refactor(core): Revamp crash recovery mechanism for main mode (#9613)

This commit is contained in:
Iván Ovejero
2024-06-07 16:19:59 +02:00
committed by GitHub
parent 291d46af15
commit b8338e3610
10 changed files with 648 additions and 223 deletions

View File

@@ -0,0 +1,420 @@
import Container from 'typedi';
import { stringify } from 'flatted';
import { mockInstance } from '@test/mocking';
import { randomInteger } from '@test-integration/random';
import { createWorkflow } from '@test-integration/db/workflows';
import { createExecution } from '@test-integration/db/executions';
import * as testDb from '@test-integration/testDb';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { InternalHooks } from '@/InternalHooks';
import { Push } from '@/push';
import { ARTIFICIAL_TASK_DATA } from '@/constants';
import { NodeCrashedError } from '@/errors/node-crashed.error';
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNode';
import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow';
import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses';
import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity';
import { NodeConnectionType } from 'n8n-workflow';
/**
* Workflow producing an execution whose data will be truncated by an instance crash.
*/
export const OOM_WORKFLOW: Partial<WorkflowEntity> = {
nodes: [
{
parameters: {},
id: '48ce17fe-9651-42ae-910c-48602a00f0bb',
name: 'When clicking "Test workflow"',
type: 'n8n-nodes-base.manualTrigger',
typeVersion: 1,
position: [640, 260],
},
{
parameters: {
category: 'oom',
memorySizeValue: 1000,
},
id: '07a48151-96d3-45eb-961c-1daf85fbe052',
name: 'DebugHelper',
type: 'n8n-nodes-base.debugHelper',
typeVersion: 1,
position: [840, 260],
},
],
connections: {
'When clicking "Test workflow"': {
main: [
[
{
node: 'DebugHelper',
type: NodeConnectionType.Main,
index: 0,
},
],
],
},
},
pinData: {},
};
/**
* Snapshot of an execution that will be truncated by an instance crash.
*/
export const IN_PROGRESS_EXECUTION_DATA = {
startData: {},
resultData: {
runData: {
'When clicking "Test workflow"': [
{
hints: [],
startTime: 1716138610153,
executionTime: 1,
source: [],
executionStatus: 'success',
data: {
main: [
[
{
json: {},
pairedItem: {
item: 0,
},
},
],
],
},
},
],
},
lastNodeExecuted: 'When clicking "Test workflow"',
},
executionData: {
contextData: {},
nodeExecutionStack: [
{
node: {
parameters: {
category: 'oom',
memorySizeValue: 1000,
},
id: '07a48151-96d3-45eb-961c-1daf85fbe052',
name: 'DebugHelper',
type: 'n8n-nodes-base.debugHelper',
typeVersion: 1,
position: [840, 260],
},
data: {
main: [
[
{
json: {},
pairedItem: {
item: 0,
},
},
],
],
},
source: {
main: [
{
previousNode: 'When clicking "Test workflow"',
},
],
},
},
],
metadata: {},
waitingExecution: {},
waitingExecutionSource: {},
},
};
export const setupMessages = (executionId: string, workflowName: string): EventMessage[] => {
return [
new EventMessageWorkflow({
eventName: 'n8n.workflow.started',
payload: { executionId },
}),
new EventMessageNode({
eventName: 'n8n.node.started',
payload: {
executionId,
workflowName,
nodeName: 'When clicking "Test workflow"',
nodeType: 'n8n-nodes-base.manualTrigger',
},
}),
new EventMessageNode({
eventName: 'n8n.node.finished',
payload: {
executionId,
workflowName,
nodeName: 'When clicking "Test workflow"',
nodeType: 'n8n-nodes-base.manualTrigger',
},
}),
new EventMessageNode({
eventName: 'n8n.node.started',
payload: {
executionId,
workflowName,
nodeName: 'DebugHelper',
nodeType: 'n8n-nodes-base.debugHelper',
},
}),
];
};
describe('ExecutionRecoveryService', () => {
let executionRecoveryService: ExecutionRecoveryService;
let push: Push;
beforeAll(async () => {
await testDb.init();
mockInstance(InternalHooks);
push = mockInstance(Push);
executionRecoveryService = new ExecutionRecoveryService(
push,
Container.get(ExecutionRepository),
);
});
afterEach(async () => {
await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']);
});
afterAll(async () => {
await testDb.terminate();
});
describe('recover', () => {
it('should amend, persist, run hooks, broadcast', async () => {
/**
* Arrange
*/
// @ts-expect-error Private method
const amendSpy = jest.spyOn(executionRecoveryService, 'amend');
const executionRepository = Container.get(ExecutionRepository);
const dbUpdateSpy = jest.spyOn(executionRepository, 'update');
// @ts-expect-error Private method
const runHooksSpy = jest.spyOn(executionRecoveryService, 'runHooks');
const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
{
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
const messages = setupMessages(execution.id, workflow.name);
/**
* Act
*/
await executionRecoveryService.recover(execution.id, messages);
/**
* Assert
*/
expect(amendSpy).toHaveBeenCalledTimes(1);
expect(amendSpy).toHaveBeenCalledWith(execution.id, messages);
expect(dbUpdateSpy).toHaveBeenCalledTimes(1);
expect(runHooksSpy).toHaveBeenCalledTimes(1);
expect(push.once).toHaveBeenCalledTimes(1);
});
test('should amend a truncated execution where last node did not finish', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
{
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
const messages = setupMessages(execution.id, workflow.name);
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recover(execution.id, messages);
/**
* Assert
*/
const startOfLastNodeRun = messages
.find((m) => m.eventName === 'n8n.node.started' && m.payload.nodeName === 'DebugHelper')
?.ts.toJSDate();
expect(amendedExecution).toEqual(
expect.objectContaining({
status: 'crashed',
stoppedAt: startOfLastNodeRun,
}),
);
const resultData = amendedExecution?.data.resultData;
if (!resultData) fail('Expected `resultData` to be defined');
expect(resultData.error).toBeInstanceOf(WorkflowCrashedError);
expect(resultData.lastNodeExecuted).toBe('DebugHelper');
const runData = resultData.runData;
if (!runData) fail('Expected `runData` to be defined');
const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0);
const debugHelperTaskData = runData.DebugHelper.at(0);
expect(manualTriggerTaskData?.executionStatus).toBe('success');
expect(manualTriggerTaskData?.error).toBeUndefined();
expect(manualTriggerTaskData?.startTime).not.toBe(ARTIFICIAL_TASK_DATA);
expect(debugHelperTaskData?.executionStatus).toBe('crashed');
expect(debugHelperTaskData?.error).toBeInstanceOf(NodeCrashedError);
});
test('should amend a truncated execution where last node finished', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
{
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
const messages = setupMessages(execution.id, workflow.name);
messages.push(
new EventMessageNode({
eventName: 'n8n.node.finished',
payload: {
executionId: execution.id,
workflowName: workflow.name,
nodeName: 'DebugHelper',
nodeType: 'n8n-nodes-base.debugHelper',
},
}),
);
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recover(execution.id, messages);
/**
* Assert
*/
const endOfLastNoderun = messages
.find((m) => m.eventName === 'n8n.node.finished' && m.payload.nodeName === 'DebugHelper')
?.ts.toJSDate();
expect(amendedExecution).toEqual(
expect.objectContaining({
status: 'crashed',
stoppedAt: endOfLastNoderun,
}),
);
const resultData = amendedExecution?.data.resultData;
if (!resultData) fail('Expected `resultData` to be defined');
expect(resultData.error).toBeUndefined();
expect(resultData.lastNodeExecuted).toBe('DebugHelper');
const runData = resultData.runData;
if (!runData) fail('Expected `runData` to be defined');
const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0);
const debugHelperTaskData = runData.DebugHelper.at(0);
expect(manualTriggerTaskData?.executionStatus).toBe('success');
expect(manualTriggerTaskData?.error).toBeUndefined();
expect(debugHelperTaskData?.executionStatus).toBe('success');
expect(debugHelperTaskData?.error).toBeUndefined();
expect(debugHelperTaskData?.data).toEqual(ARTIFICIAL_TASK_DATA);
});
test('should return `null` if no messages', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
{
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
const noMessages: EventMessage[] = [];
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recover(execution.id, noMessages);
/**
* Assert
*/
expect(amendedExecution).toBeNull();
});
test('should return `null` if no execution', async () => {
/**
* Arrange
*/
const inexistentExecutionId = randomInteger(100).toString();
const messages = setupMessages(inexistentExecutionId, 'Some workflow');
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recover(
inexistentExecutionId,
messages,
);
/**
* Assert
*/
expect(amendedExecution).toBeNull();
});
});
});

