n8n/packages/cli/test/unit/push/websocket.push.test.ts
Tomi Turtiainen ac877014ed
feat(core): Initial support for two-way communication over websockets (#7570)
- Enable two-way communication with web sockets
- Enable sending push messages to specific users
- Add collaboration service for managing active users for workflow

Missing things:
- State is currently kept only in memory, making this not work in
multi-master setups
- Removing a user from active users in situations where they go inactive
or we miss the "workflow closed" message
- I think a timer based solution for this would cover most edge cases.
I.e. have FE ping every X minutes, BE removes the user unless they have
received a ping in Y minutes, where Y > X
- FE changes to be added later by @MiloradFilipovic 

Github issue / Community forum post (link here to close automatically):

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
2023-11-07 17:26:45 +02:00

162 lines
4.3 KiB
TypeScript

import Container from 'typedi';
import { EventEmitter } from 'events';
import type WebSocket from 'ws';
import { WebSocketPush } from '@/push/websocket.push';
import { Logger } from '@/Logger';
import type { User } from '@/databases/entities/User';
import type { PushDataExecutionRecovered } from '@/Interfaces';
import { mockInstance } from '../../integration/shared/utils';
jest.useFakeTimers();
class MockWebSocket extends EventEmitter {
public isAlive = true;
public ping = jest.fn();
public send = jest.fn();
public terminate = jest.fn();
public close = jest.fn();
}
const createMockWebSocket = () => new MockWebSocket() as unknown as jest.Mocked<WebSocket>;
describe('WebSocketPush', () => {
const sessionId1 = 'test-session1';
const sessionId2 = 'test-session2';
const userId: User['id'] = 'test-user';
mockInstance(Logger);
const webSocketPush = Container.get(WebSocketPush);
const mockWebSocket1 = createMockWebSocket();
const mockWebSocket2 = createMockWebSocket();
beforeEach(() => {
jest.resetAllMocks();
});
it('can add a connection', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
expect(mockWebSocket1.listenerCount('close')).toBe(1);
expect(mockWebSocket1.listenerCount('pong')).toBe(1);
expect(mockWebSocket1.listenerCount('message')).toBe(1);
});
it('closes a connection', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
mockWebSocket1.emit('close');
expect(mockWebSocket1.listenerCount('close')).toBe(0);
expect(mockWebSocket1.listenerCount('pong')).toBe(0);
expect(mockWebSocket1.listenerCount('message')).toBe(0);
});
it('sends data to one connection', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
webSocketPush.add(sessionId2, userId, mockWebSocket2);
const data: PushDataExecutionRecovered = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
webSocketPush.send('executionRecovered', data, sessionId1);
expect(mockWebSocket1.send).toHaveBeenCalledWith(
JSON.stringify({
type: 'executionRecovered',
data: {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
},
}),
);
expect(mockWebSocket2.send).not.toHaveBeenCalled();
});
it('sends data to all connections', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
webSocketPush.add(sessionId2, userId, mockWebSocket2);
const data: PushDataExecutionRecovered = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
webSocketPush.broadcast('executionRecovered', data);
const expectedMsg = JSON.stringify({
type: 'executionRecovered',
data: {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
},
});
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
});
it('sends data to all users connections', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
webSocketPush.add(sessionId2, userId, mockWebSocket2);
const data: PushDataExecutionRecovered = {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
};
webSocketPush.sendToUsers('executionRecovered', data, [userId]);
const expectedMsg = JSON.stringify({
type: 'executionRecovered',
data: {
type: 'executionRecovered',
data: {
executionId: 'test-execution-id',
},
},
});
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
});
it('pings all connections', () => {
webSocketPush.add(sessionId1, userId, mockWebSocket1);
webSocketPush.add(sessionId2, userId, mockWebSocket2);
jest.runOnlyPendingTimers();
expect(mockWebSocket1.ping).toHaveBeenCalled();
expect(mockWebSocket2.ping).toHaveBeenCalled();
});
it('emits message event when connection receives data', () => {
const mockOnMessageReceived = jest.fn();
webSocketPush.on('message', mockOnMessageReceived);
webSocketPush.add(sessionId1, userId, mockWebSocket1);
webSocketPush.add(sessionId2, userId, mockWebSocket2);
const data = { test: 'data' };
const buffer = Buffer.from(JSON.stringify(data));
mockWebSocket1.emit('message', buffer);
expect(mockOnMessageReceived).toHaveBeenCalledWith({
msg: data,
sessionId: sessionId1,
userId,
});
});
});