feat: Migrate integer primary keys to nanoids (#6345)

* first commit for postgres migration

* (not working)

* sqlite migration

* quicksave

* fix tests

* fix pg test

* fix postgres

* fix variables import

* fix execution saving

* add user settings fix

* change migration to single lines

* patch preferences endpoint

* cleanup

* improve variable import

* cleanup unusued code

* Update packages/cli/src/PublicApi/v1/handlers/workflows/workflows.handler.ts

Co-authored-by: Omar Ajoue <krynble@gmail.com>

* address review notes

* fix var update/import

* refactor: Separate execution data to its own table (#6323)

* wip: Temporary migration process

* refactor: Create boilerplate repository methods for executions

* fix: Lint issues

* refactor: Added search endpoint to repository

* refactor: Make the execution list work again

* wip: Updating how we create and update executions everywhere

* fix: Lint issues and remove most of the direct access to execution model

* refactor: Remove includeWorkflowData flag and fix more tests

* fix: Lint issues

* fix: Fixed ordering of executions for FE, removed transaction when saving execution and removed unnecessary update

* refactor: Add comment about missing feature

* refactor: Refactor counting executions

* refactor: Add migration for other dbms and fix issues found

* refactor: Fix lint issues

* refactor: Remove unnecessary comment and auto inject repo to internal hooks

* refactor: remove type assertion

* fix: Fix broken tests

* fix: Remove unnecessary import

* Remove unnecessary toString() call

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>

* fix: Address comments after review

* refactor: Remove unused import

* fix: Lint issues

* fix: Add correct migration files

---------

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>

* remove null values from credential export

* fix: Fix an issue with queue mode where all running execution would be returned

* fix: Update n8n node to allow for workflow ids with letters

* set upstream on set branch

* remove typo

* add nodeAccess to credentials

* fix unsaved run check for undefined id

* fix(core): Rename version control feature to source control (#6480)

* rename versionControl to sourceControl

* fix source control tooltip wording

---------

Co-authored-by: Romain Minaud <romain.minaud@gmail.com>

* fix(editor): Pay 548 hide the set up version control button (#6485)

* feat(DebugHelper Node): Fix and include in main app (#6406)

* improve node a bit

* fixing continueOnFail() ton contain error in json

* improve pairedItem

* fix random data returning object results

* fix nanoId length typo

* update pnpm-lock file

---------

Co-authored-by: Marcus <marcus@n8n.io>

* fix(editor): Remove setup source control CTA button

* fix(editor): Remove setup source control CTA button

---------

Co-authored-by: Michael Auerswald <michael.auerswald@gmail.com>
Co-authored-by: Marcus <marcus@n8n.io>

* fix(editor): Update source control docs links (#6488)

* feat(DebugHelper Node): Fix and include in main app (#6406)

* improve node a bit

* fixing continueOnFail() ton contain error in json

* improve pairedItem

* fix random data returning object results

* fix nanoId length typo

* update pnpm-lock file

---------

Co-authored-by: Marcus <marcus@n8n.io>

* feat(editor): Replace root events with event bus events (no-changelog) (#6454)

* feat: replace root events with event bus events

* fix: prevent cypress from replacing global with globalThis in import path

* feat: remove emitter mixin

* fix: replace component events with event bus

* fix: fix linting issue

* fix: fix breaking expression switch

* chore: prettify ndv e2e suite code

* fix(editor): Update source control docs links

---------

Co-authored-by: Michael Auerswald <michael.auerswald@gmail.com>
Co-authored-by: Marcus <marcus@n8n.io>
Co-authored-by: Alex Grozav <alex@grozav.com>

* fix tag endpoint regex

---------

Co-authored-by: Omar Ajoue <krynble@gmail.com>
Co-authored-by: Iván Ovejero <ivov.src@gmail.com>
Co-authored-by: Romain Minaud <romain.minaud@gmail.com>
Co-authored-by: Csaba Tuncsik <csaba@n8n.io>
Co-authored-by: Marcus <marcus@n8n.io>
Co-authored-by: Alex Grozav <alex@grozav.com>
This commit is contained in:
Michael Auerswald
2023-06-20 19:13:18 +02:00
committed by GitHub
parent da330f0648
commit c3ba0123ad
156 changed files with 3499 additions and 2594 deletions

View File

@@ -1,15 +1,28 @@
import type { ICredentialNodeAccess } from 'n8n-workflow';
import { Column, Entity, Generated, Index, OneToMany, PrimaryColumn } from 'typeorm';
import { BeforeInsert, Column, Entity, Index, OneToMany, PrimaryColumn } from 'typeorm';
import { IsArray, IsObject, IsString, Length } from 'class-validator';
import type { SharedCredentials } from './SharedCredentials';
import { AbstractEntity, jsonColumnType } from './AbstractEntity';
import type { ICredentialsDb } from '@/Interfaces';
import { idStringifier } from '../utils/transformers';
import { generateNanoId } from '../utils/generators';
@Entity()
export class CredentialsEntity extends AbstractEntity implements ICredentialsDb {
@Generated()
@PrimaryColumn({ transformer: idStringifier })
constructor(data?: Partial<CredentialsEntity>) {
super();
Object.assign(this, data);
if (!this.id) {
this.id = generateNanoId();
}
}
@BeforeInsert()
nanoId(): void {
if (!this.id) {
this.id = generateNanoId();
}
}
@PrimaryColumn('varchar')
id: string;
@Column({ length: 128 })

View File

@@ -0,0 +1,27 @@
import { Column, Entity, ManyToOne, PrimaryColumn } from 'typeorm';
import { idStringifier } from '../utils/transformers';
import { ExecutionEntity } from './ExecutionEntity';
import { jsonColumnType } from './AbstractEntity';
import { IWorkflowBase } from 'n8n-workflow';
@Entity()
export class ExecutionData {
@Column('text')
data: string;
// WARNING: the workflowData column has been changed from IWorkflowDb to IWorkflowBase
// when ExecutionData was introduced as a separate entity.
// This is because manual executions of unsaved workflows have no workflow id
// and IWorkflowDb has it as a mandatory field. IWorkflowBase reflects the correct
// data structure for this entity.
@Column(jsonColumnType)
workflowData: IWorkflowBase;
@PrimaryColumn({ transformer: idStringifier })
executionId: string;
@ManyToOne('ExecutionEntity', 'data', {
onDelete: 'CASCADE',
})
execution: ExecutionEntity;
}

View File

@@ -1,10 +1,20 @@
import { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import { Column, Entity, Generated, Index, OneToMany, PrimaryColumn } from 'typeorm';
import { datetimeColumnType, jsonColumnType } from './AbstractEntity';
import { IWorkflowDb } from '@/Interfaces';
import type { IExecutionFlattedDb } from '@/Interfaces';
import {
Column,
Entity,
Generated,
Index,
ManyToOne,
OneToMany,
OneToOne,
PrimaryColumn,
Relation,
} from 'typeorm';
import { datetimeColumnType } from './AbstractEntity';
import { idStringifier } from '../utils/transformers';
import type { ExecutionData } from './ExecutionData';
import type { ExecutionMetadata } from './ExecutionMetadata';
import { WorkflowEntity } from './WorkflowEntity';
@Entity()
@Index(['workflowId', 'id'])
@@ -12,14 +22,11 @@ import type { ExecutionMetadata } from './ExecutionMetadata';
@Index(['finished', 'id'])
@Index(['workflowId', 'finished', 'id'])
@Index(['workflowId', 'waitTill', 'id'])
export class ExecutionEntity implements IExecutionFlattedDb {
export class ExecutionEntity {
@Generated()
@PrimaryColumn({ transformer: idStringifier })
id: string;
@Column('text')
data: string;
@Column()
finished: boolean;
@@ -42,10 +49,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {
@Column({ type: datetimeColumnType, nullable: true })
stoppedAt: Date;
@Column(jsonColumnType)
workflowData: IWorkflowDb;
@Column({ nullable: true, transformer: idStringifier })
@Column({ nullable: true })
workflowId: string;
@Column({ type: datetimeColumnType, nullable: true })
@@ -53,4 +57,10 @@ export class ExecutionEntity implements IExecutionFlattedDb {
@OneToMany('ExecutionMetadata', 'execution')
metadata: ExecutionMetadata[];
@OneToOne('ExecutionData', 'execution')
executionData: Relation<ExecutionData>;
@ManyToOne('WorkflowEntity')
workflow: WorkflowEntity;
}

View File

@@ -3,7 +3,6 @@ import { CredentialsEntity } from './CredentialsEntity';
import { User } from './User';
import { Role } from './Role';
import { AbstractEntity } from './AbstractEntity';
import { idStringifier } from '../utils/transformers';
@Entity()
export class SharedCredentials extends AbstractEntity {
@@ -22,6 +21,6 @@ export class SharedCredentials extends AbstractEntity {
@ManyToOne('CredentialsEntity', 'shared')
credentials: CredentialsEntity;
@PrimaryColumn({ transformer: idStringifier })
@PrimaryColumn()
credentialsId: string;
}

View File

@@ -3,7 +3,6 @@ import { WorkflowEntity } from './WorkflowEntity';
import { User } from './User';
import { Role } from './Role';
import { AbstractEntity } from './AbstractEntity';
import { idStringifier } from '../utils/transformers';
@Entity()
export class SharedWorkflow extends AbstractEntity {
@@ -22,6 +21,6 @@ export class SharedWorkflow extends AbstractEntity {
@ManyToOne('WorkflowEntity', 'shared')
workflow: WorkflowEntity;
@PrimaryColumn({ transformer: idStringifier })
@PrimaryColumn()
workflowId: string;
}

View File

@@ -1,15 +1,28 @@
import { Column, Entity, Generated, Index, ManyToMany, OneToMany, PrimaryColumn } from 'typeorm';
import { BeforeInsert, Column, Entity, Index, ManyToMany, OneToMany, PrimaryColumn } from 'typeorm';
import { IsString, Length } from 'class-validator';
import { idStringifier } from '../utils/transformers';
import type { WorkflowEntity } from './WorkflowEntity';
import type { WorkflowTagMapping } from './WorkflowTagMapping';
import { AbstractEntity } from './AbstractEntity';
import { generateNanoId } from '../utils/generators';
@Entity()
export class TagEntity extends AbstractEntity {
@Generated()
@PrimaryColumn({ transformer: idStringifier })
constructor(data?: Partial<TagEntity>) {
super();
Object.assign(this, data);
if (!this.id) {
this.id = generateNanoId();
}
}
@BeforeInsert()
nanoId() {
if (!this.id) {
this.id = generateNanoId();
}
}
@PrimaryColumn('varchar')
id: string;
@Column({ length: 24 })

View File

@@ -1,9 +1,24 @@
import { Column, Entity, PrimaryGeneratedColumn } from 'typeorm';
import { BeforeInsert, Column, Entity, PrimaryColumn } from 'typeorm';
import { generateNanoId } from '../utils/generators';
@Entity()
export class Variables {
@PrimaryGeneratedColumn()
id: number;
constructor(data?: Partial<Variables>) {
Object.assign(this, data);
if (!this.id) {
this.id = generateNanoId();
}
}
@BeforeInsert()
nanoId() {
if (!this.id) {
this.id = generateNanoId();
}
}
@PrimaryColumn('varchar')
id: string;
@Column('text')
key: string;

View File

@@ -1,11 +1,9 @@
import { Column, Entity, Index, PrimaryColumn } from 'typeorm';
import { idStringifier } from '../utils/transformers';
@Entity()
@Index(['webhookId', 'method', 'pathLength'])
export class WebhookEntity {
@Column({ transformer: idStringifier })
@Column()
workflowId: string;
@PrimaryColumn()

View File

@@ -4,9 +4,9 @@ import { IConnections, IDataObject, IWorkflowSettings } from 'n8n-workflow';
import type { IBinaryKeyData, INode, IPairedItemData } from 'n8n-workflow';
import {
BeforeInsert,
Column,
Entity,
Generated,
Index,
JoinColumn,
JoinTable,
@@ -20,14 +20,29 @@ import type { TagEntity } from './TagEntity';
import type { SharedWorkflow } from './SharedWorkflow';
import type { WorkflowStatistics } from './WorkflowStatistics';
import type { WorkflowTagMapping } from './WorkflowTagMapping';
import { idStringifier, objectRetriever, sqlite } from '../utils/transformers';
import { objectRetriever, sqlite } from '../utils/transformers';
import { AbstractEntity, jsonColumnType } from './AbstractEntity';
import type { IWorkflowDb } from '@/Interfaces';
import { generateNanoId } from '../utils/generators';
@Entity()
export class WorkflowEntity extends AbstractEntity implements IWorkflowDb {
@Generated()
@PrimaryColumn({ transformer: idStringifier })
constructor(data?: Partial<WorkflowEntity>) {
super();
Object.assign(this, data);
if (!this.id) {
this.id = generateNanoId();
}
}
@BeforeInsert()
nanoId() {
if (!this.id) {
this.id = generateNanoId();
}
}
@PrimaryColumn('varchar')
id: string;
// TODO: Add XSS check

View File

@@ -1,5 +1,4 @@
import { Column, Entity, ManyToOne, PrimaryColumn } from 'typeorm';
import { idStringifier } from '../utils/transformers';
import { datetimeColumnType } from './AbstractEntity';
import { WorkflowEntity } from './WorkflowEntity';
@@ -25,6 +24,6 @@ export class WorkflowStatistics {
@ManyToOne('WorkflowEntity', 'shared')
workflow: WorkflowEntity;
@PrimaryColumn({ transformer: idStringifier })
@PrimaryColumn()
workflowId: string;
}

View File

@@ -1,11 +1,10 @@
import { Entity, JoinColumn, ManyToOne, PrimaryColumn } from 'typeorm';
import { idStringifier } from '../utils/transformers';
import type { TagEntity } from './TagEntity';
import type { WorkflowEntity } from './WorkflowEntity';
@Entity({ name: 'workflows_tags' })
export class WorkflowTagMapping {
@PrimaryColumn({ transformer: idStringifier })
@PrimaryColumn()
workflowId: string;
@ManyToOne('WorkflowEntity', 'tagMappings')

View File

@@ -18,6 +18,7 @@ import { WorkflowEntity } from './WorkflowEntity';
import { WorkflowTagMapping } from './WorkflowTagMapping';
import { WorkflowStatistics } from './WorkflowStatistics';
import { ExecutionMetadata } from './ExecutionMetadata';
import { ExecutionData } from './ExecutionData';
export const entities = {
AuthIdentity,
@@ -39,4 +40,5 @@ export const entities = {
WorkflowTagMapping,
WorkflowStatistics,
ExecutionMetadata,
ExecutionData,
};

View File

@@ -0,0 +1,252 @@
import type { MigrationContext, ReversibleMigration } from '@db/types';
export class MigrateIntegerKeysToString1690000000001 implements ReversibleMigration {
async up({ queryRunner, tablePrefix }: MigrationContext) {
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity RENAME COLUMN id to tmp_id;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN id varchar(36) NOT NULL;`,
);
await queryRunner.query(`UPDATE ${tablePrefix}workflow_entity SET id = CONVERT(tmp_id, CHAR);`);
await queryRunner.query(
`CREATE INDEX \`TMP_idx_workflow_entity_id\` ON ${tablePrefix}workflow_entity (\`id\`);`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}tag_entity RENAME COLUMN id to tmp_id;`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}tag_entity ADD COLUMN id varchar(36) NOT NULL;`,
);
await queryRunner.query(`UPDATE ${tablePrefix}tag_entity SET id = CONVERT(tmp_id, CHAR);`);
await queryRunner.query(
`CREATE INDEX \`TMP_idx_tag_entity_id\` ON ${tablePrefix}tag_entity (\`id\`);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags RENAME COLUMN \`workflowId\` to \`tmp_workflowId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags ADD COLUMN \`workflowId\` varchar(36) NOT NULL;`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}workflows_tags SET \`workflowId\` = CONVERT(\`tmp_workflowId\`, CHAR);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags RENAME COLUMN \`tagId\` to \`tmp_tagId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags ADD COLUMN \`tagId\` varchar(36) NOT NULL;`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}workflows_tags SET \`tagId\` = CONVERT(\`tmp_tagId\`, CHAR);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags DROP CONSTRAINT \`PRIMARY\`, ADD PRIMARY KEY (\`workflowId\`, \`tagId\`);`,
);
await queryRunner.query(
`CREATE INDEX \`idx_workflows_tags_workflowid\` ON ${tablePrefix}workflows_tags (\`workflowId\`);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags DROP FOREIGN KEY \`FK_54b2f0343d6a2078fa137443869\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags ADD CONSTRAINT \`fk_workflows_tags_workflow_id\` FOREIGN KEY (\`workflowId\`) REFERENCES workflow_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags DROP FOREIGN KEY \`FK_77505b341625b0b4768082e2171\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags ADD CONSTRAINT \`fk_workflows_tags_tag_id\` FOREIGN KEY (\`tagId\`) REFERENCES tag_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags DROP COLUMN \`tmp_workflowId\`;`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflows_tags DROP COLUMN \`tmp_tagId\`;`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow RENAME COLUMN \`workflowId\` to \`tmp_workflowId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow ADD COLUMN \`workflowId\` varchar(36) NOT NULL;`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}shared_workflow SET \`workflowId\` = CONVERT(\`tmp_workflowId\`, CHAR);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow DROP CONSTRAINT \`PRIMARY\`, ADD PRIMARY KEY (\`userId\`, \`workflowId\`);`,
);
await queryRunner.query(
`CREATE INDEX \`idx_shared_workflow_workflow_id\` ON ${tablePrefix}shared_workflow (\`workflowId\`);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow DROP FOREIGN KEY \`FK_b83f8d2530884b66a9c848c8b88\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow ADD CONSTRAINT \`fk_shared_workflow_workflow_id\` FOREIGN KEY (\`workflowId\`) REFERENCES workflow_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow DROP COLUMN \`tmp_workflowId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics RENAME COLUMN \`workflowId\` to \`tmp_workflowId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics ADD COLUMN \`workflowId\` varchar(36) NOT NULL;`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}workflow_statistics SET \`workflowId\` = CONVERT(\`tmp_workflowId\`, CHAR);`,
);
await queryRunner.query(
`CREATE INDEX \`idx_workflow_statistics_workflow_id\` ON ${tablePrefix}workflow_statistics (\`workflowId\`);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics DROP FOREIGN KEY \`workflow_statistics_ibfk_1\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics ADD CONSTRAINT \`fk_workflow_statistics_workflow_id\` FOREIGN KEY (\`workflowId\`) REFERENCES workflow_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics DROP CONSTRAINT \`PRIMARY\`, ADD PRIMARY KEY (\`workflowId\`, \`name\`);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics DROP COLUMN \`tmp_workflowId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}webhook_entity RENAME COLUMN \`workflowId\` to \`tmp_workflowId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}webhook_entity ADD COLUMN \`workflowId\` varchar(36) NOT NULL;`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}webhook_entity SET \`workflowId\` = CONVERT(\`tmp_workflowId\`, CHAR);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}webhook_entity DROP COLUMN \`tmp_workflowId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}webhook_entity ADD CONSTRAINT \`fk_webhook_entity_workflow_id\` FOREIGN KEY (\`workflowId\`) REFERENCES workflow_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity RENAME COLUMN \`workflowId\` to \`tmp_workflowId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity ADD COLUMN \`workflowId\` varchar(36);`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}execution_entity SET \`workflowId\` = CONVERT(\`tmp_workflowId\`, CHAR);`,
);
await queryRunner.query(
`CREATE INDEX \`idx_execution_entity_workflow_id_id\` ON ${tablePrefix}execution_entity (\`workflowId\`,\`id\`);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity DROP FOREIGN KEY \`FK_execution_entity_workflowId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity ADD CONSTRAINT \`fk_execution_entity_workflow_id\` FOREIGN KEY (\`workflowId\`) REFERENCES workflow_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`DROP INDEX \`IDX_81fc04c8a17de15835713505e4\` ON ${tablePrefix}execution_entity;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity DROP COLUMN \`tmp_workflowId\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity MODIFY COLUMN tmp_id INT NOT NULL;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity DROP CONSTRAINT \`PRIMARY\`, ADD PRIMARY KEY (\`id\`);`,
);
await queryRunner.query(
`DROP INDEX \`TMP_idx_workflow_entity_id\` ON ${tablePrefix}workflow_entity;`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflow_entity DROP COLUMN tmp_id;`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}tag_entity MODIFY COLUMN tmp_id INT NOT NULL;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}tag_entity DROP CONSTRAINT \`PRIMARY\`, ADD PRIMARY KEY (\`id\`);`,
);
await queryRunner.query(`DROP INDEX \`TMP_idx_tag_entity_id\` ON ${tablePrefix}tag_entity;`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}tag_entity DROP COLUMN tmp_id;`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}credentials_entity RENAME COLUMN id to tmp_id;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}credentials_entity ADD COLUMN id varchar(36) NOT NULL;`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}credentials_entity SET id = CONVERT(tmp_id, CHAR);`,
);
await queryRunner.query(
`CREATE INDEX \`TMP_idx_credentials_entity_id\` ON ${tablePrefix}credentials_entity (\`id\`);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials RENAME COLUMN credentialsId to tmp_credentialsId;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials ADD COLUMN credentialsId varchar(36) NOT NULL;`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}shared_credentials SET credentialsId = CONVERT(tmp_credentialsId, CHAR);`,
);
await queryRunner.query(
`CREATE INDEX \`idx_shared_credentials_id\` ON ${tablePrefix}shared_credentials (\`credentialsId\`);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials DROP FOREIGN KEY \`FK_68661def1d4bcf2451ac8dbd949\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials ADD CONSTRAINT \`fk_shared_credentials_credentials_id\` FOREIGN KEY (\`credentialsId\`) REFERENCES credentials_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials MODIFY COLUMN tmp_credentialsId INT NOT NULL;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials DROP CONSTRAINT \`PRIMARY\`, ADD PRIMARY KEY (\`userId\`,\`credentialsId\`);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials DROP COLUMN tmp_credentialsId;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}credentials_entity MODIFY COLUMN tmp_id INT NOT NULL;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}credentials_entity DROP CONSTRAINT \`PRIMARY\`, ADD PRIMARY KEY (\`id\`);`,
);
await queryRunner.query(
`DROP INDEX \`TMP_idx_credentials_entity_id\` ON ${tablePrefix}credentials_entity;`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}credentials_entity DROP COLUMN tmp_id;`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}variables RENAME COLUMN \`id\` to \`tmp_id\`;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}variables ADD COLUMN \`id\` varchar(36) NOT NULL;`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}variables SET \`id\` = CONVERT(\`tmp_id\`, CHAR);`,
);
await queryRunner.query(
`CREATE INDEX \`TMP_idx_variables_id\` ON ${tablePrefix}variables (\`id\`);`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}variables CHANGE \`tmp_id\` \`tmp_id\` int NOT NULL;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}variables DROP PRIMARY KEY, ADD PRIMARY KEY (\`id\`);`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}variables DROP COLUMN \`tmp_id\`;`);
}
// eslint-disable-next-line @typescript-eslint/no-empty-function, @typescript-eslint/no-unused-vars
async down({ queryRunner, tablePrefix }: MigrationContext) {}
}

View File

@@ -0,0 +1,43 @@
import type { MigrationContext, ReversibleMigration } from '@db/types';
export class SeparateExecutionData1690000000030 implements ReversibleMigration {
async up({ queryRunner, tablePrefix }: MigrationContext) {
await queryRunner.query(
`CREATE TABLE ${tablePrefix}execution_data (
executionId int(11) NOT NULL primary key,
workflowData json NOT NULL,
data TEXT NOT NULL,
CONSTRAINT \`${tablePrefix}execution_data_FK\` FOREIGN KEY (\`executionId\`) REFERENCES \`${tablePrefix}execution_entity\` (\`id\`) ON DELETE CASCADE
)
ENGINE=InnoDB`,
);
await queryRunner.query(
`INSERT INTO ${tablePrefix}execution_data (
executionId,
workflowData,
data)
SELECT id, workflowData, data FROM ${tablePrefix}execution_entity
`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity DROP COLUMN workflowData, DROP COLUMN data`,
);
}
async down({ queryRunner, tablePrefix }: MigrationContext) {
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity
ADD workflowData json NULL,
ADD data text NULL`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}execution_entity SET workflowData = ${tablePrefix}execution_data.workflowData, data = ${tablePrefix}execution_data.data
FROM ${tablePrefix}execution_data WHERE ${tablePrefix}execution_data.executionId = ${tablePrefix}execution_entity.id`,
);
await queryRunner.query(`DROP TABLE ${tablePrefix}execution_data`);
}
}

View File

@@ -38,6 +38,8 @@ import { UpdateRunningExecutionStatus1677236788851 } from './1677236788851-Updat
import { CreateExecutionMetadataTable1679416281779 } from './1679416281779-CreateExecutionMetadataTable';
import { CreateVariables1677501636753 } from './1677501636753-CreateVariables';
import { AddUserActivatedProperty1681134145996 } from './1681134145996-AddUserActivatedProperty';
import { MigrateIntegerKeysToString1690000000001 } from './1690000000001-MigrateIntegerKeysToString';
import { SeparateExecutionData1690000000030 } from './1690000000030-SeparateExecutionData';
export const mysqlMigrations: Migration[] = [
InitialMigration1588157391238,
@@ -79,4 +81,6 @@ export const mysqlMigrations: Migration[] = [
CreateExecutionMetadataTable1679416281779,
CreateVariables1677501636753,
AddUserActivatedProperty1681134145996,
MigrateIntegerKeysToString1690000000001,
SeparateExecutionData1690000000030,
];

View File

@@ -0,0 +1,262 @@
/* eslint-disable n8n-local-rules/no-unneeded-backticks */
import type { MigrationContext, ReversibleMigration } from '@db/types';
export class MigrateIntegerKeysToString1690000000000 implements ReversibleMigration {
async up({ queryRunner, tablePrefix }: MigrationContext) {
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity RENAME COLUMN id to tmp_id;`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN id varchar(36);`);
await queryRunner.query(`UPDATE ${tablePrefix}workflow_entity SET id = tmp_id::text;`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ALTER COLUMN id SET NOT NULL;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ALTER COLUMN tmp_id DROP DEFAULT;`,
);
await queryRunner.query(`DROP SEQUENCE IF EXISTS ${tablePrefix}workflow_entity_id_seq;`);
await queryRunner.query(
`CREATE UNIQUE INDEX "pk_workflow_entity_id" ON ${tablePrefix}workflow_entity ("id");`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}tag_entity RENAME COLUMN id to tmp_id;`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}tag_entity ADD COLUMN id varchar(36);`);
await queryRunner.query(`UPDATE ${tablePrefix}tag_entity SET id = tmp_id::text;`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}tag_entity ALTER COLUMN id SET NOT NULL;`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}tag_entity ALTER COLUMN tmp_id DROP DEFAULT;`,
);
await queryRunner.query(`DROP SEQUENCE IF EXISTS tag_entity_id_seq;`);
await queryRunner.query(
`CREATE UNIQUE INDEX "pk_tag_entity_id" ON ${tablePrefix}tag_entity ("id");`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags RENAME COLUMN "workflowId" to "tmp_workflowId";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags ADD COLUMN "workflowId" varchar(36);`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}workflows_tags SET "workflowId" = "tmp_workflowId"::text;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags ALTER COLUMN "workflowId" SET NOT NULL;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags RENAME COLUMN "tagId" to "tmp_tagId";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags ADD COLUMN "tagId" varchar(36);`,
);
await queryRunner.query(`UPDATE ${tablePrefix}workflows_tags SET "tagId" = "tmp_tagId"::text;`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags ALTER COLUMN "tagId" SET NOT NULL;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags DROP CONSTRAINT IF EXISTS "FK_31140eb41f019805b40d0087449";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags DROP CONSTRAINT IF EXISTS "FK_5e29bfe9e22c5d6567f509d4a46";`,
);
await queryRunner.query(
`CREATE UNIQUE INDEX "pk_workflows_tags" ON ${tablePrefix}workflows_tags ("workflowId","tagId");`,
);
await queryRunner.query(`DROP INDEX IF EXISTS "idx_31140eb41f019805b40d008744";`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflows_tags DROP CONSTRAINT "PK_a60448a90e51a114e95e2a125b3",
ADD CONSTRAINT "pk_workflows_tags" PRIMARY KEY USING INDEX "pk_workflows_tags";`);
await queryRunner.query(
`CREATE INDEX "idx_workflows_tags_workflow_id" ON ${tablePrefix}workflows_tags ("workflowId");`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags ADD CONSTRAINT "fk_workflows_tags_workflow_id" FOREIGN KEY ("workflowId") REFERENCES workflow_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags ADD CONSTRAINT "fk_workflows_tags_tag_id" FOREIGN KEY ("tagId") REFERENCES tag_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflows_tags DROP COLUMN "tmp_workflowId";`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflows_tags DROP COLUMN "tmp_tagId";`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow RENAME COLUMN "workflowId" to "tmp_workflowId";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow ADD COLUMN "workflowId" varchar(36);`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}shared_workflow SET "workflowId" = "tmp_workflowId"::text;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow ALTER COLUMN "workflowId" SET NOT NULL;`,
);
await queryRunner.query(
`CREATE UNIQUE INDEX "pk_shared_workflow_id" ON ${tablePrefix}shared_workflow ("userId","workflowId");`,
);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_65a0933c0f19d278881653bf81d35064";`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}shared_workflow DROP CONSTRAINT "PK_cc5d5a71c7b2591f5154ffb0c785e85e",
ADD CONSTRAINT "pk_shared_workflow_id" PRIMARY KEY USING INDEX "pk_shared_workflow_id";`);
await queryRunner.query(
`CREATE INDEX "idx_shared_workflow_workflow_id" ON ${tablePrefix}shared_workflow ("workflowId");`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow ADD CONSTRAINT "fk_shared_workflow_workflow_id" FOREIGN KEY ("workflowId") REFERENCES workflow_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_workflow DROP COLUMN "tmp_workflowId";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics RENAME COLUMN "workflowId" to "tmp_workflowId";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics ADD COLUMN "workflowId" varchar(36);`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}workflow_statistics SET "workflowId" = "tmp_workflowId"::text;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics ALTER COLUMN "workflowId" SET NOT NULL;`,
);
await queryRunner.query(
`CREATE UNIQUE INDEX "pk_workflow_statistics" ON ${tablePrefix}workflow_statistics ("workflowId","name");`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflow_statistics DROP CONSTRAINT IF EXISTS "workflow_statistics_pkey",
ADD CONSTRAINT "pk_workflow_statistics" PRIMARY KEY USING INDEX "pk_workflow_statistics";`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics DROP COLUMN "tmp_workflowId";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_statistics ADD CONSTRAINT "fk_workflow_statistics_workflow_id" FOREIGN KEY ("workflowId") REFERENCES workflow_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}webhook_entity RENAME COLUMN "workflowId" to "tmp_workflowId";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}webhook_entity ADD COLUMN "workflowId" varchar(36);`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}webhook_entity SET "workflowId" = "tmp_workflowId"::text;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}webhook_entity ALTER COLUMN "workflowId" SET NOT NULL;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}webhook_entity DROP COLUMN "tmp_workflowId";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}webhook_entity ADD CONSTRAINT "fk_webhook_entity_workflow_id" FOREIGN KEY ("workflowId") REFERENCES workflow_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity RENAME COLUMN "workflowId" to "tmp_workflowId";`,
);
// -- Intentionally NOT setting colum to NOT NULL
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity ADD COLUMN "workflowId" varchar(36);`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}execution_entity SET "workflowId" = "tmp_workflowId"::text;`,
);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_d160d4771aba5a0d78943edbe3";`);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_4f474ac92be81610439aaad61e";`);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_58154df94c686818c99fb754ce";`);
// -- index idx_33228da131bb1112247cf52a42 is a duplicate of IDX_33228da131bb1112247cf52a42
await queryRunner.query(`DROP INDEX IF EXISTS "idx_33228da131bb1112247cf52a42";`);
await queryRunner.query(
`CREATE INDEX "idx_execution_entity_workflow_id_id" ON ${tablePrefix}execution_entity ("workflowId","id");`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity DROP COLUMN "tmp_workflowId";`,
);
// -- FK was missing in prev schema - should it be added?
await queryRunner.query(
`ALTER TABLE ${tablePrefix}execution_entity ADD CONSTRAINT "fk_execution_entity_workflow_id" FOREIGN KEY ("workflowId") REFERENCES workflow_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity DROP CONSTRAINT IF EXISTS "pk_eded7d72664448da7745d551207";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}tag_entity DROP CONSTRAINT IF EXISTS "PK_7a50a9b74ae6855c0dcaee25052";`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflow_entity DROP COLUMN tmp_id;`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}tag_entity DROP COLUMN tmp_id;`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflow_entity ADD PRIMARY KEY (id);`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}tag_entity ADD PRIMARY KEY (id);`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}credentials_entity RENAME COLUMN id to tmp_id;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}credentials_entity ADD COLUMN id varchar(36);`,
);
await queryRunner.query(`UPDATE ${tablePrefix}credentials_entity SET id = tmp_id::text;`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}credentials_entity ALTER COLUMN id SET NOT NULL;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}credentials_entity ALTER COLUMN tmp_id DROP DEFAULT;`,
);
await queryRunner.query(`DROP SEQUENCE IF EXISTS credentials_entity_id_seq;`);
await queryRunner.query(
`CREATE UNIQUE INDEX "pk_credentials_entity_id" ON ${tablePrefix}credentials_entity ("id");`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials RENAME COLUMN "credentialsId" to "tmp_credentialsId";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials ADD COLUMN "credentialsId" varchar(36);`,
);
await queryRunner.query(
`UPDATE ${tablePrefix}shared_credentials SET "credentialsId" = "tmp_credentialsId"::text;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials ALTER COLUMN "credentialsId" SET NOT NULL;`,
);
await queryRunner.query(
`CREATE UNIQUE INDEX "pk_shared_credentials_id" ON ${tablePrefix}shared_credentials ("userId","credentialsId");`,
);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_829d16efa0e265cb076d50eca8d21733";`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}shared_credentials DROP CONSTRAINT "PK_10dd1527ffb639609be7aadd98f628c6",
ADD CONSTRAINT "pk_shared_credentials_id" PRIMARY KEY USING INDEX "pk_shared_credentials_id";`);
await queryRunner.query(
`CREATE INDEX "idx_shared_credentials_credentials_id" ON ${tablePrefix}shared_credentials ("credentialsId");`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials ADD CONSTRAINT "fk_shared_credentials_credentials_id" FOREIGN KEY ("credentialsId") REFERENCES credentials_entity(id) ON DELETE CASCADE ON UPDATE NO ACTION;`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}shared_credentials DROP COLUMN "tmp_credentialsId";`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}credentials_entity DROP CONSTRAINT IF EXISTS "pk_814c3d3c36e8a27fa8edb761b0e";`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}credentials_entity DROP COLUMN tmp_id;`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}credentials_entity ADD PRIMARY KEY (id);`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}variables RENAME COLUMN id to tmp_id;`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}variables ADD COLUMN id varchar(36);`);
await queryRunner.query(`UPDATE ${tablePrefix}variables SET id = tmp_id::text;`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}variables ALTER COLUMN id SET NOT NULL;`);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}variables ALTER COLUMN tmp_id DROP DEFAULT;`,
);
await queryRunner.query(`DROP SEQUENCE IF EXISTS variables_id_seq;`);
await queryRunner.query(
`CREATE UNIQUE INDEX "pk_variables_id" ON ${tablePrefix}variables ("id");`,
);
await queryRunner.query(
`ALTER TABLE ${tablePrefix}variables DROP CONSTRAINT IF EXISTS "variables_pkey";`,
);
await queryRunner.query(`ALTER TABLE ${tablePrefix}variables DROP COLUMN tmp_id;`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}variables ADD PRIMARY KEY (id);`);
}
// eslint-disable-next-line @typescript-eslint/no-empty-function, @typescript-eslint/no-unused-vars
async down({ queryRunner, tablePrefix }: MigrationContext) {}
}

View File

@@ -0,0 +1,42 @@
import type { MigrationContext, ReversibleMigration } from '@db/types';
export class SeparateExecutionData1690000000020 implements ReversibleMigration {
async up({ queryRunner, tablePrefix }: MigrationContext) {
await queryRunner.query(
`CREATE TABLE "${tablePrefix}execution_data" (
"executionId" integer NOT NULL,
"workflowData" json NOT NULL,
"data" text NOT NULL,
CONSTRAINT "${tablePrefix}execution_data_fk" FOREIGN KEY ("executionId") REFERENCES ${tablePrefix}execution_entity(id) ON DELETE CASCADE
)`,
);
await queryRunner.query(
`INSERT INTO "${tablePrefix}execution_data" (
"executionId",
"workflowData",
"data")
SELECT "id", "workflowData", "data" FROM "${tablePrefix}execution_entity"
`,
);
await queryRunner.query(
`ALTER TABLE "${tablePrefix}execution_entity" DROP COLUMN "workflowData", DROP COLUMN "data"`,
);
}
async down({ queryRunner, tablePrefix }: MigrationContext) {
await queryRunner.query(
`ALTER TABLE "${tablePrefix}execution_entity"
ADD "workflowData" json NULL,
ADD "data" text NULL`,
);
await queryRunner.query(
`UPDATE "${tablePrefix}execution_entity" SET "workflowData" = "execution_data"."workflowData", "data" = "execution_data"."data"
FROM "${tablePrefix}execution_data" WHERE "${tablePrefix}execution_data"."executionId" = "${tablePrefix}execution_entity"."id"`,
);
await queryRunner.query(`DROP TABLE "${tablePrefix}execution_data"`);
}
}

View File

@@ -36,6 +36,8 @@ import { UpdateRunningExecutionStatus1677236854063 } from './1677236854063-Updat
import { CreateExecutionMetadataTable1679416281778 } from './1679416281778-CreateExecutionMetadataTable';
import { CreateVariables1677501636754 } from './1677501636754-CreateVariables';
import { AddUserActivatedProperty1681134145996 } from './1681134145996-AddUserActivatedProperty';
import { MigrateIntegerKeysToString1690000000000 } from './1690000000000-MigrateIntegerKeysToString';
import { SeparateExecutionData1690000000020 } from './1690000000020-SeparateExecutionData';
export const postgresMigrations: Migration[] = [
InitialMigration1587669153312,
@@ -75,4 +77,6 @@ export const postgresMigrations: Migration[] = [
CreateExecutionMetadataTable1679416281778,
CreateVariables1677501636754,
AddUserActivatedProperty1681134145996,
MigrateIntegerKeysToString1690000000000,
SeparateExecutionData1690000000020,
];

View File

@@ -0,0 +1,185 @@
import type { MigrationContext, ReversibleMigration } from '@db/types';
export class MigrateIntegerKeysToString1690000000002 implements ReversibleMigration {
transaction = false as const;
async up({ queryRunner, tablePrefix }: MigrationContext) {
await queryRunner.query('PRAGMA foreign_keys=OFF');
await queryRunner.startTransaction();
await queryRunner.query(`
CREATE TABLE "${tablePrefix}TMP_workflow_entity" ("id" varchar(36) PRIMARY KEY NOT NULL, "name" varchar(128) NOT NULL, "active" boolean NOT NULL, "nodes" text, "connections" text NOT NULL, "createdAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), "updatedAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), "settings" text, "staticData" text, "pinData" text, "versionId" varchar(36), "triggerCount" integer NOT NULL DEFAULT 0);`);
await queryRunner.query(
`INSERT INTO "${tablePrefix}TMP_workflow_entity" SELECT * FROM "${tablePrefix}workflow_entity";`,
);
await queryRunner.query('DROP TABLE "workflow_entity";');
await queryRunner.query(`ALTER TABLE "${tablePrefix}TMP_workflow_entity" RENAME TO "${tablePrefix}workflow_entity";
`);
await queryRunner.query(`
CREATE TABLE "${tablePrefix}TMP_tag_entity" ("id" varchar(36) PRIMARY KEY NOT NULL, "name" varchar(24) NOT NULL, "createdAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), "updatedAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')));`);
await queryRunner.query(
`INSERT INTO "${tablePrefix}TMP_tag_entity" SELECT * FROM "${tablePrefix}tag_entity";`,
);
await queryRunner.query('DROP TABLE "tag_entity";');
await queryRunner.query(
`ALTER TABLE "${tablePrefix}TMP_tag_entity" RENAME TO "${tablePrefix}tag_entity";`,
);
await queryRunner.query(`
CREATE TABLE "${tablePrefix}TMP_workflows_tags" ("workflowId" varchar(36) NOT NULL, "tagId" integer NOT NULL, CONSTRAINT "FK_workflows_tags_workflow_entity" FOREIGN KEY ("workflowId") REFERENCES "workflow_entity" ("id") ON DELETE CASCADE ON UPDATE NO ACTION, CONSTRAINT "FK_workflows_tags_tag_entity" FOREIGN KEY ("tagId") REFERENCES "${tablePrefix}tag_entity" ("id") ON DELETE CASCADE ON UPDATE NO ACTION, PRIMARY KEY ("workflowId", "tagId"));`);
await queryRunner.query(
`INSERT INTO "${tablePrefix}TMP_workflows_tags" SELECT * FROM "${tablePrefix}workflows_tags";`,
);
await queryRunner.query(`DROP TABLE "${tablePrefix}workflows_tags";`);
await queryRunner.query(
`ALTER TABLE "${tablePrefix}TMP_workflows_tags" RENAME TO "${tablePrefix}workflows_tags";`,
);
await queryRunner.query(
`CREATE INDEX "idx_workflows_tags_tag_id" ON "${tablePrefix}workflows_tags" ("tagId");`,
);
await queryRunner.query(
`CREATE INDEX "idx_workflows_tags_workflow_id" ON "${tablePrefix}workflows_tags" ("workflowId");`,
);
await queryRunner.query(`CREATE TABLE "${tablePrefix}TMP_workflow_statistics" (
"count" INTEGER DEFAULT 0,
"latestEvent" DATETIME,
"name" VARCHAR(128) NOT NULL,
"workflowId" VARCHAR(36),
PRIMARY KEY("workflowId", "name"),
FOREIGN KEY("workflowId") REFERENCES "${tablePrefix}workflow_entity"("id") ON DELETE CASCADE
);`);
await queryRunner.query(
`INSERT INTO "${tablePrefix}TMP_workflow_statistics" SELECT * FROM "${tablePrefix}workflow_statistics";`,
);
await queryRunner.query(`DROP TABLE "${tablePrefix}workflow_statistics";`);
await queryRunner.query(
`ALTER TABLE "${tablePrefix}TMP_workflow_statistics" RENAME TO "${tablePrefix}workflow_statistics";`,
);
await queryRunner.query(
`CREATE TABLE "${tablePrefix}TMP_shared_workflow" (
"createdAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
"updatedAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
"roleId" integer NOT NULL, "userId" varchar NOT NULL,
"workflowId" VARCHAR(36) NOT NULL,
CONSTRAINT "FK_shared_workflow_role" FOREIGN KEY ("roleId") REFERENCES "role" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION,
CONSTRAINT "FK_shared_workflow_user" FOREIGN KEY ("userId") REFERENCES "user" ("id") ON DELETE CASCADE ON UPDATE NO ACTION,
CONSTRAINT "FK_shared_workflow_workflow_entity" FOREIGN KEY ("workflowId") REFERENCES "workflow_entity" ("id") ON DELETE CASCADE ON UPDATE NO ACTION,
PRIMARY KEY ("userId", "workflowId"));`,
);
await queryRunner.query(
`INSERT INTO "${tablePrefix}TMP_shared_workflow" SELECT * FROM "${tablePrefix}shared_workflow";`,
);
await queryRunner.query(`DROP TABLE "${tablePrefix}shared_workflow";`);
await queryRunner.query(
`ALTER TABLE "${tablePrefix}TMP_shared_workflow" RENAME TO "${tablePrefix}shared_workflow";`,
);
await queryRunner.query(
`CREATE INDEX "idx_shared_workflow_workflow_id" ON "${tablePrefix}shared_workflow" ("workflowId");`,
);
await queryRunner.query(
`CREATE TABLE "${tablePrefix}TMP_webhook_entity" ("workflowId" varchar(36) NOT NULL, "webhookPath" varchar NOT NULL, "method" varchar NOT NULL, "node" varchar NOT NULL, "webhookId" varchar, "pathLength" integer, PRIMARY KEY ("webhookPath", "method"));`,
);
await queryRunner.query(
`INSERT INTO "${tablePrefix}TMP_webhook_entity" SELECT * FROM "${tablePrefix}webhook_entity";`,
);
await queryRunner.query(`DROP TABLE "${tablePrefix}webhook_entity";`);
await queryRunner.query(
`ALTER TABLE "${tablePrefix}TMP_webhook_entity" RENAME TO "${tablePrefix}webhook_entity";`,
);
await queryRunner.query(
`CREATE INDEX "idx_webhook_entity_webhook_path_method" ON "${tablePrefix}webhook_entity" ("webhookId","method","pathLength");`,
);
await queryRunner.query(`CREATE TABLE "${tablePrefix}TMP_execution_entity" (
"id" integer PRIMARY KEY AUTOINCREMENT NOT NULL,
"workflowId" varchar(36),
"finished" boolean NOT NULL,
"mode" varchar NOT NULL,
"retryOf" varchar,
"retrySuccessId" varchar,
"startedAt" datetime NOT NULL,
"stoppedAt" datetime,
"waitTill" datetime,
"workflowData" text NOT NULL,
"data" text NOT NULL, "status" varchar,
FOREIGN KEY("workflowId") REFERENCES "workflow_entity" ("id") ON DELETE CASCADE
);`);
await queryRunner.query(
`INSERT INTO "${tablePrefix}TMP_execution_entity" SELECT * FROM "${tablePrefix}execution_entity";`,
);
await queryRunner.query(`DROP TABLE "${tablePrefix}execution_entity";`);
await queryRunner.query(
`ALTER TABLE "${tablePrefix}TMP_execution_entity" RENAME TO "${tablePrefix}execution_entity";`,
);
await queryRunner.query(
`CREATE INDEX "idx_execution_entity_stopped_at" ON "${tablePrefix}execution_entity" ("stoppedAt");`,
);
await queryRunner.query(
`CREATE INDEX "idx_execution_entity_wait_till" ON "${tablePrefix}execution_entity" ("waitTill");`,
);
await queryRunner.query(
`CREATE TABLE "${tablePrefix}TMP_credentials_entity" ("id" varchar(36) PRIMARY KEY NOT NULL, "name" varchar(128) NOT NULL, "data" text NOT NULL, "type" varchar(32) NOT NULL, "nodesAccess" text NOT NULL, "createdAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), "updatedAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')));`,
);
await queryRunner.query(
`INSERT INTO "${tablePrefix}TMP_credentials_entity" SELECT * FROM "${tablePrefix}credentials_entity";`,
);
await queryRunner.query(`DROP TABLE "${tablePrefix}credentials_entity";`);
await queryRunner.query(
`ALTER TABLE "${tablePrefix}TMP_credentials_entity" RENAME TO "${tablePrefix}credentials_entity";`,
);
await queryRunner.query(
`CREATE INDEX "idx_credentials_entity_type" ON "${tablePrefix}credentials_entity" ("type");`,
);
await queryRunner.query(
`CREATE TABLE "${tablePrefix}TMP_shared_credentials" ("createdAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
"updatedAt" datetime(3) NOT NULL DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
"roleId" integer NOT NULL,
"userId" varchar NOT NULL, "credentialsId" varchar(36) NOT NULL,
CONSTRAINT "FK_shared_credentials_role" FOREIGN KEY ("roleId") REFERENCES "role" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION,
CONSTRAINT "FK_shared_credentials_user" FOREIGN KEY ("userId") REFERENCES "user" ("id") ON DELETE CASCADE ON UPDATE NO ACTION,
CONSTRAINT "FK_shared_credentials_credentials" FOREIGN KEY ("credentialsId") REFERENCES "${tablePrefix}credentials_entity" ("id") ON DELETE CASCADE ON UPDATE NO ACTION, PRIMARY KEY ("userId", "credentialsId"));`,
);
await queryRunner.query(
`INSERT INTO "${tablePrefix}TMP_shared_credentials" SELECT * FROM "${tablePrefix}shared_credentials";`,
);
await queryRunner.query(`DROP TABLE "${tablePrefix}shared_credentials";`);
await queryRunner.query(
`ALTER TABLE "${tablePrefix}TMP_shared_credentials" RENAME TO "${tablePrefix}shared_credentials";`,
);
await queryRunner.query(
`CREATE INDEX "idx_shared_credentials_credentials" ON "${tablePrefix}shared_credentials" ("credentialsId");`,
);
await queryRunner.query(
`CREATE UNIQUE INDEX "idx_shared_credentials_user_credentials" ON "${tablePrefix}shared_credentials" ("userId","credentialsId");`,
);
await queryRunner.query(`CREATE TABLE "${tablePrefix}TMP_variables" (
id varchar(36) PRIMARY KEY NOT NULL,
"key" TEXT NOT NULL,
"type" TEXT NOT NULL DEFAULT ('string'),
value TEXT,
UNIQUE("key")
);`);
await queryRunner.query(
`INSERT INTO "${tablePrefix}TMP_variables" SELECT * FROM "${tablePrefix}variables";`,
);
await queryRunner.query(`DROP TABLE "${tablePrefix}variables";`);
await queryRunner.query(
`ALTER TABLE "${tablePrefix}TMP_variables" RENAME TO "${tablePrefix}variables";`,
);
await queryRunner.query(`CREATE UNIQUE INDEX "idx_variables_key" ON "${tablePrefix}variables" ("key");
`);
await queryRunner.commitTransaction();
await queryRunner.query('PRAGMA foreign_keys=ON');
}
// eslint-disable-next-line @typescript-eslint/no-empty-function, @typescript-eslint/no-unused-vars
async down({ queryRunner, tablePrefix }: MigrationContext) {}
}

