From 7b8532d3a3da70295c201e61e5a884a8c41f2fef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 27 Nov 2023 13:10:43 +0100 Subject: [PATCH] refactor(core): Move execution progress saving to standalone utility (no-changelog) (#7770) This PR continues the effort of moving logic inside execution lifecycle hooks into standalone testable functions, as a stepping stone to refactoring the hooks themselves. --- .../cli/src/WorkflowExecuteAdditionalData.ts | 92 ++------------- .../saveExecutionProgress.ts | 105 +++++++++++++++++ .../saveExecutionProgress.test.ts | 106 ++++++++++++++++++ 3 files changed, 220 insertions(+), 83 deletions(-) create mode 100644 packages/cli/src/executionLifecycleHooks/saveExecutionProgress.ts create mode 100644 packages/cli/test/unit/execution-lifecyle/saveExecutionProgress.test.ts diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 556b92cd2..91e220121 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -65,6 +65,7 @@ import { import { restoreBinaryDataId } from './executionLifecycleHooks/restoreBinaryDataId'; import { toSaveSettings } from './executionLifecycleHooks/toSaveSettings'; import { Logger } from './Logger'; +import { saveExecutionProgress } from './executionLifecycleHooks/saveExecutionProgress'; const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); @@ -358,89 +359,14 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx data: ITaskData, executionData: IRunExecutionData, ): Promise { - const saveSettings = toSaveSettings(this.workflowData.settings); - - if (!saveSettings.progress) return; - - try { - logger.debug( - `Save execution progress to database for execution ID ${this.executionId} `, - { executionId: this.executionId, nodeName }, - ); - - const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution( - this.executionId, - { - includeData: true, - unflattenData: true, - }, - ); - - if (!fullExecutionData) { - // Something went badly wrong if this happens. - // This check is here mostly to make typescript happy. - return; - } - - if (fullExecutionData.finished) { - // We already received ´workflowExecuteAfter´ webhook, so this is just an async call - // that was left behind. We skip saving because the other call should have saved everything - // so this one is safe to ignore - return; - } - - if (fullExecutionData.data === undefined) { - fullExecutionData.data = { - startData: {}, - resultData: { - runData: {}, - }, - executionData: { - contextData: {}, - metadata: {}, - nodeExecutionStack: [], - waitingExecution: {}, - waitingExecutionSource: {}, - }, - }; - } - - if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) { - // Append data if array exists - fullExecutionData.data.resultData.runData[nodeName].push(data); - } else { - // Initialize array and save data - fullExecutionData.data.resultData.runData[nodeName] = [data]; - } - - fullExecutionData.data.executionData = executionData.executionData; - - // Set last executed node so that it may resume on failure - fullExecutionData.data.resultData.lastNodeExecuted = nodeName; - - fullExecutionData.status = 'running'; - - await Container.get(ExecutionRepository).updateExistingExecution( - this.executionId, - fullExecutionData, - ); - } catch (err) { - ErrorReporter.error(err); - // TODO: Improve in the future! - // Errors here might happen because of database access - // For busy machines, we may get "Database is locked" errors. - - // We do this to prevent crashes and executions ending in `unknown` state. - logger.error( - `Failed saving execution progress to database for execution ID ${this.executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`, - { - ...err, - executionId: this.executionId, - sessionId: this.sessionId, - workflowId: this.workflowData.id, - }, - ); - } + await saveExecutionProgress( + this.workflowData, + this.executionId, + nodeName, + data, + executionData, + this.sessionId, + ); }, ], }; diff --git a/packages/cli/src/executionLifecycleHooks/saveExecutionProgress.ts b/packages/cli/src/executionLifecycleHooks/saveExecutionProgress.ts new file mode 100644 index 000000000..d5f0d7237 --- /dev/null +++ b/packages/cli/src/executionLifecycleHooks/saveExecutionProgress.ts @@ -0,0 +1,105 @@ +import { Container } from 'typedi'; + +import type { IRunExecutionData, ITaskData, IWorkflowBase } from 'n8n-workflow'; +import { ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; + +import { Logger } from '@/Logger'; +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { toSaveSettings } from '@/executionLifecycleHooks/toSaveSettings'; + +export async function saveExecutionProgress( + workflowData: IWorkflowBase, + executionId: string, + nodeName: string, + data: ITaskData, + executionData: IRunExecutionData, + sessionId?: string, +) { + const saveSettings = toSaveSettings(workflowData.settings); + + if (!saveSettings.progress) return; + + const logger = Container.get(Logger); + + try { + logger.debug(`Save execution progress to database for execution ID ${executionId} `, { + executionId, + nodeName, + }); + + const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution( + executionId, + { + includeData: true, + unflattenData: true, + }, + ); + + if (!fullExecutionData) { + // Something went badly wrong if this happens. + // This check is here mostly to make typescript happy. + return; + } + + if (fullExecutionData.finished) { + // We already received ´workflowExecuteAfter´ webhook, so this is just an async call + // that was left behind. We skip saving because the other call should have saved everything + // so this one is safe to ignore + return; + } + + if (fullExecutionData.data === undefined) { + fullExecutionData.data = { + startData: {}, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + metadata: {}, + nodeExecutionStack: [], + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; + } + + if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) { + // Append data if array exists + fullExecutionData.data.resultData.runData[nodeName].push(data); + } else { + // Initialize array and save data + fullExecutionData.data.resultData.runData[nodeName] = [data]; + } + + fullExecutionData.data.executionData = executionData.executionData; + + // Set last executed node so that it may resume on failure + fullExecutionData.data.resultData.lastNodeExecuted = nodeName; + + fullExecutionData.status = 'running'; + + await Container.get(ExecutionRepository).updateExistingExecution( + executionId, + fullExecutionData, + ); + } catch (e) { + const error = e instanceof Error ? e : new Error(`${e}`); + + ErrorReporter.error(error); + // TODO: Improve in the future! + // Errors here might happen because of database access + // For busy machines, we may get "Database is locked" errors. + + // We do this to prevent crashes and executions ending in `unknown` state. + logger.error( + `Failed saving execution progress to database for execution ID ${executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`, + { + ...error, + executionId, + sessionId, + workflowId: workflowData.id, + }, + ); + } +} diff --git a/packages/cli/test/unit/execution-lifecyle/saveExecutionProgress.test.ts b/packages/cli/test/unit/execution-lifecyle/saveExecutionProgress.test.ts new file mode 100644 index 000000000..4235e56ec --- /dev/null +++ b/packages/cli/test/unit/execution-lifecyle/saveExecutionProgress.test.ts @@ -0,0 +1,106 @@ +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { mockInstance } from '../../shared/mocking'; +import { Logger } from '@/Logger'; +import { saveExecutionProgress } from '@/executionLifecycleHooks/saveExecutionProgress'; +import * as fnModule from '@/executionLifecycleHooks/toSaveSettings'; +import { + ErrorReporterProxy, + type IRunExecutionData, + type ITaskData, + type IWorkflowBase, +} from 'n8n-workflow'; +import type { IExecutionResponse } from '@/Interfaces'; + +mockInstance(Logger); + +const executionRepository = mockInstance(ExecutionRepository); + +afterEach(() => { + jest.clearAllMocks(); +}); + +const commonArgs: [IWorkflowBase, string, string, ITaskData, IRunExecutionData, string] = [ + {} as IWorkflowBase, + 'some-execution-id', + 'My Node', + {} as ITaskData, + {} as IRunExecutionData, + 'some-session-id', +]; + +const commonSettings = { error: true, success: true, manual: true }; + +test('should ignore if save settings say so', async () => { + jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ + ...commonSettings, + progress: false, + }); + + await saveExecutionProgress(...commonArgs); + + expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled(); +}); + +test('should ignore on leftover async call', async () => { + jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ + ...commonSettings, + progress: true, + }); + + executionRepository.findSingleExecution.mockResolvedValue({ + finished: true, + } as IExecutionResponse); + + await saveExecutionProgress(...commonArgs); + + expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled(); +}); + +test('should update execution', async () => { + jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ + ...commonSettings, + progress: true, + }); + + const reporterSpy = jest.spyOn(ErrorReporterProxy, 'error'); + + executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse); + + await saveExecutionProgress(...commonArgs); + + expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', { + data: { + executionData: undefined, + resultData: { + lastNodeExecuted: 'My Node', + runData: { + 'My Node': [{}], + }, + }, + startData: {}, + }, + status: 'running', + }); + + expect(reporterSpy).not.toHaveBeenCalled(); +}); + +test('should report error on failure', async () => { + jest.spyOn(fnModule, 'toSaveSettings').mockReturnValue({ + ...commonSettings, + progress: true, + }); + + const reporterSpy = jest.spyOn(ErrorReporterProxy, 'error'); + + const error = new Error('Something went wrong'); + + executionRepository.findSingleExecution.mockImplementation(() => { + throw error; + }); + + await saveExecutionProgress(...commonArgs); + + expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled(); + expect(reporterSpy).toHaveBeenCalledWith(error); +});