refactor(core): Bring active executions into executions controller (no-changelog) (#8371)
This commit is contained in:
134
packages/cli/src/executions/active-execution.service.ts
Normal file
134
packages/cli/src/executions/active-execution.service.ts
Normal file
@@ -0,0 +1,134 @@
|
||||
import { Service } from 'typedi';
|
||||
import { ActiveExecutions } from '@/ActiveExecutions';
|
||||
import { Logger } from '@/Logger';
|
||||
import { Queue } from '@/Queue';
|
||||
import { WaitTracker } from '@/WaitTracker';
|
||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||
import { getStatusUsingPreviousExecutionStatusMethod } from '@/executions/executionHelpers';
|
||||
import config from '@/config';
|
||||
|
||||
import type { ExecutionSummary } from 'n8n-workflow';
|
||||
import type { IExecutionBase, IExecutionsCurrentSummary } from '@/Interfaces';
|
||||
import type { GetManyActiveFilter } from './execution.types';
|
||||
|
||||
@Service()
|
||||
export class ActiveExecutionService {
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly queue: Queue,
|
||||
private readonly activeExecutions: ActiveExecutions,
|
||||
private readonly executionRepository: ExecutionRepository,
|
||||
private readonly waitTracker: WaitTracker,
|
||||
) {}
|
||||
|
||||
private readonly isRegularMode = config.getEnv('executions.mode') === 'regular';
|
||||
|
||||
async findOne(executionId: string, accessibleWorkflowIds: string[]) {
|
||||
return await this.executionRepository.findIfAccessible(executionId, accessibleWorkflowIds);
|
||||
}
|
||||
|
||||
private toSummary(execution: IExecutionsCurrentSummary | IExecutionBase): ExecutionSummary {
|
||||
return {
|
||||
id: execution.id,
|
||||
workflowId: execution.workflowId ?? '',
|
||||
mode: execution.mode,
|
||||
retryOf: execution.retryOf !== null ? execution.retryOf : undefined,
|
||||
startedAt: new Date(execution.startedAt),
|
||||
status: execution.status,
|
||||
stoppedAt: 'stoppedAt' in execution ? execution.stoppedAt : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
// regular mode
|
||||
// ----------------------------------
|
||||
|
||||
async findManyInRegularMode(
|
||||
filter: GetManyActiveFilter,
|
||||
accessibleWorkflowIds: string[],
|
||||
): Promise<ExecutionSummary[]> {
|
||||
return this.activeExecutions
|
||||
.getActiveExecutions()
|
||||
.filter(({ workflowId }) => {
|
||||
if (filter.workflowId && filter.workflowId !== workflowId) return false;
|
||||
if (workflowId && !accessibleWorkflowIds.includes(workflowId)) return false;
|
||||
return true;
|
||||
})
|
||||
.map((execution) => this.toSummary(execution))
|
||||
.sort((a, b) => Number(b.id) - Number(a.id));
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
// queue mode
|
||||
// ----------------------------------
|
||||
|
||||
async findManyInQueueMode(filter: GetManyActiveFilter, accessibleWorkflowIds: string[]) {
|
||||
const activeManualExecutionIds = this.activeExecutions
|
||||
.getActiveExecutions()
|
||||
.map((execution) => execution.id);
|
||||
|
||||
const activeJobs = await this.queue.getJobs(['active', 'waiting']);
|
||||
|
||||
const activeProductionExecutionIds = activeJobs.map((job) => job.data.executionId);
|
||||
|
||||
const activeExecutionIds = activeProductionExecutionIds.concat(activeManualExecutionIds);
|
||||
|
||||
if (activeExecutionIds.length === 0) return [];
|
||||
|
||||
const activeExecutions = await this.executionRepository.getManyActive(
|
||||
activeExecutionIds,
|
||||
accessibleWorkflowIds,
|
||||
filter,
|
||||
);
|
||||
|
||||
return activeExecutions.map((execution) => {
|
||||
if (!execution.status) {
|
||||
// @tech-debt Status should never be nullish
|
||||
execution.status = getStatusUsingPreviousExecutionStatusMethod(execution);
|
||||
}
|
||||
|
||||
return this.toSummary(execution);
|
||||
});
|
||||
}
|
||||
|
||||
async stop(execution: IExecutionBase) {
|
||||
const result = await this.activeExecutions.stopExecution(execution.id);
|
||||
|
||||
if (result) {
|
||||
return {
|
||||
mode: result.mode,
|
||||
startedAt: new Date(result.startedAt),
|
||||
stoppedAt: result.stoppedAt ? new Date(result.stoppedAt) : undefined,
|
||||
finished: result.finished,
|
||||
status: result.status,
|
||||
};
|
||||
}
|
||||
|
||||
if (!this.isRegularMode) return await this.waitTracker.stopExecution(execution.id);
|
||||
|
||||
// queue mode
|
||||
|
||||
try {
|
||||
return await this.waitTracker.stopExecution(execution.id);
|
||||
} catch {}
|
||||
|
||||
const activeJobs = await this.queue.getJobs(['active', 'waiting']);
|
||||
const job = activeJobs.find(({ data }) => data.executionId === execution.id);
|
||||
|
||||
if (!job) {
|
||||
this.logger.debug('Could not stop job because it is no longer in queue', {
|
||||
jobId: execution.id,
|
||||
});
|
||||
} else {
|
||||
await this.queue.stopJob(job);
|
||||
}
|
||||
|
||||
return {
|
||||
mode: execution.mode,
|
||||
startedAt: new Date(execution.startedAt),
|
||||
stoppedAt: execution.stoppedAt ? new Date(execution.stoppedAt) : undefined,
|
||||
finished: execution.finished,
|
||||
status: execution.status,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
import type { IExecutionDeleteFilter } from '@/Interfaces';
|
||||
import type { AuthenticatedRequest } from '@/requests';
|
||||
|
||||
export declare namespace ExecutionRequest {
|
||||
namespace QueryParam {
|
||||
type GetAll = {
|
||||
filter: string; // '{ waitTill: string; finished: boolean, [other: string]: string }'
|
||||
limit: string;
|
||||
lastId: string;
|
||||
firstId: string;
|
||||
};
|
||||
|
||||
type GetAllCurrent = {
|
||||
filter: string; // '{ workflowId: string }'
|
||||
};
|
||||
}
|
||||
|
||||
type GetAll = AuthenticatedRequest<{}, {}, {}, QueryParam.GetAll>;
|
||||
|
||||
type Get = AuthenticatedRequest<{ id: string }, {}, {}, { unflattedResponse: 'true' | 'false' }>;
|
||||
|
||||
type Delete = AuthenticatedRequest<{}, {}, IExecutionDeleteFilter>;
|
||||
|
||||
type Retry = AuthenticatedRequest<{ id: string }, {}, { loadWorkflow: boolean }, {}>;
|
||||
|
||||
type Stop = AuthenticatedRequest<{ id: string }>;
|
||||
|
||||
type GetAllCurrent = AuthenticatedRequest<{}, {}, {}, QueryParam.GetAllCurrent>;
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
import { ExecutionService } from './execution.service';
|
||||
import type { ExecutionRequest } from './execution.request';
|
||||
import type { ExecutionRequest } from './execution.types';
|
||||
import type { IExecutionResponse, IExecutionFlattedResponse } from '@/Interfaces';
|
||||
import { EnterpriseWorkflowService } from '../workflows/workflow.service.ee';
|
||||
import type { WorkflowWithSharingsAndCredentials } from '@/workflows/workflows.types';
|
||||
@@ -14,11 +14,11 @@ export class EnterpriseExecutionsService {
|
||||
private readonly enterpriseWorkflowService: EnterpriseWorkflowService,
|
||||
) {}
|
||||
|
||||
async getExecution(
|
||||
req: ExecutionRequest.Get,
|
||||
async findOne(
|
||||
req: ExecutionRequest.GetOne,
|
||||
sharedWorkflowIds: string[],
|
||||
): Promise<IExecutionResponse | IExecutionFlattedResponse | undefined> {
|
||||
const execution = await this.executionService.getExecution(req, sharedWorkflowIds);
|
||||
const execution = await this.executionService.findOne(req, sharedWorkflowIds);
|
||||
|
||||
if (!execution) return;
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ import type {
|
||||
} from '@/Interfaces';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
import { Queue } from '@/Queue';
|
||||
import type { ExecutionRequest } from './execution.request';
|
||||
import type { ExecutionRequest } from './execution.types';
|
||||
import { WorkflowRunner } from '@/WorkflowRunner';
|
||||
import * as GenericHelpers from '@/GenericHelpers';
|
||||
import { getStatusUsingPreviousExecutionStatusMethod } from './executionHelpers';
|
||||
@@ -78,15 +78,7 @@ export class ExecutionService {
|
||||
private readonly nodeTypes: NodeTypes,
|
||||
) {}
|
||||
|
||||
async getExecutionsList(req: ExecutionRequest.GetAll, sharedWorkflowIds: string[]) {
|
||||
if (sharedWorkflowIds.length === 0) {
|
||||
return {
|
||||
count: 0,
|
||||
estimated: false,
|
||||
results: [],
|
||||
};
|
||||
}
|
||||
|
||||
async findMany(req: ExecutionRequest.GetMany, sharedWorkflowIds: string[]) {
|
||||
// parse incoming filter object and remove non-valid fields
|
||||
let filter: IGetExecutionsQueryFilter | undefined = undefined;
|
||||
if (req.query.filter) {
|
||||
@@ -160,8 +152,8 @@ export class ExecutionService {
|
||||
};
|
||||
}
|
||||
|
||||
async getExecution(
|
||||
req: ExecutionRequest.Get,
|
||||
async findOne(
|
||||
req: ExecutionRequest.GetOne,
|
||||
sharedWorkflowIds: string[],
|
||||
): Promise<IExecutionResponse | IExecutionFlattedResponse | undefined> {
|
||||
if (!sharedWorkflowIds.length) return undefined;
|
||||
@@ -184,9 +176,7 @@ export class ExecutionService {
|
||||
return execution;
|
||||
}
|
||||
|
||||
async retryExecution(req: ExecutionRequest.Retry, sharedWorkflowIds: string[]) {
|
||||
if (!sharedWorkflowIds.length) return false;
|
||||
|
||||
async retry(req: ExecutionRequest.Retry, sharedWorkflowIds: string[]) {
|
||||
const { id: executionId } = req.params;
|
||||
const execution = (await this.executionRepository.findIfShared(
|
||||
executionId,
|
||||
@@ -298,12 +288,7 @@ export class ExecutionService {
|
||||
return !!executionData.finished;
|
||||
}
|
||||
|
||||
async deleteExecutions(req: ExecutionRequest.Delete, sharedWorkflowIds: string[]) {
|
||||
if (sharedWorkflowIds.length === 0) {
|
||||
// return early since without shared workflows there can be no hits
|
||||
// (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners)
|
||||
return;
|
||||
}
|
||||
async delete(req: ExecutionRequest.Delete, sharedWorkflowIds: string[]) {
|
||||
const { deleteBefore, ids, filters: requestFiltersRaw } = req.body;
|
||||
let requestFilters;
|
||||
if (requestFiltersRaw) {
|
||||
|
||||
48
packages/cli/src/executions/execution.types.ts
Normal file
48
packages/cli/src/executions/execution.types.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity';
|
||||
import type { AuthenticatedRequest } from '@/requests';
|
||||
import type { ExecutionStatus, IDataObject } from 'n8n-workflow';
|
||||
|
||||
export declare namespace ExecutionRequest {
|
||||
namespace QueryParams {
|
||||
type GetMany = {
|
||||
filter: string; // '{ waitTill: string; finished: boolean, [other: string]: string }'
|
||||
limit: string;
|
||||
lastId: string;
|
||||
firstId: string;
|
||||
};
|
||||
|
||||
type GetOne = { unflattedResponse: 'true' | 'false' };
|
||||
}
|
||||
|
||||
namespace BodyParams {
|
||||
type DeleteFilter = {
|
||||
deleteBefore?: Date;
|
||||
filters?: IDataObject;
|
||||
ids?: string[];
|
||||
};
|
||||
}
|
||||
|
||||
namespace RouteParams {
|
||||
type ExecutionId = {
|
||||
id: ExecutionEntity['id'];
|
||||
};
|
||||
}
|
||||
|
||||
type GetMany = AuthenticatedRequest<{}, {}, {}, QueryParams.GetMany>;
|
||||
|
||||
type GetOne = AuthenticatedRequest<RouteParams.ExecutionId, {}, {}, QueryParams.GetOne>;
|
||||
|
||||
type Delete = AuthenticatedRequest<{}, {}, BodyParams.DeleteFilter>;
|
||||
|
||||
type Retry = AuthenticatedRequest<RouteParams.ExecutionId, {}, { loadWorkflow: boolean }, {}>;
|
||||
|
||||
type Stop = AuthenticatedRequest<RouteParams.ExecutionId>;
|
||||
|
||||
type GetManyActive = AuthenticatedRequest<{}, {}, {}, { filter?: string }>;
|
||||
}
|
||||
|
||||
export type GetManyActiveFilter = {
|
||||
workflowId?: string;
|
||||
status?: ExecutionStatus;
|
||||
finished?: boolean;
|
||||
};
|
||||
@@ -1,18 +1,26 @@
|
||||
import { ExecutionRequest } from './execution.request';
|
||||
import type { GetManyActiveFilter } from './execution.types';
|
||||
import { ExecutionRequest } from './execution.types';
|
||||
import { ExecutionService } from './execution.service';
|
||||
import { Authorized, Get, Post, RestController } from '@/decorators';
|
||||
import { EnterpriseExecutionsService } from './execution.service.ee';
|
||||
import { isSharingEnabled } from '@/UserManagement/UserManagementHelper';
|
||||
import { WorkflowSharingService } from '@/workflows/workflowSharing.service';
|
||||
import type { User } from '@/databases/entities/User';
|
||||
import config from '@/config';
|
||||
import { jsonParse } from 'n8n-workflow';
|
||||
import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
||||
import { ActiveExecutionService } from './active-execution.service';
|
||||
|
||||
@Authorized()
|
||||
@RestController('/executions')
|
||||
export class ExecutionsController {
|
||||
private readonly isQueueMode = config.getEnv('executions.mode') === 'queue';
|
||||
|
||||
constructor(
|
||||
private readonly executionService: ExecutionService,
|
||||
private readonly enterpriseExecutionService: EnterpriseExecutionsService,
|
||||
private readonly workflowSharingService: WorkflowSharingService,
|
||||
private readonly activeExecutionService: ActiveExecutionService,
|
||||
) {}
|
||||
|
||||
private async getAccessibleWorkflowIds(user: User) {
|
||||
@@ -22,32 +30,64 @@ export class ExecutionsController {
|
||||
}
|
||||
|
||||
@Get('/')
|
||||
async getExecutionsList(req: ExecutionRequest.GetAll) {
|
||||
async getMany(req: ExecutionRequest.GetMany) {
|
||||
const workflowIds = await this.getAccessibleWorkflowIds(req.user);
|
||||
|
||||
return await this.executionService.getExecutionsList(req, workflowIds);
|
||||
if (workflowIds.length === 0) return { count: 0, estimated: false, results: [] };
|
||||
|
||||
return await this.executionService.findMany(req, workflowIds);
|
||||
}
|
||||
|
||||
@Get('/active')
|
||||
async getActive(req: ExecutionRequest.GetManyActive) {
|
||||
const filter = req.query.filter?.length ? jsonParse<GetManyActiveFilter>(req.query.filter) : {};
|
||||
|
||||
const workflowIds = await this.getAccessibleWorkflowIds(req.user);
|
||||
|
||||
return this.isQueueMode
|
||||
? await this.activeExecutionService.findManyInQueueMode(filter, workflowIds)
|
||||
: await this.activeExecutionService.findManyInRegularMode(filter, workflowIds);
|
||||
}
|
||||
|
||||
@Post('/active/:id/stop')
|
||||
async stop(req: ExecutionRequest.Stop) {
|
||||
const workflowIds = await this.getAccessibleWorkflowIds(req.user);
|
||||
|
||||
if (workflowIds.length === 0) throw new NotFoundError('Execution not found');
|
||||
|
||||
const execution = await this.activeExecutionService.findOne(req.params.id, workflowIds);
|
||||
|
||||
if (!execution) throw new NotFoundError('Execution not found');
|
||||
|
||||
return await this.activeExecutionService.stop(execution);
|
||||
}
|
||||
|
||||
@Get('/:id')
|
||||
async getExecution(req: ExecutionRequest.Get) {
|
||||
async getOne(req: ExecutionRequest.GetOne) {
|
||||
const workflowIds = await this.getAccessibleWorkflowIds(req.user);
|
||||
|
||||
if (workflowIds.length === 0) throw new NotFoundError('Execution not found');
|
||||
|
||||
return isSharingEnabled()
|
||||
? await this.enterpriseExecutionService.getExecution(req, workflowIds)
|
||||
: await this.executionService.getExecution(req, workflowIds);
|
||||
? await this.enterpriseExecutionService.findOne(req, workflowIds)
|
||||
: await this.executionService.findOne(req, workflowIds);
|
||||
}
|
||||
|
||||
@Post('/:id/retry')
|
||||
async retryExecution(req: ExecutionRequest.Retry) {
|
||||
async retry(req: ExecutionRequest.Retry) {
|
||||
const workflowIds = await this.getAccessibleWorkflowIds(req.user);
|
||||
|
||||
return await this.executionService.retryExecution(req, workflowIds);
|
||||
if (workflowIds.length === 0) throw new NotFoundError('Execution not found');
|
||||
|
||||
return await this.executionService.retry(req, workflowIds);
|
||||
}
|
||||
|
||||
@Post('/delete')
|
||||
async deleteExecutions(req: ExecutionRequest.Delete) {
|
||||
async delete(req: ExecutionRequest.Delete) {
|
||||
const workflowIds = await this.getAccessibleWorkflowIds(req.user);
|
||||
|
||||
return await this.executionService.deleteExecutions(req, workflowIds);
|
||||
if (workflowIds.length === 0) throw new NotFoundError('Execution not found');
|
||||
|
||||
return await this.executionService.delete(req, workflowIds);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user