From bf174a40999411de5809b6a4e156d090446ea7a0 Mon Sep 17 00:00:00 2001 From: Jan Oberhauser Date: Thu, 25 Jul 2019 08:30:37 +0200 Subject: [PATCH] :bug: Fix bug that some nodes got executed twice --- packages/core/src/WorkflowExecute.ts | 343 ++++++++++++++++----------- 1 file changed, 210 insertions(+), 133 deletions(-) diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 1b4d159cc..7fb705ff6 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -241,6 +241,211 @@ export class WorkflowExecute { } + addNodeToBeExecuted(workflow: Workflow, runExecutionData: IRunExecutionData, connectionData: IConnection, outputIndex: number, parentNodeName: string, nodeSuccessData: INodeExecutionData[][], runIndex: number): void { + let stillDataMissing = false; + + // Check if node has multiple inputs as then we have to wait for all input data + // to be present before we can add it to the node-execution-stack + if (workflow.connectionsByDestinationNode[connectionData.node]['main'].length > 1) { + // Node has multiple inputs + let nodeWasWaiting = true; + + // Check if there is already data for the node + if (runExecutionData.executionData!.waitingExecution[connectionData.node] === undefined) { + // Node does not have data yet so create a new empty one + runExecutionData.executionData!.waitingExecution[connectionData.node] = {}; + nodeWasWaiting = false; + } + if (runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] === undefined) { + // Node does not have data for runIndex yet so create also empty one and init it + runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = { + main: [] + }; + for (let i = 0; i < workflow.connectionsByDestinationNode[connectionData.node]['main'].length; i++) { + runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.push(null); + } + } + + // Add the new data + if (nodeSuccessData === null) { + runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = null; + } else { + runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = nodeSuccessData[outputIndex]; + } + + // Check if all data exists now + let thisExecutionData: INodeExecutionData[] | null; + let allDataFound = true; + for (let i = 0; i < runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.length; i++) { + thisExecutionData = runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[i]; + if (thisExecutionData === null) { + allDataFound = false; + break; + } + } + + if (allDataFound === true) { + // All data exists for node to be executed + // So add it to the execution stack + runExecutionData.executionData!.nodeExecutionStack.push({ + node: workflow.nodes[connectionData.node], + data: runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] + }); + + // Remove the data from waiting + delete runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex]; + + if (Object.keys(runExecutionData.executionData!.waitingExecution[connectionData.node]).length === 0) { + // No more data left for the node so also delete that one + delete runExecutionData.executionData!.waitingExecution[connectionData.node]; + } + return; + } else { + stillDataMissing = true; + } + + if (nodeWasWaiting === false) { + + // Get a list of all the output nodes that we can check for siblings eaiser + const checkOutputNodes = []; + for (const outputIndexParent in workflow.connectionsBySourceNode[parentNodeName].main) { + if (!workflow.connectionsBySourceNode[parentNodeName].main.hasOwnProperty(outputIndexParent)) { + continue; + } + for (const connectionDataCheck of workflow.connectionsBySourceNode[parentNodeName].main[outputIndexParent]) { + checkOutputNodes.push(connectionDataCheck.node); + } + } + + // Node was not on "waitingExecution" so it is the first time it gets + // checked. So we have to go through all the inputs and check if they + // are already on the list to be processed. + // If that is not the case add it. + for (let inputIndex = 0; inputIndex < workflow.connectionsByDestinationNode[connectionData.node]['main'].length; inputIndex++) { + for (const inputData of workflow.connectionsByDestinationNode[connectionData.node]['main'][inputIndex]) { + if (inputData.node === parentNodeName) { + // Is the node we come from so its data will be available for sure + continue; + } + + const executionStackNodes = runExecutionData.executionData!.nodeExecutionStack.map((stackData) => stackData.node.name); + + // Check if that node is also an output connection of the + // previously processed one + if (inputData.node !== parentNodeName && checkOutputNodes.includes(inputData.node)) { + // So the parent node will be added anyway which + // will then process this node next. So nothing to do. + continue; + } + + // Check if it is already in the execution stack + if (executionStackNodes.includes(inputData.node)) { + // Node is already on the list to be executed + // so there is nothing to do + continue; + } + + // Check if node got processed already + if (runExecutionData.resultData.runData[inputData.node] !== undefined) { + // Node got processed already so no need to add it + continue; + } + + // Check if any of the parent nodes does not have any inputs. That + // would mean that it has to get added to the list of nodes to process. + const parentNodes = workflow.getParentNodes(inputData.node, 'main', -1); + let nodeToAdd: string | undefined = inputData.node; + parentNodes.push(inputData.node); + parentNodes.reverse(); + + for (const parentNode of parentNodes) { + // Check if that node is also an output connection of the + // previously processed one + if (inputData.node !== parentNode && checkOutputNodes.includes(parentNode)) { + // So the parent node will be added anyway which + // will then process this node next. So nothing to do. + nodeToAdd = undefined; + break; + } + + // Check if it is already in the execution stack + if (executionStackNodes.includes(parentNode)) { + // Node is already on the list to be executed + // so there is nothing to do + nodeToAdd = undefined; + break; + } + + // Check if node got processed already + if (runExecutionData.resultData.runData[parentNode] !== undefined) { + // Node got processed already so we can use the + // output data as input of this node + break; + } + + nodeToAdd = parentNode; + } + + if (nodeToAdd === undefined) { + // No node has to get added so process + continue; + } + + if (workflow.connectionsByDestinationNode[nodeToAdd] === undefined) { + // Add only node if it does not have any inputs becuase else it will + // be added by its input node later anyway. + runExecutionData.executionData!.nodeExecutionStack.push( + { + node: workflow.getNode(nodeToAdd) as INode, + data: { + main: [ + [ + { + json: {}, + }, + ], + ], + }, + }, + ); + } + } + } + } + } + + // Make sure the array has all the values + const connectionDataArray: Array = []; + for (let i: number = connectionData.index; i >= 0; i--) { + connectionDataArray[i] = null; + } + + // Add the data of the current execution + if (nodeSuccessData === null) { + connectionDataArray[connectionData.index] = null; + } else { + connectionDataArray[connectionData.index] = nodeSuccessData[outputIndex]; + } + + if (stillDataMissing === true) { + // Additional data is needed to run node so add it to waiting + if (!runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)) { + runExecutionData.executionData!.waitingExecution[connectionData.node] = {}; + } + runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = { + main: connectionDataArray + }; + } else { + // All data is there so add it directly to stack + runExecutionData.executionData!.nodeExecutionStack.push({ + node: workflow.nodes[connectionData.node], + data: { + main: connectionDataArray + } + }); + } + } + /** * Runs the given execution data. @@ -353,8 +558,6 @@ export class WorkflowExecute { } } - // TODO Has to check if node is disabled - // Clone input data that nodes can not mess up data of parallel nodes which receive the same data // TODO: Should only clone if multiple nodes get the same data or when it gets returned to frontned // is very slow so only do if needed @@ -471,152 +674,26 @@ export class WorkflowExecute { if (workflow.connectionsBySourceNode[executionNode.name].hasOwnProperty('main')) { let outputIndex: string, connectionData: IConnection; // Go over all the different + + // Add the nodes to be executed for (outputIndex in workflow.connectionsBySourceNode[executionNode.name]['main']) { if (!workflow.connectionsBySourceNode[executionNode.name]['main'].hasOwnProperty(outputIndex)) { continue; } + // Go through all the different outputs of this connection for (connectionData of workflow.connectionsBySourceNode[executionNode.name]['main'][outputIndex]) { if (!workflow.nodes.hasOwnProperty(connectionData.node)) { return Promise.reject(new Error(`The node "${executionNode.name}" connects to not found node "${connectionData.node}"`)); } - let stillDataMissing = false; - - // Check if node has multiple inputs as then we have to wait for all input data - // to be present before we can add it to the node-execution-stack - if (workflow.connectionsByDestinationNode[connectionData.node]['main'].length > 1) { - // Node has multiple inputs - - // Check if there is already data for the node - if (runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node) && runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] !== undefined) { - // There is already data for the node and the current run so - // add the new data - if (nodeSuccessData === null) { - runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = null; - } else { - runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = nodeSuccessData[outputIndex]; - } - - // Check if all data exists now - let thisExecutionData: INodeExecutionData[] | null; - let allDataFound = true; - for (let i = 0; i < runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.length; i++) { - thisExecutionData = runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[i]; - if (thisExecutionData === null) { - allDataFound = false; - break; - } - } - - if (allDataFound === true) { - // All data exists for node to be executed - // So add it to the execution stack - runExecutionData.executionData!.nodeExecutionStack.push({ - node: workflow.nodes[connectionData.node], - data: runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] - }); - - // Remove the data from waiting - delete runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex]; - - if (Object.keys(runExecutionData.executionData!.waitingExecution[connectionData.node]).length === 0) { - // No more data left for the node so also delete that one - delete runExecutionData.executionData!.waitingExecution[connectionData.node]; - } - continue; - } else { - stillDataMissing = true; - } - } else { - stillDataMissing = true; - - // Node was not on "waitingExecution" so it is the first time it gets - // checked. So we have to go through all the inputs and check if they - // are already on the list to be processed. - // If that is not the case add it. - for (let inputIndex = 0; inputIndex < workflow.connectionsByDestinationNode[connectionData.node]['main'].length; inputIndex++) { - for (const inputData of workflow.connectionsByDestinationNode[connectionData.node]['main'][inputIndex]) { - if (inputData.node === executionNode.name) { - // Is the node we come from so its data is available for sure - continue; - } - - // Get the most top nodes to know where to start to process from - const inputStartNodes = workflow.getStartNodes(inputData.node); - - for (const startNode of inputStartNodes) { - // Check if the node has to be added to be processed - - // Check if node got processed already - if (runExecutionData.resultData.runData[startNode.name] !== undefined) { - continue; - } - - // Check if it is already in the execution stack - const executionStackNodes = runExecutionData.executionData!.nodeExecutionStack.map((stackData) => stackData.node.name); - if (executionStackNodes.includes(startNode.name)) { - continue; - } - - // Add is currently missing so add it - runExecutionData.executionData!.nodeExecutionStack.push( - { - node: startNode, - data: { - main: [ - [ - { - json: {}, - }, - ], - ], - }, - }, - ); - } - - } - } - } - } - - // Make sure the array has all the values - const connectionDataArray: Array = []; - for (let i: number = connectionData.index; i >= 0; i--) { - connectionDataArray[i] = null; - } - - // Add the data of the current execution - if (nodeSuccessData === null) { - connectionDataArray[connectionData.index] = null; - } else { - connectionDataArray[connectionData.index] = nodeSuccessData[outputIndex]; - } - - if (stillDataMissing === true) { - // Additional data is needed to run node so add it to waiting - if (!runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)) { - runExecutionData.executionData!.waitingExecution[connectionData.node] = {}; - } - runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = { - main: connectionDataArray - }; - } else { - // All data is there so add it directly to stack - runExecutionData.executionData!.nodeExecutionStack.push({ - node: workflow.nodes[connectionData.node], - data: { - main: connectionDataArray - } - }); - } - + this.addNodeToBeExecuted(workflow, runExecutionData, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex); } } } } } + return Promise.resolve(); })() .then(async () => {