From 8a4509d7ce19ca97c83909cb162609ee147c62e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Fri, 20 Dec 2024 12:35:33 +0100 Subject: [PATCH] WIP: setup chat handling via websockets --- packages/cli/src/chat/chat-service.ts | 153 ++++++++++++++++++++++++++ packages/cli/src/server.ts | 3 + packages/workflow/src/Interfaces.ts | 1 + 3 files changed, 157 insertions(+) create mode 100644 packages/cli/src/chat/chat-service.ts diff --git a/packages/cli/src/chat/chat-service.ts b/packages/cli/src/chat/chat-service.ts new file mode 100644 index 0000000000..90bf9eafa4 --- /dev/null +++ b/packages/cli/src/chat/chat-service.ts @@ -0,0 +1,153 @@ +import type { Application, Request } from 'express'; +import type { Server } from 'http'; +import { ServerResponse } from 'http'; +import type { IWorkflowDataProxyAdditionalKeys } from 'n8n-workflow'; +import { jsonParse, jsonStringify, Workflow } from 'n8n-workflow'; +import type { Socket } from 'net'; +import { Service } from 'typedi'; +import { parse as parseUrl } from 'url'; +import { type RawData, type WebSocket, Server as WebSocketServer } from 'ws'; + +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import { NodeTypes } from '@/node-types'; + +type ChatRequest = Request<{ workflowId: string }, {}, {}, { sessionId: string }> & { + ws: WebSocket; +}; +type Session = { + connection: WebSocket; + workflowId: string; + executionId?: string; +}; + +function heartbeat(this: WebSocket) { + this.isAlive = true; +} + +@Service() +export class ChatService { + private readonly sessions = new Map(); + + constructor( + private readonly workflowRepository: WorkflowRepository, + private readonly nodeTypes: NodeTypes, + ) { + // Ping all connected clients every 60 seconds + setInterval(() => this.pingAll(), 60 * 1000); + } + + setup(server: Server, app: Application) { + const wsServer = new WebSocketServer({ noServer: true }); + server.on('upgrade', (request: ChatRequest, socket: Socket, head) => { + if (parseUrl(request.url).pathname === '/chat') { + wsServer.handleUpgrade(request, socket, head, (ws) => { + request.ws = ws; + + const response = new ServerResponse(request); + response.writeHead = (statusCode) => { + if (statusCode > 200) ws.close(); + return response; + }; + + // @ts-ignore + // eslint-disable-next-line @typescript-eslint/no-unsafe-call + app.handle(request, response); + }); + } + }); + + app.use('/chat', async (req: ChatRequest) => await this.startSession(req)); + } + + async startSession(req: ChatRequest) { + const { + ws, + query: { sessionId }, + params: { workflowId }, + } = req; + if (!sessionId) { + ws.send('The query parameter "sessionId" is missing!'); + ws.close(1008); + return; + } + + const workflowData = await this.workflowRepository.findOne({ + where: { id: workflowId }, + relations: { shared: { project: { projectRelations: true } } }, + }); + + if (workflowData === null) { + ws.send(`Could not find workflow with id "${workflowId}"`); + ws.close(1008); + return; + } + + const session: Session = this.sessions.get(sessionId) ?? { connection: ws, workflowId }; + // Make sure that the session always points to the latest websocket connection + session.connection = ws; + + const workflow = new Workflow({ + id: workflowId, + name: workflowData.name, + nodes: workflowData.nodes, + connections: workflowData.connections, + active: workflowData.active, + nodeTypes: this.nodeTypes, + staticData: workflowData.staticData, + settings: workflowData.settings, + }); + // @ts-expect-error TODO: get the chat node here + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const startNode = {} as unknown as INode; + + const additionalKeys: IWorkflowDataProxyAdditionalKeys = { + $executionId: session.executionId, + }; + + // TODO: setup a trigger context to call `.trigger` on the chat node on every message + + ws.isAlive = true; + ws.on('pong', heartbeat); + this.sessions.set(sessionId, session); + + const onMessage = this.messageHandler(sessionId, session); + ws.once('close', () => { + ws.off('pong', heartbeat); + ws.off('message', onMessage); + this.sessions.delete(sessionId); + }); + ws.on('message', onMessage); + + ws.send( + jsonStringify({ + type: 'chat_started', + data: { sessionId }, + }), + ); + } + + private messageHandler(sessionId: string, { workflowId, executionId }: Session) { + return (data: RawData) => { + // TODO: handle closed sessions + const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data); + // TODO: start a new execution, or resume an existing one + // TODO: Add executionId to the session + // TODO: Call `.trigger` on the chat node + console.log(sessionId, workflowId, executionId, jsonParse(buffer.toString('utf8'))); + }; + } + + private pingAll() { + for (const { connection, executionId } of this.sessions.values()) { + // If a connection did not respond with a `PONG` in the last 60 seconds, disconnect + if (!connection.isAlive) { + return connection.terminate(); + if (executionId) { + // TODO: schedule the execution for cancellation + } + } + connection.isAlive = false; + connection.ping(); + } + } +} diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 74a1311444..244c318de1 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -7,6 +7,7 @@ import { resolve } from 'path'; import { Container, Service } from 'typedi'; import { AbstractServer } from '@/abstract-server'; +import { ChatService } from '@/chat/chat-service'; import config from '@/config'; import { CLI_DIR, @@ -402,5 +403,7 @@ export class Server extends AbstractServer { protected setupPushServer(): void { const { restEndpoint, server, app } = this; setupPushServer(restEndpoint, server, app); + + Container.get(ChatService).setup(server, app); } } diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index c5fa0a938a..0cd6f20036 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -2320,6 +2320,7 @@ export type WorkflowExecuteMode = | 'retry' | 'trigger' | 'webhook' + | 'chat' | 'evaluation'; export type WorkflowActivateMode =