feat(core): Execution curation (#10342)

Co-authored-by: oleg <me@olegivaniv.com>
This commit is contained in:
Eugene
2024-09-02 15:20:08 +02:00
committed by GitHub
parent 8603946e23
commit 022ddcbef9
75 changed files with 2733 additions and 713 deletions

View File

@@ -62,6 +62,7 @@ const getSqliteConnectionOptions = (): SqliteConnectionOptions | SqlitePooledCon
database: path.resolve(Container.get(InstanceSettings).n8nFolder, sqliteConfig.database),
migrations: sqliteMigrations,
};
if (sqliteConfig.poolSize > 0) {
return {
type: 'sqlite-pooled',

View File

@@ -0,0 +1,20 @@
import { Column, Entity, Index, ManyToMany, OneToMany } from '@n8n/typeorm';
import { IsString, Length } from 'class-validator';
import { WithTimestampsAndStringId } from './abstract-entity';
import type { ExecutionAnnotation } from '@/databases/entities/execution-annotation';
import type { AnnotationTagMapping } from '@/databases/entities/annotation-tag-mapping';
@Entity()
export class AnnotationTagEntity extends WithTimestampsAndStringId {
@Column({ length: 24 })
@Index({ unique: true })
@IsString({ message: 'Tag name must be of type string.' })
@Length(1, 24, { message: 'Tag name must be $constraint1 to $constraint2 characters long.' })
name: string;
@ManyToMany('ExecutionAnnotation', 'tags')
annotations: ExecutionAnnotation[];
@OneToMany('AnnotationTagMapping', 'tags')
annotationMappings: AnnotationTagMapping[];
}

View File

@@ -0,0 +1,23 @@
import { Entity, JoinColumn, ManyToOne, PrimaryColumn } from '@n8n/typeorm';
import type { ExecutionAnnotation } from './execution-annotation';
import type { AnnotationTagEntity } from './annotation-tag-entity';
/**
* This entity represents the junction table between the execution annotations and the tags
*/
@Entity({ name: 'execution_annotation_tags' })
export class AnnotationTagMapping {
@PrimaryColumn()
annotationId: number;
@ManyToOne('ExecutionAnnotation', 'tagMappings')
@JoinColumn({ name: 'annotationId' })
annotations: ExecutionAnnotation[];
@PrimaryColumn()
tagId: string;
@ManyToOne('AnnotationTagEntity', 'annotationMappings')
@JoinColumn({ name: 'tagId' })
tags: AnnotationTagEntity[];
}

View File

@@ -0,0 +1,61 @@
import {
Column,
Entity,
Index,
JoinColumn,
JoinTable,
ManyToMany,
OneToMany,
OneToOne,
PrimaryGeneratedColumn,
RelationId,
} from '@n8n/typeorm';
import { ExecutionEntity } from './execution-entity';
import type { AnnotationTagEntity } from './annotation-tag-entity';
import type { AnnotationTagMapping } from './annotation-tag-mapping';
import type { AnnotationVote } from 'n8n-workflow';
@Entity({ name: 'execution_annotations' })
export class ExecutionAnnotation {
@PrimaryGeneratedColumn()
id: number;
/**
* This field stores the up- or down-vote of the execution by user.
*/
@Column({ type: 'varchar', nullable: true })
vote: AnnotationVote | null;
/**
* Custom text note added to the execution by user.
*/
@Column({ type: 'varchar', nullable: true })
note: string | null;
@RelationId((annotation: ExecutionAnnotation) => annotation.execution)
executionId: string;
@Index({ unique: true })
@OneToOne('ExecutionEntity', 'annotation', {
onDelete: 'CASCADE',
})
@JoinColumn({ name: 'executionId' })
execution: ExecutionEntity;
@ManyToMany('AnnotationTagEntity', 'annotations')
@JoinTable({
name: 'execution_annotation_tags', // table name for the junction table of this relation
joinColumn: {
name: 'annotationId',
referencedColumnName: 'id',
},
inverseJoinColumn: {
name: 'tagId',
referencedColumnName: 'id',
},
})
tags?: AnnotationTagEntity[];
@OneToMany('AnnotationTagMapping', 'annotations')
tagMappings: AnnotationTagMapping[];
}

View File

@@ -16,6 +16,7 @@ import { idStringifier } from '../utils/transformers';
import type { ExecutionData } from './execution-data';
import type { ExecutionMetadata } from './execution-metadata';
import { WorkflowEntity } from './workflow-entity';
import type { ExecutionAnnotation } from '@/databases/entities/execution-annotation';
@Entity()
@Index(['workflowId', 'id'])
@@ -65,6 +66,9 @@ export class ExecutionEntity {
@OneToOne('ExecutionData', 'execution')
executionData: Relation<ExecutionData>;
@OneToOne('ExecutionAnnotation', 'execution')
annotation?: Relation<ExecutionAnnotation>;
@ManyToOne('WorkflowEntity')
workflow: WorkflowEntity;
}

View File

@@ -22,13 +22,19 @@ import { WorkflowHistory } from './workflow-history';
import { Project } from './project';
import { ProjectRelation } from './project-relation';
import { InvalidAuthToken } from './invalid-auth-token';
import { AnnotationTagEntity } from './annotation-tag-entity';
import { AnnotationTagMapping } from './annotation-tag-mapping';
import { ExecutionAnnotation } from './execution-annotation';
export const entities = {
AnnotationTagEntity,
AnnotationTagMapping,
AuthIdentity,
AuthProviderSyncHistory,
AuthUser,
CredentialsEntity,
EventDestinations,
ExecutionAnnotation,
ExecutionEntity,
InstalledNodes,
InstalledPackages,

View File

@@ -0,0 +1,48 @@
import type { MigrationContext, ReversibleMigration } from '@/databases/types';
const annotationsTableName = 'execution_annotations';
const annotationTagsTableName = 'annotation_tag_entity';
const annotationTagMappingsTableName = 'execution_annotation_tags';
export class CreateAnnotationTables1724753530828 implements ReversibleMigration {
async up({ schemaBuilder: { createTable, column } }: MigrationContext) {
await createTable(annotationsTableName)
.withColumns(
column('id').int.notNull.primary.autoGenerate,
column('executionId').int.notNull,
column('vote').varchar(6),
column('note').text,
)
.withIndexOn('executionId', true)
.withForeignKey('executionId', {
tableName: 'execution_entity',
columnName: 'id',
onDelete: 'CASCADE',
}).withTimestamps;
await createTable(annotationTagsTableName)
.withColumns(column('id').varchar(16).primary.notNull, column('name').varchar(24).notNull)
.withIndexOn('name', true).withTimestamps;
await createTable(annotationTagMappingsTableName)
.withColumns(column('annotationId').int.notNull, column('tagId').varchar(24).notNull)
.withForeignKey('annotationId', {
tableName: annotationsTableName,
columnName: 'id',
onDelete: 'CASCADE',
})
.withIndexOn('tagId')
.withIndexOn('annotationId')
.withForeignKey('tagId', {
tableName: annotationTagsTableName,
columnName: 'id',
onDelete: 'CASCADE',
});
}
async down({ schemaBuilder: { dropTable } }: MigrationContext) {
await dropTable(annotationTagMappingsTableName);
await dropTable(annotationTagsTableName);
await dropTable(annotationsTableName);
}
}

View File

@@ -61,6 +61,7 @@ import { AddActivatedAtUserSetting1717498465931 } from './1717498465931-AddActiv
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101653148-AddConstraintToExecutionMetadata';
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
export const mysqlMigrations: Migration[] = [
InitialMigration1588157391238,
@@ -125,4 +126,5 @@ export const mysqlMigrations: Migration[] = [
AddConstraintToExecutionMetadata1720101653148,
CreateInvalidAuthTokenTable1723627610222,
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
];

View File

@@ -61,6 +61,7 @@ import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-R
import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101653148-AddConstraintToExecutionMetadata';
import { FixExecutionMetadataSequence1721377157740 } from './1721377157740-FixExecutionMetadataSequence';
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
export const postgresMigrations: Migration[] = [
InitialMigration1587669153312,
@@ -125,4 +126,5 @@ export const postgresMigrations: Migration[] = [
FixExecutionMetadataSequence1721377157740,
CreateInvalidAuthTokenTable1723627610222,
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
];

View File

@@ -58,6 +58,7 @@ import { AddActivatedAtUserSetting1717498465931 } from './1717498465931-AddActiv
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101653148-AddConstraintToExecutionMetadata';
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
const sqliteMigrations: Migration[] = [
InitialMigration1588102412422,
@@ -119,6 +120,7 @@ const sqliteMigrations: Migration[] = [
AddConstraintToExecutionMetadata1720101653148,
CreateInvalidAuthTokenTable1723627610222,
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
];
export { sqliteMigrations };

View File

@@ -0,0 +1,26 @@
import { Service } from 'typedi';
import { DataSource, Repository } from '@n8n/typeorm';
import { AnnotationTagMapping } from '@/databases/entities/annotation-tag-mapping';
@Service()
export class AnnotationTagMappingRepository extends Repository<AnnotationTagMapping> {
constructor(dataSource: DataSource) {
super(AnnotationTagMapping, dataSource.manager);
}
/**
* Overwrite annotation tags for the given execution. Annotation should already exist.
*/
async overwriteTags(annotationId: number, tagIds: string[]) {
return await this.manager.transaction(async (tx) => {
await tx.delete(AnnotationTagMapping, { annotationId });
const tagMappings = tagIds.map((tagId) => ({
annotationId,
tagId,
}));
return await tx.insert(AnnotationTagMapping, tagMappings);
});
}
}

View File

@@ -0,0 +1,10 @@
import { Service } from 'typedi';
import { DataSource, Repository } from '@n8n/typeorm';
import { AnnotationTagEntity } from '@/databases/entities/annotation-tag-entity';
@Service()
export class AnnotationTagRepository extends Repository<AnnotationTagEntity> {
constructor(dataSource: DataSource) {
super(AnnotationTagEntity, dataSource.manager);
}
}

View File

@@ -0,0 +1,10 @@
import { Service } from 'typedi';
import { DataSource, Repository } from '@n8n/typeorm';
import { ExecutionAnnotation } from '@/databases/entities/execution-annotation';
@Service()
export class ExecutionAnnotationRepository extends Repository<ExecutionAnnotation> {
constructor(dataSource: DataSource) {
super(ExecutionAnnotation, dataSource.manager);
}
}

View File

@@ -1,4 +1,5 @@
import { Service } from 'typedi';
import pick from 'lodash/pick';
import {
Brackets,
DataSource,
@@ -21,14 +22,18 @@ import type {
} from '@n8n/typeorm';
import { parse, stringify } from 'flatted';
import { GlobalConfig } from '@n8n/config';
import {
ApplicationError,
type ExecutionStatus,
type ExecutionSummary,
type IRunExecutionData,
} from 'n8n-workflow';
import { BinaryDataService } from 'n8n-core';
import { ExecutionCancelledError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import {
ExecutionCancelledError,
ErrorReporterProxy as ErrorReporter,
ApplicationError,
} from 'n8n-workflow';
import type {
AnnotationVote,
ExecutionStatus,
ExecutionSummary,
IRunExecutionData,
} from 'n8n-workflow';
import type {
ExecutionPayload,
@@ -46,6 +51,9 @@ import { Logger } from '@/logger';
import type { ExecutionSummaries } from '@/executions/execution.types';
import { PostgresLiveRowsRetrievalError } from '@/errors/postgres-live-rows-retrieval.error';
import { separate } from '@/utils';
import { AnnotationTagEntity } from '@/databases/entities/annotation-tag-entity';
import { AnnotationTagMapping } from '@/databases/entities/annotation-tag-mapping';
import { ExecutionAnnotation } from '@/databases/entities/execution-annotation';
export interface IGetExecutionsQueryFilter {
id?: FindOperator<string> | string;
@@ -201,10 +209,22 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
);
}
private serializeAnnotation(annotation: ExecutionEntity['annotation']) {
if (!annotation) return null;
const { id, vote, tags } = annotation;
return {
id,
vote,
tags: tags?.map((tag) => pick(tag, ['id', 'name'])) ?? [],
};
}
async findSingleExecution(
id: string,
options?: {
includeData: true;
includeAnnotation?: boolean;
unflattenData: true;
where?: FindOptionsWhere<ExecutionEntity>;
},
@@ -213,6 +233,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
id: string,
options?: {
includeData: true;
includeAnnotation?: boolean;
unflattenData?: false | undefined;
where?: FindOptionsWhere<ExecutionEntity>;
},
@@ -221,6 +242,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
id: string,
options?: {
includeData?: boolean;
includeAnnotation?: boolean;
unflattenData?: boolean;
where?: FindOptionsWhere<ExecutionEntity>;
},
@@ -229,6 +251,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
id: string,
options?: {
includeData?: boolean;
includeAnnotation?: boolean;
unflattenData?: boolean;
where?: FindOptionsWhere<ExecutionEntity>;
},
@@ -240,7 +263,16 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
},
};
if (options?.includeData) {
findOptions.relations = ['executionData', 'metadata'];
findOptions.relations = { executionData: true, metadata: true };
}
if (options?.includeAnnotation) {
findOptions.relations = {
...findOptions.relations,
annotation: {
tags: true,
},
};
}
const execution = await this.findOne(findOptions);
@@ -249,25 +281,21 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return undefined;
}
const { executionData, metadata, ...rest } = execution;
const { executionData, metadata, annotation, ...rest } = execution;
const serializedAnnotation = this.serializeAnnotation(annotation);
if (options?.includeData && options?.unflattenData) {
return {
...rest,
data: parse(execution.executionData.data) as IRunExecutionData,
workflowData: execution.executionData.workflowData,
return {
...rest,
...(options?.includeData && {
data: options?.unflattenData
? (parse(executionData.data) as IRunExecutionData)
: executionData.data,
workflowData: executionData?.workflowData,
customData: Object.fromEntries(metadata.map((m) => [m.key, m.value])),
} as IExecutionResponse;
} else if (options?.includeData) {
return {
...rest,
data: execution.executionData.data,
workflowData: execution.executionData.workflowData,
customData: Object.fromEntries(metadata.map((m) => [m.key, m.value])),
} as IExecutionFlattedDb;
}
return rest;
}),
...(options?.includeAnnotation &&
serializedAnnotation && { annotation: serializedAnnotation }),
};
}
/**
@@ -410,6 +438,13 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');
// Sub-query to exclude executions having annotations
const annotatedExecutionsSubQuery = this.manager
.createQueryBuilder()
.subQuery()
.select('annotation.executionId')
.from(ExecutionAnnotation, 'annotation');
// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
const date = new Date();
date.setHours(date.getHours() - maxAge);
@@ -420,12 +455,13 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
];
if (maxCount > 0) {
const executions = await this.find({
select: ['id'],
skip: maxCount,
take: 1,
order: { id: 'DESC' },
});
const executions = await this.createQueryBuilder('execution')
.select('execution.id')
.where('execution.id NOT IN ' + annotatedExecutionsSubQuery.getQuery())
.skip(maxCount)
.take(1)
.orderBy('execution.id', 'DESC')
.getMany();
if (executions[0]) {
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
@@ -442,6 +478,8 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
// Only mark executions as deleted if they are in an end state
status: Not(In(['new', 'running', 'waiting'])),
})
// Only mark executions as deleted if they are not annotated
.andWhere('id NOT IN ' + annotatedExecutionsSubQuery.getQuery())
.andWhere(
new Brackets((qb) =>
countBasedWhere
@@ -612,6 +650,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
},
includeData: true,
unflattenData: true,
includeAnnotation: true,
});
}
@@ -622,6 +661,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
},
includeData: true,
unflattenData: false,
includeAnnotation: true,
});
}
@@ -683,12 +723,80 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
stoppedAt: true,
};
private annotationFields = {
id: true,
vote: true,
};
/**
* This function reduces duplicate rows in the raw result set of the query builder from *toQueryBuilderWithAnnotations*
* by merging the tags of the same execution annotation.
*/
private reduceExecutionsWithAnnotations(
rawExecutionsWithTags: Array<
ExecutionSummary & {
annotation_id: number;
annotation_vote: AnnotationVote;
annotation_tags_id: string;
annotation_tags_name: string;
}
>,
) {
return rawExecutionsWithTags.reduce(
(
acc,
{
annotation_id: _,
annotation_vote: vote,
annotation_tags_id: tagId,
annotation_tags_name: tagName,
...row
},
) => {
const existingExecution = acc.find((e) => e.id === row.id);
if (existingExecution) {
if (tagId) {
existingExecution.annotation = existingExecution.annotation ?? {
vote,
tags: [] as Array<{ id: string; name: string }>,
};
existingExecution.annotation.tags.push({ id: tagId, name: tagName });
}
} else {
acc.push({
...row,
annotation: {
vote,
tags: tagId ? [{ id: tagId, name: tagName }] : [],
},
});
}
return acc;
},
[] as ExecutionSummary[],
);
}
async findManyByRangeQuery(query: ExecutionSummaries.RangeQuery): Promise<ExecutionSummary[]> {
if (query?.accessibleWorkflowIds?.length === 0) {
throw new ApplicationError('Expected accessible workflow IDs');
}
const executions: ExecutionSummary[] = await this.toQueryBuilder(query).getRawMany();
// Due to performance reasons, we use custom query builder with raw SQL.
// IMPORTANT: it produces duplicate rows for executions with multiple tags, which we need to reduce manually
const qb = this.toQueryBuilderWithAnnotations(query);
const rawExecutionsWithTags: Array<
ExecutionSummary & {
annotation_id: number;
annotation_vote: AnnotationVote;
annotation_tags_id: string;
annotation_tags_name: string;
}
> = await qb.getRawMany();
const executions = this.reduceExecutionsWithAnnotations(rawExecutionsWithTags);
return executions.map((execution) => this.toSummary(execution));
}
@@ -764,6 +872,8 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
startedBefore,
startedAfter,
metadata,
annotationTags,
vote,
} = query;
const fields = Object.keys(this.summaryFields)
@@ -812,9 +922,62 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
qb.setParameter('value', value);
}
if (annotationTags?.length || vote) {
// If there is a filter by one or multiple tags or by vote - we need to join the annotations table
qb.innerJoin('execution.annotation', 'annotation');
// Add an inner join for each tag
if (annotationTags?.length) {
for (let index = 0; index < annotationTags.length; index++) {
qb.innerJoin(
AnnotationTagMapping,
`atm_${index}`,
`atm_${index}.annotationId = annotation.id AND atm_${index}.tagId = :tagId_${index}`,
);
qb.setParameter(`tagId_${index}`, annotationTags[index]);
}
}
// Add filter by vote
if (vote) {
qb.andWhere('annotation.vote = :vote', { vote });
}
}
return qb;
}
/**
* This method is used to add the annotation fields to the executions query
* It uses original query builder as a subquery and adds the annotation fields to it
* IMPORTANT: Query made with this query builder fetches duplicate execution rows for each tag,
* this is intended, as we are working with raw query.
* The duplicates are reduced in the *reduceExecutionsWithAnnotations* method.
*/
private toQueryBuilderWithAnnotations(query: ExecutionSummaries.Query) {
const annotationFields = Object.keys(this.annotationFields).map(
(key) => `annotation.${key} AS "annotation_${key}"`,
);
const subQuery = this.toQueryBuilder(query).addSelect(annotationFields);
// Ensure the join with annotations is made only once
// It might be already present as an inner join if the query includes filter by annotation tags
// If not, it must be added as a left join
if (!subQuery.expressionMap.joinAttributes.some((join) => join.alias.name === 'annotation')) {
subQuery.leftJoin('execution.annotation', 'annotation');
}
return this.manager
.createQueryBuilder()
.select(['e.*', 'ate.id AS "annotation_tags_id"', 'ate.name AS "annotation_tags_name"'])
.from(`(${subQuery.getQuery()})`, 'e')
.setParameters(subQuery.getParameters())
.leftJoin(AnnotationTagMapping, 'atm', 'atm.annotationId = e.annotation_id')
.leftJoin(AnnotationTagEntity, 'ate', 'ate.id = atm.tagId');
}
async getAllIds() {
const executions = await this.find({ select: ['id'], order: { id: 'ASC' } });