✨ Push active executions to clients to remove manual reload
This commit is contained in:
@@ -153,7 +153,8 @@ export interface IExecutionsStopData {
|
||||
}
|
||||
|
||||
export interface IExecutionsSummary {
|
||||
id: string;
|
||||
id?: string; // executionIdDb
|
||||
idActive?: string; // executionIdActive
|
||||
mode: WorkflowExecuteMode;
|
||||
finished?: boolean;
|
||||
retryOf?: string;
|
||||
@@ -238,14 +239,23 @@ export interface IPushData {
|
||||
type: IPushDataType;
|
||||
}
|
||||
|
||||
export type IPushDataType = 'executionFinished' | 'nodeExecuteAfter' | 'nodeExecuteBefore' | 'testWebhookDeleted' | 'testWebhookReceived';
|
||||
export type IPushDataType = 'executionFinished' | 'executionStarted' | 'nodeExecuteAfter' | 'nodeExecuteBefore' | 'testWebhookDeleted' | 'testWebhookReceived';
|
||||
|
||||
|
||||
export interface IPushDataExecutionFinished {
|
||||
data: IRun;
|
||||
executionId: string;
|
||||
executionIdActive: string;
|
||||
executionIdDb?: string;
|
||||
}
|
||||
|
||||
export interface IPushDataExecutionStarted {
|
||||
executionId: string;
|
||||
mode: WorkflowExecuteMode;
|
||||
startedAt: Date;
|
||||
retryOf?: string;
|
||||
workflowId: string;
|
||||
workflowName?: string;
|
||||
}
|
||||
|
||||
export interface IPushDataNodeExecuteAfter {
|
||||
data: ITaskData;
|
||||
|
||||
@@ -59,8 +59,11 @@ export class Push {
|
||||
* @param {*} data
|
||||
* @memberof Push
|
||||
*/
|
||||
send(sessionId: string, type: IPushDataType, data: any) { // tslint:disable-line:no-any
|
||||
if (this.connections[sessionId] === undefined) {
|
||||
|
||||
|
||||
|
||||
send(type: IPushDataType, data: any, sessionId?: string) { // tslint:disable-line:no-any
|
||||
if (sessionId !== undefined && this.connections[sessionId] === undefined) {
|
||||
// TODO: Log that properly!
|
||||
console.error(`The session "${sessionId}" is not registred.`);
|
||||
return;
|
||||
@@ -71,7 +74,14 @@ export class Push {
|
||||
data,
|
||||
};
|
||||
|
||||
this.channel.send(JSON.stringify(sendData), [this.connections[sessionId]]);
|
||||
if (sessionId === undefined) {
|
||||
// Send to all connected clients
|
||||
this.channel.send(JSON.stringify(sendData));
|
||||
} else {
|
||||
// Send only to a specific client
|
||||
this.channel.send(JSON.stringify(sendData), [this.connections[sessionId]]);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -848,7 +848,7 @@ class App {
|
||||
}
|
||||
returnData.push(
|
||||
{
|
||||
id: data.id.toString(),
|
||||
idActive: data.id.toString(),
|
||||
workflowId: data.workflowId,
|
||||
mode:data.mode,
|
||||
startedAt: new Date(data.startedAt),
|
||||
|
||||
@@ -91,7 +91,7 @@ export class TestWebhooks {
|
||||
|
||||
// Inform editor-ui that webhook got received
|
||||
if (this.testWebhookData[webhookKey].sessionId !== undefined) {
|
||||
pushInstance.send(this.testWebhookData[webhookKey].sessionId!, 'testWebhookReceived', { workflowId: webhookData.workflow.id });
|
||||
pushInstance.send('testWebhookReceived', { workflowId: webhookData.workflow.id }, this.testWebhookData[webhookKey].sessionId!);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
@@ -167,7 +167,7 @@ export class TestWebhooks {
|
||||
// Inform editor-ui that webhook got received
|
||||
if (this.testWebhookData[webhookKey].sessionId !== undefined) {
|
||||
try {
|
||||
pushInstance.send(this.testWebhookData[webhookKey].sessionId!, 'testWebhookDeleted', { workflowId });
|
||||
pushInstance.send('testWebhookDeleted', { workflowId }, this.testWebhookData[webhookKey].sessionId!);
|
||||
} catch (error) {
|
||||
// Could not inform editor, probably is not connected anymore. So sipmly go on.
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ import {
|
||||
IExecutionDb,
|
||||
IExecutionFlattedDb,
|
||||
IPushDataExecutionFinished,
|
||||
IPushDataExecutionStarted,
|
||||
IPushDataNodeExecuteAfter,
|
||||
IPushDataNodeExecuteBefore,
|
||||
IWorkflowBase,
|
||||
@@ -60,11 +61,45 @@ function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mo
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Pushes the execution out to all connected clients
|
||||
*
|
||||
* @param {IRun} fullRunData The RunData of the finished execution
|
||||
* @param {string} executionIdActive The id of the finished execution
|
||||
* @param {string} [executionIdDb] The database id of finished execution
|
||||
*/
|
||||
function pushExecutionFinished(fullRunData: IRun, executionIdActive: string, executionIdDb?: string) {
|
||||
// Clone the object except the runData. That one is not supposed
|
||||
// to be send. Because that data got send piece by piece after
|
||||
// each node which finished executing
|
||||
const pushRunData = {
|
||||
...fullRunData,
|
||||
data: {
|
||||
...fullRunData.data,
|
||||
resultData: {
|
||||
...fullRunData.data.resultData,
|
||||
runData: {},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
// Push data to editor-ui once workflow finished
|
||||
const sendData: IPushDataExecutionFinished = {
|
||||
executionIdActive,
|
||||
executionIdDb,
|
||||
data: pushRunData,
|
||||
};
|
||||
|
||||
pushInstance.send('executionFinished', sendData);
|
||||
}
|
||||
|
||||
|
||||
const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowInstance: Workflow, sessionId?: string, retryOf?: string) => {
|
||||
return {
|
||||
nodeExecuteBefore: [
|
||||
async (executionId: string, nodeName: string): Promise<void> => {
|
||||
if (sessionId === undefined) {
|
||||
// Only push data to the session which started it
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -73,7 +108,7 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowI
|
||||
nodeName,
|
||||
};
|
||||
|
||||
pushInstance.send(sessionId, 'nodeExecuteBefore', sendData);
|
||||
pushInstance.send('nodeExecuteBefore', sendData, sessionId);
|
||||
},
|
||||
],
|
||||
nodeExecuteAfter: [
|
||||
@@ -88,36 +123,27 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowI
|
||||
data,
|
||||
};
|
||||
|
||||
pushInstance.send(sessionId, 'nodeExecuteAfter', sendData);
|
||||
pushInstance.send('nodeExecuteAfter', sendData, sessionId);
|
||||
},
|
||||
],
|
||||
workflowExecuteBefore: [
|
||||
async (executionId: string): Promise<void> => {
|
||||
// Push data to editor-ui once workflow finished
|
||||
const sendData: IPushDataExecutionStarted = {
|
||||
executionId,
|
||||
mode,
|
||||
startedAt: new Date(),
|
||||
retryOf,
|
||||
workflowId: workflowData.id as string,
|
||||
workflowName: workflowData.name,
|
||||
};
|
||||
|
||||
pushInstance.send('executionStarted', sendData);
|
||||
}
|
||||
],
|
||||
workflowExecuteAfter: [
|
||||
async (fullRunData: IRun, executionId: string): Promise<void> => {
|
||||
try {
|
||||
if (sessionId !== undefined) {
|
||||
// Clone the object except the runData. That one is not supposed
|
||||
// to be send. Because that data got send piece by piece after
|
||||
// each node which finished executing
|
||||
const pushRunData = {
|
||||
...fullRunData,
|
||||
data: {
|
||||
...fullRunData.data,
|
||||
resultData: {
|
||||
...fullRunData.data.resultData,
|
||||
runData: {},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
// Push data to editor-ui once workflow finished
|
||||
const sendData: IPushDataExecutionFinished = {
|
||||
executionId,
|
||||
data: pushRunData,
|
||||
};
|
||||
|
||||
pushInstance.send(sessionId, 'executionFinished', sendData);
|
||||
}
|
||||
|
||||
const workflowSavePromise = WorkflowHelpers.saveStaticData(workflowInstance);
|
||||
|
||||
let saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean;
|
||||
@@ -132,6 +158,7 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowI
|
||||
await workflowSavePromise;
|
||||
}
|
||||
|
||||
pushExecutionFinished(fullRunData, executionId);
|
||||
executeErrorWorkflow(workflowData, fullRunData, mode);
|
||||
return;
|
||||
}
|
||||
@@ -148,6 +175,7 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowI
|
||||
if (workflowDidSucceed === true && saveDataSuccessExecution === 'none' ||
|
||||
workflowDidSucceed === false && saveDataErrorExecution === 'none'
|
||||
) {
|
||||
pushExecutionFinished(fullRunData, executionId);
|
||||
executeErrorWorkflow(workflowData, fullRunData, mode);
|
||||
return;
|
||||
}
|
||||
@@ -185,8 +213,10 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowI
|
||||
await workflowSavePromise;
|
||||
}
|
||||
|
||||
pushExecutionFinished(fullRunData, executionId, executionResult.id as string);
|
||||
executeErrorWorkflow(workflowData, fullRunData, mode, executionResult ? executionResult.id as string : undefined);
|
||||
} catch (error) {
|
||||
pushExecutionFinished(fullRunData, executionId);
|
||||
executeErrorWorkflow(workflowData, fullRunData, mode);
|
||||
}
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user