View File

@@ -0,0 +1,189 @@
import Container, { Service } from 'typedi';
import { Push } from '@/push';
import { sleep } from 'n8n-workflow';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle
import { InternalHooks } from '@/InternalHooks'; // @TODO: Dependency cycle if injected
import type { DateTime } from 'luxon';
import type { IRun, ITaskData } from 'n8n-workflow';
import type { EventMessageTypes } from '../eventbus/EventMessageClasses';
import type { IExecutionResponse } from '@/Interfaces';
import { NodeCrashedError } from '@/errors/node-crashed.error';
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
import { ARTIFICIAL_TASK_DATA } from '@/constants';
/**
* Service for recovering executions truncated by an instance crash.
*/
@Service()
export class ExecutionRecoveryService {
constructor(
private readonly push: Push,
private readonly executionRepository: ExecutionRepository,
) {}
/**
* "Recovery" means (1) amending key properties of a truncated execution,
* (2) running post-execution hooks, and (3) returning the amended execution
* so the UI can reflect the error. "Recovery" does **not** mean injecting
* execution data from the logs (they hold none), or resuming the execution
* from the point of truncation, or re-running the whole execution.
*
* Recovery is only possible if event logs are available in the container.
* In regular mode, logs should but might not be available, e.g. due to container
* being recycled, max log size causing rotation, etc. In queue mode, as workers
* log to their own filesystems, only manual exections can be recovered.
*/
async recover(executionId: string, messages: EventMessageTypes[]) {
if (messages.length === 0) return null;
const amendedExecution = await this.amend(executionId, messages);
if (!amendedExecution) return null;
await this.executionRepository.updateExistingExecution(executionId, amendedExecution);
await this.runHooks(amendedExecution);
this.push.once('editorUiConnected', async () => {
await sleep(1000);
this.push.broadcast('executionRecovered', { executionId });
});
return amendedExecution;
}
/**
* Amend `status`, `stoppedAt`, and `data` of an execution using event log messages.
*/
private async amend(executionId: string, messages: EventMessageTypes[]) {
const { nodeMessages, workflowMessages } = this.toRelevantMessages(messages);
if (nodeMessages.length === 0) return null;
const execution = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!execution) return null;
const runExecutionData = execution.data ?? { resultData: { runData: {} } };
let lastNodeRunTimestamp: DateTime | undefined;
for (const node of execution.workflowData.nodes) {
const nodeStartedMessage = nodeMessages.find(
(m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.started',
);
if (!nodeStartedMessage) continue;
const nodeFinishedMessage = nodeMessages.find(
(m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.finished',
);
const taskData: ITaskData = {
startTime: nodeStartedMessage.ts.toUnixInteger(),
executionTime: -1,
source: [null],
};
if (nodeFinishedMessage) {
taskData.executionStatus = 'success';
taskData.data ??= ARTIFICIAL_TASK_DATA;
taskData.executionTime = nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis();
lastNodeRunTimestamp = nodeFinishedMessage.ts;
} else {
taskData.executionStatus = 'crashed';
taskData.error = new NodeCrashedError(node);
taskData.executionTime = 0;
runExecutionData.resultData.error = new WorkflowCrashedError();
lastNodeRunTimestamp = nodeStartedMessage.ts;
}
runExecutionData.resultData.lastNodeExecuted = node.name;
runExecutionData.resultData.runData[node.name] = [taskData];
}
return {
...execution,
status: execution.status === 'error' ? 'error' : 'crashed',
stoppedAt: this.toStoppedAt(lastNodeRunTimestamp, workflowMessages),
data: runExecutionData,
} as IExecutionResponse;
}
// ----------------------------------
// private
// ----------------------------------
private toRelevantMessages(messages: EventMessageTypes[]) {
return messages.reduce<{
nodeMessages: EventMessageTypes[];
workflowMessages: EventMessageTypes[];
}>(
(acc, cur) => {
if (cur.eventName.startsWith('n8n.node.')) {
acc.nodeMessages.push(cur);
} else if (cur.eventName.startsWith('n8n.workflow.')) {
acc.workflowMessages.push(cur);
}
return acc;
},
{ nodeMessages: [], workflowMessages: [] },
);
}
private toStoppedAt(timestamp: DateTime | undefined, messages: EventMessageTypes[]) {
if (timestamp) return timestamp.toJSDate();
const WORKFLOW_END_EVENTS = new Set([
'n8n.workflow.success',
'n8n.workflow.crashed',
'n8n.workflow.failed',
]);
return (
messages.find((m) => WORKFLOW_END_EVENTS.has(m.eventName)) ??
messages.find((m) => m.eventName === 'n8n.workflow.started')
)?.ts.toJSDate();
}
private async runHooks(execution: IExecutionResponse) {
await Container.get(InternalHooks).onWorkflowPostExecute(execution.id, execution.workflowData, {
data: execution.data,
finished: false,
mode: execution.mode,
waitTill: execution.waitTill,
startedAt: execution.startedAt,
stoppedAt: execution.stoppedAt,
status: execution.status,
});
const externalHooks = getWorkflowHooksMain(
{
userId: '',
workflowData: execution.workflowData,
executionMode: execution.mode,
executionData: execution.data,
runData: execution.data.resultData.runData,
retryOf: execution.retryOf,
},
execution.id,
);
const run: IRun = {
data: execution.data,
finished: false,
mode: execution.mode,
waitTill: execution.waitTill ?? undefined,
startedAt: execution.startedAt,
stoppedAt: execution.stoppedAt,
status: execution.status,
};
await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]);
}
}