diff --git a/packages/cli/package.json b/packages/cli/package.json index d87035ec7..0e7a11957 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -102,6 +102,7 @@ "@n8n/client-oauth2": "workspace:*", "@n8n_io/license-sdk": "~2.6.0", "@oclif/command": "^1.8.16", + "@oclif/config": "^1.18.17", "@oclif/core": "^1.16.4", "@oclif/errors": "^1.3.6", "@rudderstack/rudder-sdk-node": "1.0.6", diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index 8c3d5b5bb..092b1a95a 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -117,7 +117,7 @@ export abstract class AbstractServer { if (config.getEnv('executions.mode') === 'queue') { // will start the redis connections - await Container.get(OrchestrationService).init(this.uniqueInstanceId); + await Container.get(OrchestrationService).init(); } } diff --git a/packages/cli/src/License.ts b/packages/cli/src/License.ts index bf384d024..672a8959e 100644 --- a/packages/cli/src/License.ts +++ b/packages/cli/src/License.ts @@ -1,4 +1,4 @@ -import type { TEntitlement, TLicenseBlock } from '@n8n_io/license-sdk'; +import type { TEntitlement, TFeatures, TLicenseBlock } from '@n8n_io/license-sdk'; import { LicenseManager } from '@n8n_io/license-sdk'; import type { ILogger } from 'n8n-workflow'; import { getLogger } from './Logger'; @@ -50,6 +50,9 @@ export class License { const saveCertStr = isMainInstance ? async (value: TLicenseBlock) => this.saveCertStr(value) : async () => {}; + const onFeatureChange = isMainInstance + ? async (features: TFeatures) => this.onFeatureChange(features) + : async () => {}; try { this.manager = new LicenseManager({ @@ -64,6 +67,7 @@ export class License { loadCertStr: async () => this.loadCertStr(), saveCertStr, deviceFingerprint: () => instanceId, + onFeatureChange, }); await this.manager.initialize(); @@ -89,6 +93,18 @@ export class License { return databaseSettings?.value ?? ''; } + async onFeatureChange(_features: TFeatures): Promise { + if (config.getEnv('executions.mode') === 'queue') { + if (!this.redisPublisher) { + this.logger.debug('Initializing Redis publisher for License Service'); + this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); + } + await this.redisPublisher.publishToCommandChannel({ + command: 'reloadLicense', + }); + } + } + async saveCertStr(value: TLicenseBlock): Promise { // if we have an ephemeral license, we don't want to save it to the database if (config.get('license.cert')) return; @@ -100,15 +116,6 @@ export class License { }, ['key'], ); - if (config.getEnv('executions.mode') === 'queue') { - if (!this.redisPublisher) { - this.logger.debug('Initializing Redis publisher for License Service'); - this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); - } - await this.redisPublisher.publishToCommandChannel({ - command: 'reloadLicense', - }); - } } async activate(activationKey: string): Promise { diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 70bc4b701..152b1d389 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -1474,9 +1474,7 @@ export class Server extends AbstractServer { // ---------------------------------------- if (!eventBus.isInitialized) { - await eventBus.initialize({ - uniqueInstanceId: this.uniqueInstanceId, - }); + await eventBus.initialize(); } if (this.endpointPresetCredentials !== '') { diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 922634b3f..75446f511 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -22,6 +22,7 @@ import { PostHogClient } from '@/posthog'; import { License } from '@/License'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; import { initExpressionEvaluator } from '@/ExpressionEvalator'; +import { generateHostInstanceId } from '../databases/utils/generators'; export abstract class BaseCommand extends Command { protected logger = LoggerProxy.init(getLogger()); @@ -36,6 +37,10 @@ export abstract class BaseCommand extends Command { protected instanceId: string; + instanceType: N8nInstanceType = 'main'; + + queueModeId: string; + protected server?: AbstractServer; async init(): Promise { @@ -83,6 +88,22 @@ export abstract class BaseCommand extends Command { await Container.get(InternalHooks).init(this.instanceId); } + protected setInstanceType(instanceType: N8nInstanceType) { + this.instanceType = instanceType; + config.set('generic.instanceType', instanceType); + } + + protected setInstanceQueueModeId() { + if (config.getEnv('executions.mode') === 'queue') { + if (config.get('redis.queueModeId')) { + this.queueModeId = config.get('redis.queueModeId'); + return; + } + this.queueModeId = generateHostInstanceId(this.instanceType); + config.set('redis.queueModeId', this.queueModeId); + } + } + protected async stopProcess() { // This needs to be overridden } @@ -115,11 +136,9 @@ export abstract class BaseCommand extends Command { await this.externalHooks.init(); } - async initLicense(instanceType: N8nInstanceType = 'main'): Promise { - config.set('generic.instanceType', instanceType); - + async initLicense(): Promise { const license = Container.get(License); - await license.init(this.instanceId, instanceType); + await license.init(this.instanceId, this.instanceType ?? 'main'); const activationKey = config.getEnv('license.activationKey'); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 235c0c814..676f8844f 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -30,6 +30,7 @@ import { BaseCommand } from './BaseCommand'; import { InternalHooks } from '@/InternalHooks'; import { License } from '@/License'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { IConfig } from '@oclif/config'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); @@ -65,6 +66,12 @@ export class Start extends BaseCommand { protected server = new Server(); + constructor(argv: string[], cmdConfig: IConfig) { + super(argv, cmdConfig); + this.setInstanceType('main'); + this.setInstanceQueueModeId(); + } + /** * Opens the UI in browser */ @@ -196,11 +203,16 @@ export class Start extends BaseCommand { async init() { await this.initCrashJournal(); - await super.init(); this.logger.info('Initializing n8n process'); + if (config.getEnv('executions.mode') === 'queue') { + this.logger.debug('Main Instance running in queue mode'); + this.logger.debug(`Queue mode id: ${this.queueModeId}`); + } + + await super.init(); this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); - await this.initLicense('main'); + await this.initLicense(); await this.initBinaryDataService(); await this.initExternalHooks(); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 1a681be8b..2c3ee3589 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -6,6 +6,7 @@ import { WebhookServer } from '@/WebhookServer'; import { Queue } from '@/Queue'; import { BaseCommand } from './BaseCommand'; import { Container } from 'typedi'; +import { IConfig } from '@oclif/config'; export class Webhook extends BaseCommand { static description = 'Starts n8n webhook process. Intercepts only production URLs.'; @@ -18,6 +19,15 @@ export class Webhook extends BaseCommand { protected server = new WebhookServer(); + constructor(argv: string[], cmdConfig: IConfig) { + super(argv, cmdConfig); + this.setInstanceType('webhook'); + if (this.queueModeId) { + this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`); + } + this.setInstanceQueueModeId(); + } + /** * Stops n8n in a graceful way. * Make for example sure that all the webhooks from third party services @@ -75,9 +85,13 @@ export class Webhook extends BaseCommand { } await this.initCrashJournal(); + + this.logger.info('Initializing n8n webhook process'); + this.logger.debug(`Queue mode id: ${this.queueModeId}`); + await super.init(); - await this.initLicense('webhook'); + await this.initLicense(); await this.initBinaryDataService(); await this.initExternalHooks(); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 28501a90c..5445b85f1 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -29,7 +29,6 @@ import { N8N_VERSION } from '@/constants'; import { BaseCommand } from './BaseCommand'; import { ExecutionRepository } from '@db/repositories'; import { OwnershipService } from '@/services/ownership.service'; -import { generateHostInstanceId } from '@/databases/utils/generators'; import type { ICredentialsOverwrite } from '@/Interfaces'; import { CredentialsOverwrites } from '@/CredentialsOverwrites'; import { rawBodyReader, bodyParser } from '@/middlewares'; @@ -38,6 +37,7 @@ import { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSu import { RedisServicePubSubSubscriber } from '../services/redis/RedisServicePubSubSubscriber'; import { EventMessageGeneric } from '../eventbus/EventMessageClasses/EventMessageGeneric'; import { getWorkerCommandReceivedHandler } from '../worker/workerCommandHandler'; +import { IConfig } from '@oclif/config'; export class Worker extends BaseCommand { static description = '\nStarts a n8n worker'; @@ -58,8 +58,6 @@ export class Worker extends BaseCommand { static jobQueue: JobQueue; - readonly uniqueInstanceId = generateHostInstanceId('worker'); - redisPublisher: RedisServicePubSubPublisher; redisSubscriber: RedisServicePubSubSubscriber; @@ -250,13 +248,22 @@ export class Worker extends BaseCommand { }; } + constructor(argv: string[], cmdConfig: IConfig) { + super(argv, cmdConfig); + this.setInstanceType('worker'); + this.setInstanceQueueModeId(); + } + async init() { await this.initCrashJournal(); - await super.init(); - this.logger.debug(`Worker ID: ${this.uniqueInstanceId}`); - this.logger.debug('Starting n8n worker...'); - await this.initLicense('worker'); + this.logger.debug('Starting n8n worker...'); + this.logger.debug(`Queue mode id: ${this.queueModeId}`); + + await super.init(); + + await this.initLicense(); + await this.initBinaryDataService(); await this.initExternalHooks(); await this.initExternalSecrets(); @@ -267,8 +274,7 @@ export class Worker extends BaseCommand { async initEventBus() { await eventBus.initialize({ - workerId: this.uniqueInstanceId, - uniqueInstanceId: this.uniqueInstanceId, + workerId: this.queueModeId, }); } @@ -286,7 +292,7 @@ export class Worker extends BaseCommand { new EventMessageGeneric({ eventName: 'n8n.worker.started', payload: { - workerId: this.uniqueInstanceId, + workerId: this.queueModeId, }, }), ); @@ -295,7 +301,7 @@ export class Worker extends BaseCommand { 'WorkerCommandReceivedHandler', // eslint-disable-next-line @typescript-eslint/no-unsafe-argument getWorkerCommandReceivedHandler({ - uniqueInstanceId: this.uniqueInstanceId, + queueModeId: this.queueModeId, instanceId: this.instanceId, redisPublisher: this.redisPublisher, getRunningJobIds: () => Object.keys(Worker.runningJobs), diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 65c17fd3c..ec4f2e556 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -1138,6 +1138,11 @@ export const schema = { default: 'n8n', env: 'N8N_REDIS_KEY_PREFIX', }, + queueModeId: { + doc: 'Unique ID for this n8n instance, is usually set automatically by n8n during startup', + format: String, + default: '', + }, }, cache: { diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 5a6738c5f..7d674379d 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -32,13 +32,9 @@ import { ExecutionRepository, WorkflowRepository } from '@/databases/repositorie import { RedisService } from '@/services/redis.service'; import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; -import { - COMMAND_REDIS_CHANNEL, - EVENT_BUS_REDIS_CHANNEL, -} from '@/services/redis/RedisServiceHelper'; +import { EVENT_BUS_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; -import { messageToRedisServiceCommandObject } from '@/services/orchestration/helpers'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -50,7 +46,6 @@ export interface MessageWithCallback { export interface MessageEventBusInitializeOptions { skipRecoveryPass?: boolean; workerId?: string; - uniqueInstanceId?: string; } @Service() @@ -59,8 +54,6 @@ export class MessageEventBus extends EventEmitter { isInitialized: boolean; - uniqueInstanceId: string; - redisPublisher: RedisServicePubSubPublisher; redisSubscriber: RedisServicePubSubSubscriber; @@ -93,25 +86,20 @@ export class MessageEventBus extends EventEmitter { * * Sets `isInitialized` to `true` once finished. */ - async initialize(options: MessageEventBusInitializeOptions): Promise { + async initialize(options?: MessageEventBusInitializeOptions): Promise { if (this.isInitialized) { return; } - this.uniqueInstanceId = options?.uniqueInstanceId ?? ''; - if (config.getEnv('executions.mode') === 'queue') { this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber(); await this.redisSubscriber.subscribeToEventLog(); - await this.redisSubscriber.subscribeToCommandChannel(); this.redisSubscriber.addMessageHandler( 'MessageEventBusMessageReceiver', async (channel: string, messageString: string) => { if (channel === EVENT_BUS_REDIS_CHANNEL) { await this.handleRedisEventBusMessage(messageString); - } else if (channel === COMMAND_REDIS_CHANNEL) { - await this.handleRedisCommandMessage(messageString); } }, ); @@ -265,33 +253,9 @@ export class MessageEventBus extends EventEmitter { return eventData; } - async handleRedisCommandMessage(messageString: string) { - const message = messageToRedisServiceCommandObject(messageString); - if (message) { - if ( - message.senderId === this.uniqueInstanceId || - (message.targets && !message.targets.includes(this.uniqueInstanceId)) - ) { - LoggerProxy.debug( - `Skipping command message ${message.command} because it's not for this instance.`, - ); - return message; - } - switch (message.command) { - case 'restartEventBus': - await this.restart(); - default: - break; - } - return message; - } - return; - } - async broadcastRestartEventbusAfterDestinationUpdate() { if (config.getEnv('executions.mode') === 'queue') { await this.redisPublisher.publishToCommandChannel({ - senderId: this.uniqueInstanceId, command: 'restartEventBus', }); } @@ -317,7 +281,6 @@ export class MessageEventBus extends EventEmitter { ); await this.destinations[destinationName].close(); } - await this.redisSubscriber?.unSubscribeFromCommandChannel(); await this.redisSubscriber?.unSubscribeFromEventLog(); this.isInitialized = false; LoggerProxy.debug('EventBus shut down.'); diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts index 470e832c6..d004b061a 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts @@ -18,7 +18,7 @@ import { isEventMessageConfirm, } from '../EventMessageClasses/EventMessageConfirm'; import { once as eventOnce } from 'events'; -import { inTest } from '../../constants'; +import { inTest } from '@/constants'; interface MessageEventBusLogWriterConstructorOptions { logBaseName?: string; diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 17fcab79d..0cd240235 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -10,20 +10,13 @@ import { handleCommandMessage } from './orchestration/handleCommandMessage'; export class OrchestrationService { private initialized = false; - private _uniqueInstanceId = ''; - - get uniqueInstanceId(): string { - return this._uniqueInstanceId; - } - redisPublisher: RedisServicePubSubPublisher; redisSubscriber: RedisServicePubSubSubscriber; constructor(readonly redisService: RedisService) {} - async init(uniqueInstanceId: string) { - this._uniqueInstanceId = uniqueInstanceId; + async init() { await this.initPublisher(); await this.initSubscriber(); this.initialized = true; @@ -50,7 +43,7 @@ export class OrchestrationService { if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { await handleWorkerResponseMessage(messageString); } else if (channel === COMMAND_REDIS_CHANNEL) { - await handleCommandMessage(messageString, this.uniqueInstanceId); + await handleCommandMessage(messageString); } }, ); @@ -61,7 +54,6 @@ export class OrchestrationService { throw new Error('OrchestrationService not initialized'); } await this.redisPublisher.publishToCommandChannel({ - senderId: this.uniqueInstanceId, command: 'getStatus', targets: id ? [id] : undefined, }); @@ -72,32 +64,7 @@ export class OrchestrationService { throw new Error('OrchestrationService not initialized'); } await this.redisPublisher.publishToCommandChannel({ - senderId: this.uniqueInstanceId, command: 'getId', }); } - - // TODO: not implemented yet on worker side - async stopWorker(id?: string) { - if (!this.initialized) { - throw new Error('OrchestrationService not initialized'); - } - await this.redisPublisher.publishToCommandChannel({ - senderId: this.uniqueInstanceId, - command: 'stopWorker', - targets: id ? [id] : undefined, - }); - } - - // reload the license on workers after it was changed on the main instance - async reloadLicense(id?: string) { - if (!this.initialized) { - throw new Error('OrchestrationService not initialized'); - } - await this.redisPublisher.publishToCommandChannel({ - senderId: this.uniqueInstanceId, - command: 'reloadLicense', - targets: id ? [id] : undefined, - }); - } } diff --git a/packages/cli/src/services/orchestration/handleCommandMessage.ts b/packages/cli/src/services/orchestration/handleCommandMessage.ts index 8a04cb3ba..06e08977e 100644 --- a/packages/cli/src/services/orchestration/handleCommandMessage.ts +++ b/packages/cli/src/services/orchestration/handleCommandMessage.ts @@ -1,16 +1,19 @@ import { LoggerProxy } from 'n8n-workflow'; import { messageToRedisServiceCommandObject } from './helpers'; +import config from '@/config'; +import { MessageEventBus } from '../../eventbus/MessageEventBus/MessageEventBus'; import Container from 'typedi'; -import { License } from '@/License'; // this function handles commands sent to the MAIN instance. the workers handle their own commands -export async function handleCommandMessage(messageString: string, uniqueInstanceId: string) { +export async function handleCommandMessage(messageString: string) { + const queueModeId = config.get('redis.queueModeId'); const message = messageToRedisServiceCommandObject(messageString); if (message) { if ( - message.senderId === uniqueInstanceId || - (message.targets && !message.targets.includes(uniqueInstanceId)) + message.senderId === queueModeId || + (message.targets && !message.targets.includes(queueModeId)) ) { + // Skipping command message because it's not for this instance LoggerProxy.debug( `Skipping command message ${message.command} because it's not for this instance.`, ); @@ -18,8 +21,16 @@ export async function handleCommandMessage(messageString: string, uniqueInstance } switch (message.command) { case 'reloadLicense': - await Container.get(License).reload(); + // at this point in time, only a single main instance is supported, thus this + // command _should_ never be caught currently (which is why we log a warning) + LoggerProxy.warn( + 'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.', + ); + // once multiple main instances are supported, this command should be handled + // await Container.get(License).reload(); break; + case 'restartEventBus': + await Container.get(MessageEventBus).restart(); default: break; } diff --git a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts index da16aa25e..5f5e3854d 100644 --- a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts +++ b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts @@ -2,6 +2,7 @@ import type Redis from 'ioredis'; import type { Cluster } from 'ioredis'; import { getDefaultRedisClient } from './RedisServiceHelper'; import { LoggerProxy } from 'n8n-workflow'; +import config from '@/config'; export type RedisClientType = | 'subscriber' @@ -57,8 +58,9 @@ class RedisServiceBase { export abstract class RedisServiceBaseSender extends RedisServiceBase { senderId: string; - setSenderId(senderId?: string): void { - this.senderId = senderId ?? ''; + async init(type: RedisClientType = 'client'): Promise { + await super.init(type); + this.senderId = config.get('redis.queueModeId'); } } diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index a57e19004..634450b2e 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -12,7 +12,7 @@ export type RedisServiceCommand = * @field payload: Optional arguments to be sent with the command. */ type RedisServiceBaseCommand = { - senderId?: string; + senderId: string; command: RedisServiceCommand; payload?: { [key: string]: string | number | boolean | string[] | number[] | boolean[]; diff --git a/packages/cli/src/services/redis/RedisServiceListSender.ts b/packages/cli/src/services/redis/RedisServiceListSender.ts index bb91c1325..93dcd1599 100644 --- a/packages/cli/src/services/redis/RedisServiceListSender.ts +++ b/packages/cli/src/services/redis/RedisServiceListSender.ts @@ -5,9 +5,8 @@ import { RedisServiceBaseSender } from './RedisServiceBaseClasses'; @Service() export class RedisServiceListSender extends RedisServiceBaseSender { - async init(senderId?: string): Promise { + async init(): Promise { await super.init('list-sender'); - this.setSenderId(senderId); } async prepend(list: string, message: string): Promise { diff --git a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts index a0a850133..3825cc2cc 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts @@ -13,9 +13,8 @@ import { RedisServiceBaseSender } from './RedisServiceBaseClasses'; @Service() export class RedisServicePubSubPublisher extends RedisServiceBaseSender { - async init(senderId?: string): Promise { + async init(): Promise { await super.init('publisher'); - this.setSenderId(senderId); } async publish(channel: string, message: string): Promise { @@ -29,8 +28,12 @@ export class RedisServicePubSubPublisher extends RedisServiceBaseSender { await this.publish(EVENT_BUS_REDIS_CHANNEL, message.toString()); } - async publishToCommandChannel(message: RedisServiceCommandObject): Promise { - await this.publish(COMMAND_REDIS_CHANNEL, JSON.stringify(message)); + async publishToCommandChannel( + message: Omit, + ): Promise { + const messageWithSenderId = message as RedisServiceCommandObject; + messageWithSenderId.senderId = this.senderId; + await this.publish(COMMAND_REDIS_CHANNEL, JSON.stringify(messageWithSenderId)); } async publishToWorkerChannel(message: RedisServiceWorkerResponseObject): Promise { diff --git a/packages/cli/src/services/redis/RedisServiceStreamProducer.ts b/packages/cli/src/services/redis/RedisServiceStreamProducer.ts index 51578d5d8..6fe03208e 100644 --- a/packages/cli/src/services/redis/RedisServiceStreamProducer.ts +++ b/packages/cli/src/services/redis/RedisServiceStreamProducer.ts @@ -14,9 +14,8 @@ import { RedisServiceBaseSender } from './RedisServiceBaseClasses'; @Service() export class RedisServiceStreamProducer extends RedisServiceBaseSender { - async init(senderId?: string): Promise { + async init(): Promise { await super.init('producer'); - this.setSenderId(senderId); } async add(streamName: string, values: RedisValue[]): Promise { diff --git a/packages/cli/src/worker/workerCommandHandler.ts b/packages/cli/src/worker/workerCommandHandler.ts index acd488624..285a22258 100644 --- a/packages/cli/src/worker/workerCommandHandler.ts +++ b/packages/cli/src/worker/workerCommandHandler.ts @@ -5,9 +5,10 @@ import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServiceP import * as os from 'os'; import Container from 'typedi'; import { License } from '@/License'; +import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus'; export function getWorkerCommandReceivedHandler(options: { - uniqueInstanceId: string; + queueModeId: string; instanceId: string; redisPublisher: RedisServicePubSubPublisher; getRunningJobIds: () => string[]; @@ -25,16 +26,16 @@ export function getWorkerCommandReceivedHandler(options: { return; } if (message) { - if (message.targets && !message.targets.includes(options.uniqueInstanceId)) { + if (message.targets && !message.targets.includes(options.queueModeId)) { return; // early return if the message is not for this worker } switch (message.command) { case 'getStatus': await options.redisPublisher.publishToWorkerChannel({ - workerId: options.uniqueInstanceId, + workerId: options.queueModeId, command: message.command, payload: { - workerId: options.uniqueInstanceId, + workerId: options.queueModeId, runningJobs: options.getRunningJobIds(), freeMem: os.freemem(), totalMem: os.totalmem(), @@ -53,13 +54,14 @@ export function getWorkerCommandReceivedHandler(options: { break; case 'getId': await options.redisPublisher.publishToWorkerChannel({ - workerId: options.uniqueInstanceId, + workerId: options.queueModeId, command: message.command, }); break; case 'restartEventBus': + await Container.get(MessageEventBus).restart(); await options.redisPublisher.publishToWorkerChannel({ - workerId: options.uniqueInstanceId, + workerId: options.queueModeId, command: message.command, payload: { result: 'success', diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 2d9191c12..41b016198 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -1,6 +1,7 @@ import { mockInstance } from '../shared/utils/'; import { Worker } from '@/commands/worker'; import * as Config from '@oclif/config'; +import config from '@/config'; import { LoggerProxy } from 'n8n-workflow'; import { Telemetry } from '@/telemetry'; import { getLogger } from '@/Logger'; @@ -17,10 +18,11 @@ import { InternalHooks } from '@/InternalHooks'; import { PostHogClient } from '@/posthog'; import { RedisService } from '@/services/redis.service'; -const config: Config.IConfig = new Config.Config({ root: __dirname }); +const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname }); beforeAll(async () => { LoggerProxy.init(getLogger()); + config.set('executions.mode', 'queue'); mockInstance(Telemetry); mockInstance(PostHogClient); mockInstance(InternalHooks); @@ -37,7 +39,7 @@ beforeAll(async () => { }); test('worker initializes all its components', async () => { - const worker = new Worker([], config); + const worker = new Worker([], oclifConfig); jest.spyOn(worker, 'init'); jest.spyOn(worker, 'initLicense').mockImplementation(async () => {}); @@ -60,9 +62,9 @@ test('worker initializes all its components', async () => { await worker.init(); - expect(worker.uniqueInstanceId).toBeDefined(); - expect(worker.uniqueInstanceId).toContain('worker'); - expect(worker.uniqueInstanceId.length).toBeGreaterThan(15); + expect(worker.queueModeId).toBeDefined(); + expect(worker.queueModeId).toContain('worker'); + expect(worker.queueModeId.length).toBeGreaterThan(15); expect(worker.initLicense).toHaveBeenCalled(); expect(worker.initBinaryDataService).toHaveBeenCalled(); expect(worker.initExternalHooks).toHaveBeenCalled(); diff --git a/packages/cli/test/unit/License.test.ts b/packages/cli/test/unit/License.test.ts index b9f942e36..99d90daac 100644 --- a/packages/cli/test/unit/License.test.ts +++ b/packages/cli/test/unit/License.test.ts @@ -38,6 +38,7 @@ describe('License', () => { logger: expect.anything(), loadCertStr: expect.any(Function), saveCertStr: expect.any(Function), + onFeatureChange: expect.any(Function), server: MOCK_SERVER_URL, tenantId: 1, }); @@ -56,6 +57,7 @@ describe('License', () => { logger: expect.anything(), loadCertStr: expect.any(Function), saveCertStr: expect.any(Function), + onFeatureChange: expect.any(Function), server: MOCK_SERVER_URL, tenantId: 1, }); diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 127aac690..9367e4da7 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -4,16 +4,16 @@ import { LoggerProxy } from 'n8n-workflow'; import { getLogger } from '@/Logger'; import { OrchestrationService } from '@/services/orchestration.service'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; -import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; import { eventBus } from '@/eventbus'; import { RedisService } from '@/services/redis.service'; import { mockInstance } from '../../integration/shared/utils'; import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage'; import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage'; -import { License } from '../../../src/License'; const os = Container.get(OrchestrationService); +let queueModeId: string; + function setDefaultConfig() { config.set('executions.mode', 'queue'); } @@ -27,15 +27,6 @@ const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = { }, }; -const eventBusMessage = new EventMessageWorkflow({ - eventName: 'n8n.workflow.success', - id: 'test', - message: 'test', - payload: { - test: 'test', - }, -}); - describe('Orchestration Service', () => { beforeAll(async () => { mockInstance(RedisService); @@ -74,6 +65,7 @@ describe('Orchestration Service', () => { }); }); setDefaultConfig(); + queueModeId = config.get('redis.queueModeId'); }); afterAll(async () => { @@ -83,10 +75,10 @@ describe('Orchestration Service', () => { }); test('should initialize', async () => { - await os.init('test-orchestration-service'); + await os.init(); expect(os.redisPublisher).toBeDefined(); expect(os.redisSubscriber).toBeDefined(); - expect(os.uniqueInstanceId).toBeDefined(); + expect(queueModeId).toBeDefined(); }); test('should handle worker responses', async () => { @@ -97,32 +89,28 @@ describe('Orchestration Service', () => { }); test('should handle command messages from others', async () => { - const license = Container.get(License); - license.instanceId = 'test'; - jest.spyOn(license, 'reload'); + jest.spyOn(LoggerProxy, 'warn'); const responseFalseId = await handleCommandMessage( JSON.stringify({ senderId: 'test', command: 'reloadLicense', }), - os.uniqueInstanceId, ); expect(responseFalseId).toBeDefined(); expect(responseFalseId!.command).toEqual('reloadLicense'); expect(responseFalseId!.senderId).toEqual('test'); - expect(license.reload).toHaveBeenCalled(); - jest.spyOn(license, 'reload').mockRestore(); + expect(LoggerProxy.warn).toHaveBeenCalled(); + jest.spyOn(LoggerProxy, 'warn').mockRestore(); }); test('should reject command messages from iteslf', async () => { jest.spyOn(eventBus, 'restart'); const response = await handleCommandMessage( - JSON.stringify({ ...workerRestartEventbusResponse, senderId: os.uniqueInstanceId }), - os.uniqueInstanceId, + JSON.stringify({ ...workerRestartEventbusResponse, senderId: queueModeId }), ); expect(response).toBeDefined(); expect(response!.command).toEqual('restartEventBus'); - expect(response!.senderId).toEqual(os.uniqueInstanceId); + expect(response!.senderId).toEqual(queueModeId); expect(eventBus.restart).not.toHaveBeenCalled(); jest.spyOn(eventBus, 'restart').mockRestore(); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d88bff582..d41577917 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -139,7 +139,7 @@ importers: dependencies: axios: specifier: ^0.21.1 - version: 0.21.4(debug@4.3.2) + version: 0.21.4 packages/@n8n_io/eslint-config: devDependencies: @@ -199,7 +199,10 @@ importers: version: 2.6.0 '@oclif/command': specifier: ^1.8.16 - version: 1.8.18(@oclif/config@1.18.5)(supports-color@8.1.1) + version: 1.8.18(@oclif/config@1.18.17)(supports-color@8.1.1) + '@oclif/config': + specifier: ^1.18.17 + version: 1.18.17 '@oclif/core': specifier: ^1.16.4 version: 1.16.6 @@ -217,7 +220,7 @@ importers: version: 7.28.1 axios: specifier: ^0.21.1 - version: 0.21.4(debug@4.3.2) + version: 0.21.4 basic-auth: specifier: ^2.0.1 version: 2.0.1 @@ -572,7 +575,7 @@ importers: version: link:../@n8n/client-oauth2 axios: specifier: ^0.21.1 - version: 0.21.4(debug@4.3.2) + version: 0.21.4 concat-stream: specifier: ^2.0.0 version: 2.0.0 @@ -838,7 +841,7 @@ importers: version: 10.2.0(vue@3.3.4) axios: specifier: ^0.21.1 - version: 0.21.4(debug@4.3.2) + version: 0.21.4 codemirror-lang-html-n8n: specifier: ^1.0.0 version: 1.0.0 @@ -965,7 +968,7 @@ importers: dependencies: '@oclif/command': specifier: ^1.5.18 - version: 1.8.18(@oclif/config@1.18.5)(supports-color@8.1.1) + version: 1.8.18(@oclif/config@1.18.17)(supports-color@8.1.1) '@oclif/errors': specifier: ^1.2.2 version: 1.3.6 @@ -4736,6 +4739,37 @@ packages: dev: false optional: true + /@oclif/command@1.8.18(@oclif/config@1.18.17): + resolution: {integrity: sha512-qTad+jtiriMMbkw6ArtcUY89cwLwmwDnD4KSGT+OQiZKYtegp3NUCM9JN8lfj/aKC+0kvSitJM4ULzbgiVTKQQ==} + engines: {node: '>=12.0.0'} + peerDependencies: + '@oclif/config': ^1 + dependencies: + '@oclif/config': 1.18.17 + '@oclif/errors': 1.3.6 + '@oclif/help': 1.0.3(supports-color@8.1.1) + '@oclif/parser': 3.8.8 + debug: 4.3.4(supports-color@8.1.1) + semver: 7.5.4 + transitivePeerDependencies: + - supports-color + dev: true + + /@oclif/command@1.8.18(@oclif/config@1.18.17)(supports-color@8.1.1): + resolution: {integrity: sha512-qTad+jtiriMMbkw6ArtcUY89cwLwmwDnD4KSGT+OQiZKYtegp3NUCM9JN8lfj/aKC+0kvSitJM4ULzbgiVTKQQ==} + engines: {node: '>=12.0.0'} + peerDependencies: + '@oclif/config': ^1 + dependencies: + '@oclif/config': 1.18.17 + '@oclif/errors': 1.3.6 + '@oclif/help': 1.0.3(supports-color@8.1.1) + '@oclif/parser': 3.8.8 + debug: 4.3.4(supports-color@8.1.1) + semver: 7.5.4 + transitivePeerDependencies: + - supports-color + /@oclif/command@1.8.18(@oclif/config@1.18.2): resolution: {integrity: sha512-qTad+jtiriMMbkw6ArtcUY89cwLwmwDnD4KSGT+OQiZKYtegp3NUCM9JN8lfj/aKC+0kvSitJM4ULzbgiVTKQQ==} engines: {node: '>=12.0.0'} @@ -4752,40 +4786,24 @@ packages: - supports-color dev: true - /@oclif/command@1.8.18(@oclif/config@1.18.5): - resolution: {integrity: sha512-qTad+jtiriMMbkw6ArtcUY89cwLwmwDnD4KSGT+OQiZKYtegp3NUCM9JN8lfj/aKC+0kvSitJM4ULzbgiVTKQQ==} - engines: {node: '>=12.0.0'} - peerDependencies: - '@oclif/config': ^1 + /@oclif/config@1.18.17: + resolution: {integrity: sha512-k77qyeUvjU8qAJ3XK3fr/QVAqsZO8QOBuESnfeM5HHtPNLSyfVcwiMM2zveSW5xRdLSG3MfV8QnLVkuyCL2ENg==} + engines: {node: '>=8.0.0'} + deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info. dependencies: - '@oclif/config': 1.18.5(supports-color@8.1.1) '@oclif/errors': 1.3.6 - '@oclif/help': 1.0.3(supports-color@8.1.1) - '@oclif/parser': 3.8.8 + '@oclif/parser': 3.8.17 debug: 4.3.4(supports-color@8.1.1) - semver: 7.5.4 - transitivePeerDependencies: - - supports-color - dev: true - - /@oclif/command@1.8.18(@oclif/config@1.18.5)(supports-color@8.1.1): - resolution: {integrity: sha512-qTad+jtiriMMbkw6ArtcUY89cwLwmwDnD4KSGT+OQiZKYtegp3NUCM9JN8lfj/aKC+0kvSitJM4ULzbgiVTKQQ==} - engines: {node: '>=12.0.0'} - peerDependencies: - '@oclif/config': ^1 - dependencies: - '@oclif/config': 1.18.5(supports-color@8.1.1) - '@oclif/errors': 1.3.6 - '@oclif/help': 1.0.3(supports-color@8.1.1) - '@oclif/parser': 3.8.8 - debug: 4.3.4(supports-color@8.1.1) - semver: 7.5.4 + globby: 11.1.0 + is-wsl: 2.2.0 + tslib: 2.6.1 transitivePeerDependencies: - supports-color /@oclif/config@1.18.2: resolution: {integrity: sha512-cE3qfHWv8hGRCP31j7fIS7BfCflm/BNZ2HNqHexH+fDrdF2f1D5S8VmXWLC77ffv3oDvWyvE9AZeR0RfmHCCaA==} engines: {node: '>=8.0.0'} + deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info. dependencies: '@oclif/errors': 1.3.6 '@oclif/parser': 3.8.8 @@ -4800,6 +4818,7 @@ packages: /@oclif/config@1.18.5(supports-color@8.1.1): resolution: {integrity: sha512-R6dBedaUVn5jtAh79aaRm7jezx4l3V7Im9NORlLmudz5BL1foMeuXEvnqm+bMiejyexVA+oi9mto6YKZPzo/5Q==} engines: {node: '>=8.0.0'} + deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info. dependencies: '@oclif/errors': 1.3.6 '@oclif/parser': 3.8.8 @@ -4849,11 +4868,11 @@ packages: engines: {node: '>=8.10.0'} hasBin: true dependencies: - '@oclif/command': 1.8.18(@oclif/config@1.18.5) - '@oclif/config': 1.18.5(supports-color@8.1.1) + '@oclif/command': 1.8.18(@oclif/config@1.18.17) + '@oclif/config': 1.18.17 '@oclif/errors': 1.3.6 '@oclif/plugin-help': 3.2.18 - cli-ux: 5.6.7(@oclif/config@1.18.5) + cli-ux: 5.6.7(@oclif/config@1.18.17) debug: 4.3.4(supports-color@8.1.1) find-yarn-workspace-root: 2.0.0 fs-extra: 8.1.0 @@ -4906,6 +4925,16 @@ packages: /@oclif/linewrap@1.0.0: resolution: {integrity: sha512-Ups2dShK52xXa8w6iBWLgcjPJWjais6KPJQq3gQ/88AY6BXoTX+MIGFPrWQO1KLMiQfoTpcLnUwloN4brrVUHw==} + /@oclif/parser@3.8.17: + resolution: {integrity: sha512-l04iSd0xoh/16TGVpXb81Gg3z7tlQGrEup16BrVLsZBK6SEYpYHRJZnM32BwZrHI97ZSFfuSwVlzoo6HdsaK8A==} + engines: {node: '>=8.0.0'} + deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info. + dependencies: + '@oclif/errors': 1.3.6 + '@oclif/linewrap': 1.0.0 + chalk: 4.1.2 + tslib: 2.6.1 + /@oclif/parser@3.8.8: resolution: {integrity: sha512-OgqQAtpyq1XFJG3dvLl9aqiO+F5pubkzt7AivUDkNoa6/hNgVZ79vvTO8sqo5XAAhOm/fcTSerZ35OTnTJb1ng==} engines: {node: '>=8.0.0'} @@ -5022,7 +5051,7 @@ packages: dependencies: '@segment/loosely-validate-event': 2.0.0 auto-changelog: 1.16.4 - axios: 0.21.4(debug@4.3.2) + axios: 0.21.4 axios-retry: 3.3.1 bull: 3.29.3 lodash.clonedeep: 4.5.0 @@ -6775,7 +6804,7 @@ packages: ts-dedent: 2.2.0 type-fest: 3.13.1 vue: 3.3.4 - vue-component-type-helpers: 1.8.11 + vue-component-type-helpers: 1.8.14 transitivePeerDependencies: - encoding - supports-color @@ -9072,19 +9101,18 @@ packages: is-retry-allowed: 2.2.0 dev: false - /axios@0.21.4(debug@4.3.2): + /axios@0.21.4: resolution: {integrity: sha512-ut5vewkiu8jjGBdqpM44XxjuCjq9LAKeHVmoVfHVzy8eHgxxq8SbAVQNovDA8mVi05kP0Ea/n/UzcSHcTJQfNg==} dependencies: - follow-redirects: 1.15.2(debug@4.3.2) + follow-redirects: 1.15.2(debug@3.2.7) transitivePeerDependencies: - debug dev: false - /axios@0.27.2: - resolution: {integrity: sha512-t+yRIyySRTp/wua5xEr+z1q60QmLq8ABsS5O9Me1AsE5dfKqgnCFzwiCZZ/cGNd1lq4/7akDWMxdhVlucjmnOQ==} + /axios@0.21.4(debug@4.3.2): + resolution: {integrity: sha512-ut5vewkiu8jjGBdqpM44XxjuCjq9LAKeHVmoVfHVzy8eHgxxq8SbAVQNovDA8mVi05kP0Ea/n/UzcSHcTJQfNg==} dependencies: follow-redirects: 1.15.2(debug@4.3.2) - form-data: 4.0.0 transitivePeerDependencies: - debug dev: false @@ -9110,7 +9138,7 @@ packages: /axios@1.4.0: resolution: {integrity: sha512-S4XCWMEmzvo64T9GfvQDOXgYRDJ/wsSZc7Jvdgx5u1sd0JwsuPLqb3SYmusag+edF6ziyMensPVqLTSc1PiSEA==} dependencies: - follow-redirects: 1.15.2(debug@4.3.2) + follow-redirects: 1.15.2(debug@3.2.7) form-data: 4.0.0 proxy-from-env: 1.1.0 transitivePeerDependencies: @@ -9938,12 +9966,12 @@ packages: string-width: 4.2.3 dev: true - /cli-ux@5.6.7(@oclif/config@1.18.5): + /cli-ux@5.6.7(@oclif/config@1.18.17): resolution: {integrity: sha512-dsKAurMNyFDnO6X1TiiRNiVbL90XReLKcvIq4H777NMqXGBxBws23ag8ubCJE97vVZEgWG2eSUhsyLf63Jv8+g==} engines: {node: '>=8.0.0'} deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info. dependencies: - '@oclif/command': 1.8.18(@oclif/config@1.18.5)(supports-color@8.1.1) + '@oclif/command': 1.8.18(@oclif/config@1.18.17)(supports-color@8.1.1) '@oclif/errors': 1.3.6 '@oclif/linewrap': 1.0.0 '@oclif/screen': 1.0.4 @@ -18006,7 +18034,7 @@ packages: resolution: {integrity: sha512-aXYe/D+28kF63W8Cz53t09ypEORz+ULeDCahdAqhVrRm2scbOXFbtnn0GGhvMpYe45grepLKuwui9KxrZ2ZuMw==} engines: {node: '>=14.17.0'} dependencies: - axios: 0.27.2 + axios: 0.27.2(debug@3.2.7) transitivePeerDependencies: - debug dev: false @@ -21736,8 +21764,8 @@ packages: vue: 3.3.4 dev: false - /vue-component-type-helpers@1.8.11: - resolution: {integrity: sha512-CWItFzuEWjkSXDeMGwQEc5cFH4FaueyPQHfi1mBDe+wA2JABqNjFxFUtmZJ9WHkb0HpEwqgBg1umiXrWYXkXHw==} + /vue-component-type-helpers@1.8.14: + resolution: {integrity: sha512-veuaNIJas+dkRflRumpnY0e0HWqrUrqg5CdWxK/CbQvJ96V4uVOM5eJbj6cJX3rFNmc7+LO3ySHwvKVS8DjG5w==} dev: true /vue-component-type-helpers@1.8.4: