feat(core): Coordinate manual workflow activation and deactivation in multi-main scenario (#7643)

Followup to #7566 | Story: https://linear.app/n8n/issue/PAY-926

### Manual workflow activation and deactivation

In a multi-main scenario, if the user manually activates or deactivates
a workflow, the process (whether leader or follower) that handles the
PATCH request and updates its internal state should send a message into
the command channel, so that all other main processes update their
internal state accordingly:

- Add to `ActiveWorkflows` if activating
- Remove from `ActiveWorkflows` if deactivating
- Remove and re-add to `ActiveWorkflows` if the update did not change
activation status.

After updating their internal state, if activating or deactivating, the
recipient main processes should push a message to all connected
frontends so that these can update their stores and so reflect the value
in the UI.

### Workflow activation errors

On failure to activate a workflow, the main instance should record the
error in Redis - main instances should always pull activation errors
from Redis in a multi-main scenario.

### Leadership change

On leadership change...

- The old leader should stop pruning and the new leader should start
pruning.
- The old leader should remove trigger- and poller-based workflows and
the new leader should add them.
This commit is contained in:
Iván Ovejero
2023-11-17 15:58:50 +01:00
committed by GitHub
parent b3a3f16bc2
commit 4c4082503c
33 changed files with 639 additions and 336 deletions

View File

@@ -80,21 +80,21 @@ export class CacheService extends EventEmitter {
* @param options.refreshTtl Optional ttl for the refreshFunction's set call
* @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided
*/
async get(
async get<T = unknown>(
key: string,
options: {
fallbackValue?: unknown;
refreshFunction?: (key: string) => Promise<unknown>;
fallbackValue?: T;
refreshFunction?: (key: string) => Promise<T>;
refreshTtl?: number;
} = {},
): Promise<unknown> {
): Promise<T | undefined> {
if (!key || key.length === 0) {
return;
}
const value = await this.cache?.store.get(key);
if (value !== undefined) {
this.emit(this.metricsCounterEvents.cacheHit);
return value;
return value as T;
}
this.emit(this.metricsCounterEvents.cacheMiss);
if (options.refreshFunction) {

View File

@@ -5,7 +5,7 @@ import config from '@/config';
import { EventEmitter } from 'node:events';
export abstract class OrchestrationService extends EventEmitter {
protected initialized = false;
protected isInitialized = false;
protected queueModeId: string;
@@ -36,17 +36,17 @@ export abstract class OrchestrationService extends EventEmitter {
}
sanityCheck(): boolean {
return this.initialized && this.isQueueMode;
return this.isInitialized && this.isQueueMode;
}
async init() {
await this.initPublisher();
this.initialized = true;
this.isInitialized = true;
}
async shutdown() {
await this.redisPublisher?.destroy();
this.initialized = false;
this.isInitialized = false;
}
protected async initPublisher() {

View File

@@ -1,50 +1,61 @@
import config from '@/config';
import { Service } from 'typedi';
import { TIME } from '@/constants';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { getRedisPrefix } from '@/services/redis/RedisServiceHelper';
/**
* For use in main instance, in multiple main instances cluster.
*/
@Service()
export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
export class MultiMainSetup extends SingleMainSetup {
private id = this.queueModeId;
private leaderId: string | undefined;
private isLicensed = false;
get isEnabled() {
return (
config.getEnv('executions.mode') === 'queue' &&
config.getEnv('multiMainSetup.enabled') &&
this.isLicensed
);
}
get isLeader() {
return this.id === this.leaderId;
return config.getEnv('multiMainSetup.instanceType') === 'leader';
}
get isFollower() {
return !this.isLeader;
}
setLicensed(newState: boolean) {
this.isLicensed = newState;
}
private readonly leaderKey = getRedisPrefix() + ':main_instance_leader';
private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl');
private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl');
private leaderCheckInterval: NodeJS.Timer | undefined;
async init() {
if (this.initialized) return;
if (this.isInitialized) return;
await this.initPublisher();
this.initialized = true;
this.isInitialized = true;
await this.tryBecomeLeader();
await this.tryBecomeLeader(); // prevent initial wait
this.leaderCheckInterval = setInterval(
async () => {
await this.checkLeader();
},
config.getEnv('leaderSelection.interval') * TIME.SECOND,
config.getEnv('multiMainSetup.interval') * TIME.SECOND,
);
}
async destroy() {
async shutdown() {
if (!this.isInitialized) return;
clearInterval(this.leaderCheckInterval);
if (this.isLeader) await this.redisPublisher.clear(this.leaderKey);
@@ -69,12 +80,17 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
} else {
this.logger.debug(`Leader is other instance "${leaderId}"`);
this.leaderId = leaderId;
config.set('multiMainSetup.instanceType', 'follower');
}
}
private async tryBecomeLeader() {
if (this.isLeader || !this.redisPublisher.redisClient) return;
if (
config.getEnv('multiMainSetup.instanceType') === 'leader' ||
!this.redisPublisher.redisClient
) {
return;
}
// this can only succeed if leadership is currently vacant
const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id);
@@ -82,11 +98,36 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
if (keySetSuccessfully) {
this.logger.debug(`Leader is now this instance "${this.id}"`);
this.leaderId = this.id;
this.emit('leadershipChange', this.id);
config.set('multiMainSetup.instanceType', 'leader');
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
this.emit('leadershipChange', this.id);
} else {
config.set('multiMainSetup.instanceType', 'follower');
}
}
async broadcastWorkflowActiveStateChanged(payload: {
workflowId: string;
oldState: boolean;
newState: boolean;
versionId: string;
}) {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'workflowActiveStateChanged',
payload,
});
}
async broadcastWorkflowFailedToActivate(payload: { workflowId: string; errorMessage: string }) {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'workflowFailedToActivate',
payload,
});
}
}

View File

@@ -6,13 +6,13 @@ import { OrchestrationService } from '@/services/orchestration.base.service';
* For use in main instance, in single main instance scenario.
*/
@Service()
export class SingleMainInstancePublisher extends OrchestrationService {
export class SingleMainSetup extends OrchestrationService {
constructor(protected readonly logger: Logger) {
super();
}
sanityCheck() {
return this.initialized && this.isQueueMode && this.isMainInstance;
return this.isInitialized && this.isQueueMode && this.isMainInstance;
}
async getWorkerStatus(id?: string) {

View File

@@ -5,12 +5,17 @@ import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { License } from '@/License';
import { Logger } from '@/Logger';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { Push } from '@/push';
import { MultiMainSetup } from './MultiMainSetup.ee';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
export async function handleCommandMessageMain(messageString: string) {
const queueModeId = config.get('redis.queueModeId');
const isMainInstance = config.get('generic.instanceType') === 'main';
const queueModeId = config.getEnv('redis.queueModeId');
const isMainInstance = config.getEnv('generic.instanceType') === 'main';
const message = messageToRedisServiceCommandObject(messageString);
const logger = Container.get(Logger);
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
if (message) {
logger.debug(
@@ -35,7 +40,7 @@ export async function handleCommandMessageMain(messageString: string) {
return message;
}
if (isMainInstance && !config.getEnv('leaderSelection.enabled')) {
if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) {
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
logger.error(
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
@@ -60,6 +65,68 @@ export async function handleCommandMessageMain(messageString: string) {
return message;
}
await Container.get(ExternalSecretsManager).reloadAllProviders();
break;
case 'workflowActiveStateChanged': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;
}
const { workflowId, oldState, newState, versionId } = message.payload ?? {};
if (
typeof workflowId !== 'string' ||
typeof oldState !== 'boolean' ||
typeof newState !== 'boolean' ||
typeof versionId !== 'string'
) {
break;
}
const push = Container.get(Push);
if (!oldState && newState) {
try {
await activeWorkflowRunner.add(workflowId, 'activate');
push.broadcast('workflowActivated', { workflowId });
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
await Container.get(WorkflowRepository).update(workflowId, {
active: false,
versionId,
});
await Container.get(MultiMainSetup).broadcastWorkflowFailedToActivate({
workflowId,
errorMessage: error.message,
});
}
} else if (oldState && !newState) {
await activeWorkflowRunner.remove(workflowId);
push.broadcast('workflowDeactivated', { workflowId });
} else {
await activeWorkflowRunner.remove(workflowId);
await activeWorkflowRunner.add(workflowId, 'update');
}
await activeWorkflowRunner.removeActivationError(workflowId);
}
case 'workflowFailedToActivate': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;
}
const { workflowId, errorMessage } = message.payload ?? {};
if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break;
Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage });
}
default:
break;
}

View File

@@ -4,6 +4,6 @@ import { OrchestrationService } from '../../orchestration.base.service';
@Service()
export class OrchestrationWebhookService extends OrchestrationService {
sanityCheck(): boolean {
return this.initialized && this.isQueueMode && this.isWebhookInstance;
return this.isInitialized && this.isQueueMode && this.isWebhookInstance;
}
}

View File

@@ -5,7 +5,7 @@ import { OrchestrationService } from '../../orchestration.base.service';
@Service()
export class OrchestrationWorkerService extends OrchestrationService {
sanityCheck(): boolean {
return this.initialized && this.isQueueMode && this.isWorkerInstance;
return this.isInitialized && this.isQueueMode && this.isWorkerInstance;
}
async publishToEventLog(message: AbstractEventMessage) {

View File

@@ -1,4 +1,4 @@
import Container, { Service } from 'typedi';
import { Service } from 'typedi';
import { BinaryDataService } from 'n8n-core';
import { LessThanOrEqual, IsNull, Not, In, Brackets } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
@@ -23,16 +23,13 @@ export class PruningService {
public hardDeletionTimeout: NodeJS.Timeout | undefined;
private isMultiMainScenario =
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService,
) {}
async isPruningEnabled() {
isPruningEnabled() {
if (
!config.getEnv('executions.pruneData') ||
inTest ||
@@ -41,75 +38,60 @@ export class PruningService {
return false;
}
if (this.isMultiMainScenario) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
return multiMainInstancePublisher.isLeader;
if (
config.getEnv('multiMainSetup.enabled') &&
config.getEnv('multiMainSetup.instanceType') === 'follower'
) {
return false;
}
return true;
}
/**
* @important Call only after DB connection is established and migrations have completed.
* @important Call this method only after DB migrations have completed.
*/
startPruning() {
this.logger.debug('[Pruning] Starting soft-deletion and hard-deletion timers');
this.setSoftDeletionInterval();
this.scheduleHardDeletion();
}
async stopPruning() {
if (this.isMultiMainScenario) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
if (multiMainInstancePublisher.isFollower) return;
}
this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)');
stopPruning() {
this.logger.debug('[Pruning] Removing soft-deletion and hard-deletion timers');
clearInterval(this.softDeletionInterval);
clearTimeout(this.hardDeletionTimeout);
}
private setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`);
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
this.softDeletionInterval = setInterval(
async () => this.softDeleteOnPruningCycle(),
this.rates.softDeletion,
);
this.logger.debug(`[Pruning] Soft-deletion scheduled every ${when}`);
}
private scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`);
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
this.hardDeletionTimeout = setTimeout(
async () => this.hardDeleteOnPruningCycle(),
this.rates.hardDeletion,
);
this.logger.debug(`[Pruning] Hard-deletion scheduled for next ${when}`);
}
/**
* Mark executions as deleted based on age and count, in a pruning cycle.
*/
async softDeleteOnPruningCycle() {
this.logger.debug('Starting soft-deletion of executions (pruning cycle)');
this.logger.debug('[Pruning] Starting soft-deletion of executions');
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');
@@ -157,8 +139,11 @@ export class PruningService {
.execute();
if (result.affected === 0) {
this.logger.debug('Found no executions to soft-delete (pruning cycle)');
this.logger.debug('[Pruning] Found no executions to soft-delete');
return;
}
this.logger.debug('[Pruning] Soft-deleted executions', { count: result.affected });
}
/**
@@ -187,21 +172,23 @@ export class PruningService {
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
if (executionIds.length === 0) {
this.logger.debug('Found no executions to hard-delete (pruning cycle)');
this.logger.debug('[Pruning] Found no executions to hard-delete');
this.scheduleHardDeletion();
return;
}
try {
this.logger.debug('Starting hard-deletion of executions (pruning cycle)', {
this.logger.debug('[Pruning] Starting hard-deletion of executions', {
executionIds,
});
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
await this.executionRepository.delete({ id: In(executionIds) });
this.logger.debug('[Pruning] Hard-deleted executions', { executionIds });
} catch (error) {
this.logger.error('Failed to hard-delete executions (pruning cycle)', {
this.logger.error('[Pruning] Failed to hard-delete executions', {
executionIds,
error: error instanceof Error ? error.message : `${error}`,
});

View File

@@ -6,7 +6,9 @@ export type RedisServiceCommand =
| 'restartEventBus'
| 'stopWorker'
| 'reloadLicense'
| 'reloadExternalSecretsProviders';
| 'reloadExternalSecretsProviders'
| 'workflowActiveStateChanged' // multi-main only
| 'workflowFailedToActivate'; // multi-main only
/**
* An object to be sent via Redis pub/sub from the main process to the workers.
@@ -50,6 +52,14 @@ export type RedisServiceWorkerResponseObject = {
| {
command: 'stopWorker';
}
| {
command: 'workflowActiveStateChanged';
payload: {
oldState: boolean;
newState: boolean;
workflowId: string;
};
}
);
export type RedisServiceCommandObject = {