refactor(core): Simplify OrchestrationService (no-changelog) (#8364)
This commit is contained in:
@@ -1,55 +0,0 @@
|
||||
import Container from 'typedi';
|
||||
import { RedisService } from './redis.service';
|
||||
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
|
||||
import config from '@/config';
|
||||
import { EventEmitter } from 'node:events';
|
||||
|
||||
export abstract class OrchestrationService extends EventEmitter {
|
||||
protected isInitialized = false;
|
||||
|
||||
protected queueModeId: string;
|
||||
|
||||
redisPublisher: RedisServicePubSubPublisher;
|
||||
|
||||
readonly redisService: RedisService;
|
||||
|
||||
get isQueueMode(): boolean {
|
||||
return config.get('executions.mode') === 'queue';
|
||||
}
|
||||
|
||||
get isMainInstance(): boolean {
|
||||
return config.get('generic.instanceType') === 'main';
|
||||
}
|
||||
|
||||
get isWebhookInstance(): boolean {
|
||||
return config.get('generic.instanceType') === 'webhook';
|
||||
}
|
||||
|
||||
get isWorkerInstance(): boolean {
|
||||
return config.get('generic.instanceType') === 'worker';
|
||||
}
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
this.redisService = Container.get(RedisService);
|
||||
this.queueModeId = config.getEnv('redis.queueModeId');
|
||||
}
|
||||
|
||||
sanityCheck(): boolean {
|
||||
return this.isInitialized && this.isQueueMode;
|
||||
}
|
||||
|
||||
async init() {
|
||||
await this.initPublisher();
|
||||
this.isInitialized = true;
|
||||
}
|
||||
|
||||
async shutdown() {
|
||||
await this.redisPublisher?.destroy();
|
||||
this.isInitialized = false;
|
||||
}
|
||||
|
||||
protected async initPublisher() {
|
||||
this.redisPublisher = await this.redisService.getPubSubPublisher();
|
||||
}
|
||||
}
|
||||
121
packages/cli/src/services/orchestration.service.ts
Normal file
121
packages/cli/src/services/orchestration.service.ts
Normal file
@@ -0,0 +1,121 @@
|
||||
import { Service } from 'typedi';
|
||||
import { Logger } from '@/Logger';
|
||||
import config from '@/config';
|
||||
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
|
||||
import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/RedisServiceCommands';
|
||||
|
||||
import { RedisService } from './redis.service';
|
||||
import { MultiMainSetup } from './orchestration/main/MultiMainSetup.ee';
|
||||
|
||||
@Service()
|
||||
export class OrchestrationService {
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly redisService: RedisService,
|
||||
readonly multiMainSetup: MultiMainSetup,
|
||||
) {}
|
||||
|
||||
protected isInitialized = false;
|
||||
|
||||
private isMultiMainSetupLicensed = false;
|
||||
|
||||
setMultiMainSetupLicensed(newState: boolean) {
|
||||
this.isMultiMainSetupLicensed = newState;
|
||||
}
|
||||
|
||||
get isMultiMainSetupEnabled() {
|
||||
return (
|
||||
config.getEnv('executions.mode') === 'queue' &&
|
||||
config.getEnv('multiMainSetup.enabled') &&
|
||||
config.getEnv('generic.instanceType') === 'main' &&
|
||||
this.isMultiMainSetupLicensed
|
||||
);
|
||||
}
|
||||
|
||||
redisPublisher: RedisServicePubSubPublisher;
|
||||
|
||||
get instanceId() {
|
||||
return config.getEnv('redis.queueModeId');
|
||||
}
|
||||
|
||||
get isLeader() {
|
||||
return config.getEnv('multiMainSetup.instanceType') === 'leader';
|
||||
}
|
||||
|
||||
get isFollower() {
|
||||
return config.getEnv('multiMainSetup.instanceType') !== 'leader';
|
||||
}
|
||||
|
||||
sanityCheck() {
|
||||
return this.isInitialized && config.get('executions.mode') === 'queue';
|
||||
}
|
||||
|
||||
async init() {
|
||||
if (this.isInitialized) return;
|
||||
|
||||
if (config.get('executions.mode') === 'queue') await this.initPublisher();
|
||||
|
||||
if (this.isMultiMainSetupEnabled) {
|
||||
await this.multiMainSetup.init();
|
||||
} else {
|
||||
config.set('multiMainSetup.instanceType', 'leader');
|
||||
}
|
||||
|
||||
this.isInitialized = true;
|
||||
}
|
||||
|
||||
async shutdown() {
|
||||
if (!this.isInitialized) return;
|
||||
|
||||
if (this.isMultiMainSetupEnabled) await this.multiMainSetup.shutdown();
|
||||
|
||||
await this.redisPublisher.destroy();
|
||||
|
||||
this.isInitialized = false;
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
// pubsub
|
||||
// ----------------------------------
|
||||
|
||||
protected async initPublisher() {
|
||||
this.redisPublisher = await this.redisService.getPubSubPublisher();
|
||||
}
|
||||
|
||||
async publish(command: RedisServiceCommand, data?: unknown) {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const payload = data as RedisServiceBaseCommand['payload'];
|
||||
|
||||
this.logger.debug(`[Instance ID ${this.instanceId}] Publishing command "${command}"`, payload);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({ command, payload });
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
// workers status
|
||||
// ----------------------------------
|
||||
|
||||
async getWorkerStatus(id?: string) {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const command = 'getStatus';
|
||||
|
||||
this.logger.debug(`Sending "${command}" to command channel`);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
command,
|
||||
targets: id ? [id] : undefined,
|
||||
});
|
||||
}
|
||||
|
||||
async getWorkerIds() {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const command = 'getId';
|
||||
|
||||
this.logger.debug(`Sending "${command}" to command channel`);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({ command });
|
||||
}
|
||||
}
|
||||
@@ -1,43 +1,23 @@
|
||||
import { EventEmitter } from 'node:events';
|
||||
import config from '@/config';
|
||||
import { Service } from 'typedi';
|
||||
import { TIME } from '@/constants';
|
||||
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||
import { getRedisPrefix } from '@/services/redis/RedisServiceHelper';
|
||||
import { ErrorReporterProxy as EventReporter } from 'n8n-workflow';
|
||||
import type {
|
||||
RedisServiceBaseCommand,
|
||||
RedisServiceCommand,
|
||||
} from '@/services/redis/RedisServiceCommands';
|
||||
import { Logger } from '@/Logger';
|
||||
import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
|
||||
|
||||
@Service()
|
||||
export class MultiMainSetup extends SingleMainSetup {
|
||||
private id = this.queueModeId;
|
||||
|
||||
private isLicensed = false;
|
||||
|
||||
get isEnabled() {
|
||||
return (
|
||||
config.getEnv('executions.mode') === 'queue' &&
|
||||
config.getEnv('multiMainSetup.enabled') &&
|
||||
config.getEnv('generic.instanceType') === 'main' &&
|
||||
this.isLicensed
|
||||
);
|
||||
}
|
||||
|
||||
get isLeader() {
|
||||
return config.getEnv('multiMainSetup.instanceType') === 'leader';
|
||||
}
|
||||
|
||||
get isFollower() {
|
||||
return !this.isLeader;
|
||||
export class MultiMainSetup extends EventEmitter {
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly redisPublisher: RedisServicePubSubPublisher,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
get instanceId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
setLicensed(newState: boolean) {
|
||||
this.isLicensed = newState;
|
||||
return config.getEnv('redis.queueModeId');
|
||||
}
|
||||
|
||||
private readonly leaderKey = getRedisPrefix() + ':main_instance_leader';
|
||||
@@ -47,12 +27,6 @@ export class MultiMainSetup extends SingleMainSetup {
|
||||
private leaderCheckInterval: NodeJS.Timer | undefined;
|
||||
|
||||
async init() {
|
||||
if (!this.isEnabled || this.isInitialized) return;
|
||||
|
||||
await this.initPublisher();
|
||||
|
||||
this.isInitialized = true;
|
||||
|
||||
await this.tryBecomeLeader(); // prevent initial wait
|
||||
|
||||
this.leaderCheckInterval = setInterval(
|
||||
@@ -64,35 +38,35 @@ export class MultiMainSetup extends SingleMainSetup {
|
||||
}
|
||||
|
||||
async shutdown() {
|
||||
if (!this.isInitialized) return;
|
||||
|
||||
clearInterval(this.leaderCheckInterval);
|
||||
|
||||
if (this.isLeader) await this.redisPublisher.clear(this.leaderKey);
|
||||
const isLeader = config.getEnv('multiMainSetup.instanceType') === 'leader';
|
||||
|
||||
if (isLeader) await this.redisPublisher.clear(this.leaderKey);
|
||||
}
|
||||
|
||||
private async checkLeader() {
|
||||
const leaderId = await this.redisPublisher.get(this.leaderKey);
|
||||
|
||||
if (leaderId === this.id) {
|
||||
this.logger.debug(`[Instance ID ${this.id}] Leader is this instance`);
|
||||
if (leaderId === this.instanceId) {
|
||||
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is this instance`);
|
||||
|
||||
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (leaderId && leaderId !== this.id) {
|
||||
this.logger.debug(`[Instance ID ${this.id}] Leader is other instance "${leaderId}"`);
|
||||
if (leaderId && leaderId !== this.instanceId) {
|
||||
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is other instance "${leaderId}"`);
|
||||
|
||||
if (config.getEnv('multiMainSetup.instanceType') === 'leader') {
|
||||
this.emit('leadershipChange', leaderId); // stop triggers, pruning, etc.
|
||||
config.set('multiMainSetup.instanceType', 'follower');
|
||||
|
||||
this.emit('leadershipChange'); // stop triggers, pollers, pruning
|
||||
|
||||
EventReporter.report('[Multi-main setup] Leader failed to renew leader key', {
|
||||
level: 'info',
|
||||
});
|
||||
|
||||
config.set('multiMainSetup.instanceType', 'follower');
|
||||
}
|
||||
|
||||
return;
|
||||
@@ -100,42 +74,37 @@ export class MultiMainSetup extends SingleMainSetup {
|
||||
|
||||
if (!leaderId) {
|
||||
this.logger.debug(
|
||||
`[Instance ID ${this.id}] Leadership vacant, attempting to become leader...`,
|
||||
`[Instance ID ${this.instanceId}] Leadership vacant, attempting to become leader...`,
|
||||
);
|
||||
|
||||
config.set('multiMainSetup.instanceType', 'follower');
|
||||
|
||||
this.emit('leadershipVacant'); // stop triggers, pollers, pruning
|
||||
|
||||
await this.tryBecomeLeader();
|
||||
}
|
||||
}
|
||||
|
||||
private async tryBecomeLeader() {
|
||||
// this can only succeed if leadership is currently vacant
|
||||
const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id);
|
||||
const keySetSuccessfully = await this.redisPublisher.setIfNotExists(
|
||||
this.leaderKey,
|
||||
this.instanceId,
|
||||
);
|
||||
|
||||
if (keySetSuccessfully) {
|
||||
this.logger.debug(`[Instance ID ${this.id}] Leader is now this instance`);
|
||||
this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`);
|
||||
|
||||
config.set('multiMainSetup.instanceType', 'leader');
|
||||
|
||||
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
||||
|
||||
this.emit('leadershipChange', this.id);
|
||||
this.emit('leadershipChange'); // start triggers, pollers, pruning
|
||||
} else {
|
||||
config.set('multiMainSetup.instanceType', 'follower');
|
||||
}
|
||||
}
|
||||
|
||||
async publish(command: RedisServiceCommand, data: unknown) {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const payload = data as RedisServiceBaseCommand['payload'];
|
||||
|
||||
this.logger.debug(`[Instance ID ${this.id}] Publishing command "${command}"`, payload);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({ command, payload });
|
||||
}
|
||||
|
||||
async fetchLeaderKey() {
|
||||
return await this.redisPublisher.get(this.leaderKey);
|
||||
}
|
||||
|
||||
@@ -1,60 +0,0 @@
|
||||
import { Logger } from '@/Logger';
|
||||
import { Service } from 'typedi';
|
||||
import { OrchestrationService } from '@/services/orchestration.base.service';
|
||||
|
||||
/**
|
||||
* For use in main instance, in single main instance scenario.
|
||||
*/
|
||||
@Service()
|
||||
export class SingleMainSetup extends OrchestrationService {
|
||||
constructor(protected readonly logger: Logger) {
|
||||
super();
|
||||
}
|
||||
|
||||
sanityCheck() {
|
||||
return this.isInitialized && this.isQueueMode && this.isMainInstance;
|
||||
}
|
||||
|
||||
async getWorkerStatus(id?: string) {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const command = 'getStatus';
|
||||
|
||||
this.logger.debug(`Sending "${command}" to command channel`);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
command,
|
||||
targets: id ? [id] : undefined,
|
||||
});
|
||||
}
|
||||
|
||||
async getWorkerIds() {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const command = 'getId';
|
||||
|
||||
this.logger.debug(`Sending "${command}" to command channel`);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({ command });
|
||||
}
|
||||
|
||||
async broadcastRestartEventbusAfterDestinationUpdate() {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const command = 'restartEventBus';
|
||||
|
||||
this.logger.debug(`Sending "${command}" to command channel`);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({ command });
|
||||
}
|
||||
|
||||
async broadcastReloadExternalSecretsProviders() {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
const command = 'reloadExternalSecretsProviders';
|
||||
|
||||
this.logger.debug(`Sending "${command}" to command channel`);
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({ command });
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,7 @@ import { License } from '@/License';
|
||||
import { Logger } from '@/Logger';
|
||||
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||
import { Push } from '@/push';
|
||||
import { MultiMainSetup } from './MultiMainSetup.ee';
|
||||
import { OrchestrationService } from '@/services/orchestration.service';
|
||||
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
|
||||
import { TestWebhooks } from '@/TestWebhooks';
|
||||
|
||||
@@ -100,7 +100,7 @@ export async function handleCommandMessageMain(messageString: string) {
|
||||
versionId,
|
||||
});
|
||||
|
||||
await Container.get(MultiMainSetup).publish('workflowFailedToActivate', {
|
||||
await Container.get(OrchestrationService).publish('workflowFailedToActivate', {
|
||||
workflowId,
|
||||
errorMessage: error.message,
|
||||
});
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
import { Service } from 'typedi';
|
||||
import { OrchestrationService } from '../../orchestration.base.service';
|
||||
import { OrchestrationService } from '../../orchestration.service';
|
||||
import config from '@/config';
|
||||
|
||||
@Service()
|
||||
export class OrchestrationWebhookService extends OrchestrationService {
|
||||
sanityCheck(): boolean {
|
||||
return this.isInitialized && this.isQueueMode && this.isWebhookInstance;
|
||||
return (
|
||||
this.isInitialized &&
|
||||
config.get('executions.mode') === 'queue' &&
|
||||
config.get('generic.instanceType') === 'webhook'
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
import { Service } from 'typedi';
|
||||
import type { AbstractEventMessage } from '@/eventbus/EventMessageClasses/AbstractEventMessage';
|
||||
import { OrchestrationService } from '../../orchestration.base.service';
|
||||
import { OrchestrationService } from '../../orchestration.service';
|
||||
import config from '@/config';
|
||||
|
||||
@Service()
|
||||
export class OrchestrationWorkerService extends OrchestrationService {
|
||||
sanityCheck(): boolean {
|
||||
return this.isInitialized && this.isQueueMode && this.isWorkerInstance;
|
||||
return (
|
||||
this.isInitialized &&
|
||||
config.get('executions.mode') === 'queue' &&
|
||||
config.get('generic.instanceType') === 'worker'
|
||||
);
|
||||
}
|
||||
|
||||
async publishToEventLog(message: AbstractEventMessage) {
|
||||
|
||||
@@ -28,7 +28,7 @@ export class PruningService {
|
||||
private readonly binaryDataService: BinaryDataService,
|
||||
) {}
|
||||
|
||||
isPruningEnabled() {
|
||||
private isPruningEnabled() {
|
||||
if (
|
||||
!config.getEnv('executions.pruneData') ||
|
||||
inTest ||
|
||||
@@ -52,6 +52,8 @@ export class PruningService {
|
||||
* @important Call this method only after DB migrations have completed.
|
||||
*/
|
||||
startPruning() {
|
||||
if (!this.isPruningEnabled()) return;
|
||||
|
||||
if (this.isShuttingDown) {
|
||||
this.logger.warn('[Pruning] Cannot start pruning while shutting down');
|
||||
return;
|
||||
@@ -64,6 +66,8 @@ export class PruningService {
|
||||
}
|
||||
|
||||
stopPruning() {
|
||||
if (!this.isPruningEnabled()) return;
|
||||
|
||||
this.logger.debug('[Pruning] Removing soft-deletion and hard-deletion timers');
|
||||
|
||||
clearInterval(this.softDeletionInterval);
|
||||
|
||||
Reference in New Issue
Block a user