feat(core): Initial support for two-way communication over websockets (#7570)

- Enable two-way communication with web sockets
- Enable sending push messages to specific users
- Add collaboration service for managing active users for workflow

Missing things:
- State is currently kept only in memory, making this not work in
multi-master setups
- Removing a user from active users in situations where they go inactive
or we miss the "workflow closed" message
- I think a timer based solution for this would cover most edge cases.
I.e. have FE ping every X minutes, BE removes the user unless they have
received a ping in Y minutes, where Y > X
- FE changes to be added later by @MiloradFilipovic 

Github issue / Community forum post (link here to close automatically):

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Tomi Turtiainen 2023-11-07 17:26:45 +02:00 committed by GitHub
parent a3a26109c6
commit ac877014ed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 641 additions and 45 deletions

View file

@ -503,58 +503,74 @@ export type IPushData =
| PushDataRemoveNodeType | PushDataRemoveNodeType
| PushDataTestWebhook | PushDataTestWebhook
| PushDataNodeDescriptionUpdated | PushDataNodeDescriptionUpdated
| PushDataExecutionRecovered; | PushDataExecutionRecovered
| PushDataActiveWorkflowUsersChanged;
type PushDataExecutionRecovered = { type PushDataActiveWorkflowUsersChanged = {
data: IActiveWorkflowUsersChanged;
type: 'activeWorkflowUsersChanged';
};
export type PushDataExecutionRecovered = {
data: IPushDataExecutionRecovered; data: IPushDataExecutionRecovered;
type: 'executionRecovered'; type: 'executionRecovered';
}; };
type PushDataExecutionFinished = { export type PushDataExecutionFinished = {
data: IPushDataExecutionFinished; data: IPushDataExecutionFinished;
type: 'executionFinished'; type: 'executionFinished';
}; };
type PushDataExecutionStarted = { export type PushDataExecutionStarted = {
data: IPushDataExecutionStarted; data: IPushDataExecutionStarted;
type: 'executionStarted'; type: 'executionStarted';
}; };
type PushDataExecuteAfter = { export type PushDataExecuteAfter = {
data: IPushDataNodeExecuteAfter; data: IPushDataNodeExecuteAfter;
type: 'nodeExecuteAfter'; type: 'nodeExecuteAfter';
}; };
type PushDataExecuteBefore = { export type PushDataExecuteBefore = {
data: IPushDataNodeExecuteBefore; data: IPushDataNodeExecuteBefore;
type: 'nodeExecuteBefore'; type: 'nodeExecuteBefore';
}; };
type PushDataConsoleMessage = { export type PushDataConsoleMessage = {
data: IPushDataConsoleMessage; data: IPushDataConsoleMessage;
type: 'sendConsoleMessage'; type: 'sendConsoleMessage';
}; };
type PushDataReloadNodeType = { export type PushDataReloadNodeType = {
data: IPushDataReloadNodeType; data: IPushDataReloadNodeType;
type: 'reloadNodeType'; type: 'reloadNodeType';
}; };
type PushDataRemoveNodeType = { export type PushDataRemoveNodeType = {
data: IPushDataRemoveNodeType; data: IPushDataRemoveNodeType;
type: 'removeNodeType'; type: 'removeNodeType';
}; };
type PushDataTestWebhook = { export type PushDataTestWebhook = {
data: IPushDataTestWebhook; data: IPushDataTestWebhook;
type: 'testWebhookDeleted' | 'testWebhookReceived'; type: 'testWebhookDeleted' | 'testWebhookReceived';
}; };
type PushDataNodeDescriptionUpdated = { export type PushDataNodeDescriptionUpdated = {
data: undefined; data: undefined;
type: 'nodeDescriptionUpdated'; type: 'nodeDescriptionUpdated';
}; };
export interface IActiveWorkflowUser {
user: User;
lastSeen: Date;
}
export interface IActiveWorkflowUsersChanged {
workflowId: Workflow['id'];
activeUsers: IActiveWorkflowUser[];
}
export interface IPushDataExecutionRecovered { export interface IPushDataExecutionRecovered {
executionId: string; executionId: string;
} }

View file

@ -340,7 +340,7 @@ export class LoadNodesAndCredentials {
loader.reset(); loader.reset();
await loader.loadAll(); await loader.loadAll();
await this.postProcessLoaders(); await this.postProcessLoaders();
push.send('nodeDescriptionUpdated', undefined); push.broadcast('nodeDescriptionUpdated');
}, 100); }, 100);
const toWatch = loader.isLazyLoaded const toWatch = loader.isLazyLoaded

