From 6bb6a5c6cd1da3503a1a2b35bcf4c685cd3f964f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 2 Sep 2024 16:47:20 +0200 Subject: [PATCH] fix(core): Flush responses for ai streaming endpoints (#10633) --- .../controllers/ai-assistant.controller.ts | 39 ++++++++++++------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/packages/cli/src/controllers/ai-assistant.controller.ts b/packages/cli/src/controllers/ai-assistant.controller.ts index 6248fa3bb..fb11e15d1 100644 --- a/packages/cli/src/controllers/ai-assistant.controller.ts +++ b/packages/cli/src/controllers/ai-assistant.controller.ts @@ -1,31 +1,40 @@ -import { Post, RestController } from '@/decorators'; -import { AiAssistantService } from '@/services/ai-assistant.service'; -import { AiAssistantRequest } from '@/requests'; -import { Response } from 'express'; +import type { Response } from 'express'; import type { AiAssistantSDK } from '@n8n_io/ai-assistant-sdk'; -import { Readable, promises } from 'node:stream'; -import { InternalServerError } from 'express-openapi-validator/dist/openapi.validator'; +import { WritableStream } from 'node:stream/web'; import { strict as assert } from 'node:assert'; import { ErrorReporterProxy } from 'n8n-workflow'; +import { Post, RestController } from '@/decorators'; +import { InternalServerError } from '@/errors/response-errors/internal-server.error'; +import { AiAssistantRequest } from '@/requests'; +import { AiAssistantService } from '@/services/ai-assistant.service'; + +type FlushableResponse = Response & { flush: () => void }; + @RestController('/ai-assistant') export class AiAssistantController { constructor(private readonly aiAssistantService: AiAssistantService) {} @Post('/chat', { rateLimit: { limit: 100 } }) - async chat(req: AiAssistantRequest.Chat, res: Response) { + async chat(req: AiAssistantRequest.Chat, res: FlushableResponse) { try { - const stream = await this.aiAssistantService.chat(req.body, req.user); - - if (stream.body) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - await promises.pipeline(Readable.fromWeb(stream.body), res); + const aiResponse = await this.aiAssistantService.chat(req.body, req.user); + if (aiResponse.body) { + res.header('Content-type', 'application/json-lines').flush(); + await aiResponse.body.pipeTo( + new WritableStream({ + write(chunk) { + res.write(chunk); + res.flush(); + }, + }), + ); + res.end(); } } catch (e) { - // todo add sentry reporting assert(e instanceof Error); ErrorReporterProxy.error(e); - throw new InternalServerError({ message: `Something went wrong: ${e.message}` }); + throw new InternalServerError(`Something went wrong: ${e.message}`); } } @@ -38,7 +47,7 @@ export class AiAssistantController { } catch (e) { assert(e instanceof Error); ErrorReporterProxy.error(e); - throw new InternalServerError({ message: `Something went wrong: ${e.message}` }); + throw new InternalServerError(`Something went wrong: ${e.message}`); } } }