feat(editor): Refactor and unify executions views (no-changelog) (#8538)

This commit is contained in:
Alex Grozav
2024-04-19 07:50:18 +02:00
committed by GitHub
parent eab01876ab
commit a3eea3ac5e
65 changed files with 3601 additions and 2960 deletions

View File

@@ -41,7 +41,22 @@ import { ExecutionEntity } from '../entities/ExecutionEntity';
import { ExecutionMetadata } from '../entities/ExecutionMetadata';
import { ExecutionDataRepository } from './executionData.repository';
import { Logger } from '@/Logger';
import type { GetManyActiveFilter } from '@/executions/execution.types';
import type { ExecutionSummaries } from '@/executions/execution.types';
import { PostgresLiveRowsRetrievalError } from '@/errors/postgres-live-rows-retrieval.error';
export interface IGetExecutionsQueryFilter {
id?: FindOperator<string> | string;
finished?: boolean;
mode?: string;
retryOf?: string;
retrySuccessId?: string;
status?: ExecutionStatus[];
workflowId?: string;
waitTill?: FindOperator<any> | boolean;
metadata?: Array<{ key: string; value: string }>;
startedAfter?: string;
startedBefore?: string;
}
function parseFiltersToQueryBuilder(
qb: SelectQueryBuilder<ExecutionEntity>,
@@ -82,6 +97,14 @@ function parseFiltersToQueryBuilder(
}
}
const lessThanOrEqual = (date: string): unknown => {
return LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(new Date(date)));
};
const moreThanOrEqual = (date: string): unknown => {
return MoreThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(new Date(date)));
};
@Service()
export class ExecutionRepository extends Repository<ExecutionEntity> {
private hardDeletionBatchSize = 100;
@@ -284,114 +307,6 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
}
}
async countExecutions(
filters: IGetExecutionsQueryFilter | undefined,
accessibleWorkflowIds: string[],
currentlyRunningExecutions: string[],
hasGlobalRead: boolean,
): Promise<{ count: number; estimated: boolean }> {
const dbType = config.getEnv('database.type');
if (dbType !== 'postgresdb' || (filters && Object.keys(filters).length > 0) || !hasGlobalRead) {
const query = this.createQueryBuilder('execution').andWhere(
'execution.workflowId IN (:...accessibleWorkflowIds)',
{ accessibleWorkflowIds },
);
if (currentlyRunningExecutions.length > 0) {
query.andWhere('execution.id NOT IN (:...currentlyRunningExecutions)', {
currentlyRunningExecutions,
});
}
parseFiltersToQueryBuilder(query, filters);
const count = await query.getCount();
return { count, estimated: false };
}
try {
// Get an estimate of rows count.
const estimateRowsNumberSql =
"SELECT n_live_tup FROM pg_stat_all_tables WHERE relname = 'execution_entity';";
const rows = (await this.query(estimateRowsNumberSql)) as Array<{ n_live_tup: string }>;
const estimate = parseInt(rows[0].n_live_tup, 10);
// If over 100k, return just an estimate.
if (estimate > 100_000) {
// if less than 100k, we get the real count as even a full
// table scan should not take so long.
return { count: estimate, estimated: true };
}
} catch (error) {
if (error instanceof Error) {
this.logger.warn(`Failed to get executions count from Postgres: ${error.message}`, {
error,
});
}
}
const count = await this.count({
where: {
workflowId: In(accessibleWorkflowIds),
},
});
return { count, estimated: false };
}
async searchExecutions(
filters: IGetExecutionsQueryFilter | undefined,
limit: number,
excludedExecutionIds: string[],
accessibleWorkflowIds: string[],
additionalFilters?: { lastId?: string; firstId?: string },
): Promise<ExecutionSummary[]> {
if (accessibleWorkflowIds.length === 0) {
return [];
}
const query = this.createQueryBuilder('execution')
.select([
'execution.id',
'execution.finished',
'execution.mode',
'execution.retryOf',
'execution.retrySuccessId',
'execution.status',
'execution.startedAt',
'execution.stoppedAt',
'execution.workflowId',
'execution.waitTill',
'workflow.name',
])
.innerJoin('execution.workflow', 'workflow')
.limit(limit)
// eslint-disable-next-line @typescript-eslint/naming-convention
.orderBy({ 'execution.id': 'DESC' })
.andWhere('execution.workflowId IN (:...accessibleWorkflowIds)', { accessibleWorkflowIds });
if (excludedExecutionIds.length > 0) {
query.andWhere('execution.id NOT IN (:...excludedExecutionIds)', { excludedExecutionIds });
}
if (additionalFilters?.lastId) {
query.andWhere('execution.id < :lastId', { lastId: additionalFilters.lastId });
}
if (additionalFilters?.firstId) {
query.andWhere('execution.id > :firstId', { firstId: additionalFilters.firstId });
}
parseFiltersToQueryBuilder(query, filters);
const executions = await query.getMany();
return executions.map((execution) => {
const { workflow, waitTill, ...rest } = execution;
return {
...rest,
waitTill: waitTill ?? undefined,
workflowName: workflow.name,
};
});
}
async deleteExecutionsByFilter(
filters: IGetExecutionsQueryFilter | undefined,
accessibleWorkflowIds: string[],
@@ -682,52 +597,151 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
});
}
async getManyActive(
activeExecutionIds: string[],
accessibleWorkflowIds: string[],
filter?: GetManyActiveFilter,
) {
const where: FindOptionsWhere<ExecutionEntity> = {
id: In(activeExecutionIds),
status: Not(In(['finished', 'stopped', 'error', 'crashed'])),
};
// ----------------------------------
// new API
// ----------------------------------
if (filter) {
const { workflowId, status, finished } = filter;
if (workflowId && accessibleWorkflowIds.includes(workflowId)) {
where.workflowId = workflowId;
} else {
where.workflowId = In(accessibleWorkflowIds);
}
if (status) {
// @ts-ignore
where.status = In(status);
}
if (finished !== undefined) {
where.finished = finished;
}
} else {
where.workflowId = In(accessibleWorkflowIds);
/**
* Fields to include in the summary of an execution when querying for many.
*/
private summaryFields = {
id: true,
workflowId: true,
mode: true,
retryOf: true,
status: true,
startedAt: true,
stoppedAt: true,
};
async findManyByRangeQuery(query: ExecutionSummaries.RangeQuery): Promise<ExecutionSummary[]> {
if (query?.accessibleWorkflowIds?.length === 0) {
throw new ApplicationError('Expected accessible workflow IDs');
}
return await this.findMultipleExecutions({
select: ['id', 'workflowId', 'mode', 'retryOf', 'startedAt', 'stoppedAt', 'status'],
order: { id: 'DESC' },
where,
});
const executions: ExecutionSummary[] = await this.toQueryBuilder(query).getRawMany();
return executions.map((execution) => this.toSummary(execution));
}
// @tech_debt: These transformations should not be needed
private toSummary(execution: {
id: number | string;
startedAt?: Date | string;
stoppedAt?: Date | string;
waitTill?: Date | string | null;
}): ExecutionSummary {
execution.id = execution.id.toString();
const normalizeDateString = (date: string) => {
if (date.includes(' ')) return date.replace(' ', 'T') + 'Z';
return date;
};
if (execution.startedAt) {
execution.startedAt =
execution.startedAt instanceof Date
? execution.startedAt.toISOString()
: normalizeDateString(execution.startedAt);
}
if (execution.waitTill) {
execution.waitTill =
execution.waitTill instanceof Date
? execution.waitTill.toISOString()
: normalizeDateString(execution.waitTill);
}
if (execution.stoppedAt) {
execution.stoppedAt =
execution.stoppedAt instanceof Date
? execution.stoppedAt.toISOString()
: normalizeDateString(execution.stoppedAt);
}
return execution as ExecutionSummary;
}
async fetchCount(query: ExecutionSummaries.CountQuery) {
return await this.toQueryBuilder(query).getCount();
}
async getLiveExecutionRowsOnPostgres() {
const tableName = `${config.getEnv('database.tablePrefix')}execution_entity`;
const pgSql = `SELECT n_live_tup as result FROM pg_stat_all_tables WHERE relname = '${tableName}';`;
try {
const rows = (await this.query(pgSql)) as Array<{ result: string }>;
if (rows.length !== 1) throw new PostgresLiveRowsRetrievalError(rows);
const [row] = rows;
return parseInt(row.result, 10);
} catch (error) {
if (error instanceof Error) this.logger.error(error.message, { error });
return -1;
}
}
private toQueryBuilder(query: ExecutionSummaries.Query) {
const {
accessibleWorkflowIds,
status,
finished,
workflowId,
startedBefore,
startedAfter,
metadata,
} = query;
const fields = Object.keys(this.summaryFields)
.concat(['waitTill', 'retrySuccessId'])
.map((key) => `execution.${key} AS "${key}"`)
.concat('workflow.name AS "workflowName"');
const qb = this.createQueryBuilder('execution')
.select(fields)
.innerJoin('execution.workflow', 'workflow')
.where('execution.workflowId IN (:...accessibleWorkflowIds)', { accessibleWorkflowIds });
if (query.kind === 'range') {
const { limit, firstId, lastId } = query.range;
qb.limit(limit);
if (firstId) qb.andWhere('execution.id > :firstId', { firstId });
if (lastId) qb.andWhere('execution.id < :lastId', { lastId });
if (query.order?.stoppedAt === 'DESC') {
qb.orderBy({ 'execution.stoppedAt': 'DESC' });
} else {
qb.orderBy({ 'execution.id': 'DESC' });
}
}
if (status) qb.andWhere('execution.status IN (:...status)', { status });
if (finished) qb.andWhere({ finished });
if (workflowId) qb.andWhere({ workflowId });
if (startedBefore) qb.andWhere({ startedAt: lessThanOrEqual(startedBefore) });
if (startedAfter) qb.andWhere({ startedAt: moreThanOrEqual(startedAfter) });
if (metadata) {
qb.leftJoin(ExecutionMetadata, 'md', 'md.executionId = execution.id');
for (const item of metadata) {
qb.andWhere('md.key = :key AND md.value = :value', item);
}
}
return qb;
}
async getAllIds() {
const executions = await this.find({ select: ['id'], order: { id: 'ASC' } });
return executions.map(({ id }) => id);
}
}
export interface IGetExecutionsQueryFilter {
id?: FindOperator<string> | string;
finished?: boolean;
mode?: string;
retryOf?: string;
retrySuccessId?: string;
status?: ExecutionStatus[];
workflowId?: string;
waitTill?: FindOperator<any> | boolean;
metadata?: Array<{ key: string; value: string }>;
startedAfter?: string;
startedBefore?: string;
}