refactor(core): Use DI for eventBus code - Part 1 (no-changelog) (#8434)
This commit is contained in:
committed by
GitHub
parent
3cc0f81c02
commit
7c49004018
@@ -1,7 +1,18 @@
|
||||
import { jsonParse } from 'n8n-workflow';
|
||||
import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
|
||||
import { Service } from 'typedi';
|
||||
import type { DeleteResult } from 'typeorm';
|
||||
import { In } from 'typeorm';
|
||||
import EventEmitter from 'events';
|
||||
import uniqby from 'lodash/uniqBy';
|
||||
import { jsonParse } from 'n8n-workflow';
|
||||
import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
|
||||
|
||||
import config from '@/config';
|
||||
import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository';
|
||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
||||
import { OrchestrationService } from '@/services/orchestration.service';
|
||||
import { Logger } from '@/Logger';
|
||||
|
||||
import type {
|
||||
EventMessageTypes,
|
||||
EventNamesTypes,
|
||||
@@ -9,10 +20,7 @@ import type {
|
||||
} from '../EventMessageClasses/';
|
||||
import type { MessageEventBusDestination } from '../MessageEventBusDestination/MessageEventBusDestination.ee';
|
||||
import { MessageEventBusLogWriter } from '../MessageEventBusWriter/MessageEventBusLogWriter';
|
||||
import EventEmitter from 'events';
|
||||
import config from '@/config';
|
||||
import { messageEventBusDestinationFromDb } from '../MessageEventBusDestination/MessageEventBusDestinationFromDb';
|
||||
import uniqby from 'lodash/uniqBy';
|
||||
import type { EventMessageConfirmSource } from '../EventMessageClasses/EventMessageConfirm';
|
||||
import type { EventMessageAuditOptions } from '../EventMessageClasses/EventMessageAudit';
|
||||
import { EventMessageAudit } from '../EventMessageClasses/EventMessageAudit';
|
||||
@@ -25,16 +33,10 @@ import {
|
||||
EventMessageGeneric,
|
||||
eventMessageGenericDestinationTestEvent,
|
||||
} from '../EventMessageClasses/EventMessageGeneric';
|
||||
import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
|
||||
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
|
||||
import { Container, Service } from 'typedi';
|
||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
||||
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
|
||||
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
|
||||
import { OrchestrationService } from '@/services/orchestration.service';
|
||||
import { Logger } from '@/Logger';
|
||||
import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository';
|
||||
import { ExecutionDataRecoveryService } from '../executionDataRecovery.service';
|
||||
|
||||
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
|
||||
|
||||
@@ -50,7 +52,7 @@ export interface MessageEventBusInitializeOptions {
|
||||
|
||||
@Service()
|
||||
export class MessageEventBus extends EventEmitter {
|
||||
isInitialized: boolean;
|
||||
private isInitialized = false;
|
||||
|
||||
logWriter: MessageEventBusLogWriter;
|
||||
|
||||
@@ -60,9 +62,15 @@ export class MessageEventBus extends EventEmitter {
|
||||
|
||||
private pushIntervalTimer: NodeJS.Timer;
|
||||
|
||||
constructor(private readonly logger: Logger) {
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly executionRepository: ExecutionRepository,
|
||||
private readonly eventDestinationsRepository: EventDestinationsRepository,
|
||||
private readonly workflowRepository: WorkflowRepository,
|
||||
private readonly orchestrationService: OrchestrationService,
|
||||
private readonly recoveryService: ExecutionDataRecoveryService,
|
||||
) {
|
||||
super();
|
||||
this.isInitialized = false;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -80,7 +88,7 @@ export class MessageEventBus extends EventEmitter {
|
||||
|
||||
this.logger.debug('Initializing event bus...');
|
||||
|
||||
const savedEventDestinations = await Container.get(EventDestinationsRepository).find({});
|
||||
const savedEventDestinations = await this.eventDestinationsRepository.find({});
|
||||
if (savedEventDestinations.length > 0) {
|
||||
for (const destinationData of savedEventDestinations) {
|
||||
try {
|
||||
@@ -132,7 +140,7 @@ export class MessageEventBus extends EventEmitter {
|
||||
// crashing, so we can't just mark them as crashed
|
||||
if (config.get('executions.mode') !== 'queue') {
|
||||
const dbUnfinishedExecutionIds = (
|
||||
await Container.get(ExecutionRepository).find({
|
||||
await this.executionRepository.find({
|
||||
where: {
|
||||
status: In(['running', 'new', 'unknown']),
|
||||
},
|
||||
@@ -147,7 +155,7 @@ export class MessageEventBus extends EventEmitter {
|
||||
if (unfinishedExecutionIds.length > 0) {
|
||||
this.logger.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`);
|
||||
this.logger.info('This could be due to a crash of an active workflow or a restart of n8n.');
|
||||
const activeWorkflows = await Container.get(WorkflowRepository).find({
|
||||
const activeWorkflows = await this.workflowRepository.find({
|
||||
where: { active: true },
|
||||
select: ['id', 'name'],
|
||||
});
|
||||
@@ -159,7 +167,7 @@ export class MessageEventBus extends EventEmitter {
|
||||
}
|
||||
const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning();
|
||||
if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') {
|
||||
await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds);
|
||||
await this.executionRepository.markAsCrashed(unfinishedExecutionIds);
|
||||
// if we end up here, it means that the previous recovery process did not finish
|
||||
// a possible reason would be that recreating the workflow data itself caused e.g an OOM error
|
||||
// in that case, we do not want to retry the recovery process, but rather mark the executions as crashed
|
||||
@@ -174,9 +182,9 @@ export class MessageEventBus extends EventEmitter {
|
||||
this.logger.debug(
|
||||
`No event messages found, marking execution ${executionId} as 'crashed'`,
|
||||
);
|
||||
await Container.get(ExecutionRepository).markAsCrashed([executionId]);
|
||||
await this.executionRepository.markAsCrashed([executionId]);
|
||||
} else {
|
||||
await recoverExecutionDataFromEventLogMessages(
|
||||
await this.recoveryService.recoverExecutionData(
|
||||
executionId,
|
||||
unsentAndUnfinished.unfinishedExecutions[executionId],
|
||||
true,
|
||||
@@ -207,7 +215,7 @@ export class MessageEventBus extends EventEmitter {
|
||||
this.destinations[destination.getId()] = destination;
|
||||
this.destinations[destination.getId()].startListening();
|
||||
if (notifyWorkers) {
|
||||
await Container.get(OrchestrationService).publish('restartEventBus');
|
||||
await this.orchestrationService.publish('restartEventBus');
|
||||
}
|
||||
return destination;
|
||||
}
|
||||
@@ -233,7 +241,7 @@ export class MessageEventBus extends EventEmitter {
|
||||
delete this.destinations[id];
|
||||
}
|
||||
if (notifyWorkers) {
|
||||
await Container.get(OrchestrationService).publish('restartEventBus');
|
||||
await this.orchestrationService.publish('restartEventBus');
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@@ -243,7 +251,7 @@ export class MessageEventBus extends EventEmitter {
|
||||
if (eventData) {
|
||||
const eventMessage = getEventMessageObjectByType(eventData);
|
||||
if (eventMessage) {
|
||||
await Container.get(MessageEventBus).send(eventMessage);
|
||||
await this.send(eventMessage);
|
||||
}
|
||||
}
|
||||
return eventData;
|
||||
@@ -370,7 +378,7 @@ export class MessageEventBus extends EventEmitter {
|
||||
.slice(-amount);
|
||||
|
||||
for (const execution of filteredExecutionIds) {
|
||||
const data = await recoverExecutionDataFromEventLogMessages(
|
||||
const data = await this.recoveryService.recoverExecutionData(
|
||||
execution.executionId,
|
||||
queryResult,
|
||||
false,
|
||||
@@ -450,5 +458,3 @@ export class MessageEventBus extends EventEmitter {
|
||||
await this.send(new EventMessageNode(options));
|
||||
}
|
||||
}
|
||||
|
||||
export const eventBus = Container.get(MessageEventBus);
|
||||
|
||||
@@ -1,205 +0,0 @@
|
||||
import type { IRun, IRunExecutionData, ITaskData } from 'n8n-workflow';
|
||||
import { NodeOperationError, WorkflowOperationError } from 'n8n-workflow';
|
||||
import type { EventMessageTypes, EventNamesTypes } from '../EventMessageClasses';
|
||||
import type { DateTime } from 'luxon';
|
||||
import { Push } from '@/push';
|
||||
import { Container } from 'typedi';
|
||||
import { InternalHooks } from '@/InternalHooks';
|
||||
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData';
|
||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||
|
||||
export async function recoverExecutionDataFromEventLogMessages(
|
||||
executionId: string,
|
||||
messages: EventMessageTypes[],
|
||||
applyToDb: boolean,
|
||||
): Promise<IRunExecutionData | undefined> {
|
||||
const executionEntry = await Container.get(ExecutionRepository).findSingleExecution(executionId, {
|
||||
includeData: true,
|
||||
unflattenData: true,
|
||||
});
|
||||
|
||||
if (executionEntry && messages) {
|
||||
let executionData = executionEntry.data;
|
||||
let workflowError: WorkflowOperationError | undefined;
|
||||
if (!executionData) {
|
||||
executionData = { resultData: { runData: {} } };
|
||||
}
|
||||
let nodeNames: string[] = [];
|
||||
if (
|
||||
executionData?.resultData?.runData &&
|
||||
Object.keys(executionData.resultData.runData).length > 0
|
||||
) {
|
||||
} else {
|
||||
if (!executionData.resultData) {
|
||||
executionData.resultData = {
|
||||
runData: {},
|
||||
};
|
||||
} else {
|
||||
if (!executionData.resultData.runData) {
|
||||
executionData.resultData.runData = {};
|
||||
}
|
||||
}
|
||||
}
|
||||
nodeNames = executionEntry.workflowData.nodes.map((n) => n.name);
|
||||
|
||||
let lastNodeRunTimestamp: DateTime | undefined = undefined;
|
||||
|
||||
for (const nodeName of nodeNames) {
|
||||
const nodeByName = executionEntry?.workflowData.nodes.find((n) => n.name === nodeName);
|
||||
|
||||
if (!nodeByName) continue;
|
||||
|
||||
const nodeStartedMessage = messages.find(
|
||||
(message) =>
|
||||
message.eventName === 'n8n.node.started' && message.payload.nodeName === nodeName,
|
||||
);
|
||||
const nodeFinishedMessage = messages.find(
|
||||
(message) =>
|
||||
message.eventName === 'n8n.node.finished' && message.payload.nodeName === nodeName,
|
||||
);
|
||||
|
||||
const executionTime =
|
||||
nodeStartedMessage && nodeFinishedMessage
|
||||
? nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis()
|
||||
: 0;
|
||||
|
||||
let taskData: ITaskData;
|
||||
if (executionData.resultData.runData[nodeName]?.length > 0) {
|
||||
taskData = executionData.resultData.runData[nodeName][0];
|
||||
} else {
|
||||
taskData = {
|
||||
startTime: nodeStartedMessage ? nodeStartedMessage.ts.toUnixInteger() : 0,
|
||||
executionTime,
|
||||
source: [null],
|
||||
executionStatus: 'unknown',
|
||||
};
|
||||
}
|
||||
|
||||
if (nodeStartedMessage && !nodeFinishedMessage) {
|
||||
const nodeError = new NodeOperationError(
|
||||
nodeByName,
|
||||
'Node crashed, possible out-of-memory issue',
|
||||
{
|
||||
message: 'Execution stopped at this node',
|
||||
description:
|
||||
"n8n may have run out of memory while executing it. More context and tips on how to avoid this <a href='https://docs.n8n.io/flow-logic/error-handling/memory-errors' target='_blank'>in the docs</a>",
|
||||
},
|
||||
);
|
||||
workflowError = new WorkflowOperationError(
|
||||
'Workflow did not finish, possible out-of-memory issue',
|
||||
);
|
||||
taskData.error = nodeError;
|
||||
taskData.executionStatus = 'crashed';
|
||||
executionData.resultData.lastNodeExecuted = nodeName;
|
||||
if (nodeStartedMessage) lastNodeRunTimestamp = nodeStartedMessage.ts;
|
||||
} else if (nodeStartedMessage && nodeFinishedMessage) {
|
||||
taskData.executionStatus = 'success';
|
||||
if (taskData.data === undefined) {
|
||||
taskData.data = {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: {
|
||||
isArtificialRecoveredEventItem: true,
|
||||
},
|
||||
pairedItem: undefined,
|
||||
},
|
||||
],
|
||||
],
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (!executionData.resultData.runData[nodeName]) {
|
||||
executionData.resultData.runData[nodeName] = [taskData];
|
||||
}
|
||||
}
|
||||
|
||||
if (!lastNodeRunTimestamp) {
|
||||
const workflowEndedMessage = messages.find((message) =>
|
||||
(
|
||||
[
|
||||
'n8n.workflow.success',
|
||||
'n8n.workflow.crashed',
|
||||
'n8n.workflow.failed',
|
||||
] as EventNamesTypes[]
|
||||
).includes(message.eventName),
|
||||
);
|
||||
if (workflowEndedMessage) {
|
||||
lastNodeRunTimestamp = workflowEndedMessage.ts;
|
||||
} else {
|
||||
if (!workflowError) {
|
||||
workflowError = new WorkflowOperationError(
|
||||
'Workflow did not finish, possible out-of-memory issue',
|
||||
);
|
||||
}
|
||||
const workflowStartedMessage = messages.find(
|
||||
(message) => message.eventName === 'n8n.workflow.started',
|
||||
);
|
||||
if (workflowStartedMessage) {
|
||||
lastNodeRunTimestamp = workflowStartedMessage.ts;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!executionData.resultData.error && workflowError) {
|
||||
executionData.resultData.error = workflowError;
|
||||
}
|
||||
|
||||
if (applyToDb) {
|
||||
const newStatus = executionEntry.status === 'failed' ? 'failed' : 'crashed';
|
||||
await Container.get(ExecutionRepository).updateExistingExecution(executionId, {
|
||||
data: executionData,
|
||||
status: newStatus,
|
||||
stoppedAt: lastNodeRunTimestamp?.toJSDate(),
|
||||
});
|
||||
await Container.get(InternalHooks).onWorkflowPostExecute(
|
||||
executionId,
|
||||
executionEntry.workflowData,
|
||||
{
|
||||
data: executionData,
|
||||
finished: false,
|
||||
mode: executionEntry.mode,
|
||||
waitTill: executionEntry.waitTill ?? undefined,
|
||||
startedAt: executionEntry.startedAt,
|
||||
stoppedAt: lastNodeRunTimestamp?.toJSDate(),
|
||||
status: newStatus,
|
||||
},
|
||||
);
|
||||
const iRunData: IRun = {
|
||||
data: executionData,
|
||||
finished: false,
|
||||
mode: executionEntry.mode,
|
||||
waitTill: executionEntry.waitTill ?? undefined,
|
||||
startedAt: executionEntry.startedAt,
|
||||
stoppedAt: lastNodeRunTimestamp?.toJSDate(),
|
||||
status: newStatus,
|
||||
};
|
||||
const workflowHooks = getWorkflowHooksMain(
|
||||
{
|
||||
userId: '',
|
||||
workflowData: executionEntry.workflowData,
|
||||
executionMode: executionEntry.mode,
|
||||
executionData,
|
||||
runData: executionData.resultData.runData,
|
||||
retryOf: executionEntry.retryOf,
|
||||
},
|
||||
executionId,
|
||||
);
|
||||
|
||||
// execute workflowExecuteAfter hook to trigger error workflow
|
||||
await workflowHooks.executeHookFunctions('workflowExecuteAfter', [iRunData]);
|
||||
|
||||
const push = Container.get(Push);
|
||||
// wait for UI to be back up and send the execution data
|
||||
push.once('editorUiConnected', function handleUiBackUp() {
|
||||
// add a small timeout to make sure the UI is back up
|
||||
setTimeout(() => {
|
||||
push.broadcast('executionRecovered', { executionId });
|
||||
}, 1000);
|
||||
});
|
||||
}
|
||||
return executionData;
|
||||
}
|
||||
return;
|
||||
}
|
||||
Reference in New Issue
Block a user