fix(core): Use owners file to export wf owners (#6547)
* remove owner from exported workflow * use owners file to export wf owners * update sharedworkflow owners * fix logic * further update logic * add updatetAt to local changes * additional filter for cred export * optimize query * remove transactions and optimize query * reduce array size and add updated at to tags status
This commit is contained in:
committed by
GitHub
parent
b19833d673
commit
4b755fb0b4
@@ -3,6 +3,7 @@ import path from 'path';
|
||||
import {
|
||||
SOURCE_CONTROL_CREDENTIAL_EXPORT_FOLDER,
|
||||
SOURCE_CONTROL_GIT_FOLDER,
|
||||
SOURCE_CONTROL_OWNERS_EXPORT_FILE,
|
||||
SOURCE_CONTROL_TAGS_EXPORT_FILE,
|
||||
SOURCE_CONTROL_VARIABLES_EXPORT_FILE,
|
||||
SOURCE_CONTROL_WORKFLOW_EXPORT_FOLDER,
|
||||
@@ -14,15 +15,12 @@ import { readFile as fsReadFile } from 'fs/promises';
|
||||
import { Credentials, UserSettings } from 'n8n-core';
|
||||
import type { IWorkflowToImport } from '@/Interfaces';
|
||||
import type { ExportableCredential } from './types/exportableCredential';
|
||||
import { SharedWorkflow } from '@/databases/entities/SharedWorkflow';
|
||||
import { CredentialsEntity } from '@/databases/entities/CredentialsEntity';
|
||||
import { Variables } from '@/databases/entities/Variables';
|
||||
import type { ImportResult } from './types/importResult';
|
||||
import { UM_FIX_INSTRUCTION } from '@/commands/BaseCommand';
|
||||
import { SharedCredentials } from '@/databases/entities/SharedCredentials';
|
||||
import { WorkflowEntity } from '@/databases/entities/WorkflowEntity';
|
||||
import { WorkflowTagMapping } from '@/databases/entities/WorkflowTagMapping';
|
||||
import { TagEntity } from '@/databases/entities/TagEntity';
|
||||
import type { WorkflowTagMapping } from '@/databases/entities/WorkflowTagMapping';
|
||||
import type { TagEntity } from '@/databases/entities/TagEntity';
|
||||
import { ActiveWorkflowRunner } from '../../ActiveWorkflowRunner';
|
||||
import type { SourceControllPullOptions } from './types/sourceControlPullWorkFolder';
|
||||
import { In } from 'typeorm';
|
||||
@@ -94,56 +92,54 @@ export class SourceControlImportService {
|
||||
const ownerGlobalRole = await this.getOwnerGlobalRole();
|
||||
const encryptionKey = await UserSettings.getEncryptionKey();
|
||||
let importCredentialsResult: Array<{ id: string; name: string; type: string }> = [];
|
||||
await Db.transaction(async (transactionManager) => {
|
||||
importCredentialsResult = await Promise.all(
|
||||
credentialFiles.map(async (file) => {
|
||||
LoggerProxy.debug(`Importing credentials file ${file}`);
|
||||
const credential = jsonParse<ExportableCredential>(
|
||||
await fsReadFile(file, { encoding: 'utf8' }),
|
||||
);
|
||||
const existingCredential = existingCredentials.find(
|
||||
(e) => e.id === credential.id && e.type === credential.type,
|
||||
);
|
||||
const sharedOwner = await Db.collections.SharedCredentials.findOne({
|
||||
select: ['userId'],
|
||||
where: {
|
||||
credentialsId: credential.id,
|
||||
roleId: In([ownerCredentialRole.id, ownerGlobalRole.id]),
|
||||
},
|
||||
});
|
||||
importCredentialsResult = await Promise.all(
|
||||
credentialFiles.map(async (file) => {
|
||||
LoggerProxy.debug(`Importing credentials file ${file}`);
|
||||
const credential = jsonParse<ExportableCredential>(
|
||||
await fsReadFile(file, { encoding: 'utf8' }),
|
||||
);
|
||||
const existingCredential = existingCredentials.find(
|
||||
(e) => e.id === credential.id && e.type === credential.type,
|
||||
);
|
||||
const sharedOwner = await Db.collections.SharedCredentials.findOne({
|
||||
select: ['userId'],
|
||||
where: {
|
||||
credentialsId: credential.id,
|
||||
roleId: In([ownerCredentialRole.id, ownerGlobalRole.id]),
|
||||
},
|
||||
});
|
||||
|
||||
const { name, type, data, id, nodesAccess } = credential;
|
||||
const newCredentialObject = new Credentials({ id, name }, type, []);
|
||||
if (existingCredential?.data) {
|
||||
newCredentialObject.data = existingCredential.data;
|
||||
} else {
|
||||
newCredentialObject.setData(data, encryptionKey);
|
||||
}
|
||||
newCredentialObject.nodesAccess = nodesAccess || existingCredential?.nodesAccess || [];
|
||||
const { name, type, data, id, nodesAccess } = credential;
|
||||
const newCredentialObject = new Credentials({ id, name }, type, []);
|
||||
if (existingCredential?.data) {
|
||||
newCredentialObject.data = existingCredential.data;
|
||||
} else {
|
||||
newCredentialObject.setData(data, encryptionKey);
|
||||
}
|
||||
newCredentialObject.nodesAccess = nodesAccess || existingCredential?.nodesAccess || [];
|
||||
|
||||
LoggerProxy.debug(`Updating credential id ${newCredentialObject.id as string}`);
|
||||
await transactionManager.upsert(CredentialsEntity, newCredentialObject, ['id']);
|
||||
LoggerProxy.debug(`Updating credential id ${newCredentialObject.id as string}`);
|
||||
await Db.collections.Credentials.upsert(newCredentialObject, ['id']);
|
||||
|
||||
if (!sharedOwner) {
|
||||
const newSharedCredential = new SharedCredentials();
|
||||
newSharedCredential.credentialsId = newCredentialObject.id as string;
|
||||
newSharedCredential.userId = userId;
|
||||
newSharedCredential.roleId = ownerGlobalRole.id;
|
||||
if (!sharedOwner) {
|
||||
const newSharedCredential = new SharedCredentials();
|
||||
newSharedCredential.credentialsId = newCredentialObject.id as string;
|
||||
newSharedCredential.userId = userId;
|
||||
newSharedCredential.roleId = ownerGlobalRole.id;
|
||||
|
||||
await transactionManager.upsert(SharedCredentials, { ...newSharedCredential }, [
|
||||
'credentialsId',
|
||||
'userId',
|
||||
]);
|
||||
}
|
||||
await Db.collections.SharedCredentials.upsert({ ...newSharedCredential }, [
|
||||
'credentialsId',
|
||||
'userId',
|
||||
]);
|
||||
}
|
||||
|
||||
return {
|
||||
id: newCredentialObject.id as string,
|
||||
name: newCredentialObject.name,
|
||||
type: newCredentialObject.type,
|
||||
};
|
||||
}),
|
||||
);
|
||||
});
|
||||
return {
|
||||
id: newCredentialObject.id as string,
|
||||
name: newCredentialObject.name,
|
||||
type: newCredentialObject.type,
|
||||
};
|
||||
}),
|
||||
);
|
||||
return importCredentialsResult.filter((e) => e !== undefined);
|
||||
}
|
||||
|
||||
@@ -224,35 +220,31 @@ export class SourceControlImportService {
|
||||
).map((e) => e.id),
|
||||
);
|
||||
|
||||
await Db.transaction(async (transactionManager) => {
|
||||
await Promise.all(
|
||||
mappedTags.tags.map(async (tag) => {
|
||||
await transactionManager.upsert(
|
||||
TagEntity,
|
||||
{
|
||||
...tag,
|
||||
},
|
||||
{
|
||||
skipUpdateIfNoValuesChanged: true,
|
||||
conflictPaths: { id: true },
|
||||
},
|
||||
);
|
||||
}),
|
||||
);
|
||||
await Promise.all(
|
||||
mappedTags.mappings.map(async (mapping) => {
|
||||
if (!existingWorkflowIds.has(String(mapping.workflowId))) return;
|
||||
await transactionManager.upsert(
|
||||
WorkflowTagMapping,
|
||||
{ tagId: String(mapping.tagId), workflowId: String(mapping.workflowId) },
|
||||
{
|
||||
skipUpdateIfNoValuesChanged: true,
|
||||
conflictPaths: { tagId: true, workflowId: true },
|
||||
},
|
||||
);
|
||||
}),
|
||||
);
|
||||
});
|
||||
await Promise.all(
|
||||
mappedTags.tags.map(async (tag) => {
|
||||
await Db.collections.Tag.upsert(
|
||||
{
|
||||
...tag,
|
||||
},
|
||||
{
|
||||
skipUpdateIfNoValuesChanged: true,
|
||||
conflictPaths: { id: true },
|
||||
},
|
||||
);
|
||||
}),
|
||||
);
|
||||
await Promise.all(
|
||||
mappedTags.mappings.map(async (mapping) => {
|
||||
if (!existingWorkflowIds.has(String(mapping.workflowId))) return;
|
||||
await Db.collections.WorkflowTagMapping.upsert(
|
||||
{ tagId: String(mapping.tagId), workflowId: String(mapping.workflowId) },
|
||||
{
|
||||
skipUpdateIfNoValuesChanged: true,
|
||||
conflictPaths: { tagId: true, workflowId: true },
|
||||
},
|
||||
);
|
||||
}),
|
||||
);
|
||||
return mappedTags;
|
||||
}
|
||||
return { tags: [], mappings: [] };
|
||||
@@ -273,74 +265,118 @@ export class SourceControlImportService {
|
||||
const ownerWorkflowRole = await this.getOwnerWorkflowRole();
|
||||
const workflowRunner = Container.get(ActiveWorkflowRunner);
|
||||
|
||||
let importWorkflowsResult = new Array<{ id: string; name: string }>();
|
||||
await Db.transaction(async (transactionManager) => {
|
||||
importWorkflowsResult = await Promise.all(
|
||||
workflowFiles.map(async (file) => {
|
||||
LoggerProxy.debug(`Parsing workflow file ${file}`);
|
||||
const importedWorkflow = jsonParse<IWorkflowToImport>(
|
||||
await fsReadFile(file, { encoding: 'utf8' }),
|
||||
// read owner file if it exists and map workflow ids to owner emails
|
||||
// then find existing users with those emails or fallback to passed in userId
|
||||
const ownerRecords: Record<string, string> = {};
|
||||
const ownersFile = await glob(SOURCE_CONTROL_OWNERS_EXPORT_FILE, {
|
||||
cwd: this.gitFolder,
|
||||
absolute: true,
|
||||
});
|
||||
if (ownersFile.length > 0) {
|
||||
LoggerProxy.debug(`Reading workflow owners from file ${ownersFile[0]}`);
|
||||
const ownerEmails = jsonParse<Record<string, string>>(
|
||||
await fsReadFile(ownersFile[0], { encoding: 'utf8' }),
|
||||
{ fallbackValue: {} },
|
||||
);
|
||||
if (ownerEmails) {
|
||||
const uniqueOwnerEmails = new Set(Object.values(ownerEmails));
|
||||
const existingUsers = await Db.collections.User.find({
|
||||
where: { email: In([...uniqueOwnerEmails]) },
|
||||
});
|
||||
Object.keys(ownerEmails).forEach((workflowId) => {
|
||||
ownerRecords[workflowId] =
|
||||
existingUsers.find((e) => e.email === ownerEmails[workflowId])?.id ?? userId;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let importWorkflowsResult = new Array<{ id: string; name: string } | undefined>();
|
||||
|
||||
const allSharedWorkflows = await Db.collections.SharedWorkflow.find({
|
||||
select: ['workflowId', 'roleId', 'userId'],
|
||||
});
|
||||
|
||||
importWorkflowsResult = await Promise.all(
|
||||
workflowFiles.map(async (file) => {
|
||||
LoggerProxy.debug(`Parsing workflow file ${file}`);
|
||||
const importedWorkflow = jsonParse<IWorkflowToImport>(
|
||||
await fsReadFile(file, { encoding: 'utf8' }),
|
||||
);
|
||||
if (!importedWorkflow?.id) {
|
||||
return;
|
||||
}
|
||||
const existingWorkflow = existingWorkflows.find((e) => e.id === importedWorkflow.id);
|
||||
if (existingWorkflow?.versionId === importedWorkflow.versionId) {
|
||||
LoggerProxy.debug(
|
||||
`Skipping import of workflow ${importedWorkflow.id ?? 'n/a'} - versionId is up to date`,
|
||||
);
|
||||
const existingWorkflow = existingWorkflows.find((e) => e.id === importedWorkflow.id);
|
||||
if (existingWorkflow?.versionId === importedWorkflow.versionId) {
|
||||
LoggerProxy.debug(
|
||||
`Skipping import of workflow ${
|
||||
importedWorkflow.id ?? 'n/a'
|
||||
} - versionId is up to date`,
|
||||
);
|
||||
return {
|
||||
id: importedWorkflow.id ?? 'n/a',
|
||||
name: 'skipped',
|
||||
};
|
||||
}
|
||||
LoggerProxy.debug(`Importing workflow ${importedWorkflow.id ?? 'n/a'}`);
|
||||
importedWorkflow.active = existingWorkflow?.active ?? false;
|
||||
LoggerProxy.debug(`Updating workflow id ${importedWorkflow.id ?? 'new'}`);
|
||||
const upsertResult = await transactionManager.upsert(
|
||||
WorkflowEntity,
|
||||
{ ...importedWorkflow },
|
||||
['id'],
|
||||
);
|
||||
if (upsertResult?.identifiers?.length !== 1) {
|
||||
throw new Error(`Failed to upsert workflow ${importedWorkflow.id ?? 'new'}`);
|
||||
}
|
||||
// due to sequential Ids, this may have changed during the insert
|
||||
// TODO: once IDs are unique and we removed autoincrement, remove this
|
||||
const upsertedWorkflowId = upsertResult.identifiers[0].id as string;
|
||||
await transactionManager.upsert(
|
||||
SharedWorkflow,
|
||||
return {
|
||||
id: importedWorkflow.id ?? 'n/a',
|
||||
name: 'skipped',
|
||||
};
|
||||
}
|
||||
LoggerProxy.debug(`Importing workflow ${importedWorkflow.id ?? 'n/a'}`);
|
||||
importedWorkflow.active = existingWorkflow?.active ?? false;
|
||||
LoggerProxy.debug(`Updating workflow id ${importedWorkflow.id ?? 'new'}`);
|
||||
const upsertResult = await Db.collections.Workflow.upsert({ ...importedWorkflow }, ['id']);
|
||||
if (upsertResult?.identifiers?.length !== 1) {
|
||||
throw new Error(`Failed to upsert workflow ${importedWorkflow.id ?? 'new'}`);
|
||||
}
|
||||
// Update workflow owner to the user who exported the workflow, if that user exists
|
||||
// in the instance, and the workflow doesn't already have an owner
|
||||
const workflowOwnerId = ownerRecords[importedWorkflow.id] ?? userId;
|
||||
const existingSharedWorkflowOwnerByRoleId = allSharedWorkflows.find(
|
||||
(e) => e.workflowId === importedWorkflow.id && e.roleId === ownerWorkflowRole.id,
|
||||
);
|
||||
const existingSharedWorkflowOwnerByUserId = allSharedWorkflows.find(
|
||||
(e) => e.workflowId === importedWorkflow.id && e.userId === workflowOwnerId,
|
||||
);
|
||||
if (!existingSharedWorkflowOwnerByUserId && !existingSharedWorkflowOwnerByRoleId) {
|
||||
// no owner exists yet, so create one
|
||||
await Db.collections.SharedWorkflow.insert({
|
||||
workflowId: importedWorkflow.id,
|
||||
userId: workflowOwnerId,
|
||||
roleId: ownerWorkflowRole.id,
|
||||
});
|
||||
} else if (existingSharedWorkflowOwnerByRoleId) {
|
||||
// skip, because the workflow already has a global owner
|
||||
} else if (existingSharedWorkflowOwnerByUserId && !existingSharedWorkflowOwnerByRoleId) {
|
||||
// if the worklflow has a non-global owner that is referenced by the owner file,
|
||||
// and no existing global owner, update the owner to the user referenced in the owner file
|
||||
await Db.collections.SharedWorkflow.update(
|
||||
{
|
||||
workflowId: importedWorkflow.id,
|
||||
userId: workflowOwnerId,
|
||||
},
|
||||
{
|
||||
workflowId: upsertedWorkflowId,
|
||||
userId,
|
||||
roleId: ownerWorkflowRole.id,
|
||||
},
|
||||
['workflowId', 'userId'],
|
||||
);
|
||||
|
||||
if (existingWorkflow?.active) {
|
||||
try {
|
||||
// remove active pre-import workflow
|
||||
LoggerProxy.debug(`Deactivating workflow id ${existingWorkflow.id}`);
|
||||
await workflowRunner.remove(existingWorkflow.id);
|
||||
// try activating the imported workflow
|
||||
LoggerProxy.debug(`Reactivating workflow id ${existingWorkflow.id}`);
|
||||
await workflowRunner.add(existingWorkflow.id, 'activate');
|
||||
} catch (error) {
|
||||
LoggerProxy.error(
|
||||
`Failed to activate workflow ${existingWorkflow.id}`,
|
||||
error as Error,
|
||||
);
|
||||
}
|
||||
}
|
||||
if (existingWorkflow?.active) {
|
||||
try {
|
||||
// remove active pre-import workflow
|
||||
LoggerProxy.debug(`Deactivating workflow id ${existingWorkflow.id}`);
|
||||
await workflowRunner.remove(existingWorkflow.id);
|
||||
// try activating the imported workflow
|
||||
LoggerProxy.debug(`Reactivating workflow id ${existingWorkflow.id}`);
|
||||
await workflowRunner.add(existingWorkflow.id, 'activate');
|
||||
} catch (error) {
|
||||
LoggerProxy.error(`Failed to activate workflow ${existingWorkflow.id}`, error as Error);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
id: importedWorkflow.id ?? 'unknown',
|
||||
name: file,
|
||||
};
|
||||
}),
|
||||
);
|
||||
});
|
||||
return importWorkflowsResult;
|
||||
return {
|
||||
id: importedWorkflow.id ?? 'unknown',
|
||||
name: file,
|
||||
};
|
||||
}),
|
||||
);
|
||||
|
||||
return importWorkflowsResult.filter((e) => e !== undefined) as Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
}>;
|
||||
}
|
||||
|
||||
async importFromWorkFolder(options: SourceControllPullOptions): Promise<ImportResult> {
|
||||
|
||||
Reference in New Issue
Block a user