🐛 Fix "unknown", never-end workflow and not displaying error message (#1978)

* Added try catch blocks to avoid endlessly running workflows

* Added handling for subworkflows

*  Fix one cause of "unkown" status of worklows with "main" mode

*  Fix one cause of "unkown" status of worklows with "own" mode

*  Fix one cause of "unkown" status of worklows with "queue" mode

* Saving database recovery

* 🐛 Fix issue that errors did not get saved correctly and also not
displayed

*  Save workflow timeout correctly as error

* Adding error capture to queued jobs

*  Mark canceled executions as not finished consistently across all
modes

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
Omar Ajoue
2021-07-10 11:34:41 +02:00
committed by GitHub
parent abc2f2a515
commit d3da5023f0
7 changed files with 232 additions and 97 deletions

View File

@@ -8,6 +8,7 @@ import {
IBullJobResponse,
ICredentialsOverwrite,
ICredentialsTypeData,
IExecutionDb,
IExecutionFlattedDb,
IExecutionResponse,
IProcessMessageDataHook,
@@ -29,6 +30,7 @@ import {
import {
ExecutionError,
IRun,
IWorkflowBase,
LoggerProxy as Logger,
Workflow,
WorkflowExecuteMode,
@@ -85,11 +87,15 @@ export class WorkflowRunner {
* @param {string} executionId
* @memberof WorkflowRunner
*/
processError(error: ExecutionError, startedAt: Date, executionMode: WorkflowExecuteMode, executionId: string) {
async processError(error: ExecutionError, startedAt: Date, executionMode: WorkflowExecuteMode, executionId: string, hooks?: WorkflowHooks) {
const fullRunData: IRun = {
data: {
resultData: {
error,
error: {
...error,
message: error.message,
stack: error.stack,
},
runData: {},
},
},
@@ -102,6 +108,10 @@ export class WorkflowRunner {
// Remove from active execution with empty data. That will
// set the execution to failed.
this.activeExecutions.remove(executionId, fullRunData);
if (hooks) {
await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]);
}
}
/**
@@ -179,28 +189,34 @@ export class WorkflowRunner {
// Register the active execution
const executionId = await this.activeExecutions.add(data, undefined);
Logger.verbose(`Execution for workflow ${data.workflowData.name} was assigned id ${executionId}`, {executionId});
let workflowExecution: PCancelable<IRun>;
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId, true);
additionalData.sendMessageToUI = WorkflowExecuteAdditionalData.sendMessageToUI.bind({sessionId: data.sessionId});
try {
additionalData.sendMessageToUI = WorkflowExecuteAdditionalData.sendMessageToUI.bind({sessionId: data.sessionId});
let workflowExecution: PCancelable<IRun>;
if (data.executionData !== undefined) {
Logger.debug(`Execution ID ${executionId} had Execution data. Running with payload.`, {executionId});
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode, data.executionData);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) {
Logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {executionId});
// Execute all nodes
if (data.executionData !== undefined) {
Logger.debug(`Execution ID ${executionId} had Execution data. Running with payload.`, {executionId});
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode, data.executionData);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) {
Logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {executionId});
// Execute all nodes
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode);
} else {
Logger.debug(`Execution ID ${executionId} is a partial execution.`, {executionId});
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.runPartialWorkflow(workflow, data.runData, data.startNodes, data.destinationNode);
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode);
} else {
Logger.debug(`Execution ID ${executionId} is a partial execution.`, {executionId});
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.runPartialWorkflow(workflow, data.runData, data.startNodes, data.destinationNode);
}
} catch (error) {
await this.processError(error, new Date(), data.executionMode, executionId, additionalData.hooks);
throw error;
}
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
@@ -247,7 +263,17 @@ export class WorkflowRunner {
removeOnComplete: true,
removeOnFail: true,
};
const job = await this.jobQueue.add(jobData, jobOptions);
let job: Bull.Job;
try {
job = await this.jobQueue.add(jobData, jobOptions);
} catch (error) {
// We use "getWorkflowHooksIntegrated" here as we are just integrated in the "workflowExecuteAfter"
// hook anyway and other get so ignored
const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksIntegrated(data.executionMode, executionId, data.workflowData, { retryOf: data.retryOf ? data.retryOf.toString() : undefined });
await this.processError(error, new Date(), data.executionMode, executionId, hooks);
return executionId;
}
console.log('Started with ID: ' + job.id.toString());
const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(data.executionMode, executionId, data.workflowData, { retryOf: data.retryOf ? data.retryOf.toString() : undefined });
@@ -264,7 +290,7 @@ export class WorkflowRunner {
const fullRunData :IRun = {
data: {
resultData: {
error: new WorkflowOperationError('Workflow has been canceled!'),
error: new WorkflowOperationError('Workflow-Execution has been canceled!'),
runData: {},
},
},
@@ -280,6 +306,9 @@ export class WorkflowRunner {
const queueRecoveryInterval = config.get('queue.bull.queueRecoveryInterval') as number;
const racingPromises: Array<Promise<IBullJobResponse | object>> = [jobData];
let clearWatchdogInterval;
if (queueRecoveryInterval > 0) {
/*************************************************
* Long explanation about what this solves: *
@@ -295,7 +324,7 @@ export class WorkflowRunner {
*************************************************/
let watchDogInterval: NodeJS.Timeout | undefined;
const watchDog = new Promise((res) => {
const watchDog: Promise<object> = new Promise((res) => {
watchDogInterval = setInterval(async () => {
const currentJob = await this.jobQueue.getJob(job.id);
// When null means job is finished (not found in queue)
@@ -306,19 +335,43 @@ export class WorkflowRunner {
}, queueRecoveryInterval * 1000);
});
racingPromises.push(watchDog);
const clearWatchdogInterval = () => {
clearWatchdogInterval = () => {
if (watchDogInterval) {
clearInterval(watchDogInterval);
watchDogInterval = undefined;
}
};
}
await Promise.race([jobData, watchDog]);
clearWatchdogInterval();
try {
await Promise.race(racingPromises);
if (clearWatchdogInterval !== undefined) {
clearWatchdogInterval();
}
} catch (error) {
const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(data.executionMode, executionId, data.workflowData, { retryOf: data.retryOf ? data.retryOf.toString() : undefined });
Logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`);
if (clearWatchdogInterval !== undefined) {
clearWatchdogInterval();
}
await this.processError(error, new Date(), data.executionMode, executionId, hooks);
} else {
await jobData;
const fullRunData :IRun = {
data: {
resultData: {
error,
runData: {},
},
},
mode: data.executionMode,
startedAt: new Date(),
stoppedAt: new Date(),
};
this.activeExecutions.remove(executionId, fullRunData);
resolve(fullRunData);
return;
}
@@ -427,8 +480,13 @@ export class WorkflowRunner {
const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);
// Send all data to subprocess it needs to run the workflow
subprocess.send({ type: 'startWorkflow', data } as IProcessMessage);
try {
// Send all data to subprocess it needs to run the workflow
subprocess.send({ type: 'startWorkflow', data } as IProcessMessage);
} catch (error) {
await this.processError(error, new Date(), data.executionMode, executionId, workflowHooks);
return executionId;
}
// Start timeout for the execution
let executionTimeout: NodeJS.Timeout;
@@ -476,14 +534,14 @@ export class WorkflowRunner {
} else if (message.type === 'processError') {
clearTimeout(executionTimeout);
const executionError = message.data.executionError as ExecutionError;
this.processError(executionError, startedAt, data.executionMode, executionId);
await this.processError(executionError, startedAt, data.executionMode, executionId, workflowHooks);
} else if (message.type === 'processHook') {
this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook);
} else if (message.type === 'timeout') {
// Execution timed out and its process has been terminated
const timeoutError = new WorkflowOperationError('Workflow execution timed out!');
// No need to add hook here as the subprocess takes care of calling the hooks
this.processError(timeoutError, startedAt, data.executionMode, executionId);
} else if (message.type === 'startExecution') {
const executionId = await this.activeExecutions.add(message.data.runData);
@@ -506,13 +564,13 @@ export class WorkflowRunner {
// Execution timed out and its process has been terminated
const timeoutError = new WorkflowOperationError('Workflow execution timed out!');
this.processError(timeoutError, startedAt, data.executionMode, executionId);
await this.processError(timeoutError, startedAt, data.executionMode, executionId, workflowHooks);
} else if (code !== 0) {
Logger.debug(`Subprocess for execution ID ${executionId} finished with error code ${code}.`, {executionId});
// Process did exit with error code, so something went wrong.
const executionError = new WorkflowOperationError('Workflow execution process did crash for an unknown reason!');
this.processError(executionError, startedAt, data.executionMode, executionId);
await this.processError(executionError, startedAt, data.executionMode, executionId, workflowHooks);
}
for(const executionId of childExecutionIds) {