fix(core): Flush responses for ai streaming endpoints (#10633)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-09-02 16:47:20 +02:00 committed by GitHub
parent 14952eb83b
commit 6bb6a5c6cd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,31 +1,40 @@
import { Post, RestController } from '@/decorators';
import { AiAssistantService } from '@/services/ai-assistant.service';
import { AiAssistantRequest } from '@/requests';
import { Response } from 'express';
import type { Response } from 'express';
import type { AiAssistantSDK } from '@n8n_io/ai-assistant-sdk';
import { Readable, promises } from 'node:stream';
import { InternalServerError } from 'express-openapi-validator/dist/openapi.validator';
import { WritableStream } from 'node:stream/web';
import { strict as assert } from 'node:assert';
import { ErrorReporterProxy } from 'n8n-workflow';
import { Post, RestController } from '@/decorators';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { AiAssistantRequest } from '@/requests';
import { AiAssistantService } from '@/services/ai-assistant.service';
type FlushableResponse = Response & { flush: () => void };
@RestController('/ai-assistant')
export class AiAssistantController {
constructor(private readonly aiAssistantService: AiAssistantService) {}
@Post('/chat', { rateLimit: { limit: 100 } })
async chat(req: AiAssistantRequest.Chat, res: Response) {
async chat(req: AiAssistantRequest.Chat, res: FlushableResponse) {
try {
const stream = await this.aiAssistantService.chat(req.body, req.user);
if (stream.body) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await promises.pipeline(Readable.fromWeb(stream.body), res);
const aiResponse = await this.aiAssistantService.chat(req.body, req.user);
if (aiResponse.body) {
res.header('Content-type', 'application/json-lines').flush();
await aiResponse.body.pipeTo(
new WritableStream({
write(chunk) {
res.write(chunk);
res.flush();
},
}),
);
res.end();
}
} catch (e) {
// todo add sentry reporting
assert(e instanceof Error);
ErrorReporterProxy.error(e);
throw new InternalServerError({ message: `Something went wrong: ${e.message}` });
throw new InternalServerError(`Something went wrong: ${e.message}`);
}
}
@ -38,7 +47,7 @@ export class AiAssistantController {
} catch (e) {
assert(e instanceof Error);
ErrorReporterProxy.error(e);
throw new InternalServerError({ message: `Something went wrong: ${e.message}` });
throw new InternalServerError(`Something went wrong: ${e.message}`);
}
}
}