feat: Reintroduce collaboration feature (#10602)
This commit is contained in:
@@ -7,6 +7,7 @@ import { Logger } from '@/logger';
|
||||
import type { PushDataExecutionRecovered } from '@/interfaces';
|
||||
|
||||
import { mockInstance } from '@test/mocking';
|
||||
import type { User } from '@/databases/entities/user';
|
||||
|
||||
jest.useFakeTimers();
|
||||
|
||||
@@ -27,6 +28,7 @@ const createMockWebSocket = () => new MockWebSocket() as unknown as jest.Mocked<
|
||||
describe('WebSocketPush', () => {
|
||||
const pushRef1 = 'test-session1';
|
||||
const pushRef2 = 'test-session2';
|
||||
const userId: User['id'] = 'test-user';
|
||||
|
||||
mockInstance(Logger);
|
||||
const webSocketPush = Container.get(WebSocketPush);
|
||||
@@ -35,27 +37,31 @@ describe('WebSocketPush', () => {
|
||||
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
mockWebSocket1.removeAllListeners();
|
||||
mockWebSocket2.removeAllListeners();
|
||||
});
|
||||
|
||||
it('can add a connection', () => {
|
||||
webSocketPush.add(pushRef1, mockWebSocket1);
|
||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
||||
|
||||
expect(mockWebSocket1.listenerCount('close')).toBe(1);
|
||||
expect(mockWebSocket1.listenerCount('pong')).toBe(1);
|
||||
expect(mockWebSocket1.listenerCount('message')).toBe(1);
|
||||
});
|
||||
|
||||
it('closes a connection', () => {
|
||||
webSocketPush.add(pushRef1, mockWebSocket1);
|
||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
||||
|
||||
mockWebSocket1.emit('close');
|
||||
|
||||
expect(mockWebSocket1.listenerCount('message')).toBe(0);
|
||||
expect(mockWebSocket1.listenerCount('close')).toBe(0);
|
||||
expect(mockWebSocket1.listenerCount('pong')).toBe(0);
|
||||
});
|
||||
|
||||
it('sends data to one connection', () => {
|
||||
webSocketPush.add(pushRef1, mockWebSocket1);
|
||||
webSocketPush.add(pushRef2, mockWebSocket2);
|
||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
||||
webSocketPush.add(pushRef2, userId, mockWebSocket2);
|
||||
const data: PushDataExecutionRecovered = {
|
||||
type: 'executionRecovered',
|
||||
data: {
|
||||
@@ -80,8 +86,8 @@ describe('WebSocketPush', () => {
|
||||
});
|
||||
|
||||
it('sends data to all connections', () => {
|
||||
webSocketPush.add(pushRef1, mockWebSocket1);
|
||||
webSocketPush.add(pushRef2, mockWebSocket2);
|
||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
||||
webSocketPush.add(pushRef2, userId, mockWebSocket2);
|
||||
const data: PushDataExecutionRecovered = {
|
||||
type: 'executionRecovered',
|
||||
data: {
|
||||
@@ -105,12 +111,55 @@ describe('WebSocketPush', () => {
|
||||
});
|
||||
|
||||
it('pings all connections', () => {
|
||||
webSocketPush.add(pushRef1, mockWebSocket1);
|
||||
webSocketPush.add(pushRef2, mockWebSocket2);
|
||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
||||
webSocketPush.add(pushRef2, userId, mockWebSocket2);
|
||||
|
||||
jest.runOnlyPendingTimers();
|
||||
|
||||
expect(mockWebSocket1.ping).toHaveBeenCalled();
|
||||
expect(mockWebSocket2.ping).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('sends data to all users connections', () => {
|
||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
||||
webSocketPush.add(pushRef2, userId, mockWebSocket2);
|
||||
const data: PushDataExecutionRecovered = {
|
||||
type: 'executionRecovered',
|
||||
data: {
|
||||
executionId: 'test-execution-id',
|
||||
},
|
||||
};
|
||||
|
||||
webSocketPush.sendToUsers('executionRecovered', data, [userId]);
|
||||
|
||||
const expectedMsg = JSON.stringify({
|
||||
type: 'executionRecovered',
|
||||
data: {
|
||||
type: 'executionRecovered',
|
||||
data: {
|
||||
executionId: 'test-execution-id',
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
|
||||
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
|
||||
});
|
||||
|
||||
it('emits message event when connection receives data', () => {
|
||||
const mockOnMessageReceived = jest.fn();
|
||||
webSocketPush.on('message', mockOnMessageReceived);
|
||||
webSocketPush.add(pushRef1, userId, mockWebSocket1);
|
||||
webSocketPush.add(pushRef2, userId, mockWebSocket2);
|
||||
|
||||
const data = { test: 'data' };
|
||||
const buffer = Buffer.from(JSON.stringify(data));
|
||||
|
||||
mockWebSocket1.emit('message', buffer);
|
||||
|
||||
expect(mockOnMessageReceived).toHaveBeenCalledWith({
|
||||
msg: data,
|
||||
pushRef: pushRef1,
|
||||
userId,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,13 @@
|
||||
import { assert, jsonStringify } from 'n8n-workflow';
|
||||
import type { IPushDataType } from '@/interfaces';
|
||||
import type { Logger } from '@/logger';
|
||||
import type { User } from '@/databases/entities/user';
|
||||
import { TypedEmitter } from '@/typed-emitter';
|
||||
import type { OnPushMessage } from '@/push/types';
|
||||
|
||||
export interface AbstractPushEvents {
|
||||
message: OnPushMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract class for two-way push communication.
|
||||
@@ -8,16 +15,20 @@ import type { Logger } from '@/logger';
|
||||
*
|
||||
* @emits message when a message is received from a client
|
||||
*/
|
||||
export abstract class AbstractPush<T> {
|
||||
export abstract class AbstractPush<T> extends TypedEmitter<AbstractPushEvents> {
|
||||
protected connections: Record<string, T> = {};
|
||||
|
||||
protected userIdByPushRef: Record<string, string> = {};
|
||||
|
||||
protected abstract close(connection: T): void;
|
||||
protected abstract sendToOneConnection(connection: T, data: string): void;
|
||||
|
||||
constructor(protected readonly logger: Logger) {}
|
||||
constructor(protected readonly logger: Logger) {
|
||||
super();
|
||||
}
|
||||
|
||||
protected add(pushRef: string, connection: T) {
|
||||
const { connections } = this;
|
||||
protected add(pushRef: string, userId: User['id'], connection: T) {
|
||||
const { connections, userIdByPushRef } = this;
|
||||
this.logger.debug('Add editor-UI session', { pushRef });
|
||||
|
||||
const existingConnection = connections[pushRef];
|
||||
@@ -28,6 +39,15 @@ export abstract class AbstractPush<T> {
|
||||
}
|
||||
|
||||
connections[pushRef] = connection;
|
||||
userIdByPushRef[pushRef] = userId;
|
||||
}
|
||||
|
||||
protected onMessageReceived(pushRef: string, msg: unknown) {
|
||||
this.logger.debug('Received message from editor-UI', { pushRef, msg });
|
||||
|
||||
const userId = this.userIdByPushRef[pushRef];
|
||||
|
||||
this.emit('message', { pushRef, userId, msg });
|
||||
}
|
||||
|
||||
protected remove(pushRef?: string) {
|
||||
@@ -36,6 +56,7 @@ export abstract class AbstractPush<T> {
|
||||
this.logger.debug('Removed editor-UI session', { pushRef });
|
||||
|
||||
delete this.connections[pushRef];
|
||||
delete this.userIdByPushRef[pushRef];
|
||||
}
|
||||
|
||||
private sendTo(type: IPushDataType, data: unknown, pushRefs: string[]) {
|
||||
@@ -66,6 +87,15 @@ export abstract class AbstractPush<T> {
|
||||
this.sendTo(type, data, [pushRef]);
|
||||
}
|
||||
|
||||
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
|
||||
const { connections } = this;
|
||||
const userPushRefs = Object.keys(connections).filter((pushRef) =>
|
||||
userIds.includes(this.userIdByPushRef[pushRef]),
|
||||
);
|
||||
|
||||
this.sendTo(type, data, userPushRefs);
|
||||
}
|
||||
|
||||
closeAllConnections() {
|
||||
for (const pushRef in this.connections) {
|
||||
// Signal the connection that we want to close it.
|
||||
|
||||
@@ -15,11 +15,13 @@ import { OrchestrationService } from '@/services/orchestration.service';
|
||||
|
||||
import { SSEPush } from './sse.push';
|
||||
import { WebSocketPush } from './websocket.push';
|
||||
import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
|
||||
import type { OnPushMessage, PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
|
||||
import { TypedEmitter } from '@/typed-emitter';
|
||||
import type { User } from '@/databases/entities/user';
|
||||
|
||||
type PushEvents = {
|
||||
editorUiConnected: string;
|
||||
message: OnPushMessage;
|
||||
};
|
||||
|
||||
const useWebSockets = config.getEnv('push.backend') === 'websocket';
|
||||
@@ -33,16 +35,21 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket';
|
||||
*/
|
||||
@Service()
|
||||
export class Push extends TypedEmitter<PushEvents> {
|
||||
public isBidirectional = useWebSockets;
|
||||
|
||||
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
|
||||
|
||||
constructor(private readonly orchestrationService: OrchestrationService) {
|
||||
super();
|
||||
|
||||
if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
|
||||
}
|
||||
|
||||
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
|
||||
const {
|
||||
ws,
|
||||
query: { pushRef },
|
||||
user,
|
||||
} = req;
|
||||
|
||||
if (!pushRef) {
|
||||
@@ -55,9 +62,9 @@ export class Push extends TypedEmitter<PushEvents> {
|
||||
}
|
||||
|
||||
if (req.ws) {
|
||||
(this.backend as WebSocketPush).add(pushRef, req.ws);
|
||||
(this.backend as WebSocketPush).add(pushRef, user.id, req.ws);
|
||||
} else if (!useWebSockets) {
|
||||
(this.backend as SSEPush).add(pushRef, { req, res });
|
||||
(this.backend as SSEPush).add(pushRef, user.id, { req, res });
|
||||
} else {
|
||||
res.status(401).send('Unauthorized');
|
||||
return;
|
||||
@@ -90,6 +97,10 @@ export class Push extends TypedEmitter<PushEvents> {
|
||||
return this.backend;
|
||||
}
|
||||
|
||||
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
|
||||
this.backend.sendToUsers(type, data, userIds);
|
||||
}
|
||||
|
||||
@OnShutdown()
|
||||
onShutdown() {
|
||||
this.backend.closeAllConnections();
|
||||
|
||||
@@ -5,6 +5,7 @@ import { Logger } from '@/logger';
|
||||
|
||||
import { AbstractPush } from './abstract.push';
|
||||
import type { PushRequest, PushResponse } from './types';
|
||||
import type { User } from '@/databases/entities/user';
|
||||
|
||||
type Connection = { req: PushRequest; res: PushResponse };
|
||||
|
||||
@@ -22,8 +23,8 @@ export class SSEPush extends AbstractPush<Connection> {
|
||||
});
|
||||
}
|
||||
|
||||
add(pushRef: string, connection: Connection) {
|
||||
super.add(pushRef, connection);
|
||||
add(pushRef: string, userId: User['id'], connection: Connection) {
|
||||
super.add(pushRef, userId, connection);
|
||||
this.channel.addClient(connection.req, connection.res);
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ import type { Response } from 'express';
|
||||
import type { WebSocket } from 'ws';
|
||||
|
||||
import type { AuthenticatedRequest } from '@/requests';
|
||||
import type { User } from '@/databases/entities/user';
|
||||
|
||||
// TODO: move all push related types here
|
||||
|
||||
@@ -11,3 +12,9 @@ export type SSEPushRequest = PushRequest & { ws: undefined };
|
||||
export type WebSocketPushRequest = PushRequest & { ws: WebSocket };
|
||||
|
||||
export type PushResponse = Response & { req: PushRequest };
|
||||
|
||||
export interface OnPushMessage {
|
||||
pushRef: string;
|
||||
userId: User['id'];
|
||||
msg: unknown;
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ import type WebSocket from 'ws';
|
||||
import { Service } from 'typedi';
|
||||
import { Logger } from '@/logger';
|
||||
import { AbstractPush } from './abstract.push';
|
||||
import type { User } from '@/databases/entities/user';
|
||||
import { ApplicationError, ErrorReporterProxy } from 'n8n-workflow';
|
||||
|
||||
function heartbeat(this: WebSocket) {
|
||||
this.isAlive = true;
|
||||
@@ -16,17 +18,43 @@ export class WebSocketPush extends AbstractPush<WebSocket> {
|
||||
setInterval(() => this.pingAll(), 60 * 1000);
|
||||
}
|
||||
|
||||
add(pushRef: string, connection: WebSocket) {
|
||||
add(pushRef: string, userId: User['id'], connection: WebSocket) {
|
||||
connection.isAlive = true;
|
||||
connection.on('pong', heartbeat);
|
||||
|
||||
super.add(pushRef, connection);
|
||||
super.add(pushRef, userId, connection);
|
||||
|
||||
const onMessage = (data: WebSocket.RawData) => {
|
||||
try {
|
||||
const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data);
|
||||
|
||||
this.onMessageReceived(pushRef, JSON.parse(buffer.toString('utf8')));
|
||||
} catch (error) {
|
||||
ErrorReporterProxy.error(
|
||||
new ApplicationError('Error parsing push message', {
|
||||
extra: {
|
||||
userId,
|
||||
data,
|
||||
},
|
||||
cause: error,
|
||||
}),
|
||||
);
|
||||
this.logger.error("Couldn't parse message from editor-UI", {
|
||||
error: error as unknown,
|
||||
pushRef,
|
||||
data,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Makes sure to remove the session if the connection is closed
|
||||
connection.once('close', () => {
|
||||
connection.off('pong', heartbeat);
|
||||
connection.off('message', onMessage);
|
||||
this.remove(pushRef);
|
||||
});
|
||||
|
||||
connection.on('message', onMessage);
|
||||
}
|
||||
|
||||
protected close(connection: WebSocket): void {
|
||||
|
||||
Reference in New Issue
Block a user