refactor(editor): Enable collaboration features only in NodeView v2 (no-changelog) (#10756)
This commit is contained in:
committed by
GitHub
parent
ee5fbc543c
commit
a1e011dd2a
@@ -27,13 +27,13 @@ describe('CollaborationState', () => {
|
||||
|
||||
const workflowId = 'workflow';
|
||||
|
||||
describe('addActiveWorkflowUser', () => {
|
||||
describe('addCollaborator', () => {
|
||||
it('should add workflow user with correct cache key and value', async () => {
|
||||
// Arrange
|
||||
global.Date = mockDateFactory('2023-01-01T00:00:00.000Z');
|
||||
|
||||
// Act
|
||||
await collaborationState.addActiveWorkflowUser(workflowId, 'userId');
|
||||
await collaborationState.addCollaborator(workflowId, 'userId');
|
||||
|
||||
// Assert
|
||||
expect(mockCacheService.setHash).toHaveBeenCalledWith('collaboration:workflow', {
|
||||
@@ -42,10 +42,10 @@ describe('CollaborationState', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('removeActiveWorkflowUser', () => {
|
||||
describe('removeCollaborator', () => {
|
||||
it('should remove workflow user with correct cache key', async () => {
|
||||
// Act
|
||||
await collaborationState.removeActiveWorkflowUser(workflowId, 'userId');
|
||||
await collaborationState.removeCollaborator(workflowId, 'userId');
|
||||
|
||||
// Assert
|
||||
expect(mockCacheService.deleteFromHash).toHaveBeenCalledWith(
|
||||
@@ -55,10 +55,10 @@ describe('CollaborationState', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('getActiveWorkflowUsers', () => {
|
||||
describe('getCollaborators', () => {
|
||||
it('should get workflows with correct cache key', async () => {
|
||||
// Act
|
||||
const users = await collaborationState.getActiveWorkflowUsers(workflowId);
|
||||
const users = await collaborationState.getCollaborators(workflowId);
|
||||
|
||||
// Assert
|
||||
expect(mockCacheService.getHash).toHaveBeenCalledWith('collaboration:workflow');
|
||||
@@ -77,7 +77,7 @@ describe('CollaborationState', () => {
|
||||
});
|
||||
|
||||
// Act
|
||||
const users = await collaborationState.getActiveWorkflowUsers(workflowId);
|
||||
const users = await collaborationState.getCollaborators(workflowId);
|
||||
|
||||
// Assert
|
||||
expect(users).toEqual([
|
||||
|
||||
@@ -1,16 +1,17 @@
|
||||
import type { Workflow } from 'n8n-workflow';
|
||||
import { Service } from 'typedi';
|
||||
import { Push } from '../push';
|
||||
import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message';
|
||||
import { parseWorkflowMessage } from './collaboration.message';
|
||||
import type { IActiveWorkflowUsersChanged } from '../interfaces';
|
||||
import type { Workflow } from 'n8n-workflow';
|
||||
import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow';
|
||||
|
||||
import { Push } from '@/push';
|
||||
import type { ICollaboratorsChanged } from '@/interfaces';
|
||||
import type { OnPushMessage } from '@/push/types';
|
||||
import { UserRepository } from '@/databases/repositories/user.repository';
|
||||
import type { User } from '@/databases/entities/user';
|
||||
import { CollaborationState } from '@/collaboration/collaboration.state';
|
||||
import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
|
||||
import { UserService } from '@/services/user.service';
|
||||
import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow';
|
||||
|
||||
import { CollaborationState } from './collaboration.state';
|
||||
import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message';
|
||||
import { parseWorkflowMessage } from './collaboration.message';
|
||||
|
||||
/**
|
||||
* Service for managing collaboration feature between users. E.g. keeping
|
||||
@@ -22,7 +23,6 @@ export class CollaborationService {
|
||||
private readonly push: Push,
|
||||
private readonly state: CollaborationState,
|
||||
private readonly userRepository: UserRepository,
|
||||
private readonly userService: UserService,
|
||||
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
|
||||
) {}
|
||||
|
||||
@@ -61,7 +61,7 @@ export class CollaborationService {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.state.addActiveWorkflowUser(workflowId, userId);
|
||||
await this.state.addCollaborator(workflowId, userId);
|
||||
|
||||
await this.sendWorkflowUsersChangedMessage(workflowId);
|
||||
}
|
||||
@@ -73,7 +73,7 @@ export class CollaborationService {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.state.removeActiveWorkflowUser(workflowId, userId);
|
||||
await this.state.removeCollaborator(workflowId, userId);
|
||||
|
||||
await this.sendWorkflowUsersChangedMessage(workflowId);
|
||||
}
|
||||
@@ -81,26 +81,23 @@ export class CollaborationService {
|
||||
private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) {
|
||||
// We have already validated that all active workflow users
|
||||
// have proper access to the workflow, so we don't need to validate it again
|
||||
const activeWorkflowUsers = await this.state.getActiveWorkflowUsers(workflowId);
|
||||
const workflowUserIds = activeWorkflowUsers.map((user) => user.userId);
|
||||
const collaborators = await this.state.getCollaborators(workflowId);
|
||||
const userIds = collaborators.map((user) => user.userId);
|
||||
|
||||
if (workflowUserIds.length === 0) {
|
||||
if (userIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
const users = await this.userRepository.getByIds(this.userRepository.manager, workflowUserIds);
|
||||
|
||||
const msgData: IActiveWorkflowUsersChanged = {
|
||||
const users = await this.userRepository.getByIds(this.userRepository.manager, userIds);
|
||||
const activeCollaborators = users.map((user) => ({
|
||||
user: user.toIUser(),
|
||||
lastSeen: collaborators.find(({ userId }) => userId === user.id)!.lastSeen,
|
||||
}));
|
||||
const msgData: ICollaboratorsChanged = {
|
||||
workflowId,
|
||||
activeUsers: await Promise.all(
|
||||
users.map(async (user) => ({
|
||||
user: await this.userService.toPublic(user),
|
||||
lastSeen: activeWorkflowUsers.find((activeUser) => activeUser.userId === user.id)!
|
||||
.lastSeen,
|
||||
})),
|
||||
),
|
||||
collaborators: activeCollaborators,
|
||||
};
|
||||
|
||||
this.push.sendToUsers('activeWorkflowUsersChanged', msgData, workflowUserIds);
|
||||
this.push.sendToUsers('collaboratorsChanged', msgData, userIds);
|
||||
}
|
||||
|
||||
private async hasUserAccessToWorkflow(userId: User['id'], workflowId: Workflow['id']) {
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
import type { ActiveWorkflowUser } from '@/collaboration/collaboration.types';
|
||||
import { Service } from 'typedi';
|
||||
import type { Workflow } from 'n8n-workflow';
|
||||
|
||||
import { Time } from '@/constants';
|
||||
import type { Iso8601DateTimeString } from '@/interfaces';
|
||||
import { CacheService } from '@/services/cache/cache.service';
|
||||
import type { User } from '@/databases/entities/user';
|
||||
import { type Workflow } from 'n8n-workflow';
|
||||
import { Service } from 'typedi';
|
||||
|
||||
type WorkflowCacheHash = Record<User['id'], Iso8601DateTimeString>;
|
||||
interface CacheEntry {
|
||||
userId: string;
|
||||
lastSeen: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* State management for the collaboration service. Workflow active
|
||||
@@ -30,7 +34,7 @@ export class CollaborationState {
|
||||
/**
|
||||
* Mark user active for given workflow
|
||||
*/
|
||||
async addActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
|
||||
async addCollaborator(workflowId: Workflow['id'], userId: User['id']) {
|
||||
const cacheKey = this.formWorkflowCacheKey(workflowId);
|
||||
const cacheEntry: WorkflowCacheHash = {
|
||||
[userId]: new Date().toISOString(),
|
||||
@@ -42,13 +46,13 @@ export class CollaborationState {
|
||||
/**
|
||||
* Remove user from workflow's active users
|
||||
*/
|
||||
async removeActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
|
||||
async removeCollaborator(workflowId: Workflow['id'], userId: User['id']) {
|
||||
const cacheKey = this.formWorkflowCacheKey(workflowId);
|
||||
|
||||
await this.cache.deleteFromHash(cacheKey, userId);
|
||||
}
|
||||
|
||||
async getActiveWorkflowUsers(workflowId: Workflow['id']): Promise<ActiveWorkflowUser[]> {
|
||||
async getCollaborators(workflowId: Workflow['id']): Promise<CacheEntry[]> {
|
||||
const cacheKey = this.formWorkflowCacheKey(workflowId);
|
||||
|
||||
const cacheValue = await this.cache.getHash<Iso8601DateTimeString>(cacheKey);
|
||||
@@ -56,11 +60,11 @@ export class CollaborationState {
|
||||
return [];
|
||||
}
|
||||
|
||||
const workflowActiveUsers = this.cacheHashToWorkflowActiveUsers(cacheValue);
|
||||
const [expired, stillActive] = this.splitToExpiredAndStillActive(workflowActiveUsers);
|
||||
const activeCollaborators = this.cacheHashToCollaborators(cacheValue);
|
||||
const [expired, stillActive] = this.splitToExpiredAndStillActive(activeCollaborators);
|
||||
|
||||
if (expired.length > 0) {
|
||||
void this.removeExpiredUsersForWorkflow(workflowId, expired);
|
||||
void this.removeExpiredCollaborators(workflowId, expired);
|
||||
}
|
||||
|
||||
return stillActive;
|
||||
@@ -70,39 +74,36 @@ export class CollaborationState {
|
||||
return `collaboration:${workflowId}`;
|
||||
}
|
||||
|
||||
private splitToExpiredAndStillActive(workflowUsers: ActiveWorkflowUser[]) {
|
||||
const expired: ActiveWorkflowUser[] = [];
|
||||
const stillActive: ActiveWorkflowUser[] = [];
|
||||
private splitToExpiredAndStillActive(collaborators: CacheEntry[]) {
|
||||
const expired: CacheEntry[] = [];
|
||||
const stillActive: CacheEntry[] = [];
|
||||
|
||||
for (const user of workflowUsers) {
|
||||
if (this.hasUserExpired(user.lastSeen)) {
|
||||
expired.push(user);
|
||||
for (const collaborator of collaborators) {
|
||||
if (this.hasSessionExpired(collaborator.lastSeen)) {
|
||||
expired.push(collaborator);
|
||||
} else {
|
||||
stillActive.push(user);
|
||||
stillActive.push(collaborator);
|
||||
}
|
||||
}
|
||||
|
||||
return [expired, stillActive];
|
||||
}
|
||||
|
||||
private async removeExpiredUsersForWorkflow(
|
||||
workflowId: Workflow['id'],
|
||||
expiredUsers: ActiveWorkflowUser[],
|
||||
) {
|
||||
private async removeExpiredCollaborators(workflowId: Workflow['id'], expiredUsers: CacheEntry[]) {
|
||||
const cacheKey = this.formWorkflowCacheKey(workflowId);
|
||||
await Promise.all(
|
||||
expiredUsers.map(async (user) => await this.cache.deleteFromHash(cacheKey, user.userId)),
|
||||
);
|
||||
}
|
||||
|
||||
private cacheHashToWorkflowActiveUsers(workflowCacheEntry: WorkflowCacheHash) {
|
||||
private cacheHashToCollaborators(workflowCacheEntry: WorkflowCacheHash): CacheEntry[] {
|
||||
return Object.entries(workflowCacheEntry).map(([userId, lastSeen]) => ({
|
||||
userId,
|
||||
lastSeen,
|
||||
}));
|
||||
}
|
||||
|
||||
private hasUserExpired(lastSeenString: Iso8601DateTimeString) {
|
||||
private hasSessionExpired(lastSeenString: Iso8601DateTimeString) {
|
||||
const expiryTime = new Date(lastSeenString).getTime() + this.inactivityCleanUpTime;
|
||||
|
||||
return Date.now() > expiryTime;
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
import type { Iso8601DateTimeString } from '@/interfaces';
|
||||
import type { User } from '@/databases/entities/user';
|
||||
|
||||
export type ActiveWorkflowUser = {
|
||||
userId: User['id'];
|
||||
lastSeen: Iso8601DateTimeString;
|
||||
};
|
||||
@@ -162,4 +162,9 @@ export class User extends WithTimestamps implements IUser {
|
||||
return 'Unnamed Project';
|
||||
}
|
||||
}
|
||||
|
||||
toIUser(): IUser {
|
||||
const { id, email, firstName, lastName } = this;
|
||||
return { id, email, firstName, lastName };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import type {
|
||||
INodeProperties,
|
||||
IUserSettings,
|
||||
IWorkflowExecutionDataProcess,
|
||||
IUser,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import type { ActiveWorkflowManager } from '@/active-workflow-manager';
|
||||
@@ -289,11 +290,11 @@ export type IPushData =
|
||||
| PushDataWorkflowActivated
|
||||
| PushDataWorkflowDeactivated
|
||||
| PushDataWorkflowFailedToActivate
|
||||
| PushDataActiveWorkflowUsersChanged;
|
||||
| PushDataCollaboratorsChanged;
|
||||
|
||||
type PushDataActiveWorkflowUsersChanged = {
|
||||
data: IActiveWorkflowUsersChanged;
|
||||
type: 'activeWorkflowUsersChanged';
|
||||
type PushDataCollaboratorsChanged = {
|
||||
data: ICollaboratorsChanged;
|
||||
type: 'collaboratorsChanged';
|
||||
};
|
||||
|
||||
type PushDataWorkflowFailedToActivate = {
|
||||
@@ -369,14 +370,14 @@ export type PushDataNodeDescriptionUpdated = {
|
||||
/** DateTime in the Iso8601 format, e.g. 2024-10-31T00:00:00.123Z */
|
||||
export type Iso8601DateTimeString = string;
|
||||
|
||||
export interface IActiveWorkflowUser {
|
||||
user: PublicUser;
|
||||
export interface ICollaborator {
|
||||
user: IUser;
|
||||
lastSeen: Iso8601DateTimeString;
|
||||
}
|
||||
|
||||
export interface IActiveWorkflowUsersChanged {
|
||||
export interface ICollaboratorsChanged {
|
||||
workflowId: Workflow['id'];
|
||||
activeUsers: IActiveWorkflowUser[];
|
||||
collaborators: ICollaborator[];
|
||||
}
|
||||
|
||||
export interface IActiveWorkflowAdded {
|
||||
|
||||
Reference in New Issue
Block a user