View file

@ -0,0 +1,23 @@
export type CollaborationMessage = WorkflowOpenedMessage | WorkflowClosedMessage;
export type WorkflowOpenedMessage = {
type: 'workflowOpened';
workflowId: string;
};
export type WorkflowClosedMessage = {
type: 'workflowClosed';
workflowId: string;
};
const isWorkflowMessage = (msg: unknown): msg is CollaborationMessage => {
return typeof msg === 'object' && msg !== null && 'type' in msg;
};
export const isWorkflowOpenedMessage = (msg: unknown): msg is WorkflowOpenedMessage => {
return isWorkflowMessage(msg) && msg.type === 'workflowOpened';
};
export const isWorkflowClosedMessage = (msg: unknown): msg is WorkflowClosedMessage => {
return isWorkflowMessage(msg) && msg.type === 'workflowClosed';
};

View file

@ -0,0 +1,87 @@
import type { Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
import { Push } from '../push';
import { Logger } from '@/Logger';
import type { WorkflowClosedMessage, WorkflowOpenedMessage } from './collaboration.message';
import { isWorkflowClosedMessage, isWorkflowOpenedMessage } from './collaboration.message';
import { UserService } from '../services/user.service';
import type { IActiveWorkflowUsersChanged } from '../Interfaces';
import type { OnPushMessageEvent } from '@/push/types';
import { CollaborationState } from '@/collaboration/collaboration.state';
/**
* Service for managing collaboration feature between users. E.g. keeping
* track of active users for a workflow.
*/
@Service()
export class CollaborationService {
constructor(
private readonly logger: Logger,
private readonly push: Push,
private readonly state: CollaborationState,
private readonly userService: UserService,
) {
if (!push.isBidirectional) {
logger.warn(
'Collaboration features are disabled because push is configured unidirectional. Use N8N_PUSH_BACKEND=websocket environment variable to enable them.',
);
return;
}
this.push.on('message', async (event: OnPushMessageEvent) => {
try {
await this.handleUserMessage(event.userId, event.msg);
} catch (error) {
this.logger.error('Error handling user message', {
error: error as unknown,
msg: event.msg,
userId: event.userId,
});
}
});
}
async handleUserMessage(userId: string, msg: unknown) {
if (isWorkflowOpenedMessage(msg)) {
await this.handleWorkflowOpened(userId, msg);
} else if (isWorkflowClosedMessage(msg)) {
await this.handleWorkflowClosed(userId, msg);
}
}
private async handleWorkflowOpened(userId: string, msg: WorkflowOpenedMessage) {
const { workflowId } = msg;
this.state.addActiveWorkflowUser(workflowId, userId);
await this.sendWorkflowUsersChangedMessage(workflowId);
}
private async handleWorkflowClosed(userId: string, msg: WorkflowClosedMessage) {
const { workflowId } = msg;
this.state.removeActiveWorkflowUser(workflowId, userId);
await this.sendWorkflowUsersChangedMessage(workflowId);
}
private async sendWorkflowUsersChangedMessage(workflowId: Workflow['id']) {
const activeWorkflowUsers = this.state.getActiveWorkflowUsers(workflowId);
const workflowUserIds = activeWorkflowUsers.map((user) => user.userId);
if (workflowUserIds.length === 0) {
return;
}
const users = await this.userService.getByIds(this.userService.getManager(), workflowUserIds);
const msgData: IActiveWorkflowUsersChanged = {
workflowId,
activeUsers: users.map((user) => ({
user,
lastSeen: activeWorkflowUsers.find((activeUser) => activeUser.userId === user.id)!.lastSeen,
})),
};
this.push.sendToUsers('activeWorkflowUsersChanged', msgData, workflowUserIds);
}
}

View file

@ -0,0 +1,62 @@
import type { User } from '@/databases/entities/User';
import type { Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
type ActiveWorkflowUser = {
userId: User['id'];
lastSeen: Date;
};
type UserStateByUserId = Map<User['id'], ActiveWorkflowUser>;
type State = {
activeUsersByWorkflowId: Map<Workflow['id'], UserStateByUserId>;
};
/**
* State management for the collaboration service
*/
@Service()
export class CollaborationState {
private state: State = {
activeUsersByWorkflowId: new Map(),
};
addActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
const { activeUsersByWorkflowId } = this.state;
let activeUsers = activeUsersByWorkflowId.get(workflowId);
if (!activeUsers) {
activeUsers = new Map();
activeUsersByWorkflowId.set(workflowId, activeUsers);
}
activeUsers.set(userId, {
userId,
lastSeen: new Date(),
});
}
removeActiveWorkflowUser(workflowId: Workflow['id'], userId: User['id']) {
const { activeUsersByWorkflowId } = this.state;
const activeUsers = activeUsersByWorkflowId.get(workflowId);
if (!activeUsers) {
return;
}
activeUsers.delete(userId);
if (activeUsers.size === 0) {
activeUsersByWorkflowId.delete(workflowId);
}
}
getActiveWorkflowUsers(workflowId: Workflow['id']): ActiveWorkflowUser[] {
const workflowState = this.state.activeUsersByWorkflowId.get(workflowId);
if (!workflowState) {
return [];
}
return [...workflowState.values()];
}
}

View file

@ -129,7 +129,7 @@ export class CommunityPackagesController {
// broadcast to connected frontends that node list has been updated // broadcast to connected frontends that node list has been updated
installedPackage.installedNodes.forEach((node) => { installedPackage.installedNodes.forEach((node) => {
this.push.send('reloadNodeType', { this.push.broadcast('reloadNodeType', {
name: node.type, name: node.type,
version: node.latestVersion, version: node.latestVersion,
}); });
@ -218,7 +218,7 @@ export class CommunityPackagesController {
// broadcast to connected frontends that node list has been updated // broadcast to connected frontends that node list has been updated
installedPackage.installedNodes.forEach((node) => { installedPackage.installedNodes.forEach((node) => {
this.push.send('removeNodeType', { this.push.broadcast('removeNodeType', {
name: node.type, name: node.type,
version: node.latestVersion, version: node.latestVersion,
}); });
@ -257,14 +257,14 @@ export class CommunityPackagesController {
// broadcast to connected frontends that node list has been updated // broadcast to connected frontends that node list has been updated
previouslyInstalledPackage.installedNodes.forEach((node) => { previouslyInstalledPackage.installedNodes.forEach((node) => {
this.push.send('removeNodeType', { this.push.broadcast('removeNodeType', {
name: node.type, name: node.type,
version: node.latestVersion, version: node.latestVersion,
}); });
}); });
newInstalledPackage.installedNodes.forEach((node) => { newInstalledPackage.installedNodes.forEach((node) => {
this.push.send('reloadNodeType', { this.push.broadcast('reloadNodeType', {
name: node.name, name: node.name,
version: node.latestVersion, version: node.latestVersion,
}); });
@ -283,7 +283,7 @@ export class CommunityPackagesController {
return newInstalledPackage; return newInstalledPackage;
} catch (error) { } catch (error) {
previouslyInstalledPackage.installedNodes.forEach((node) => { previouslyInstalledPackage.installedNodes.forEach((node) => {
this.push.send('removeNodeType', { this.push.broadcast('removeNodeType', {
name: node.type, name: node.type,
version: node.latestVersion, version: node.latestVersion,
}); });

View file

@ -195,7 +195,7 @@ export async function recoverExecutionDataFromEventLogMessages(
push.once('editorUiConnected', function handleUiBackUp() { push.once('editorUiConnected', function handleUiBackUp() {
// add a small timeout to make sure the UI is back up // add a small timeout to make sure the UI is back up
setTimeout(() => { setTimeout(() => {
push.send('executionRecovered', { executionId }); push.broadcast('executionRecovered', { executionId });
}, 1000); }, 1000);
}); });
} }

View file

@ -1,17 +1,29 @@
import { jsonStringify } from 'n8n-workflow'; import { EventEmitter } from 'events';
import { assert, jsonStringify } from 'n8n-workflow';
import type { IPushDataType } from '@/Interfaces'; import type { IPushDataType } from '@/Interfaces';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import type { User } from '@/databases/entities/User';
export abstract class AbstractPush<T> { /**
* Abstract class for two-way push communication.
* Keeps track of user sessions and enables sending messages.
*
* @emits message when a message is received from a client
*/
export abstract class AbstractPush<T> extends EventEmitter {
protected connections: Record<string, T> = {}; protected connections: Record<string, T> = {};
protected userIdBySessionId: Record<string, string> = {};
protected abstract close(connection: T): void; protected abstract close(connection: T): void;
protected abstract sendToOne(connection: T, data: string): void; protected abstract sendToOne(connection: T, data: string): void;
constructor(private readonly logger: Logger) {} constructor(protected readonly logger: Logger) {
super();
}
protected add(sessionId: string, connection: T): void { protected add(sessionId: string, userId: User['id'], connection: T): void {
const { connections } = this; const { connections, userIdBySessionId: userIdsBySessionId } = this;
this.logger.debug('Add editor-UI session', { sessionId }); this.logger.debug('Add editor-UI session', { sessionId });
const existingConnection = connections[sessionId]; const existingConnection = connections[sessionId];
@ -21,32 +33,65 @@ export abstract class AbstractPush<T> {
} }
connections[sessionId] = connection; connections[sessionId] = connection;
userIdsBySessionId[sessionId] = userId;
}
protected onMessageReceived(sessionId: string, msg: unknown): void {
this.logger.debug('Received message from editor-UI', { sessionId, msg });
const userId = this.userIdBySessionId[sessionId];
this.emit('message', {
sessionId,
userId,
msg,
});
} }
protected remove(sessionId?: string): void { protected remove(sessionId?: string): void {
if (sessionId !== undefined) { if (sessionId !== undefined) {
this.logger.debug('Remove editor-UI session', { sessionId }); this.logger.debug('Remove editor-UI session', { sessionId });
delete this.connections[sessionId]; delete this.connections[sessionId];
delete this.userIdBySessionId[sessionId];
} }
} }
send<D>(type: IPushDataType, data: D, sessionId: string | undefined) { private sendToSessions<D>(type: IPushDataType, data: D, sessionIds: string[]) {
this.logger.debug(`Send data of type "${type}" to editor-UI`, {
dataType: type,
sessionIds: sessionIds.join(', '),
});
const sendData = jsonStringify({ type, data }, { replaceCircularRefs: true });
for (const sessionId of sessionIds) {
const connection = this.connections[sessionId];
assert(connection);
this.sendToOne(connection, sendData);
}
}
broadcast<D>(type: IPushDataType, data?: D) {
this.sendToSessions(type, data, Object.keys(this.connections));
}
send<D>(type: IPushDataType, data: D, sessionId: string) {
const { connections } = this; const { connections } = this;
if (sessionId !== undefined && connections[sessionId] === undefined) { if (connections[sessionId] === undefined) {
this.logger.error(`The session "${sessionId}" is not registered.`, { sessionId }); this.logger.error(`The session "${sessionId}" is not registered.`, { sessionId });
return; return;
} }
this.logger.debug(`Send data of type "${type}" to editor-UI`, { dataType: type, sessionId }); this.sendToSessions(type, data, [sessionId]);
}
const sendData = jsonStringify({ type, data }, { replaceCircularRefs: true }); /**
* Sends the given data to given users' connections
*/
sendToUsers<D>(type: IPushDataType, data: D, userIds: Array<User['id']>) {
const { connections } = this;
const userSessionIds = Object.keys(connections).filter((sessionId) =>
userIds.includes(this.userIdBySessionId[sessionId]),
);
if (sessionId === undefined) { this.sendToSessions(type, data, userSessionIds);
// Send to all connected clients
Object.values(connections).forEach((connection) => this.sendToOne(connection, sendData));
} else {
// Send only to a specific client
this.sendToOne(connections[sessionId], sendData);
}
} }
} }

View file

@ -13,27 +13,52 @@ import { SSEPush } from './sse.push';
import { WebSocketPush } from './websocket.push'; import { WebSocketPush } from './websocket.push';
import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
import type { IPushDataType } from '@/Interfaces'; import type { IPushDataType } from '@/Interfaces';
import type { User } from '@/databases/entities/User';
const useWebSockets = config.getEnv('push.backend') === 'websocket'; const useWebSockets = config.getEnv('push.backend') === 'websocket';
/**
* Push service for uni- or bi-directional communication with frontend clients.
* Uses either server-sent events (SSE, unidirectional from backend --> frontend)
* or WebSocket (bidirectional backend <--> frontend) depending on the configuration.
*
* @emits message when a message is received from a client
*/
@Service() @Service()
export class Push extends EventEmitter { export class Push extends EventEmitter {
public isBidirectional = useWebSockets;
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
const {
userId,
query: { sessionId },
} = req;
if (req.ws) { if (req.ws) {
(this.backend as WebSocketPush).add(req.query.sessionId, req.ws); (this.backend as WebSocketPush).add(sessionId, userId, req.ws);
this.backend.on('message', (msg) => this.emit('message', msg));
} else if (!useWebSockets) { } else if (!useWebSockets) {
(this.backend as SSEPush).add(req.query.sessionId, { req, res }); (this.backend as SSEPush).add(sessionId, userId, { req, res });
} else { } else {
res.status(401).send('Unauthorized'); res.status(401).send('Unauthorized');
} return;
this.emit('editorUiConnected', req.query.sessionId);
} }
send<D>(type: IPushDataType, data: D, sessionId: string | undefined = undefined) { this.emit('editorUiConnected', sessionId);
}
broadcast<D>(type: IPushDataType, data?: D) {
this.backend.broadcast(type, data);
}
send<D>(type: IPushDataType, data: D, sessionId: string) {
this.backend.send(type, data, sessionId); this.backend.send(type, data, sessionId);
} }
sendToUsers<D>(type: IPushDataType, data: D, userIds: Array<User['id']>) {
this.backend.sendToUsers(type, data, userIds);
}
} }
export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => { export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => {
@ -82,7 +107,8 @@ export const setupPushHandler = (restEndpoint: string, app: Application) => {
try { try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access
const authCookie: string = req.cookies?.[AUTH_COOKIE_NAME] ?? ''; const authCookie: string = req.cookies?.[AUTH_COOKIE_NAME] ?? '';
await resolveJwt(authCookie); const user = await resolveJwt(authCookie);
req.userId = user.id;
} catch (error) { } catch (error) {
if (ws) { if (ws) {
ws.send(`Unauthorized: ${(error as Error).message}`); ws.send(`Unauthorized: ${(error as Error).message}`);

View file

@ -3,6 +3,7 @@ import { Service } from 'typedi';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { AbstractPush } from './abstract.push'; import { AbstractPush } from './abstract.push';
import type { PushRequest, PushResponse } from './types'; import type { PushRequest, PushResponse } from './types';
import type { User } from '@/databases/entities/User';
type Connection = { req: PushRequest; res: PushResponse }; type Connection = { req: PushRequest; res: PushResponse };
@ -19,8 +20,8 @@ export class SSEPush extends AbstractPush<Connection> {
}); });
} }
add(sessionId: string, connection: Connection) { add(sessionId: string, userId: User['id'], connection: Connection) {
super.add(sessionId, connection); super.add(sessionId, userId, connection);
this.channel.addClient(connection.req, connection.res); this.channel.addClient(connection.req, connection.res);
} }

View file

@ -1,3 +1,4 @@
import type { User } from '@/databases/entities/User';
import type { Request, Response } from 'express'; import type { Request, Response } from 'express';
import type { WebSocket } from 'ws'; import type { WebSocket } from 'ws';
@ -5,7 +6,13 @@ import type { WebSocket } from 'ws';
export type PushRequest = Request<{}, {}, {}, { sessionId: string }>; export type PushRequest = Request<{}, {}, {}, { sessionId: string }>;
export type SSEPushRequest = PushRequest & { ws: undefined }; export type SSEPushRequest = PushRequest & { ws: undefined; userId: User['id'] };
export type WebSocketPushRequest = PushRequest & { ws: WebSocket }; export type WebSocketPushRequest = PushRequest & { ws: WebSocket; userId: User['id'] };
export type PushResponse = Response & { req: PushRequest }; export type PushResponse = Response & { req: PushRequest };
export type OnPushMessageEvent = {
sessionId: string;
userId: User['id'];
msg: unknown;
};

View file

@ -2,6 +2,7 @@ import type WebSocket from 'ws';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { AbstractPush } from './abstract.push'; import { AbstractPush } from './abstract.push';
import type { User } from '@/databases/entities/User';
function heartbeat(this: WebSocket) { function heartbeat(this: WebSocket) {
this.isAlive = true; this.isAlive = true;
@ -16,17 +17,34 @@ export class WebSocketPush extends AbstractPush<WebSocket> {
setInterval(() => this.pingAll(), 60 * 1000); setInterval(() => this.pingAll(), 60 * 1000);
} }
add(sessionId: string, connection: WebSocket) { add(sessionId: string, userId: User['id'], connection: WebSocket) {
connection.isAlive = true; connection.isAlive = true;
connection.on('pong', heartbeat); connection.on('pong', heartbeat);
super.add(sessionId, connection); super.add(sessionId, userId, connection);
const onMessage = (data: WebSocket.RawData) => {
try {
const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data);
this.onMessageReceived(sessionId, JSON.parse(buffer.toString('utf8')));
} catch (error) {
this.logger.error("Couldn't parse message from editor-UI", {
error: error as unknown,
sessionId,
data,
});
}
};
// Makes sure to remove the session if the connection is closed // Makes sure to remove the session if the connection is closed
connection.once('close', () => { connection.once('close', () => {
connection.off('pong', heartbeat); connection.off('pong', heartbeat);
connection.off('message', onMessage);
this.remove(sessionId); this.remove(sessionId);
}); });
connection.on('message', onMessage);
} }
protected close(connection: WebSocket): void { protected close(connection: WebSocket): void {

View file

@ -0,0 +1,150 @@
import { CollaborationService } from '@/collaboration/collaboration.service';
import type { Logger } from '@/Logger';
import type { User } from '@/databases/entities/User';
import type { UserService } from '@/services/user.service';
import { CollaborationState } from '@/collaboration/collaboration.state';
import type { Push } from '@/push';
import type {
WorkflowClosedMessage,
WorkflowOpenedMessage,
} from '@/collaboration/collaboration.message';
describe('CollaborationService', () => {
let collaborationService: CollaborationService;
let mockLogger: Logger;
let mockUserService: jest.Mocked<UserService>;
let state: CollaborationState;
let push: Push;
beforeEach(() => {
mockLogger = {
warn: jest.fn(),
error: jest.fn(),
} as unknown as jest.Mocked<Logger>;
mockUserService = {
getByIds: jest.fn(),
getManager: jest.fn(),
} as unknown as jest.Mocked<UserService>;
push = {
on: jest.fn(),
sendToUsers: jest.fn(),
} as unknown as Push;
state = new CollaborationState();
collaborationService = new CollaborationService(mockLogger, push, state, mockUserService);
});
describe('workflow opened message', () => {
const userId = 'test-user';
const workflowId = 'test-workflow';
const message: WorkflowOpenedMessage = {
type: 'workflowOpened',
workflowId,
};
const expectActiveUsersChangedMessage = (userIds: string[]) => {
expect(push.sendToUsers).toHaveBeenCalledWith(
'activeWorkflowUsersChanged',
{
workflowId,
activeUsers: [
{
user: { id: userId },
lastSeen: expect.any(Date),
},
],
},
[userId],
);
};
describe('user is not yet active', () => {
it('updates state correctly', async () => {
mockUserService.getByIds.mockResolvedValueOnce([{ id: userId } as User]);
await collaborationService.handleUserMessage(userId, message);
expect(state.getActiveWorkflowUsers(workflowId)).toEqual([
{
lastSeen: expect.any(Date),
userId,
},
]);
});
it('sends active workflow users changed message', async () => {
mockUserService.getByIds.mockResolvedValueOnce([{ id: userId } as User]);
await collaborationService.handleUserMessage(userId, message);
expectActiveUsersChangedMessage([userId]);
});
});
describe('user is already active', () => {
beforeEach(() => {
state.addActiveWorkflowUser(workflowId, userId);
});
it('updates state correctly', async () => {
mockUserService.getByIds.mockResolvedValueOnce([{ id: userId } as User]);
await collaborationService.handleUserMessage(userId, message);
expect(state.getActiveWorkflowUsers(workflowId)).toEqual([
{
lastSeen: expect.any(Date),
userId,
},
]);
});
it('sends active workflow users changed message', async () => {
mockUserService.getByIds.mockResolvedValueOnce([{ id: userId } as User]);
await collaborationService.handleUserMessage(userId, message);
expectActiveUsersChangedMessage([userId]);
});
});
});
describe('workflow closed message', () => {
const userId = 'test-user';
const workflowId = 'test-workflow';
const message: WorkflowClosedMessage = {
type: 'workflowClosed',
workflowId,
};
describe('user is active', () => {
beforeEach(() => {
state.addActiveWorkflowUser(workflowId, userId);
});
it('updates state correctly', async () => {
await collaborationService.handleUserMessage(userId, message);
expect(state.getActiveWorkflowUsers(workflowId)).toEqual([]);
});
it('does not send active workflow users changed message', async () => {
await collaborationService.handleUserMessage(userId, message);
expect(push.sendToUsers).not.toHaveBeenCalled();
});
});
describe('user is not active', () => {
it('updates state correctly', async () => {
await collaborationService.handleUserMessage(userId, message);
expect(state.getActiveWorkflowUsers(workflowId)).toEqual([]);
});
it('does not send active workflow users changed message', async () => {
await collaborationService.handleUserMessage(userId, message);
expect(push.sendToUsers).not.toHaveBeenCalled();
});
});
});
});

View file

@ -0,0 +1,161 @@
import Container from 'typedi';
import { EventEmitter } from 'events';
import type WebSocket from 'ws';
import { WebSocketPush } from '@/push/websocket.push';
import { Logger } from '@/Logger';
import type { User } from '@/databases/entities/User';
import type { PushDataExecutionRecovered } from '@/Interfaces';
import { mockInstance } from '../../integration/shared/utils';
jest.useFakeTimers();
class MockWebSocket extends EventEmitter {
public isAlive = true;
public ping = jest.fn();
public send = jest.fn();
public terminate = jest.fn();
public close = jest.fn();
}
const createMockWebSocket = () => new MockWebSocket() as unknown as jest.Mocked<WebSocket>;
describe('WebSocketPush', () => {
const sessionId1 = 'test-session1';
const sessionId2 = 'test-session2';
const userId: User['id'] = 'test-user';
mockInstance(Logger);
const webSocketPush = Container.get(WebSocketPush);
const mockWebSocket1 = createMockWebSocket();
const mockWebSocket2 = createMockWebSocket();
beforeEach(() => {
jest.resetAllMocks();
});
it('can add a connection', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
expect(mockWebSocket1.listenerCount('close')).toBe(1);
expect(mockWebSocket1.listenerCount('pong')).toBe(1);
expect(mockWebSocket1.listenerCount('message')).toBe(1);
});
it('closes a connection', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
mockWebSocket1.emit('close');
expect(mockWebSocket1.listenerCount('close')).toBe(0);
expect(mockWebSocket1.listenerCount('pong')).toBe(0);
expect(mockWebSocket1.listenerCount('message')).toBe(0);
});
it('sends data to one connection', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
webSocketPush.add(sessionId2, userId, mockWebSocket2);
const data: PushDataExecutionRecovered = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
webSocketPush.send('executionRecovered', data, sessionId1);
expect(mockWebSocket1.send).toHaveBeenCalledWith(
JSON.stringify({
type: 'executionRecovered',
data: {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
},
}),
);
expect(mockWebSocket2.send).not.toHaveBeenCalled();
});
it('sends data to all connections', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
webSocketPush.add(sessionId2, userId, mockWebSocket2);
const data: PushDataExecutionRecovered = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
webSocketPush.broadcast('executionRecovered', data);
const expectedMsg = JSON.stringify({
type: 'executionRecovered',
data: {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
},
});
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
});
it('sends data to all users connections', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
webSocketPush.add(sessionId2, userId, mockWebSocket2);
const data: PushDataExecutionRecovered = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
webSocketPush.sendToUsers('executionRecovered', data, [userId]);
const expectedMsg = JSON.stringify({
type: 'executionRecovered',
data: {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
},
});
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
});
it('pings all connections', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
webSocketPush.add(sessionId2, userId, mockWebSocket2);
jest.runOnlyPendingTimers();
expect(mockWebSocket1.ping).toHaveBeenCalled();
expect(mockWebSocket2.ping).toHaveBeenCalled();
});
it('emits message event when connection receives data', () => {
const mockOnMessageReceived = jest.fn();
webSocketPush.on('message', mockOnMessageReceived);
webSocketPush.add(sessionId1, userId, mockWebSocket1);
webSocketPush.add(sessionId2, userId, mockWebSocket2);
const data = { test: 'data' };
const buffer = Buffer.from(JSON.stringify(data));
mockWebSocket1.emit('message', buffer);
expect(mockOnMessageReceived).toHaveBeenCalledWith({
msg: data,
sessionId: sessionId1,
userId,
});
});
});