View File

@@ -0,0 +1,46 @@
import type { MigrationContext, ReversibleMigration } from '@/databases/types';
export class SeparateExecutionData1690000000010 implements ReversibleMigration {
async up({ queryRunner, tablePrefix }: MigrationContext): Promise<void> {
await queryRunner.query(
`CREATE TABLE "${tablePrefix}execution_data" (
"executionId" int PRIMARY KEY NOT NULL,
"workflowData" text NOT NULL,
"data" text NOT NULL,
FOREIGN KEY("executionId") REFERENCES "${tablePrefix}execution_entity" ("id") ON DELETE CASCADE
)`,
);
await queryRunner.query(
`INSERT INTO "${tablePrefix}execution_data" (
"executionId",
"workflowData",
"data")
SELECT "id", "workflowData", "data" FROM "${tablePrefix}execution_entity"
`,
);
await queryRunner.query(
`ALTER TABLE \`${tablePrefix}execution_entity\` DROP COLUMN "workflowData"`,
);
await queryRunner.query(`ALTER TABLE \`${tablePrefix}execution_entity\` DROP COLUMN "data"`);
}
async down({ queryRunner, tablePrefix }: MigrationContext): Promise<void> {
await queryRunner.query(
`ALTER TABLE \`${tablePrefix}execution_entity\` ADD COLUMN "workflowData" text NULL`,
);
await queryRunner.query(
`ALTER TABLE \`${tablePrefix}execution_entity\` ADD COLUMN "data" text NULL`,
);
await queryRunner.query(
`UPDATE "${tablePrefix}execution_entity" SET "workflowData" = (SELECT "workflowData" FROM "${tablePrefix}execution_data" WHERE "${tablePrefix}execution_data"."executionId" = "${tablePrefix}execution_entity"."id")`,
);
await queryRunner.query(
`UPDATE "${tablePrefix}execution_entity" SET "data" = (SELECT "data" FROM "${tablePrefix}execution_data" WHERE "${tablePrefix}execution_data"."executionId" = "${tablePrefix}execution_entity"."id")`,
);
await queryRunner.query(`DROP TABLE "${tablePrefix}execution_data"`);
}
}

View File

@@ -35,6 +35,8 @@ import { UpdateRunningExecutionStatus1677237073720 } from './1677237073720-Updat
import { CreateExecutionMetadataTable1679416281777 } from './1679416281777-CreateExecutionMetadataTable';
import { CreateVariables1677501636752 } from './1677501636752-CreateVariables';
import { AddUserActivatedProperty1681134145996 } from './1681134145996-AddUserActivatedProperty';
import { MigrateIntegerKeysToString1690000000002 } from './1690000000002-MigrateIntegerKeysToString';
import { SeparateExecutionData1690000000010 } from './1690000000010-SeparateExecutionData';
const sqliteMigrations: Migration[] = [
InitialMigration1588102412422,
@@ -73,6 +75,8 @@ const sqliteMigrations: Migration[] = [
CreateVariables1677501636752,
CreateExecutionMetadataTable1679416281777,
AddUserActivatedProperty1681134145996,
MigrateIntegerKeysToString1690000000002,
SeparateExecutionData1690000000010,
];
export { sqliteMigrations };

View File

@@ -1,10 +1,408 @@
import { Service } from 'typedi';
import { DataSource, Repository } from 'typeorm';
import { DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm';
import type {
FindManyOptions,
FindOneOptions,
FindOptionsWhere,
SelectQueryBuilder,
} from 'typeorm';
import { ExecutionEntity } from '../entities/ExecutionEntity';
import { parse, stringify } from 'flatted';
import type {
IExecutionBase,
IExecutionDb,
IExecutionFlattedDb,
IExecutionResponse,
} from '@/Interfaces';
import { LoggerProxy } from 'n8n-workflow';
import type { IExecutionsSummary, IRunExecutionData } from 'n8n-workflow';
import { ExecutionDataRepository } from './executionData.repository';
import type { ExecutionData } from '../entities/ExecutionData';
import type { IGetExecutionsQueryFilter } from '@/executions/executions.service';
import { isAdvancedExecutionFiltersEnabled } from '@/executions/executionHelpers';
import { ExecutionMetadata } from '../entities/ExecutionMetadata';
import { DateUtils } from 'typeorm/util/DateUtils';
import { BinaryDataManager } from 'n8n-core';
import config from '@/config';
function parseFiltersToQueryBuilder(
qb: SelectQueryBuilder<ExecutionEntity>,
filters?: IGetExecutionsQueryFilter,
) {
if (filters?.status) {
qb.andWhere('execution.status IN (:...workflowStatus)', {
workflowStatus: filters.status,
});
}
if (filters?.finished) {
qb.andWhere({ finished: filters.finished });
}
if (filters?.metadata && isAdvancedExecutionFiltersEnabled()) {
qb.leftJoin(ExecutionMetadata, 'md', 'md.executionId = execution.id');
for (const md of filters.metadata) {
qb.andWhere('md.key = :key AND md.value = :value', md);
}
}
if (filters?.startedAfter) {
qb.andWhere({
startedAt: MoreThanOrEqual(
DateUtils.mixedDateToUtcDatetimeString(new Date(filters.startedAfter)),
),
});
}
if (filters?.startedBefore) {
qb.andWhere({
startedAt: LessThanOrEqual(
DateUtils.mixedDateToUtcDatetimeString(new Date(filters.startedBefore)),
),
});
}
if (filters?.workflowId) {
qb.andWhere({
workflowId: filters.workflowId,
});
}
}
@Service()
export class ExecutionRepository extends Repository<ExecutionEntity> {
constructor(dataSource: DataSource) {
private executionDataRepository: ExecutionDataRepository;
constructor(dataSource: DataSource, executionDataRepository: ExecutionDataRepository) {
super(ExecutionEntity, dataSource.manager);
this.executionDataRepository = executionDataRepository;
}
async findMultipleExecutions(
queryParams: FindManyOptions<ExecutionEntity>,
options?: {
unflattenData: true;
includeData?: true;
},
): Promise<IExecutionResponse[]>;
async findMultipleExecutions(
queryParams: FindManyOptions<ExecutionEntity>,
options?: {
unflattenData?: false | undefined;
includeData?: true;
},
): Promise<IExecutionFlattedDb[]>;
async findMultipleExecutions(
queryParams: FindManyOptions<ExecutionEntity>,
options?: {
unflattenData?: boolean;
includeData?: boolean;
},
): Promise<IExecutionBase[]>;
async findMultipleExecutions(
queryParams: FindManyOptions<ExecutionEntity>,
options?: {
unflattenData?: boolean;
includeData?: boolean;
},
): Promise<IExecutionFlattedDb[] | IExecutionResponse[] | IExecutionBase[]> {
if (options?.includeData) {
if (!queryParams.relations) {
queryParams.relations = [];
}
(queryParams.relations as string[]).push('executionData');
}
const executions = await this.find(queryParams);
if (options?.includeData && options?.unflattenData) {
return executions.map((execution) => {
const { executionData, ...rest } = execution;
return {
...rest,
data: parse(executionData.data) as IRunExecutionData,
workflowData: executionData.workflowData,
} as IExecutionResponse;
});
} else if (options?.includeData) {
return executions.map((execution) => {
const { executionData, ...rest } = execution;
return {
...rest,
data: execution.executionData.data,
workflowData: execution.executionData.workflowData,
} as IExecutionFlattedDb;
});
}
return executions.map((execution) => {
const { executionData, ...rest } = execution;
return rest;
});
}
async findSingleExecution(
id: string,
options?: {
includeData: true;
unflattenData: true;
where?: FindOptionsWhere<ExecutionEntity>;
},
): Promise<IExecutionResponse | undefined>;
async findSingleExecution(
id: string,
options?: {
includeData: true;
unflattenData?: false | undefined;
where?: FindOptionsWhere<ExecutionEntity>;
},
): Promise<IExecutionFlattedDb | undefined>;
async findSingleExecution(
id: string,
options?: {
includeData?: boolean;
unflattenData?: boolean;
where?: FindOptionsWhere<ExecutionEntity>;
},
): Promise<IExecutionBase | undefined>;
async findSingleExecution(
id: string,
options?: {
includeData?: boolean;
unflattenData?: boolean;
where?: FindOptionsWhere<ExecutionEntity>;
},
): Promise<IExecutionFlattedDb | IExecutionResponse | IExecutionBase | undefined> {
const whereClause: FindOneOptions<ExecutionEntity> = {
where: {
id,
...options?.where,
},
};
if (options?.includeData) {
whereClause.relations = ['executionData'];
}
const execution = await this.findOne(whereClause);
if (!execution) {
return undefined;
}
const { executionData, ...rest } = execution;
if (options?.includeData && options?.unflattenData) {
return {
...rest,
data: parse(execution.executionData.data) as IRunExecutionData,
workflowData: execution.executionData.workflowData,
} as IExecutionResponse;
} else if (options?.includeData) {
return {
...rest,
data: execution.executionData.data,
workflowData: execution.executionData.workflowData,
} as IExecutionFlattedDb;
}
return rest;
}
async createNewExecution(execution: IExecutionDb) {
const { data, workflowData, ...rest } = execution;
const newExecution = await this.save(rest);
await this.executionDataRepository.save({
execution: newExecution,
workflowData,
data: stringify(data),
});
return newExecution;
}
async updateExistingExecution(executionId: string, execution: Partial<IExecutionResponse>) {
// Se isolate startedAt because it must be set when the execution starts and should never change.
// So we prevent updating it, if it's sent (it usually is and causes problems to executions that
// are resumed after waiting for some time, as a new startedAt is set)
const { id, data, workflowData, startedAt, ...executionInformation } = execution;
if (Object.keys(executionInformation).length > 0) {
await this.update({ id: executionId }, executionInformation);
}
if (data || workflowData) {
const executionData: Partial<ExecutionData> = {};
if (workflowData) {
executionData.workflowData = workflowData;
}
if (data) {
executionData.data = stringify(data);
}
// @ts-ignore
await this.executionDataRepository.update({ executionId }, executionData);
}
}
async deleteExecution(executionId: string) {
// TODO: Should this be awaited? Should we add a catch in case it fails?
await BinaryDataManager.getInstance().deleteBinaryDataByExecutionId(executionId);
return this.delete({ id: executionId });
}
async countExecutions(
filters: IGetExecutionsQueryFilter | undefined,
accessibleWorkflowIds: string[],
currentlyRunningExecutions: string[],
isOwner: boolean,
): Promise<{ count: number; estimated: boolean }> {
const dbType = config.getEnv('database.type');
if (dbType !== 'postgresdb' || (filters && Object.keys(filters).length > 0) || !isOwner) {
const query = this.createQueryBuilder('execution').andWhere(
'execution.workflowId IN (:...accessibleWorkflowIds)',
{ accessibleWorkflowIds },
);
if (currentlyRunningExecutions.length > 0) {
query.andWhere('execution.id NOT IN (:...currentlyRunningExecutions)', {
currentlyRunningExecutions,
});
}
parseFiltersToQueryBuilder(query, filters);
const count = await query.getCount();
return { count, estimated: false };
}
try {
// Get an estimate of rows count.
const estimateRowsNumberSql =
"SELECT n_live_tup FROM pg_stat_all_tables WHERE relname = 'execution_entity';";
const rows = (await this.query(estimateRowsNumberSql)) as Array<{ n_live_tup: string }>;
const estimate = parseInt(rows[0].n_live_tup, 10);
// If over 100k, return just an estimate.
if (estimate > 100_000) {
// if less than 100k, we get the real count as even a full
// table scan should not take so long.
return { count: estimate, estimated: true };
}
} catch (error) {
if (error instanceof Error) {
LoggerProxy.warn(`Failed to get executions count from Postgres: ${error.message}`, {
error,
});
}
}
const count = await this.count({
where: {
workflowId: In(accessibleWorkflowIds),
},
});
return { count, estimated: false };
}
async searchExecutions(
filters: IGetExecutionsQueryFilter | undefined,
limit: number,
excludedExecutionIds: string[],
accessibleWorkflowIds: string[],
additionalFilters?: { lastId?: string; firstId?: string },
): Promise<IExecutionsSummary[]> {
if (accessibleWorkflowIds.length === 0) {
return [];
}
const query = this.createQueryBuilder('execution')
.select([
'execution.id',
'execution.finished',
'execution.mode',
'execution.retryOf',
'execution.retrySuccessId',
'execution.status',
'execution.startedAt',
'execution.stoppedAt',
'execution.workflowId',
'execution.waitTill',
'workflow.name',
])
.innerJoin('execution.workflow', 'workflow')
.limit(limit)
// eslint-disable-next-line @typescript-eslint/naming-convention
.orderBy({ 'execution.id': 'DESC' })
.andWhere('execution.workflowId IN (:...accessibleWorkflowIds)', { accessibleWorkflowIds });
if (excludedExecutionIds.length > 0) {
query.andWhere('execution.id NOT IN (:...excludedExecutionIds)', { excludedExecutionIds });
}
if (additionalFilters?.lastId) {
query.andWhere('execution.id < :lastId', { lastId: additionalFilters.lastId });
}
if (additionalFilters?.firstId) {
query.andWhere('execution.id > :firstId', { firstId: additionalFilters.firstId });
}
parseFiltersToQueryBuilder(query, filters);
const executions = await query.getMany();
return executions.map((execution) => {
const { workflow, waitTill, ...rest } = execution;
return {
...rest,
waitTill: waitTill ?? undefined,
workflowName: workflow.name,
};
});
}
async deleteExecutions(
filters: IGetExecutionsQueryFilter | undefined,
accessibleWorkflowIds: string[],
deleteConditions: {
deleteBefore?: Date;
ids?: string[];
},
) {
if (!deleteConditions?.deleteBefore && !deleteConditions?.ids) {
throw new Error('Either "deleteBefore" or "ids" must be present in the request body');
}
const query = this.createQueryBuilder('execution')
.select(['execution.id'])
.andWhere('execution.workflowId IN (:...accessibleWorkflowIds)', { accessibleWorkflowIds });
if (deleteConditions.deleteBefore) {
// delete executions by date, if user may access the underlying workflows
query.andWhere('execution.startedAt <= :deleteBefore', {
deleteBefore: deleteConditions.deleteBefore,
});
// Filters are only used when filtering by date
parseFiltersToQueryBuilder(query, filters);
} else if (deleteConditions.ids) {
// delete executions by IDs, if user may access the underlying workflows
query.andWhere('execution.id IN (:...executionIds)', { executionIds: deleteConditions.ids });
}
const executions = await query.getMany();
if (!executions.length) {
if (deleteConditions.ids) {
LoggerProxy.error('Failed to delete an execution due to insufficient permissions', {
executionIds: deleteConditions.ids,
});
}
return;
}
const idsToDelete = executions.map(({ id }) => id);
const binaryDataManager = BinaryDataManager.getInstance();
await Promise.all(
idsToDelete.map(async (id) => binaryDataManager.deleteBinaryDataByExecutionId(id)),
);
do {
// Delete in batches to avoid "SQLITE_ERROR: Expression tree is too large (maximum depth 1000)" error
const batch = idsToDelete.splice(0, 500);
await this.delete(batch);
} while (idsToDelete.length > 0);
}
}

View File

@@ -0,0 +1,10 @@
import { Service } from 'typedi';
import { DataSource, Repository } from 'typeorm';
import { ExecutionData } from '../entities/ExecutionData';
@Service()
export class ExecutionDataRepository extends Repository<ExecutionData> {
constructor(dataSource: DataSource) {
super(ExecutionData, dataSource.manager);
}
}

View File

@@ -2,6 +2,7 @@ export { AuthIdentityRepository } from './authIdentity.repository';
export { AuthProviderSyncHistoryRepository } from './authProviderSyncHistory.repository';
export { CredentialsRepository } from './credentials.repository';
export { EventDestinationsRepository } from './eventDestinations.repository';
export { ExecutionDataRepository } from './executionData.repository';
export { ExecutionMetadataRepository } from './executionMetadata.repository';
export { ExecutionRepository } from './execution.repository';
export { InstalledNodesRepository } from './installedNodes.repository';

View File

@@ -0,0 +1,6 @@
import { customAlphabet } from 'nanoid';
const nanoid = customAlphabet('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz', 16);
export function generateNanoId() {
return nanoid();
}