fix(core): Make senderId required for all command messages (#7252)

all commands sent between main instance and workers need to contain a
server id to prevent senders from reacting to their own messages,
causing loops

this PR makes sure all sent messages contain a sender id by default as
part of constructing a sending redis client.

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Michael Auerswald
2023-09-26 13:58:06 +02:00
committed by GitHub
parent 77d6e3fc07
commit 4b014286cf
23 changed files with 231 additions and 203 deletions

View File

@@ -32,13 +32,9 @@ import { ExecutionRepository, WorkflowRepository } from '@/databases/repositorie
import { RedisService } from '@/services/redis.service';
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
import {
COMMAND_REDIS_CHANNEL,
EVENT_BUS_REDIS_CHANNEL,
} from '@/services/redis/RedisServiceHelper';
import { EVENT_BUS_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { messageToRedisServiceCommandObject } from '@/services/orchestration/helpers';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
@@ -50,7 +46,6 @@ export interface MessageWithCallback {
export interface MessageEventBusInitializeOptions {
skipRecoveryPass?: boolean;
workerId?: string;
uniqueInstanceId?: string;
}
@Service()
@@ -59,8 +54,6 @@ export class MessageEventBus extends EventEmitter {
isInitialized: boolean;
uniqueInstanceId: string;
redisPublisher: RedisServicePubSubPublisher;
redisSubscriber: RedisServicePubSubSubscriber;
@@ -93,25 +86,20 @@ export class MessageEventBus extends EventEmitter {
*
* Sets `isInitialized` to `true` once finished.
*/
async initialize(options: MessageEventBusInitializeOptions): Promise<void> {
async initialize(options?: MessageEventBusInitializeOptions): Promise<void> {
if (this.isInitialized) {
return;
}
this.uniqueInstanceId = options?.uniqueInstanceId ?? '';
if (config.getEnv('executions.mode') === 'queue') {
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber();
await this.redisSubscriber.subscribeToEventLog();
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'MessageEventBusMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === EVENT_BUS_REDIS_CHANNEL) {
await this.handleRedisEventBusMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await this.handleRedisCommandMessage(messageString);
}
},
);
@@ -265,33 +253,9 @@ export class MessageEventBus extends EventEmitter {
return eventData;
}
async handleRedisCommandMessage(messageString: string) {
const message = messageToRedisServiceCommandObject(messageString);
if (message) {
if (
message.senderId === this.uniqueInstanceId ||
(message.targets && !message.targets.includes(this.uniqueInstanceId))
) {
LoggerProxy.debug(
`Skipping command message ${message.command} because it's not for this instance.`,
);
return message;
}
switch (message.command) {
case 'restartEventBus':
await this.restart();
default:
break;
}
return message;
}
return;
}
async broadcastRestartEventbusAfterDestinationUpdate() {
if (config.getEnv('executions.mode') === 'queue') {
await this.redisPublisher.publishToCommandChannel({
senderId: this.uniqueInstanceId,
command: 'restartEventBus',
});
}
@@ -317,7 +281,6 @@ export class MessageEventBus extends EventEmitter {
);
await this.destinations[destinationName].close();
}
await this.redisSubscriber?.unSubscribeFromCommandChannel();
await this.redisSubscriber?.unSubscribeFromEventLog();
this.isInitialized = false;
LoggerProxy.debug('EventBus shut down.');