feat: RBAC (#8922)
Signed-off-by: Oleg Ivaniv <me@olegivaniv.com> Co-authored-by: Val <68596159+valya@users.noreply.github.com> Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in> Co-authored-by: Valya Bullions <valya@n8n.io> Co-authored-by: Danny Martini <danny@n8n.io> Co-authored-by: Danny Martini <despair.blue@gmail.com> Co-authored-by: Iván Ovejero <ivov.src@gmail.com> Co-authored-by: Omar Ajoue <krynble@gmail.com> Co-authored-by: oleg <me@olegivaniv.com> Co-authored-by: Michael Kret <michael.k@radency.com> Co-authored-by: Michael Kret <88898367+michael-radency@users.noreply.github.com> Co-authored-by: Elias Meire <elias@meire.dev> Co-authored-by: Giulio Andreini <andreini@netseven.it> Co-authored-by: Giulio Andreini <g.andreini@gmail.com> Co-authored-by: Ayato Hayashi <go12limchangyong@gmail.com>
This commit is contained in:
@@ -26,13 +26,17 @@ import type { SourceControlledFile } from './types/sourceControlledFile';
|
||||
import { VariablesService } from '../variables/variables.service.ee';
|
||||
import { TagRepository } from '@db/repositories/tag.repository';
|
||||
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
||||
import { UserRepository } from '@db/repositories/user.repository';
|
||||
import { Logger } from '@/Logger';
|
||||
import { CredentialsRepository } from '@db/repositories/credentials.repository';
|
||||
import { SharedCredentialsRepository } from '@db/repositories/sharedCredentials.repository';
|
||||
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
|
||||
import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository';
|
||||
import { VariablesRepository } from '@db/repositories/variables.repository';
|
||||
import { ProjectRepository } from '@/databases/repositories/project.repository';
|
||||
import type { Project } from '@/databases/entities/Project';
|
||||
import type { ResourceOwner } from './types/resourceOwner';
|
||||
import { assertNever } from '@/utils';
|
||||
import { UserRepository } from '@/databases/repositories/user.repository';
|
||||
|
||||
@Service()
|
||||
export class SourceControlImportService {
|
||||
@@ -203,116 +207,94 @@ export class SourceControlImportService {
|
||||
}
|
||||
|
||||
public async importWorkflowFromWorkFolder(candidates: SourceControlledFile[], userId: string) {
|
||||
const workflowRunner = this.activeWorkflowManager;
|
||||
const personalProject =
|
||||
await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(userId);
|
||||
const workflowManager = this.activeWorkflowManager;
|
||||
const candidateIds = candidates.map((c) => c.id);
|
||||
const existingWorkflows = await Container.get(WorkflowRepository).findByIds(candidateIds, {
|
||||
fields: ['id', 'name', 'versionId', 'active'],
|
||||
});
|
||||
const allSharedWorkflows = await Container.get(SharedWorkflowRepository).findWithFields(
|
||||
candidateIds,
|
||||
{ select: ['workflowId', 'role', 'userId'] },
|
||||
{ select: ['workflowId', 'role', 'projectId'] },
|
||||
);
|
||||
const cachedOwnerIds = new Map<string, string>();
|
||||
const importWorkflowsResult = await Promise.all(
|
||||
candidates.map(async (candidate) => {
|
||||
this.logger.debug(`Parsing workflow file ${candidate.file}`);
|
||||
const importedWorkflow = jsonParse<IWorkflowToImport & { owner: string }>(
|
||||
await fsReadFile(candidate.file, { encoding: 'utf8' }),
|
||||
);
|
||||
if (!importedWorkflow?.id) {
|
||||
return;
|
||||
}
|
||||
const existingWorkflow = existingWorkflows.find((e) => e.id === importedWorkflow.id);
|
||||
importedWorkflow.active = existingWorkflow?.active ?? false;
|
||||
this.logger.debug(`Updating workflow id ${importedWorkflow.id ?? 'new'}`);
|
||||
const upsertResult = await Container.get(WorkflowRepository).upsert(
|
||||
{ ...importedWorkflow },
|
||||
['id'],
|
||||
);
|
||||
if (upsertResult?.identifiers?.length !== 1) {
|
||||
throw new ApplicationError('Failed to upsert workflow', {
|
||||
extra: { workflowId: importedWorkflow.id ?? 'new' },
|
||||
});
|
||||
}
|
||||
// Update workflow owner to the user who exported the workflow, if that user exists
|
||||
// in the instance, and the workflow doesn't already have an owner
|
||||
let workflowOwnerId = userId;
|
||||
if (cachedOwnerIds.has(importedWorkflow.owner)) {
|
||||
workflowOwnerId = cachedOwnerIds.get(importedWorkflow.owner) ?? userId;
|
||||
} else {
|
||||
const foundUser = await Container.get(UserRepository).findOne({
|
||||
where: {
|
||||
email: importedWorkflow.owner,
|
||||
},
|
||||
select: ['id'],
|
||||
});
|
||||
if (foundUser) {
|
||||
cachedOwnerIds.set(importedWorkflow.owner, foundUser.id);
|
||||
workflowOwnerId = foundUser.id;
|
||||
}
|
||||
}
|
||||
const importWorkflowsResult = [];
|
||||
|
||||
const existingSharedWorkflowOwnerByRoleId = allSharedWorkflows.find(
|
||||
(e) => e.workflowId === importedWorkflow.id && e.role === 'workflow:owner',
|
||||
);
|
||||
const existingSharedWorkflowOwnerByUserId = allSharedWorkflows.find(
|
||||
(e) => e.workflowId === importedWorkflow.id && e.role === 'workflow:owner',
|
||||
);
|
||||
if (!existingSharedWorkflowOwnerByUserId && !existingSharedWorkflowOwnerByRoleId) {
|
||||
// no owner exists yet, so create one
|
||||
await Container.get(SharedWorkflowRepository).insert({
|
||||
// Due to SQLite concurrency issues, we cannot save all workflows at once
|
||||
// as project creation might cause constraint issues.
|
||||
// We must iterate over the array and run the whole process workflow by workflow
|
||||
for (const candidate of candidates) {
|
||||
this.logger.debug(`Parsing workflow file ${candidate.file}`);
|
||||
const importedWorkflow = jsonParse<IWorkflowToImport & { owner: string }>(
|
||||
await fsReadFile(candidate.file, { encoding: 'utf8' }),
|
||||
);
|
||||
if (!importedWorkflow?.id) {
|
||||
continue;
|
||||
}
|
||||
const existingWorkflow = existingWorkflows.find((e) => e.id === importedWorkflow.id);
|
||||
importedWorkflow.active = existingWorkflow?.active ?? false;
|
||||
this.logger.debug(`Updating workflow id ${importedWorkflow.id ?? 'new'}`);
|
||||
const upsertResult = await Container.get(WorkflowRepository).upsert({ ...importedWorkflow }, [
|
||||
'id',
|
||||
]);
|
||||
if (upsertResult?.identifiers?.length !== 1) {
|
||||
throw new ApplicationError('Failed to upsert workflow', {
|
||||
extra: { workflowId: importedWorkflow.id ?? 'new' },
|
||||
});
|
||||
}
|
||||
|
||||
const isOwnedLocally = allSharedWorkflows.some(
|
||||
(w) => w.workflowId === importedWorkflow.id && w.role === 'workflow:owner',
|
||||
);
|
||||
|
||||
if (!isOwnedLocally) {
|
||||
const remoteOwnerProject: Project | null = importedWorkflow.owner
|
||||
? await this.findOrCreateOwnerProject(importedWorkflow.owner)
|
||||
: null;
|
||||
|
||||
await Container.get(SharedWorkflowRepository).upsert(
|
||||
{
|
||||
workflowId: importedWorkflow.id,
|
||||
userId: workflowOwnerId,
|
||||
projectId: remoteOwnerProject?.id ?? personalProject.id,
|
||||
role: 'workflow:owner',
|
||||
});
|
||||
} else if (existingSharedWorkflowOwnerByRoleId) {
|
||||
// skip, because the workflow already has a global owner
|
||||
} else if (existingSharedWorkflowOwnerByUserId && !existingSharedWorkflowOwnerByRoleId) {
|
||||
// if the workflow has a non-global owner that is referenced by the owner file,
|
||||
// and no existing global owner, update the owner to the user referenced in the owner file
|
||||
await Container.get(SharedWorkflowRepository).update(
|
||||
{
|
||||
workflowId: importedWorkflow.id,
|
||||
userId: workflowOwnerId,
|
||||
},
|
||||
{ role: 'workflow:owner' },
|
||||
},
|
||||
['workflowId', 'projectId'],
|
||||
);
|
||||
}
|
||||
|
||||
if (existingWorkflow?.active) {
|
||||
try {
|
||||
// remove active pre-import workflow
|
||||
this.logger.debug(`Deactivating workflow id ${existingWorkflow.id}`);
|
||||
await workflowManager.remove(existingWorkflow.id);
|
||||
// try activating the imported workflow
|
||||
this.logger.debug(`Reactivating workflow id ${existingWorkflow.id}`);
|
||||
await workflowManager.add(existingWorkflow.id, 'activate');
|
||||
// update the versionId of the workflow to match the imported workflow
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to activate workflow ${existingWorkflow.id}`, error as Error);
|
||||
} finally {
|
||||
await Container.get(WorkflowRepository).update(
|
||||
{ id: existingWorkflow.id },
|
||||
{ versionId: importedWorkflow.versionId },
|
||||
);
|
||||
}
|
||||
if (existingWorkflow?.active) {
|
||||
try {
|
||||
// remove active pre-import workflow
|
||||
this.logger.debug(`Deactivating workflow id ${existingWorkflow.id}`);
|
||||
await workflowRunner.remove(existingWorkflow.id);
|
||||
// try activating the imported workflow
|
||||
this.logger.debug(`Reactivating workflow id ${existingWorkflow.id}`);
|
||||
await workflowRunner.add(existingWorkflow.id, 'activate');
|
||||
// update the versionId of the workflow to match the imported workflow
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to activate workflow ${existingWorkflow.id}`, error as Error);
|
||||
} finally {
|
||||
await Container.get(WorkflowRepository).update(
|
||||
{ id: existingWorkflow.id },
|
||||
{ versionId: importedWorkflow.versionId },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
id: importedWorkflow.id ?? 'unknown',
|
||||
name: candidate.file,
|
||||
};
|
||||
}),
|
||||
);
|
||||
importWorkflowsResult.push({
|
||||
id: importedWorkflow.id ?? 'unknown',
|
||||
name: candidate.file,
|
||||
});
|
||||
}
|
||||
return importWorkflowsResult.filter((e) => e !== undefined) as Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
}>;
|
||||
}
|
||||
|
||||
public async importCredentialsFromWorkFolder(
|
||||
candidates: SourceControlledFile[],
|
||||
importingUserId: string,
|
||||
) {
|
||||
public async importCredentialsFromWorkFolder(candidates: SourceControlledFile[], userId: string) {
|
||||
const personalProject =
|
||||
await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(userId);
|
||||
const candidateIds = candidates.map((c) => c.id);
|
||||
const existingCredentials = await Container.get(CredentialsRepository).find({
|
||||
where: {
|
||||
@@ -321,7 +303,7 @@ export class SourceControlImportService {
|
||||
select: ['id', 'name', 'type', 'data'],
|
||||
});
|
||||
const existingSharedCredentials = await Container.get(SharedCredentialsRepository).find({
|
||||
select: ['userId', 'credentialsId', 'role'],
|
||||
select: ['credentialsId', 'role'],
|
||||
where: {
|
||||
credentialsId: In(candidateIds),
|
||||
role: 'credential:owner',
|
||||
@@ -350,27 +332,22 @@ export class SourceControlImportService {
|
||||
await Container.get(CredentialsRepository).upsert(newCredentialObject, ['id']);
|
||||
|
||||
const isOwnedLocally = existingSharedCredentials.some(
|
||||
(c) => c.credentialsId === credential.id,
|
||||
(c) => c.credentialsId === credential.id && c.role === 'credential:owner',
|
||||
);
|
||||
|
||||
if (!isOwnedLocally) {
|
||||
const remoteOwnerId = credential.ownedBy
|
||||
? await Container.get(UserRepository)
|
||||
.findOne({
|
||||
where: { email: credential.ownedBy },
|
||||
select: { id: true },
|
||||
})
|
||||
.then((user) => user?.id)
|
||||
const remoteOwnerProject: Project | null = credential.ownedBy
|
||||
? await this.findOrCreateOwnerProject(credential.ownedBy)
|
||||
: null;
|
||||
|
||||
const newSharedCredential = new SharedCredentials();
|
||||
newSharedCredential.credentialsId = newCredentialObject.id as string;
|
||||
newSharedCredential.userId = remoteOwnerId ?? importingUserId;
|
||||
newSharedCredential.projectId = remoteOwnerProject?.id ?? personalProject.id;
|
||||
newSharedCredential.role = 'credential:owner';
|
||||
|
||||
await Container.get(SharedCredentialsRepository).upsert({ ...newSharedCredential }, [
|
||||
'credentialsId',
|
||||
'userId',
|
||||
'projectId',
|
||||
]);
|
||||
}
|
||||
|
||||
@@ -469,7 +446,7 @@ export class SourceControlImportService {
|
||||
if (!variable.key) {
|
||||
continue;
|
||||
}
|
||||
// by default no value is stored remotely, so an empty string is retuned
|
||||
// by default no value is stored remotely, so an empty string is returned
|
||||
// it must be changed to undefined so as to not overwrite existing values!
|
||||
if (variable.value === '') {
|
||||
variable.value = undefined;
|
||||
@@ -511,4 +488,52 @@ export class SourceControlImportService {
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private async findOrCreateOwnerProject(owner: ResourceOwner): Promise<Project | null> {
|
||||
const projectRepository = Container.get(ProjectRepository);
|
||||
const userRepository = Container.get(UserRepository);
|
||||
if (typeof owner === 'string' || owner.type === 'personal') {
|
||||
const email = typeof owner === 'string' ? owner : owner.personalEmail;
|
||||
const user = await userRepository.findOne({
|
||||
where: { email },
|
||||
});
|
||||
if (!user) {
|
||||
return null;
|
||||
}
|
||||
return await projectRepository.getPersonalProjectForUserOrFail(user.id);
|
||||
} else if (owner.type === 'team') {
|
||||
let teamProject = await projectRepository.findOne({
|
||||
where: { id: owner.teamId },
|
||||
});
|
||||
if (!teamProject) {
|
||||
try {
|
||||
teamProject = await projectRepository.save(
|
||||
projectRepository.create({
|
||||
id: owner.teamId,
|
||||
name: owner.teamName,
|
||||
type: 'team',
|
||||
}),
|
||||
);
|
||||
} catch (e) {
|
||||
teamProject = await projectRepository.findOne({
|
||||
where: { id: owner.teamId },
|
||||
});
|
||||
if (!teamProject) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return teamProject;
|
||||
}
|
||||
|
||||
assertNever(owner);
|
||||
|
||||
const errorOwner = owner as ResourceOwner;
|
||||
throw new ApplicationError(
|
||||
`Unknown resource owner type "${
|
||||
typeof errorOwner !== 'string' ? errorOwner.type : 'UNKNOWN'
|
||||
}" found when importing from source controller`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user