refactor(core): Use DI for WorkflowRunner (no-changelog) (#8372)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2024-01-26 13:49:39 +01:00
committed by GitHub
parent bf11c7c1bd
commit c70fa66e76
21 changed files with 215 additions and 258 deletions

View File

@@ -2,7 +2,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Container } from 'typedi';
import { Container, Service } from 'typedi';
import type { IProcessMessage } from 'n8n-core';
import { WorkflowExecute } from 'n8n-core';
@@ -46,36 +46,37 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { initErrorHandling } from '@/ErrorReporting';
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { Push } from '@/push';
import { InternalHooks } from '@/InternalHooks';
import { Logger } from '@/Logger';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
@Service()
export class WorkflowRunner {
logger: Logger;
private jobQueue: Queue;
activeExecutions: ActiveExecutions;
private executionsMode = config.getEnv('executions.mode');
push: Push;
private executionsProcess = config.getEnv('executions.process');
jobQueue: Queue;
constructor(
private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions,
private readonly executionRepository: ExecutionRepository,
private readonly externalHooks: ExternalHooks,
private readonly workflowStaticDataService: WorkflowStaticDataService,
private readonly nodeTypes: NodeTypes,
private readonly permissionChecker: PermissionChecker,
) {}
constructor() {
this.logger = Container.get(Logger);
this.push = Container.get(Push);
this.activeExecutions = Container.get(ActiveExecutions);
}
/**
* The process did send a hook message so execute the appropriate hook
*/
async processHookMessage(workflowHooks: WorkflowHooks, hookData: IProcessMessageDataHook) {
/** The process did send a hook message so execute the appropriate hook */
private async processHookMessage(
workflowHooks: WorkflowHooks,
hookData: IProcessMessageDataHook,
) {
await workflowHooks.executeHookFunctions(hookData.hook, hookData.parameters);
}
/**
* The process did error
*/
/** The process did error */
async processError(
error: ExecutionError,
startedAt: Date,
@@ -91,12 +92,9 @@ export class WorkflowRunner {
// by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415
if (isQueueMode && executionMode !== 'manual') {
const executionWithoutData = await Container.get(ExecutionRepository).findSingleExecution(
executionId,
{
includeData: false,
},
);
const executionWithoutData = await this.executionRepository.findSingleExecution(executionId, {
includeData: false,
});
if (executionWithoutData?.finished === true && executionWithoutData?.status === 'success') {
// false positive, execution was successful
return;
@@ -140,12 +138,9 @@ export class WorkflowRunner {
}
}
const executionFlattedData = await Container.get(ExecutionRepository).findSingleExecution(
executionId,
{
includeData: true,
},
);
const executionFlattedData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
});
if (executionFlattedData) {
void Container.get(InternalHooks).onWorkflowCrashed(
@@ -169,12 +164,7 @@ export class WorkflowRunner {
}
}
/**
* Run the workflow
*
* @param {boolean} [loadStaticData] If set will the static data be loaded from
* the workflow and added to input data
*/
/** Run the workflow */
async run(
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
@@ -182,16 +172,13 @@ export class WorkflowRunner {
executionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
const executionsMode = config.getEnv('executions.mode');
const executionsProcess = config.getEnv('executions.process');
await initErrorHandling();
if (executionsMode === 'queue') {
if (this.executionsMode === 'queue') {
this.jobQueue = Container.get(Queue);
}
if (executionsMode === 'queue' && data.executionMode !== 'manual') {
if (this.executionsMode === 'queue' && data.executionMode !== 'manual') {
// Do not run "manual" executions in bull because sending events to the
// frontend would not be possible
executionId = await this.enqueueExecution(
@@ -202,7 +189,7 @@ export class WorkflowRunner {
responsePromise,
);
} else {
if (executionsProcess === 'main') {
if (this.executionsProcess === 'main') {
executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise);
} else {
executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise);
@@ -213,12 +200,11 @@ export class WorkflowRunner {
// only run these when not in queue mode or when the execution is manual,
// since these calls are now done by the worker directly
if (
executionsMode !== 'queue' ||
this.executionsMode !== 'queue' ||
config.getEnv('generic.instanceType') === 'worker' ||
data.executionMode === 'manual'
) {
const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId);
const externalHooks = Container.get(ExternalHooks);
postExecutePromise
.then(async (executionData) => {
void Container.get(InternalHooks).onWorkflowPostExecute(
@@ -227,9 +213,9 @@ export class WorkflowRunner {
executionData,
data.userId,
);
if (externalHooks.exists('workflow.postExecute')) {
if (this.externalHooks.exists('workflow.postExecute')) {
try {
await externalHooks.run('workflow.postExecute', [
await this.externalHooks.run('workflow.postExecute', [
executionData,
data.workflowData,
executionId,
@@ -249,13 +235,8 @@ export class WorkflowRunner {
return executionId;
}
/**
* Run the workflow in current process
*
* @param {boolean} [loadStaticData] If set will the static data be loaded from
* the workflow and added to input data
*/
async runMainProcess(
/** Run the workflow in current process */
private async runMainProcess(
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
restartExecutionId?: string,
@@ -264,11 +245,9 @@ export class WorkflowRunner {
const workflowId = data.workflowData.id;
if (loadStaticData === true && workflowId) {
data.workflowData.staticData =
await Container.get(WorkflowStaticDataService).getStaticDataById(workflowId);
await this.workflowStaticDataService.getStaticDataById(workflowId);
}
const nodeTypes = Container.get(NodeTypes);
// Soft timeout to stop workflow execution after current running node
// Changes were made by adding the `workflowTimeout` to the `additionalData`
// So that the timeout will also work for executions with nested workflows.
@@ -291,7 +270,7 @@ export class WorkflowRunner {
nodes: data.workflowData.nodes,
connections: data.workflowData.connections,
active: data.workflowData.active,
nodeTypes,
nodeTypes: this.nodeTypes,
staticData: data.workflowData.staticData,
settings: workflowSettings,
pinData,
@@ -312,7 +291,7 @@ export class WorkflowRunner {
{ executionId },
);
let workflowExecution: PCancelable<IRun>;
await Container.get(ExecutionRepository).updateStatus(executionId, 'running');
await this.executionRepository.updateStatus(executionId, 'running');
try {
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(
@@ -322,7 +301,7 @@ export class WorkflowRunner {
);
try {
await Container.get(PermissionChecker).check(workflow, data.userId);
await this.permissionChecker.check(workflow, data.userId);
} catch (error) {
ErrorReporter.error(error);
// Create a failed execution with the data for the node
@@ -439,7 +418,7 @@ export class WorkflowRunner {
return executionId;
}
async enqueueExecution(
private async enqueueExecution(
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
realtime?: boolean,
@@ -604,13 +583,10 @@ export class WorkflowRunner {
);
}
const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution(
executionId,
{
includeData: executionHasPostExecutionPromises,
unflattenData: executionHasPostExecutionPromises,
},
);
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: executionHasPostExecutionPromises,
unflattenData: executionHasPostExecutionPromises,
});
if (!fullExecutionData) {
return reject(new Error(`Could not find execution with id "${executionId}"`));
}
@@ -651,13 +627,8 @@ export class WorkflowRunner {
return executionId;
}
/**
* Run the workflow
*
* @param {boolean} [loadStaticData] If set will the static data be loaded from
* the workflow and added to input data
*/
async runSubprocess(
/** Run the workflow in a child-process */
private async runSubprocess(
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
restartExecutionId?: string,
@@ -669,7 +640,7 @@ export class WorkflowRunner {
if (loadStaticData === true && workflowId) {
data.workflowData.staticData =
await Container.get(WorkflowStaticDataService).getStaticDataById(workflowId);
await this.workflowStaticDataService.getStaticDataById(workflowId);
}
data.restartExecutionId = restartExecutionId;
@@ -678,7 +649,7 @@ export class WorkflowRunner {
const executionId = await this.activeExecutions.add(data, subprocess, restartExecutionId);
(data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId;
await Container.get(ExecutionRepository).updateStatus(executionId, 'running');
await this.executionRepository.updateStatus(executionId, 'running');
const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);