fix(core): Fix ignoring crashed executions without event msgs (#7368)
when the event logs do not contain messages for running executions, the recovery/crash detection on startup would skip these. this PR fixes that.
This commit is contained in:
committed by
GitHub
parent
c77042f2bb
commit
2f4d91b2cd
@@ -1,6 +1,7 @@
|
||||
import { LoggerProxy, jsonParse } from 'n8n-workflow';
|
||||
import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
|
||||
import type { DeleteResult } from 'typeorm';
|
||||
import { In } from 'typeorm';
|
||||
import type {
|
||||
EventMessageTypes,
|
||||
EventNamesTypes,
|
||||
@@ -132,7 +133,23 @@ export class MessageEventBus extends EventEmitter {
|
||||
this.logWriter?.startLogging();
|
||||
await this.send(unsentAndUnfinished.unsentMessages);
|
||||
|
||||
const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions);
|
||||
let unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions);
|
||||
|
||||
// if we are in queue mode, running jobs may still be running on a worker despite the main process
|
||||
// crashing, so we can't just mark them as crashed
|
||||
if (config.get('executions.mode') !== 'queue') {
|
||||
const dbUnfinishedExecutionIds = (
|
||||
await Container.get(ExecutionRepository).find({
|
||||
where: {
|
||||
status: In(['running', 'new', 'unknown']),
|
||||
},
|
||||
select: ['id'],
|
||||
})
|
||||
).map((e) => e.id);
|
||||
unfinishedExecutionIds = Array.from(
|
||||
new Set<string>([...unfinishedExecutionIds, ...dbUnfinishedExecutionIds]),
|
||||
);
|
||||
}
|
||||
|
||||
if (unfinishedExecutionIds.length > 0) {
|
||||
LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`);
|
||||
@@ -160,11 +177,18 @@ export class MessageEventBus extends EventEmitter {
|
||||
this.logWriter?.startRecoveryProcess();
|
||||
for (const executionId of unfinishedExecutionIds) {
|
||||
LoggerProxy.warn(`Attempting to recover execution ${executionId}`);
|
||||
await recoverExecutionDataFromEventLogMessages(
|
||||
executionId,
|
||||
unsentAndUnfinished.unfinishedExecutions[executionId],
|
||||
true,
|
||||
);
|
||||
if (!unsentAndUnfinished.unfinishedExecutions[executionId]?.length) {
|
||||
LoggerProxy.debug(
|
||||
`No event messages found, marking execution ${executionId} as 'crashed'`,
|
||||
);
|
||||
await Container.get(ExecutionRepository).markAsCrashed([executionId]);
|
||||
} else {
|
||||
await recoverExecutionDataFromEventLogMessages(
|
||||
executionId,
|
||||
unsentAndUnfinished.unfinishedExecutions[executionId],
|
||||
true,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// remove the recovery process flag file
|
||||
|
||||
@@ -11,7 +11,7 @@ import { ExecutionRepository } from '@db/repositories';
|
||||
export async function recoverExecutionDataFromEventLogMessages(
|
||||
executionId: string,
|
||||
messages: EventMessageTypes[],
|
||||
applyToDb = true,
|
||||
applyToDb: boolean,
|
||||
): Promise<IRunExecutionData | undefined> {
|
||||
const executionEntry = await Container.get(ExecutionRepository).findSingleExecution(executionId, {
|
||||
includeData: true,
|
||||
|
||||
Reference in New Issue
Block a user