refactor(core): Inject dependencies into workflow services (no-changelog) (#8066)
Inject dependencies into workflow services (no-changelog) Up next: - ~~Make workflow services injectable~~ #8033 - ~~Inject dependencies into workflow services~~ (current) - Consolidate workflow controllers into one - Make workflow controller injectable - Inject dependencies into workflow controller
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import { Container, Service } from 'typedi';
|
||||
import Container, { Service } from 'typedi';
|
||||
import type { IDataObject, INode, IPinData } from 'n8n-workflow';
|
||||
import { NodeApiError, ErrorReporterProxy as ErrorReporter, Workflow } from 'n8n-workflow';
|
||||
import type { FindManyOptions, FindOptionsSelect, FindOptionsWhere, UpdateResult } from 'typeorm';
|
||||
@@ -43,6 +43,23 @@ export type WorkflowsGetSharedOptions =
|
||||
|
||||
@Service()
|
||||
export class WorkflowService {
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly executionRepository: ExecutionRepository,
|
||||
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
|
||||
private readonly workflowRepository: WorkflowRepository,
|
||||
private readonly workflowTagMappingRepository: WorkflowTagMappingRepository,
|
||||
private readonly binaryDataService: BinaryDataService,
|
||||
private readonly ownershipService: OwnershipService,
|
||||
private readonly tagService: TagService,
|
||||
private readonly workflowHistoryService: WorkflowHistoryService,
|
||||
private readonly multiMainSetup: MultiMainSetup,
|
||||
private readonly nodeTypes: NodeTypes,
|
||||
private readonly testWebhooks: TestWebhooks,
|
||||
private readonly externalHooks: ExternalHooks,
|
||||
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
|
||||
) {}
|
||||
|
||||
async getSharing(
|
||||
user: User,
|
||||
workflowId: string,
|
||||
@@ -58,7 +75,7 @@ export class WorkflowService {
|
||||
where.userId = user.id;
|
||||
}
|
||||
|
||||
return Container.get(SharedWorkflowRepository).findOne({ where, relations });
|
||||
return this.sharedWorkflowRepository.findOne({ where, relations });
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -89,7 +106,7 @@ export class WorkflowService {
|
||||
nodes: workflow.nodes,
|
||||
connections: workflow.connections,
|
||||
active: workflow.active,
|
||||
nodeTypes: Container.get(NodeTypes),
|
||||
nodeTypes: this.nodeTypes,
|
||||
}).getParentNodes(startNodeName);
|
||||
|
||||
let checkNodeName = '';
|
||||
@@ -104,7 +121,7 @@ export class WorkflowService {
|
||||
}
|
||||
|
||||
async get(workflow: FindOptionsWhere<WorkflowEntity>, options?: { relations: string[] }) {
|
||||
return Container.get(WorkflowRepository).findOne({
|
||||
return this.workflowRepository.findOne({
|
||||
where: workflow,
|
||||
relations: options?.relations,
|
||||
});
|
||||
@@ -175,15 +192,14 @@ export class WorkflowService {
|
||||
findManyOptions.take = options.take;
|
||||
}
|
||||
|
||||
const [workflows, count] = (await Container.get(WorkflowRepository).findAndCount(
|
||||
findManyOptions,
|
||||
)) as [ListQuery.Workflow.Plain[] | ListQuery.Workflow.WithSharing[], number];
|
||||
const [workflows, count] = (await this.workflowRepository.findAndCount(findManyOptions)) as [
|
||||
ListQuery.Workflow.Plain[] | ListQuery.Workflow.WithSharing[],
|
||||
number,
|
||||
];
|
||||
|
||||
return hasSharing(workflows)
|
||||
? {
|
||||
workflows: workflows.map((w) =>
|
||||
Container.get(OwnershipService).addOwnedByAndSharedWith(w),
|
||||
),
|
||||
workflows: workflows.map((w) => this.ownershipService.addOwnedByAndSharedWith(w)),
|
||||
count,
|
||||
}
|
||||
: { workflows, count };
|
||||
@@ -197,7 +213,7 @@ export class WorkflowService {
|
||||
forceSave?: boolean,
|
||||
roles?: string[],
|
||||
): Promise<WorkflowEntity> {
|
||||
const shared = await Container.get(SharedWorkflowRepository).findOne({
|
||||
const shared = await this.sharedWorkflowRepository.findOne({
|
||||
relations: ['workflow', 'role'],
|
||||
where: await whereClause({
|
||||
user,
|
||||
@@ -208,9 +224,8 @@ export class WorkflowService {
|
||||
}),
|
||||
});
|
||||
|
||||
const logger = Container.get(Logger);
|
||||
if (!shared) {
|
||||
logger.verbose('User attempted to update a workflow without permissions', {
|
||||
this.logger.verbose('User attempted to update a workflow without permissions', {
|
||||
workflowId,
|
||||
userId: user.id,
|
||||
});
|
||||
@@ -236,7 +251,7 @@ export class WorkflowService {
|
||||
// Update the workflow's version when changing properties such as
|
||||
// `name`, `pinData`, `nodes`, `connections`, `settings` or `tags`
|
||||
workflow.versionId = uuid();
|
||||
logger.verbose(
|
||||
this.logger.verbose(
|
||||
`Updating versionId for workflow ${workflowId} for user ${user.id} after saving`,
|
||||
{
|
||||
previousVersionId: shared.workflow.versionId,
|
||||
@@ -250,7 +265,7 @@ export class WorkflowService {
|
||||
|
||||
WorkflowHelpers.addNodeIds(workflow);
|
||||
|
||||
await Container.get(ExternalHooks).run('workflow.update', [workflow]);
|
||||
await this.externalHooks.run('workflow.update', [workflow]);
|
||||
|
||||
/**
|
||||
* If the workflow being updated is stored as `active`, remove it from
|
||||
@@ -260,7 +275,7 @@ export class WorkflowService {
|
||||
* will take effect only on removing and re-adding.
|
||||
*/
|
||||
if (shared.workflow.active) {
|
||||
await Container.get(ActiveWorkflowRunner).remove(workflowId);
|
||||
await this.activeWorkflowRunner.remove(workflowId);
|
||||
}
|
||||
|
||||
const workflowSettings = workflow.settings ?? {};
|
||||
@@ -289,7 +304,7 @@ export class WorkflowService {
|
||||
await validateEntity(workflow);
|
||||
}
|
||||
|
||||
await Container.get(WorkflowRepository).update(
|
||||
await this.workflowRepository.update(
|
||||
workflowId,
|
||||
pick(workflow, [
|
||||
'name',
|
||||
@@ -304,21 +319,21 @@ export class WorkflowService {
|
||||
);
|
||||
|
||||
if (tagIds && !config.getEnv('workflowTagsDisabled')) {
|
||||
await Container.get(WorkflowTagMappingRepository).delete({ workflowId });
|
||||
await Container.get(WorkflowTagMappingRepository).insert(
|
||||
await this.workflowTagMappingRepository.delete({ workflowId });
|
||||
await this.workflowTagMappingRepository.insert(
|
||||
tagIds.map((tagId) => ({ tagId, workflowId })),
|
||||
);
|
||||
}
|
||||
|
||||
if (workflow.versionId !== shared.workflow.versionId) {
|
||||
await Container.get(WorkflowHistoryService).saveVersion(user, workflow, workflowId);
|
||||
await this.workflowHistoryService.saveVersion(user, workflow, workflowId);
|
||||
}
|
||||
|
||||
const relations = config.getEnv('workflowTagsDisabled') ? [] : ['tags'];
|
||||
|
||||
// We sadly get nothing back from "update". Neither if it updated a record
|
||||
// nor the new value. So query now the hopefully updated entry.
|
||||
const updatedWorkflow = await Container.get(WorkflowRepository).findOne({
|
||||
const updatedWorkflow = await this.workflowRepository.findOne({
|
||||
where: { id: workflowId },
|
||||
relations,
|
||||
});
|
||||
@@ -330,26 +345,26 @@ export class WorkflowService {
|
||||
}
|
||||
|
||||
if (updatedWorkflow.tags?.length && tagIds?.length) {
|
||||
updatedWorkflow.tags = Container.get(TagService).sortByRequestOrder(updatedWorkflow.tags, {
|
||||
updatedWorkflow.tags = this.tagService.sortByRequestOrder(updatedWorkflow.tags, {
|
||||
requestOrder: tagIds,
|
||||
});
|
||||
}
|
||||
|
||||
await Container.get(ExternalHooks).run('workflow.afterUpdate', [updatedWorkflow]);
|
||||
await this.externalHooks.run('workflow.afterUpdate', [updatedWorkflow]);
|
||||
void Container.get(InternalHooks).onWorkflowSaved(user, updatedWorkflow, false);
|
||||
|
||||
if (updatedWorkflow.active) {
|
||||
// When the workflow is supposed to be active add it again
|
||||
try {
|
||||
await Container.get(ExternalHooks).run('workflow.activate', [updatedWorkflow]);
|
||||
await Container.get(ActiveWorkflowRunner).add(
|
||||
await this.externalHooks.run('workflow.activate', [updatedWorkflow]);
|
||||
await this.activeWorkflowRunner.add(
|
||||
workflowId,
|
||||
shared.workflow.active ? 'update' : 'activate',
|
||||
);
|
||||
} catch (error) {
|
||||
// If workflow could not be activated set it again to inactive
|
||||
// and revert the versionId change so UI remains consistent
|
||||
await Container.get(WorkflowRepository).update(workflowId, {
|
||||
await this.workflowRepository.update(workflowId, {
|
||||
active: false,
|
||||
versionId: shared.workflow.versionId,
|
||||
});
|
||||
@@ -366,12 +381,10 @@ export class WorkflowService {
|
||||
}
|
||||
}
|
||||
|
||||
const multiMainSetup = Container.get(MultiMainSetup);
|
||||
await this.multiMainSetup.init();
|
||||
|
||||
await multiMainSetup.init();
|
||||
|
||||
if (multiMainSetup.isEnabled) {
|
||||
await Container.get(MultiMainSetup).broadcastWorkflowActiveStateChanged({
|
||||
if (this.multiMainSetup.isEnabled) {
|
||||
await this.multiMainSetup.broadcastWorkflowActiveStateChanged({
|
||||
workflowId,
|
||||
oldState,
|
||||
newState: updatedWorkflow.active,
|
||||
@@ -412,14 +425,14 @@ export class WorkflowService {
|
||||
nodes: workflowData.nodes,
|
||||
connections: workflowData.connections,
|
||||
active: false,
|
||||
nodeTypes: Container.get(NodeTypes),
|
||||
nodeTypes: this.nodeTypes,
|
||||
staticData: undefined,
|
||||
settings: workflowData.settings,
|
||||
});
|
||||
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase(user.id);
|
||||
|
||||
const needsWebhook = await Container.get(TestWebhooks).needsWebhookData(
|
||||
const needsWebhook = await this.testWebhooks.needsWebhookData(
|
||||
workflowData,
|
||||
workflow,
|
||||
additionalData,
|
||||
@@ -465,9 +478,9 @@ export class WorkflowService {
|
||||
}
|
||||
|
||||
async delete(user: User, workflowId: string): Promise<WorkflowEntity | undefined> {
|
||||
await Container.get(ExternalHooks).run('workflow.delete', [workflowId]);
|
||||
await this.externalHooks.run('workflow.delete', [workflowId]);
|
||||
|
||||
const sharedWorkflow = await Container.get(SharedWorkflowRepository).findOne({
|
||||
const sharedWorkflow = await this.sharedWorkflowRepository.findOne({
|
||||
relations: ['workflow', 'role'],
|
||||
where: await whereClause({
|
||||
user,
|
||||
@@ -484,27 +497,27 @@ export class WorkflowService {
|
||||
|
||||
if (sharedWorkflow.workflow.active) {
|
||||
// deactivate before deleting
|
||||
await Container.get(ActiveWorkflowRunner).remove(workflowId);
|
||||
await this.activeWorkflowRunner.remove(workflowId);
|
||||
}
|
||||
|
||||
const idsForDeletion = await Container.get(ExecutionRepository)
|
||||
const idsForDeletion = await this.executionRepository
|
||||
.find({
|
||||
select: ['id'],
|
||||
where: { workflowId },
|
||||
})
|
||||
.then((rows) => rows.map(({ id: executionId }) => ({ workflowId, executionId })));
|
||||
|
||||
await Container.get(WorkflowRepository).delete(workflowId);
|
||||
await Container.get(BinaryDataService).deleteMany(idsForDeletion);
|
||||
await this.workflowRepository.delete(workflowId);
|
||||
await this.binaryDataService.deleteMany(idsForDeletion);
|
||||
|
||||
void Container.get(InternalHooks).onWorkflowDeleted(user, workflowId, false);
|
||||
await Container.get(ExternalHooks).run('workflow.afterDelete', [workflowId]);
|
||||
await this.externalHooks.run('workflow.afterDelete', [workflowId]);
|
||||
|
||||
return sharedWorkflow.workflow;
|
||||
}
|
||||
|
||||
async updateWorkflowTriggerCount(id: string, triggerCount: number): Promise<UpdateResult> {
|
||||
const qb = Container.get(WorkflowRepository).createQueryBuilder('workflow');
|
||||
const qb = this.workflowRepository.createQueryBuilder('workflow');
|
||||
return qb
|
||||
.update()
|
||||
.set({
|
||||
@@ -533,7 +546,7 @@ export class WorkflowService {
|
||||
workflow.staticData.__dataChanged = false;
|
||||
} catch (error) {
|
||||
ErrorReporter.error(error);
|
||||
Container.get(Logger).error(
|
||||
this.logger.error(
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||
`There was a problem saving the workflow with id "${workflow.id}" to save changed Data: "${error.message}"`,
|
||||
{ workflowId: workflow.id },
|
||||
@@ -550,7 +563,7 @@ export class WorkflowService {
|
||||
* @param {IDataObject} newStaticData The static data to save
|
||||
*/
|
||||
async saveStaticDataById(workflowId: string, newStaticData: IDataObject): Promise<void> {
|
||||
await Container.get(WorkflowRepository).update(workflowId, {
|
||||
await this.workflowRepository.update(workflowId, {
|
||||
staticData: newStaticData,
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user