Add max execution time for Workflows (#755)

* 🎉 basic setup and execution stopping

* 🚧 soft timeout for own process executions

* 🚧 add hard timeout for subprocesses

* 🚧 add soft timeout to main thread

* 🔧 set default timeout to 5 mins --> 500s

* 💡 adding documentation to configs

* 🚧 deactivate timeout by default

* 🚧 add logic of max execution timeout

*  adding timeout to settings in frontend and server

* 🎨 improve naming

* 💡 fix change in config docs

* ✔️ fixing compilation issue

* 🎨 add format for new config variables

* 👌 type cast before checking equality

*  Improve error message if NodeType is not known

* 🐳 Tag also rpi latest image

* 🐛 Fix Postgres issue with Node.js 14 #776

* 🚧 add toggle to activate workflow timeout

* 💄 improving UX of setting a timeout and its duration

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
Ben Hesseldieck
2020-07-29 14:12:54 +02:00
committed by GitHub
parent 6e06da99fb
commit 051598d30e
13 changed files with 232 additions and 25 deletions

View File

@@ -88,10 +88,11 @@ export class ActiveExecutions {
* Forces an execution to stop
*
* @param {string} executionId The id of the execution to stop
* @param {string} timeout String 'timeout' given if stop due to timeout
* @returns {(Promise<IRun | undefined>)}
* @memberof ActiveExecutions
*/
async stopExecution(executionId: string): Promise<IRun | undefined> {
async stopExecution(executionId: string, timeout?: string): Promise<IRun | undefined> {
if (this.activeExecutions[executionId] === undefined) {
// There is no execution running with that id
return;
@@ -101,17 +102,17 @@ export class ActiveExecutions {
// returned that it gets then also resolved correctly.
if (this.activeExecutions[executionId].process !== undefined) {
// Workflow is running in subprocess
setTimeout(() => {
if (this.activeExecutions[executionId].process!.connected) {
if (this.activeExecutions[executionId].process!.connected) {
setTimeout(() => {
// execute on next event loop tick;
this.activeExecutions[executionId].process!.send({
type: 'stopExecution'
type: timeout ? timeout : 'stopExecution',
});
}
}, 1);
}, 1)
}
} else {
// Workflow is running in current process
this.activeExecutions[executionId].workflowExecution!.cancel('Canceled by user');
this.activeExecutions[executionId].workflowExecution!.cancel();
}
return this.getPostExecutePromise(executionId);

View File

@@ -286,17 +286,17 @@ export interface IN8nUISettings {
saveDataErrorExecution: string;
saveDataSuccessExecution: string;
saveManualExecutions: boolean;
executionTimeout: number;
maxExecutionTimeout: number;
timezone: string;
urlBaseWebhook: string;
versionCli: string;
}
export interface IPackageVersions {
cli: string;
}
export interface IPushData {
data: IPushDataExecutionFinished | IPushDataNodeExecuteAfter | IPushDataNodeExecuteBefore | IPushDataTestWebhook;
type: IPushDataType;
@@ -304,7 +304,6 @@ export interface IPushData {
export type IPushDataType = 'executionFinished' | 'executionStarted' | 'nodeExecuteAfter' | 'nodeExecuteBefore' | 'testWebhookDeleted' | 'testWebhookReceived';
export interface IPushDataExecutionFinished {
data: IRun;
executionIdActive: string;

View File

@@ -29,6 +29,9 @@ class NodeTypesClass implements INodeTypes {
}
getByName(nodeType: string): INodeType | undefined {
if (this.nodeTypes[nodeType] === undefined) {
throw new Error(`The node-type "${nodeType}" is not known!`);
}
return this.nodeTypes[nodeType].type;
}
}

View File

@@ -113,6 +113,8 @@ class App {
saveDataErrorExecution: string;
saveDataSuccessExecution: string;
saveManualExecutions: boolean;
executionTimeout: number;
maxExecutionTimeout: number;
timezone: string;
activeExecutionsInstance: ActiveExecutions.ActiveExecutions;
push: Push.Push;
@@ -133,6 +135,8 @@ class App {
this.saveDataErrorExecution = config.get('executions.saveDataOnError') as string;
this.saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string;
this.saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean;
this.executionTimeout = config.get('executions.timeout') as number;
this.maxExecutionTimeout = config.get('executions.maxTimeout') as number;
this.timezone = config.get('generic.timezone') as string;
this.restEndpoint = config.get('endpoints.rest') as string;
@@ -482,9 +486,12 @@ class App {
// Do not save when default got set
delete newWorkflowData.settings.saveManualExecutions;
}
if (parseInt(newWorkflowData.settings.executionTimeout as string) === this.executionTimeout) {
// Do not save when default got set
delete newWorkflowData.settings.executionTimeout
}
}
newWorkflowData.updatedAt = this.getCurrentDate();
await Db.collections.Workflow!.update(id, newWorkflowData);
@@ -1534,6 +1541,8 @@ class App {
saveDataErrorExecution: this.saveDataErrorExecution,
saveDataSuccessExecution: this.saveDataSuccessExecution,
saveManualExecutions: this.saveManualExecutions,
executionTimeout: this.executionTimeout,
maxExecutionTimeout: this.maxExecutionTimeout,
timezone: this.timezone,
urlBaseWebhook: WebhookHelpers.getWebhookBaseUrl(),
versionCli: this.versions!.cli,

View File

@@ -90,7 +90,6 @@ export class WorkflowRunner {
WorkflowExecuteAdditionalData.pushExecutionFinished(executionMode, fullRunData, executionId);
}
/**
* Run the workflow
*
@@ -155,9 +154,27 @@ export class WorkflowRunner {
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
// Soft timeout to stop workflow execution after current running node
let executionTimeout: NodeJS.Timeout;
let workflowTimeout = config.get('executions.timeout') as number > 0 && config.get('executions.timeout') as number; // initialize with default
if (data.workflowData.settings && data.workflowData.settings.executionTimeout) {
workflowTimeout = data.workflowData.settings!.executionTimeout as number > 0 && data.workflowData.settings!.executionTimeout as number // preference on workflow setting
}
if (workflowTimeout) {
const timeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number) * 1000; // as seconds
executionTimeout = setTimeout(() => {
this.activeExecutions.stopExecution(executionId, 'timeout')
}, timeout)
}
workflowExecution.then((fullRunData) => {
clearTimeout(executionTimeout);
if (workflowExecution.isCanceled) {
fullRunData.finished = false;
}
this.activeExecutions.remove(executionId, fullRunData);
});
})
return executionId;
}
@@ -218,24 +235,54 @@ export class WorkflowRunner {
// Send all data to subprocess it needs to run the workflow
subprocess.send({ type: 'startWorkflow', data } as IProcessMessage);
// Start timeout for the execution
let executionTimeout: NodeJS.Timeout;
let workflowTimeout = config.get('executions.timeout') as number > 0 && config.get('executions.timeout') as number; // initialize with default
if (data.workflowData.settings && data.workflowData.settings.executionTimeout) {
workflowTimeout = data.workflowData.settings!.executionTimeout as number > 0 && data.workflowData.settings!.executionTimeout as number // preference on workflow setting
}
if (workflowTimeout) {
const timeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number) * 1000; // as seconds
executionTimeout = setTimeout(() => {
this.activeExecutions.stopExecution(executionId, 'timeout')
executionTimeout = setTimeout(() => subprocess.kill(), Math.max(timeout * 0.2, 5000)) // minimum 5 seconds
}, timeout)
}
// Listen to data from the subprocess
subprocess.on('message', (message: IProcessMessage) => {
if (message.type === 'end') {
clearTimeout(executionTimeout);
this.activeExecutions.remove(executionId!, message.data.runData);
} else if (message.type === 'processError') {
clearTimeout(executionTimeout);
const executionError = message.data.executionError as IExecutionError;
this.processError(executionError, startedAt, data.executionMode, executionId);
} else if (message.type === 'processHook') {
this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook);
} else if (message.type === 'timeout') {
// Execution timed out and its process has been terminated
const timeoutError = { message: 'Workflow execution timed out!' } as IExecutionError;
this.processError(timeoutError, startedAt, data.executionMode, executionId);
}
});
// Also get informed when the processes does exit especially when it did crash
// Also get informed when the processes does exit especially when it did crash or timed out
subprocess.on('exit', (code, signal) => {
if (code !== 0) {
if (signal === 'SIGTERM'){
// Execution timed out and its process has been terminated
const timeoutError = {
message: 'Workflow execution timed out!',
} as IExecutionError;
this.processError(timeoutError, startedAt, data.executionMode, executionId);
} else if (code !== 0) {
// Process did exit with error code, so something went wrong.
const executionError = {
message: 'Workflow execution process did crash for an unknown reason!',
@@ -243,6 +290,7 @@ export class WorkflowRunner {
this.processError(executionError, startedAt, data.executionMode, executionId);
}
clearTimeout(executionTimeout);
});
return executionId;

View File

@@ -190,17 +190,18 @@ process.on('message', async (message: IProcessMessage) => {
// Once the workflow got executed make sure the process gets killed again
process.exit();
} else if (message.type === 'stopExecution') {
} else if (message.type === 'stopExecution' || message.type === 'timeout') {
// The workflow execution should be stopped
let runData: IRun;
if (workflowRunner.workflowExecute !== undefined) {
// Workflow started already executing
runData = workflowRunner.workflowExecute.getFullRunData(workflowRunner.startedAt);
// If there is any data send it to parent process
await workflowRunner.workflowExecute.processSuccessExecution(workflowRunner.startedAt, workflowRunner.workflow!);
const timeOutError = message.type === 'timeout' ? { message: 'Workflow execution timed out!' } as IExecutionError : undefined
// If there is any data send it to parent process, if execution timedout add the error
await workflowRunner.workflowExecute.processSuccessExecution(workflowRunner.startedAt, workflowRunner.workflow!, timeOutError);
} else {
// Workflow did not get started yet
runData = {
@@ -209,7 +210,7 @@ process.on('message', async (message: IProcessMessage) => {
runData: {},
},
},
finished: true,
finished: message.type !== 'timeout',
mode: workflowRunner.data!.executionMode,
startedAt: workflowRunner.startedAt,
stoppedAt: new Date(),
@@ -218,7 +219,7 @@ process.on('message', async (message: IProcessMessage) => {
workflowRunner.sendHookToParentProcess('workflowExecuteAfter', [runData]);
}
await sendToParentProcess('end', {
await sendToParentProcess(message.type === 'timeout' ? message.type : 'end', {
runData,
});