feat(core): Add commands to workers to respond with current state (#7029)

This PR adds new endpoints to the REST API:
`/orchestration/worker/status` and `/orchestration/worker/id`

Currently these just trigger the return of status / ids from the workers
via the redis back channel, this still needs to be handled and passed
through to the frontend.

It also adds the eventbus to each worker, and triggers a reload of those
eventbus instances when the configuration changes on the main instances.
This commit is contained in:
Michael Auerswald
2023-09-07 14:44:19 +02:00
committed by GitHub
parent 0a35025e5e
commit 7b49cf2a2c
14 changed files with 794 additions and 225 deletions

View File

@@ -29,6 +29,7 @@ import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import Container from 'typedi';
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
import { OrchestrationService } from '../../services/orchestration.service';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
@@ -37,6 +38,11 @@ export interface MessageWithCallback {
confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void;
}
export interface MessageEventBusInitializeOptions {
skipRecoveryPass?: boolean;
workerId?: string;
}
export class MessageEventBus extends EventEmitter {
private static instance: MessageEventBus;
@@ -70,7 +76,7 @@ export class MessageEventBus extends EventEmitter {
*
* Sets `isInitialized` to `true` once finished.
*/
async initialize() {
async initialize(options?: MessageEventBusInitializeOptions): Promise<void> {
if (this.isInitialized) {
return;
}
@@ -93,64 +99,75 @@ export class MessageEventBus extends EventEmitter {
}
LoggerProxy.debug('Initializing event writer');
this.logWriter = await MessageEventBusLogWriter.getInstance();
if (options?.workerId) {
// only add 'worker' to log file name since the ID changes on every start and we
// would not be able to recover the log files from the previous run not knowing it
const logBaseName = config.getEnv('eventBus.logWriter.logBaseName') + '-worker';
this.logWriter = await MessageEventBusLogWriter.getInstance({
logBaseName,
});
} else {
this.logWriter = await MessageEventBusLogWriter.getInstance();
}
if (!this.logWriter) {
LoggerProxy.warn('Could not initialize event writer');
}
// unsent event check:
// - find unsent messages in current event log(s)
// - cycle event logs and start the logging to a fresh file
// - retry sending events
LoggerProxy.debug('Checking for unsent event messages');
const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions();
LoggerProxy.debug(
`Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `,
);
this.logWriter?.startLogging();
await this.send(unsentAndUnfinished.unsentMessages);
if (options?.skipRecoveryPass) {
LoggerProxy.debug('Skipping unsent event check');
} else {
// unsent event check:
// - find unsent messages in current event log(s)
// - cycle event logs and start the logging to a fresh file
// - retry sending events
LoggerProxy.debug('Checking for unsent event messages');
const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions();
LoggerProxy.debug(
`Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `,
);
this.logWriter?.startLogging();
await this.send(unsentAndUnfinished.unsentMessages);
const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions);
const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions);
if (unfinishedExecutionIds.length > 0) {
LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`);
LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.');
const activeWorkflows = await Container.get(WorkflowRepository).find({
where: { active: true },
select: ['id', 'name'],
});
if (activeWorkflows.length > 0) {
LoggerProxy.info('Currently active workflows:');
for (const workflowData of activeWorkflows) {
LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
if (unfinishedExecutionIds.length > 0) {
LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`);
LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.');
const activeWorkflows = await Container.get(WorkflowRepository).find({
where: { active: true },
select: ['id', 'name'],
});
if (activeWorkflows.length > 0) {
LoggerProxy.info('Currently active workflows:');
for (const workflowData of activeWorkflows) {
LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
}
}
}
const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning();
if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') {
await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds);
// if we end up here, it means that the previous recovery process did not finish
// a possible reason would be that recreating the workflow data itself caused e.g an OOM error
// in that case, we do not want to retry the recovery process, but rather mark the executions as crashed
if (recoveryAlreadyAttempted)
LoggerProxy.warn('Skipped recovery process since it previously failed.');
} else {
// start actual recovery process and write recovery process flag file
this.logWriter?.startRecoveryProcess();
for (const executionId of unfinishedExecutionIds) {
LoggerProxy.warn(`Attempting to recover execution ${executionId}`);
await recoverExecutionDataFromEventLogMessages(
executionId,
unsentAndUnfinished.unfinishedExecutions[executionId],
true,
);
const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning();
if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') {
await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds);
// if we end up here, it means that the previous recovery process did not finish
// a possible reason would be that recreating the workflow data itself caused e.g an OOM error
// in that case, we do not want to retry the recovery process, but rather mark the executions as crashed
if (recoveryAlreadyAttempted)
LoggerProxy.warn('Skipped recovery process since it previously failed.');
} else {
// start actual recovery process and write recovery process flag file
this.logWriter?.startRecoveryProcess();
for (const executionId of unfinishedExecutionIds) {
LoggerProxy.warn(`Attempting to recover execution ${executionId}`);
await recoverExecutionDataFromEventLogMessages(
executionId,
unsentAndUnfinished.unfinishedExecutions[executionId],
true,
);
}
}
// remove the recovery process flag file
this.logWriter?.endRecoveryProcess();
}
// remove the recovery process flag file
this.logWriter?.endRecoveryProcess();
}
// if configured, run this test every n ms
if (config.getEnv('eventBus.checkUnsentInterval') > 0) {
if (this.pushIntervalTimer) {
@@ -192,6 +209,12 @@ export class MessageEventBus extends EventEmitter {
return result;
}
async broadcastRestartEventbusAfterDestinationUpdate() {
if (config.getEnv('executions.mode') === 'queue') {
await Container.get(OrchestrationService).restartEventBus();
}
}
private async trySendingUnsent(msgs?: EventMessageTypes[]) {
const unsentMessages = msgs ?? (await this.getEventsUnsent());
if (unsentMessages.length > 0) {
@@ -212,9 +235,15 @@ export class MessageEventBus extends EventEmitter {
);
await this.destinations[destinationName].close();
}
this.isInitialized = false;
LoggerProxy.debug('EventBus shut down.');
}
async restart() {
await this.close();
await this.initialize({ skipRecoveryPass: true });
}
async send(msgs: EventMessageTypes | EventMessageTypes[]) {
if (!Array.isArray(msgs)) {
msgs = [msgs];