mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 13:27:31 -08:00
fix(editor): Buffer json chunks in stream response (#10439)
This commit is contained in:
parent
1466ff7525
commit
37797f38d8
|
@ -9,7 +9,14 @@ export function chatWithAssistant(
|
|||
onDone: () => void,
|
||||
onError: (e: Error) => void,
|
||||
): void {
|
||||
void streamRequest(ctx, '/ai-assistant/chat', payload, onMessageUpdated, onDone, onError);
|
||||
void streamRequest<ChatRequest.ResponsePayload>(
|
||||
ctx,
|
||||
'/ai-assistant/chat',
|
||||
payload,
|
||||
onMessageUpdated,
|
||||
onDone,
|
||||
onError,
|
||||
);
|
||||
}
|
||||
|
||||
export async function replaceCode(
|
||||
|
|
|
@ -131,7 +131,7 @@
|
|||
"auth.signup.setupYourAccount": "Set up your account",
|
||||
"auth.signup.setupYourAccountError": "Problem setting up your account",
|
||||
"auth.signup.tokenValidationError": "Issue validating invite token",
|
||||
"aiAssistant.name": "Ava",
|
||||
"aiAssistant.name": "Assistant",
|
||||
"aiAssistant.assistant": "AI Assistant",
|
||||
"aiAssistant.newSessionModal.title.part1": "Start new",
|
||||
"aiAssistant.newSessionModal.title.part2": "session",
|
||||
|
@ -139,7 +139,7 @@
|
|||
"aiAssistant.newSessionModal.question": "Are you sure you want to start a new session?",
|
||||
"aiAssistant.newSessionModal.confirm": "Start new session",
|
||||
"aiAssistant.serviceError.message": "Unable to connect to n8n's AI service",
|
||||
"aiAssistant.codeUpdated.message.title": "Ava modified workflow",
|
||||
"aiAssistant.codeUpdated.message.title": "Assistant modified workflow",
|
||||
"aiAssistant.codeUpdated.message.body": "Open the <a data-action='openNodeDetail' data-action-parameter-node='{nodeName}'>{nodeName}</a> node to see the changes",
|
||||
"banners.confirmEmail.message.1": "To secure your account and prevent future access issues, please confirm your",
|
||||
"banners.confirmEmail.message.2": "email address.",
|
||||
|
|
112
packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts
Normal file
112
packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts
Normal file
|
@ -0,0 +1,112 @@
|
|||
import { STREAM_SEPERATOR, streamRequest } from '../apiUtils';
|
||||
|
||||
describe('streamRequest', () => {
|
||||
it('should stream data from the API endpoint', async () => {
|
||||
const encoder = new TextEncoder();
|
||||
const mockResponse = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 1 })}${STREAM_SEPERATOR}`));
|
||||
controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 2 })}${STREAM_SEPERATOR}`));
|
||||
controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 3 })}${STREAM_SEPERATOR}`));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const mockFetch = vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
body: mockResponse,
|
||||
});
|
||||
|
||||
global.fetch = mockFetch;
|
||||
|
||||
const onChunkMock = vi.fn();
|
||||
const onDoneMock = vi.fn();
|
||||
const onErrorMock = vi.fn();
|
||||
|
||||
await streamRequest(
|
||||
{
|
||||
baseUrl: 'https://api.example.com',
|
||||
pushRef: '',
|
||||
},
|
||||
'/data',
|
||||
{ key: 'value' },
|
||||
onChunkMock,
|
||||
onDoneMock,
|
||||
onErrorMock,
|
||||
);
|
||||
|
||||
expect(mockFetch).toHaveBeenCalledWith('https://api.example.com/data', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ key: 'value' }),
|
||||
credentials: 'include',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'browser-id': expect.stringContaining('-'),
|
||||
},
|
||||
});
|
||||
|
||||
expect(onChunkMock).toHaveBeenCalledTimes(3);
|
||||
expect(onChunkMock).toHaveBeenNthCalledWith(1, { chunk: 1 });
|
||||
expect(onChunkMock).toHaveBeenNthCalledWith(2, { chunk: 2 });
|
||||
expect(onChunkMock).toHaveBeenNthCalledWith(3, { chunk: 3 });
|
||||
|
||||
expect(onDoneMock).toHaveBeenCalledTimes(1);
|
||||
expect(onErrorMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should handle broken stream data', async () => {
|
||||
const encoder = new TextEncoder();
|
||||
const mockResponse = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(
|
||||
encoder.encode(`${JSON.stringify({ chunk: 1 })}${STREAM_SEPERATOR}{"chunk": `),
|
||||
);
|
||||
controller.enqueue(encoder.encode(`2}${STREAM_SEPERATOR}{"ch`));
|
||||
controller.enqueue(encoder.encode('unk":'));
|
||||
controller.enqueue(encoder.encode(`3}${STREAM_SEPERATOR}`));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const mockFetch = vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
body: mockResponse,
|
||||
});
|
||||
|
||||
global.fetch = mockFetch;
|
||||
|
||||
const onChunkMock = vi.fn();
|
||||
const onDoneMock = vi.fn();
|
||||
const onErrorMock = vi.fn();
|
||||
|
||||
await streamRequest(
|
||||
{
|
||||
baseUrl: 'https://api.example.com',
|
||||
pushRef: '',
|
||||
},
|
||||
'/data',
|
||||
{ key: 'value' },
|
||||
onChunkMock,
|
||||
onDoneMock,
|
||||
onErrorMock,
|
||||
);
|
||||
|
||||
expect(mockFetch).toHaveBeenCalledWith('https://api.example.com/data', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ key: 'value' }),
|
||||
credentials: 'include',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'browser-id': expect.stringContaining('-'),
|
||||
},
|
||||
});
|
||||
|
||||
expect(onChunkMock).toHaveBeenCalledTimes(3);
|
||||
expect(onChunkMock).toHaveBeenNthCalledWith(1, { chunk: 1 });
|
||||
expect(onChunkMock).toHaveBeenNthCalledWith(2, { chunk: 2 });
|
||||
expect(onChunkMock).toHaveBeenNthCalledWith(3, { chunk: 3 });
|
||||
|
||||
expect(onDoneMock).toHaveBeenCalledTimes(1);
|
||||
expect(onErrorMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
|
@ -3,7 +3,6 @@ import axios from 'axios';
|
|||
import { ApplicationError, jsonParse, type GenericValue, type IDataObject } from 'n8n-workflow';
|
||||
import type { IExecutionFlattedResponse, IExecutionResponse, IRestApiContext } from '@/Interface';
|
||||
import { parse } from 'flatted';
|
||||
import type { ChatRequest } from '@/types/assistant.types';
|
||||
import { assert } from '@/utils/assert';
|
||||
|
||||
const BROWSER_ID_STORAGE_KEY = 'n8n-browserId';
|
||||
|
@ -14,6 +13,7 @@ if (!browserId && 'randomUUID' in crypto) {
|
|||
}
|
||||
|
||||
export const NO_NETWORK_ERROR_CODE = 999;
|
||||
export const STREAM_SEPERATOR = '⧉⇋⇋➽⌑⧉§§\n';
|
||||
|
||||
export class ResponseError extends ApplicationError {
|
||||
// The HTTP status code of response
|
||||
|
@ -194,15 +194,15 @@ export function unflattenExecutionData(fullExecutionData: IExecutionFlattedRespo
|
|||
return returnData;
|
||||
}
|
||||
|
||||
export const streamRequest = async (
|
||||
export async function streamRequest<T>(
|
||||
context: IRestApiContext,
|
||||
apiEndpoint: string,
|
||||
payload: ChatRequest.RequestPayload,
|
||||
onChunk?: (chunk: ChatRequest.ResponsePayload) => void,
|
||||
payload: object,
|
||||
onChunk?: (chunk: T) => void,
|
||||
onDone?: () => void,
|
||||
onError?: (e: Error) => void,
|
||||
separator = '⧉⇋⇋➽⌑⧉§§\n',
|
||||
): Promise<void> => {
|
||||
separator = STREAM_SEPERATOR,
|
||||
): Promise<void> {
|
||||
const headers: Record<string, string> = {
|
||||
'Content-Type': 'application/json',
|
||||
};
|
||||
|
@ -223,23 +223,36 @@ export const streamRequest = async (
|
|||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder('utf-8');
|
||||
|
||||
let buffer = '';
|
||||
|
||||
async function readStream() {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) {
|
||||
onDone?.();
|
||||
return;
|
||||
}
|
||||
|
||||
const chunk = decoder.decode(value);
|
||||
const splitChunks = chunk.split(separator);
|
||||
buffer += chunk;
|
||||
|
||||
const splitChunks = buffer.split(separator);
|
||||
|
||||
buffer = '';
|
||||
for (const splitChunk of splitChunks) {
|
||||
if (splitChunk && onChunk) {
|
||||
if (splitChunk) {
|
||||
let data: T;
|
||||
try {
|
||||
onChunk(jsonParse(splitChunk, { errorMessage: 'Invalid json chunk in stream' }));
|
||||
data = jsonParse<T>(splitChunk, { errorMessage: 'Invalid json' });
|
||||
} catch (e) {
|
||||
// incomplete json. append to buffer to complete
|
||||
buffer += splitChunk;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
onChunk?.(data);
|
||||
} catch (e: unknown) {
|
||||
if (e instanceof Error) {
|
||||
console.log(`${e.message}: ${splitChunk}`);
|
||||
onError?.(e);
|
||||
}
|
||||
}
|
||||
|
@ -257,4 +270,4 @@ export const streamRequest = async (
|
|||
assert(e instanceof Error);
|
||||
onError?.(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue