refactor(core): Delete more redundant code across migrations (Part 1) (no-changelog) (#6691)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2023-07-31 17:35:53 +02:00
committed by GitHub
parent 7b27fa5898
commit b7ca27afcf
42 changed files with 830 additions and 2195 deletions

View File

@@ -0,0 +1,48 @@
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { MigrationContext, ReversibleMigration } from '@db/types';
export class UniqueWorkflowNames1620821879465 implements ReversibleMigration {
protected indexSuffix = '943d8f922be094eb507cb9a7f9';
async up({ isMysql, escape, runQuery }: MigrationContext) {
const tableName = escape.tableName('workflow_entity');
const workflowNames: Array<Pick<WorkflowEntity, 'name'>> = await runQuery(
`SELECT name FROM ${tableName}`,
);
for (const { name } of workflowNames) {
const duplicates: Array<Pick<WorkflowEntity, 'id' | 'name'>> = await runQuery(
`SELECT id, name FROM ${tableName} WHERE name = :name ORDER BY createdAt ASC`,
{ name },
);
if (duplicates.length > 1) {
await Promise.all(
duplicates.map(async (workflow, index) => {
if (index === 0) return;
return runQuery(
`UPDATE ${tableName} SET name = :name WHERE id = :id`,
{ name: `${workflow.name} ${index + 1}` },
{ id: workflow.id },
);
}),
);
}
}
const indexName = escape.indexName(this.indexSuffix);
await runQuery(
isMysql
? `ALTER TABLE ${tableName} ADD UNIQUE INDEX ${indexName} (${escape.columnName('name')})`
: `CREATE UNIQUE INDEX ${indexName} ON ${tableName} ("name")`,
);
}
async down({ isMysql, escape, runQuery }: MigrationContext) {
const tableName = escape.tableName('workflow_entity');
const indexName = escape.indexName(this.indexSuffix);
await runQuery(
isMysql ? `ALTER TABLE ${tableName} DROP INDEX ${indexName}` : `DROP INDEX ${indexName}`,
);
}
}

View File

@@ -0,0 +1,258 @@
import type { IWorkflowBase } from 'n8n-workflow';
import type { CredentialsEntity } from '@db/entities/CredentialsEntity';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { MigrationContext, ReversibleMigration } from '@db/types';
type Credential = Pick<CredentialsEntity, 'id' | 'name' | 'type'>;
type ExecutionWithData = { id: string; workflowData: string | IWorkflowBase };
type Workflow = Pick<WorkflowEntity, 'id'> & { nodes: string | WorkflowEntity['nodes'] };
// replacing the credentials in workflows and execution
// `nodeType: name` changes to `nodeType: { id, name }`
export class UpdateWorkflowCredentials1630330987096 implements ReversibleMigration {
async up({ dbType, escape, parseJson, runQuery, runInBatches }: MigrationContext) {
const credentialsTable = escape.tableName('credentials_entity');
const workflowsTable = escape.tableName('workflow_entity');
const executionsTable = escape.tableName('execution_entity');
const dataColumn = escape.columnName('workflowData');
const waitTillColumn = escape.columnName('waitTill');
const credentialsEntities: Credential[] = await runQuery(
`SELECT id, name, type FROM ${credentialsTable}`,
);
const workflowsQuery = `SELECT id, nodes FROM ${workflowsTable}`;
await runInBatches<Workflow>(workflowsQuery, async (workflows) => {
workflows.forEach(async (workflow) => {
let credentialsUpdated = false;
const nodes = parseJson(workflow.nodes);
nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, name] of allNodeCredentials) {
if (typeof name === 'string') {
const matchingCredentials = credentialsEntities.find(
(credentials) => credentials.name === name && credentials.type === type,
);
node.credentials[type] = { id: matchingCredentials?.id ?? null, name };
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
await runQuery(
`UPDATE ${workflowsTable} SET nodes = :nodes WHERE id = :id`,
{ nodes: JSON.stringify(nodes) },
{ id: workflow.id },
);
}
});
});
const finishedValue = dbType === 'postgresdb' ? 'FALSE' : '0';
const waitingExecutionsQuery = `
SELECT id, ${dataColumn}
FROM ${executionsTable}
WHERE ${waitTillColumn} IS NOT NULL AND finished = ${finishedValue}
`;
await runInBatches<ExecutionWithData>(waitingExecutionsQuery, async (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
let credentialsUpdated = false;
const workflowData = parseJson(execution.workflowData);
workflowData.nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, name] of allNodeCredentials) {
if (typeof name === 'string') {
const matchingCredentials = credentialsEntities.find(
(credentials) => credentials.name === name && credentials.type === type,
);
node.credentials[type] = { id: matchingCredentials?.id ?? null, name };
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
await runQuery(
`UPDATE ${executionsTable}
SET ${escape.columnName('workflowData')} = :data WHERE id = :id`,
{ data: JSON.stringify(workflowData) },
{ id: execution.id },
);
}
});
});
const retryableExecutions: ExecutionWithData[] = await runQuery(`
SELECT id, ${dataColumn}
FROM ${executionsTable}
WHERE ${waitTillColumn} IS NULL AND finished = ${finishedValue} AND mode != 'retry'
ORDER BY ${escape.columnName('startedAt')} DESC
LIMIT 200
`);
retryableExecutions.forEach(async (execution) => {
let credentialsUpdated = false;
const workflowData = parseJson(execution.workflowData);
workflowData.nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, name] of allNodeCredentials) {
if (typeof name === 'string') {
const matchingCredentials = credentialsEntities.find(
(credentials) => credentials.name === name && credentials.type === type,
);
node.credentials[type] = { id: matchingCredentials?.id ?? null, name };
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
await runQuery(
`UPDATE ${executionsTable}
SET ${escape.columnName('workflowData')} = :data WHERE id = :id`,
{ data: JSON.stringify(workflowData) },
{ id: execution.id },
);
}
});
}
async down({ dbType, escape, parseJson, runQuery, runInBatches }: MigrationContext) {
const credentialsTable = escape.tableName('credentials_entity');
const workflowsTable = escape.tableName('workflow_entity');
const executionsTable = escape.tableName('execution_entity');
const dataColumn = escape.columnName('workflowData');
const waitTillColumn = escape.columnName('waitTill');
const credentialsEntities: Credential[] = await runQuery(
`SELECT id, name, type FROM ${credentialsTable}`,
);
const workflowsQuery = `SELECT id, nodes FROM ${workflowsTable}`;
await runInBatches<Workflow>(workflowsQuery, async (workflows) => {
workflows.forEach(async (workflow) => {
let credentialsUpdated = false;
const nodes = parseJson(workflow.nodes);
nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, creds] of allNodeCredentials) {
if (typeof creds === 'object') {
const matchingCredentials = credentialsEntities.find(
// double-equals because creds.id can be string or number
// eslint-disable-next-line eqeqeq
(credentials) => credentials.id == creds.id && credentials.type === type,
);
if (matchingCredentials) {
// @ts-ignore
node.credentials[type] = matchingCredentials.name;
} else {
// @ts-ignore
node.credentials[type] = creds.name;
}
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
await runQuery(
`UPDATE ${workflowsTable} SET nodes = :nodes WHERE id = :id`,
{ nodes: JSON.stringify(nodes) },
{ id: workflow.id },
);
}
});
});
const finishedValue = dbType === 'postgresdb' ? 'FALSE' : '0';
const waitingExecutionsQuery = `
SELECT id, ${dataColumn}
FROM ${executionsTable}
WHERE ${waitTillColumn} IS NOT NULL AND finished = ${finishedValue}
`;
await runInBatches<ExecutionWithData>(waitingExecutionsQuery, async (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
let credentialsUpdated = false;
const workflowData = parseJson(execution.workflowData);
workflowData.nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, creds] of allNodeCredentials) {
if (typeof creds === 'object') {
const matchingCredentials = credentialsEntities.find(
// double-equals because creds.id can be string or number
// eslint-disable-next-line eqeqeq
(credentials) => credentials.id == creds.id && credentials.type === type,
);
if (matchingCredentials) {
// @ts-ignore
node.credentials[type] = matchingCredentials.name;
} else {
// @ts-ignore
node.credentials[type] = creds.name;
}
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
await runQuery(
`UPDATE ${executionsTable}
SET ${escape.columnName('workflowData')} = :data WHERE id = :id`,
{ data: JSON.stringify(workflowData) },
{ id: execution.id },
);
}
});
});
const retryableExecutions: ExecutionWithData[] = await runQuery(`
SELECT id, ${dataColumn}
FROM ${executionsTable}
WHERE ${waitTillColumn} IS NULL AND finished = ${finishedValue} AND mode != 'retry'
ORDER BY ${escape.columnName('startedAt')} DESC
LIMIT 200
`);
retryableExecutions.forEach(async (execution) => {
let credentialsUpdated = false;
const workflowData = parseJson(execution.workflowData);
workflowData.nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, creds] of allNodeCredentials) {
if (typeof creds === 'object') {
const matchingCredentials = credentialsEntities.find(
// double-equals because creds.id can be string or number
// eslint-disable-next-line eqeqeq
(credentials) => credentials.id == creds.id && credentials.type === type,
);
if (matchingCredentials) {
// @ts-ignore
node.credentials[type] = matchingCredentials.name;
} else {
// @ts-ignore
node.credentials[type] = creds.name;
}
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
await runQuery(
`UPDATE ${executionsTable}
SET ${escape.columnName('workflowData')} = :data WHERE id = :id`,
{ data: JSON.stringify(workflowData) },
{ id: execution.id },
);
}
});
}
}

View File

@@ -0,0 +1,44 @@
import type { INode } from 'n8n-workflow';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { MigrationContext, ReversibleMigration } from '@db/types';
import { v4 as uuid } from 'uuid';
type Workflow = Pick<WorkflowEntity, 'id'> & { nodes: string | INode[] };
export class AddNodeIds1658930531669 implements ReversibleMigration {
async up({ escape, runQuery, runInBatches, parseJson }: MigrationContext) {
const tableName = escape.tableName('workflow_entity');
const workflowsQuery = `SELECT id, nodes FROM ${tableName}`;
await runInBatches<Workflow>(workflowsQuery, async (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = parseJson(workflow.nodes);
nodes.forEach((node: INode) => {
if (!node.id) {
node.id = uuid();
}
});
await runQuery(
`UPDATE ${tableName} SET nodes = :nodes WHERE id = :id`,
{ nodes: JSON.stringify(nodes) },
{ id: workflow.id },
);
});
});
}
async down({ escape, runQuery, runInBatches, parseJson }: MigrationContext) {
const tableName = escape.tableName('workflow_entity');
const workflowsQuery = `SELECT id, nodes FROM ${tableName}`;
await runInBatches<Workflow>(workflowsQuery, async (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = parseJson(workflow.nodes).map(({ id, ...rest }) => rest);
await runQuery(
`UPDATE ${tableName} SET nodes = :nodes WHERE id = :id`,
{ nodes: JSON.stringify(nodes) },
{ id: workflow.id },
);
});
});
}
}

View File

@@ -0,0 +1,84 @@
import type { IDataObject, INodeExecutionData } from 'n8n-workflow';
import type { MigrationContext, IrreversibleMigration } from '@db/types';
type OldPinnedData = { [nodeName: string]: IDataObject[] };
type NewPinnedData = { [nodeName: string]: INodeExecutionData[] };
type Workflow = { id: number; pinData: string | OldPinnedData };
function isObjectLiteral(item: unknown): item is { [key: string]: string } {
return typeof item === 'object' && item !== null && !Array.isArray(item);
}
function isJsonKeyObject(item: unknown): item is {
json: unknown;
[keys: string]: unknown;
} {
if (!isObjectLiteral(item)) return false;
return Object.keys(item).includes('json');
}
/**
* Convert TEXT-type `pinData` column in `workflow_entity` table from
* `{ [nodeName: string]: IDataObject[] }` to `{ [nodeName: string]: INodeExecutionData[] }`
*/
export class AddJsonKeyPinData1659888469333 implements IrreversibleMigration {
async up({ escape, runQuery, runInBatches }: MigrationContext) {
const tableName = escape.tableName('workflow_entity');
const columnName = escape.columnName('pinData');
const selectQuery = `SELECT id, ${columnName} FROM ${tableName} WHERE ${columnName} IS NOT NULL`;
await runInBatches<Workflow>(selectQuery, async (workflows) => {
await Promise.all(
this.makeUpdateParams(workflows).map(async (workflow) =>
runQuery(`UPDATE ${tableName} SET ${columnName} = :pinData WHERE id = :id;`, {
pinData: workflow.pinData,
id: workflow.id,
}),
),
);
});
}
private makeUpdateParams(fetchedWorkflows: Workflow[]) {
return fetchedWorkflows.reduce<Workflow[]>((updateParams, { id, pinData: rawPinData }) => {
let pinDataPerWorkflow: OldPinnedData | NewPinnedData;
if (typeof rawPinData === 'string') {
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
pinDataPerWorkflow = JSON.parse(rawPinData);
} catch {
pinDataPerWorkflow = {};
}
} else {
pinDataPerWorkflow = rawPinData;
}
const newPinDataPerWorkflow = Object.keys(pinDataPerWorkflow).reduce<NewPinnedData>(
// eslint-disable-next-line @typescript-eslint/no-shadow
(newPinDataPerWorkflow, nodeName) => {
let pinDataPerNode = pinDataPerWorkflow[nodeName];
if (!Array.isArray(pinDataPerNode)) {
pinDataPerNode = [pinDataPerNode];
}
if (pinDataPerNode.every((item) => item.json)) return newPinDataPerWorkflow;
newPinDataPerWorkflow[nodeName] = pinDataPerNode.map((item) =>
isJsonKeyObject(item) ? item : { json: item },
);
return newPinDataPerWorkflow;
},
{},
);
if (Object.keys(newPinDataPerWorkflow).length > 0) {
updateParams.push({ id, pinData: JSON.stringify(newPinDataPerWorkflow) });
}
return updateParams;
}, []);
}
}

View File

@@ -0,0 +1,28 @@
import type { MigrationContext, ReversibleMigration } from '@db/types';
import { v4 as uuidv4 } from 'uuid';
type Workflow = { id: number };
export class AddWorkflowVersionIdColumn1669739707124 implements ReversibleMigration {
async up({ escape, runQuery }: MigrationContext) {
const tableName = escape.tableName('workflow_entity');
const columnName = escape.columnName('versionId');
await runQuery(`ALTER TABLE ${tableName} ADD COLUMN ${columnName} CHAR(36)`);
const workflowIds: Workflow[] = await runQuery(`SELECT id FROM ${tableName}`);
for (const { id } of workflowIds) {
await runQuery(
`UPDATE ${tableName} SET ${columnName} = :versionId WHERE id = :id`,
{ versionId: uuidv4() },
{ id },
);
}
}
async down({ escape, runQuery }: MigrationContext) {
const tableName = escape.tableName('workflow_entity');
const columnName = escape.columnName('versionId');
await runQuery(`ALTER TABLE ${tableName} DROP COLUMN ${columnName}`);
}
}

View File

@@ -0,0 +1,61 @@
import type { MigrationContext, ReversibleMigration } from '@db/types';
import { StatisticsNames } from '@db/entities/WorkflowStatistics';
export class RemoveWorkflowDataLoadedFlag1671726148419 implements ReversibleMigration {
async up({ escape, dbType, runQuery }: MigrationContext) {
const workflowTableName = escape.tableName('workflow_entity');
const statisticsTableName = escape.tableName('workflow_statistics');
const columnName = escape.columnName('dataLoaded');
// If any existing workflow has dataLoaded set to true, insert the relevant information to the statistics table
const workflowIds: Array<{ id: number; dataLoaded: boolean }> = await runQuery(
`SELECT id, ${columnName} FROM ${workflowTableName}`,
);
const now =
dbType === 'sqlite' ? "STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')" : 'CURRENT_TIMESTAMP(3)';
await Promise.all(
workflowIds.map(
async ({ id, dataLoaded }) =>
dataLoaded &&
runQuery(
`INSERT INTO ${statisticsTableName}
(${escape.columnName('workflowId')}, name, count, ${escape.columnName('latestEvent')})
VALUES (:id, :name, 1, ${now})`,
{ id, name: StatisticsNames.dataLoaded },
),
),
);
await runQuery(`ALTER TABLE ${workflowTableName} DROP COLUMN ${columnName}`);
}
async down({ escape, runQuery }: MigrationContext) {
const workflowTableName = escape.tableName('workflow_entity');
const statisticsTableName = escape.tableName('workflow_statistics');
const columnName = escape.columnName('dataLoaded');
await runQuery(
`ALTER TABLE ${workflowTableName} ADD COLUMN ${columnName} BOOLEAN DEFAULT false`,
);
// Search through statistics for any workflows that have the dataLoaded stat
const workflowsIds: Array<{ workflowId: string }> = await runQuery(
`SELECT ${escape.columnName('workflowId')} FROM ${statisticsTableName} WHERE name = :name`,
{ name: StatisticsNames.dataLoaded },
);
await Promise.all(
workflowsIds.map(async ({ workflowId }) =>
runQuery(`UPDATE ${workflowTableName} SET ${columnName} = true WHERE id = :id`, {
id: workflowId,
}),
),
);
await runQuery(`DELETE FROM ${statisticsTableName} WHERE name = :name`, {
name: StatisticsNames.dataLoaded,
});
}
}

View File

@@ -0,0 +1,68 @@
import type { MigrationContext, ReversibleMigration } from '@db/types';
import { LDAP_DEFAULT_CONFIGURATION, LDAP_FEATURE_NAME } from '@/Ldap/constants';
export class CreateLdapEntities1674509946020 implements ReversibleMigration {
async up({ escape, dbType, isMysql, runQuery }: MigrationContext) {
const userTable = escape.tableName('user');
await runQuery(`ALTER TABLE ${userTable} ADD COLUMN disabled BOOLEAN NOT NULL DEFAULT false;`);
await runQuery(`
INSERT INTO ${escape.tableName('settings')}
(${escape.columnName('key')}, value, ${escape.columnName('loadOnStartup')})
VALUES ('${LDAP_FEATURE_NAME}', '${JSON.stringify(LDAP_DEFAULT_CONFIGURATION)}', true)
`);
const uuidColumnType = dbType === 'postgresdb' ? 'UUID' : 'VARCHAR(36)';
await runQuery(
`CREATE TABLE IF NOT EXISTS ${escape.tableName('auth_identity')} (
${escape.columnName('userId')} ${uuidColumnType} REFERENCES ${userTable} (id),
${escape.columnName('providerId')} VARCHAR(64) NOT NULL,
${escape.columnName('providerType')} VARCHAR(32) NOT NULL,
${escape.columnName('createdAt')} timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
${escape.columnName('updatedAt')} timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(${escape.columnName('providerId')}, ${escape.columnName('providerType')})
)${isMysql ? "ENGINE='InnoDB'" : ''}`,
);
const idColumn =
dbType === 'sqlite'
? 'INTEGER PRIMARY KEY AUTOINCREMENT'
: dbType === 'postgresdb'
? 'SERIAL NOT NULL PRIMARY KEY'
: 'INTEGER NOT NULL AUTO_INCREMENT';
const timestampColumn =
dbType === 'sqlite'
? 'DATETIME NOT NULL'
: dbType === 'postgresdb'
? 'TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP'
: 'DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP';
await runQuery(
`CREATE TABLE IF NOT EXISTS ${escape.tableName('auth_provider_sync_history')} (
${escape.columnName('id')} ${idColumn},
${escape.columnName('providerType')} VARCHAR(32) NOT NULL,
${escape.columnName('runMode')} TEXT NOT NULL,
${escape.columnName('status')} TEXT NOT NULL,
${escape.columnName('startedAt')} ${timestampColumn},
${escape.columnName('endedAt')} ${timestampColumn},
${escape.columnName('scanned')} INTEGER NOT NULL,
${escape.columnName('created')} INTEGER NOT NULL,
${escape.columnName('updated')} INTEGER NOT NULL,
${escape.columnName('disabled')} INTEGER NOT NULL,
${escape.columnName('error')} TEXT
${isMysql ? ',PRIMARY KEY (`id`)' : ''}
)${isMysql ? "ENGINE='InnoDB'" : ''}`,
);
}
async down({ escape, runQuery }: MigrationContext) {
await runQuery(`DROP TABLE "${escape.tableName('auth_provider_sync_history')}`);
await runQuery(`DROP TABLE "${escape.tableName('auth_identity')}`);
await runQuery(`DELETE FROM ${escape.tableName('settings')} WHERE key = :key`, {
key: LDAP_FEATURE_NAME,
});
await runQuery(`ALTER TABLE ${escape.tableName('user')} DROP COLUMN disabled`);
}
}

View File

@@ -0,0 +1,58 @@
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { MigrationContext, IrreversibleMigration } from '@db/types';
interface Workflow {
id: number;
nodes: WorkflowEntity['nodes'] | string;
connections: WorkflowEntity['connections'] | string;
}
export class PurgeInvalidWorkflowConnections1675940580449 implements IrreversibleMigration {
async up({ escape, parseJson, runQuery, nodeTypes }: MigrationContext) {
const workflowsTable = escape.tableName('workflow_entity');
const workflows: Workflow[] = await runQuery(
`SELECT id, nodes, connections FROM ${workflowsTable}`,
);
await Promise.all(
workflows.map(async (workflow) => {
const connections = parseJson(workflow.connections);
const nodes = parseJson(workflow.nodes);
const nodesThatCannotReceiveInput = nodes.reduce<string[]>((acc, node) => {
try {
const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
if ((nodeType.description.inputs?.length ?? []) === 0) {
acc.push(node.name);
}
} catch (error) {}
return acc;
}, []);
Object.keys(connections).forEach((sourceNodeName) => {
const connection = connections[sourceNodeName];
const outputs = Object.keys(connection);
outputs.forEach((outputConnectionName /* Like `main` */) => {
const outputConnection = connection[outputConnectionName];
// It filters out all connections that are connected to a node that cannot receive input
outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => {
outputConnection[outputConnectionItemIdx] = outputConnectionItem.filter(
(outgoingConnections) =>
!nodesThatCannotReceiveInput.includes(outgoingConnections.node),
);
});
});
});
// Update database with new connections
return runQuery(
`UPDATE ${workflowsTable} SET connections = :connections WHERE id = :id`,
{ connections: JSON.stringify(connections) },
{ id: workflow.id },
);
}),
);
}
}