diff --git a/packages/cli/src/controllers/ai-assistant.controller.ts b/packages/cli/src/controllers/ai-assistant.controller.ts index 6248fa3bb0..fb11e15d1a 100644 --- a/packages/cli/src/controllers/ai-assistant.controller.ts +++ b/packages/cli/src/controllers/ai-assistant.controller.ts @@ -1,31 +1,40 @@ -import { Post, RestController } from '@/decorators'; -import { AiAssistantService } from '@/services/ai-assistant.service'; -import { AiAssistantRequest } from '@/requests'; -import { Response } from 'express'; +import type { Response } from 'express'; import type { AiAssistantSDK } from '@n8n_io/ai-assistant-sdk'; -import { Readable, promises } from 'node:stream'; -import { InternalServerError } from 'express-openapi-validator/dist/openapi.validator'; +import { WritableStream } from 'node:stream/web'; import { strict as assert } from 'node:assert'; import { ErrorReporterProxy } from 'n8n-workflow'; +import { Post, RestController } from '@/decorators'; +import { InternalServerError } from '@/errors/response-errors/internal-server.error'; +import { AiAssistantRequest } from '@/requests'; +import { AiAssistantService } from '@/services/ai-assistant.service'; + +type FlushableResponse = Response & { flush: () => void }; + @RestController('/ai-assistant') export class AiAssistantController { constructor(private readonly aiAssistantService: AiAssistantService) {} @Post('/chat', { rateLimit: { limit: 100 } }) - async chat(req: AiAssistantRequest.Chat, res: Response) { + async chat(req: AiAssistantRequest.Chat, res: FlushableResponse) { try { - const stream = await this.aiAssistantService.chat(req.body, req.user); - - if (stream.body) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - await promises.pipeline(Readable.fromWeb(stream.body), res); + const aiResponse = await this.aiAssistantService.chat(req.body, req.user); + if (aiResponse.body) { + res.header('Content-type', 'application/json-lines').flush(); + await aiResponse.body.pipeTo( + new WritableStream({ + write(chunk) { + res.write(chunk); + res.flush(); + }, + }), + ); + res.end(); } } catch (e) { - // todo add sentry reporting assert(e instanceof Error); ErrorReporterProxy.error(e); - throw new InternalServerError({ message: `Something went wrong: ${e.message}` }); + throw new InternalServerError(`Something went wrong: ${e.message}`); } } @@ -38,7 +47,7 @@ export class AiAssistantController { } catch (e) { assert(e instanceof Error); ErrorReporterProxy.error(e); - throw new InternalServerError({ message: `Something went wrong: ${e.message}` }); + throw new InternalServerError(`Something went wrong: ${e.message}`); } } }