diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 09be1d79c..60da7fb29 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -27,6 +27,8 @@ import { } from '../EventMessageClasses/EventMessageGeneric'; import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; +import Container from 'typedi'; +import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -93,6 +95,10 @@ export class MessageEventBus extends EventEmitter { LoggerProxy.debug('Initializing event writer'); this.logWriter = await MessageEventBusLogWriter.getInstance(); + if (!this.logWriter) { + LoggerProxy.warn('Could not initialize event writer'); + } + // unsent event check: // - find unsent messages in current event log(s) // - cycle event logs and start the logging to a fresh file @@ -105,14 +111,47 @@ export class MessageEventBus extends EventEmitter { this.logWriter?.startLogging(); await this.send(unsentAndUnfinished.unsentMessages); - if (Object.keys(unsentAndUnfinished.unfinishedExecutions).length > 0) { - for (const executionId of Object.keys(unsentAndUnfinished.unfinishedExecutions)) { - await recoverExecutionDataFromEventLogMessages( - executionId, - unsentAndUnfinished.unfinishedExecutions[executionId], - true, - ); + const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions); + + if (unfinishedExecutionIds.length > 0) { + LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`); + LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.'); + const activeWorkflows = await Container.get(WorkflowRepository).find({ + where: { active: true }, + select: ['id', 'name'], + }); + if (activeWorkflows.length > 0) { + LoggerProxy.info('Currently active workflows:'); + for (const workflowData of activeWorkflows) { + LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`); + } } + if (this.logWriter?.isRecoveryProcessRunning()) { + // if we end up here, it means that the previous recovery process did not finish + // a possible reason would be that recreating the workflow data itself caused e.g an OOM error + // in that case, we do not want to retry the recovery process, but rather mark the executions as crashed + LoggerProxy.warn('Skipping recover process since it previously failed.'); + for (const executionId of unfinishedExecutionIds) { + LoggerProxy.info(`Setting status of execution ${executionId} to crashed`); + await Container.get(ExecutionRepository).updateExistingExecution(executionId, { + status: 'crashed', + stoppedAt: new Date(), + }); + } + } else { + // start actual recovery process and write recovery process flag file + this.logWriter?.startRecoveryProcess(); + for (const executionId of unfinishedExecutionIds) { + LoggerProxy.warn(`Attempting to recover execution ${executionId}`); + await recoverExecutionDataFromEventLogMessages( + executionId, + unsentAndUnfinished.unfinishedExecutions[executionId], + true, + ); + } + } + // remove the recovery process flag file + this.logWriter?.endRecoveryProcess(); } // if configured, run this test every n ms diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts index 80837609a..470e832c6 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts @@ -98,6 +98,22 @@ export class MessageEventBusLogWriter { } } + startRecoveryProcess() { + if (this.worker) { + this.worker.postMessage({ command: 'startRecoveryProcess', data: {} }); + } + } + + isRecoveryProcessRunning(): boolean { + return existsSync(this.getRecoveryInProgressFileName()); + } + + endRecoveryProcess() { + if (this.worker) { + this.worker.postMessage({ command: 'endRecoveryProcess', data: {} }); + } + } + private async startThread() { if (this.worker) { await this.close(); @@ -240,6 +256,10 @@ export class MessageEventBusLogWriter { } } + getRecoveryInProgressFileName(): string { + return `${MessageEventBusLogWriter.options.logFullBasePath}.recoveryInProgress`; + } + cleanAllLogs() { for (let i = 0; i <= MessageEventBusLogWriter.options.keepNumberOfFiles; i++) { if (existsSync(this.getLogFileName(i))) { diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts index 06d95a3bb..5e2d77118 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts @@ -25,6 +25,25 @@ function setKeepFiles(keepNumberOfFiles: number) { keepFiles = keepNumberOfFiles; } +function buildRecoveryInProgressFileName(): string { + return `${logFileBasePath}.recoveryInProgress`; +} + +function startRecoveryProcess() { + if (existsSync(buildRecoveryInProgressFileName())) { + return false; + } + const fileHandle = openSync(buildRecoveryInProgressFileName(), 'a'); + closeSync(fileHandle); + return true; +} + +function endRecoveryProcess() { + if (existsSync(buildRecoveryInProgressFileName())) { + rmSync(buildRecoveryInProgressFileName()); + } +} + function buildLogFileNameWithCounter(counter?: number): string { if (counter) { return `${logFileBasePath}-${counter}.log`; @@ -112,6 +131,14 @@ if (!isMainThread) { cleanAllLogs(); parentPort?.postMessage('cleanedAllLogs'); break; + case 'startRecoveryProcess': + const recoveryStarted = startRecoveryProcess(); + parentPort?.postMessage({ command, data: recoveryStarted }); + break; + case 'endRecoveryProcess': + endRecoveryProcess(); + parentPort?.postMessage({ command, data: true }); + break; default: break; }