From b496bf3147d2cd873d24371be02cb7ea5dbd8621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 8 Nov 2024 10:09:24 +0100 Subject: [PATCH 1/2] fix(core): Ensure task runner server closes websocket connection correctly (#11633) --- .../__tests__/task-runner-server.test.ts | 67 +++++++++++++++++++ .../cli/src/runners/task-runner-server.ts | 2 +- 2 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 packages/cli/src/runners/__tests__/task-runner-server.test.ts diff --git a/packages/cli/src/runners/__tests__/task-runner-server.test.ts b/packages/cli/src/runners/__tests__/task-runner-server.test.ts new file mode 100644 index 0000000000..ae25cd1231 --- /dev/null +++ b/packages/cli/src/runners/__tests__/task-runner-server.test.ts @@ -0,0 +1,67 @@ +import type { GlobalConfig } from '@n8n/config'; +import { mock } from 'jest-mock-extended'; +import { ServerResponse } from 'node:http'; +import type WebSocket from 'ws'; + +import type { TaskRunnerAuthController } from '@/runners/auth/task-runner-auth.controller'; +import { TaskRunnerServer } from '@/runners/task-runner-server'; + +import type { TaskRunnerServerInitRequest } from '../runner-types'; + +describe('TaskRunnerServer', () => { + describe('handleUpgradeRequest', () => { + it('should close WebSocket when response status code is > 200', () => { + const ws = mock(); + const request = mock({ + url: '/runners/_ws', + ws, + }); + + const server = new TaskRunnerServer( + mock(), + mock({ taskRunners: { path: '/runners' } }), + mock(), + mock(), + ); + + // @ts-expect-error Private property + server.handleUpgradeRequest(request, mock(), Buffer.from('')); + + const response = new ServerResponse(request); + response.writeHead = (statusCode) => { + if (statusCode > 200) ws.close(); + return response; + }; + + response.writeHead(401); + expect(ws.close).toHaveBeenCalledWith(); // no args + }); + + it('should not close WebSocket when response status code is 200', () => { + const ws = mock(); + const request = mock({ + url: '/runners/_ws', + ws, + }); + + const server = new TaskRunnerServer( + mock(), + mock({ taskRunners: { path: '/runners' } }), + mock(), + mock(), + ); + + // @ts-expect-error Private property + server.handleUpgradeRequest(request, mock(), Buffer.from('')); + + const response = new ServerResponse(request); + response.writeHead = (statusCode) => { + if (statusCode > 200) ws.close(); + return response; + }; + + response.writeHead(200); + expect(ws.close).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/cli/src/runners/task-runner-server.ts b/packages/cli/src/runners/task-runner-server.ts index 6dd0fd5919..56c56e02ae 100644 --- a/packages/cli/src/runners/task-runner-server.ts +++ b/packages/cli/src/runners/task-runner-server.ts @@ -181,7 +181,7 @@ export class TaskRunnerServer { const response = new ServerResponse(request); response.writeHead = (statusCode) => { - if (statusCode > 200) ws.close(100); + if (statusCode > 200) ws.close(); return response; }; From 57467d0285d67509322630c4c01130022f274a41 Mon Sep 17 00:00:00 2001 From: Eugene Date: Fri, 8 Nov 2024 10:17:11 +0100 Subject: [PATCH 2/2] fix(core): Improve model sub-nodes error handling (#11418) --- .../LMChatAnthropic/LmChatAnthropic.node.ts | 2 + .../llms/LMChatOllama/LmChatOllama.node.ts | 2 + .../llms/LMChatOpenAi/LmChatOpenAi.node.ts | 29 ++------ .../nodes/llms/LMCohere/LmCohere.node.ts | 2 + .../nodes/llms/LMOllama/LmOllama.node.ts | 2 + .../nodes/llms/LMOpenAi/LmOpenAi.node.ts | 2 + .../LmOpenHuggingFaceInference.node.ts | 2 + .../LmChatAwsBedrock/LmChatAwsBedrock.node.ts | 2 + .../LmChatAzureOpenAi.node.ts | 2 + .../LmChatGoogleGemini.node.ts | 2 + .../LmChatGoogleVertex.node.ts | 6 +- .../nodes/llms/LmChatGroq/LmChatGroq.node.ts | 2 + .../LmChatMistralCloud.node.ts | 2 + .../nodes/llms/N8nLlmTracing.ts | 33 ++++++++-- .../n8nDefaultFailedAttemptHandler.test.ts | 66 +++++++++++++++++++ .../llms/n8nDefaultFailedAttemptHandler.ts | 41 ++++++++++++ .../llms/n8nLlmFailedAttemptHandler.test.ts | 65 ++++++++++++++++++ .../nodes/llms/n8nLlmFailedAttemptHandler.ts | 46 +++++++++++++ .../vendors/OpenAi/helpers/error-handling.ts | 18 +++++ .../@n8n/nodes-langchain/utils/logWrapper.ts | 19 +----- .../src/composables/usePushConnection.ts | 11 ++-- packages/editor-ui/src/utils/expressions.ts | 2 +- .../workflow/src/errors/expression.error.ts | 6 +- .../workflow/src/errors/node-api.error.ts | 2 +- 24 files changed, 310 insertions(+), 56 deletions(-) create mode 100644 packages/@n8n/nodes-langchain/nodes/llms/n8nDefaultFailedAttemptHandler.test.ts create mode 100644 packages/@n8n/nodes-langchain/nodes/llms/n8nDefaultFailedAttemptHandler.ts create mode 100644 packages/@n8n/nodes-langchain/nodes/llms/n8nLlmFailedAttemptHandler.test.ts create mode 100644 packages/@n8n/nodes-langchain/nodes/llms/n8nLlmFailedAttemptHandler.ts diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMChatAnthropic/LmChatAnthropic.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMChatAnthropic/LmChatAnthropic.node.ts index fa51ed3a45..c575b59aa8 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMChatAnthropic/LmChatAnthropic.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMChatAnthropic/LmChatAnthropic.node.ts @@ -14,6 +14,7 @@ import { import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { N8nLlmTracing } from '../N8nLlmTracing'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; const modelField: INodeProperties = { displayName: 'Model', @@ -214,6 +215,7 @@ export class LmChatAnthropic implements INodeType { topK: options.topK, topP: options.topP, callbacks: [new N8nLlmTracing(this, { tokensUsageParser })], + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMChatOllama/LmChatOllama.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMChatOllama/LmChatOllama.node.ts index dc2f716b2b..354e54e27a 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMChatOllama/LmChatOllama.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMChatOllama/LmChatOllama.node.ts @@ -12,6 +12,7 @@ import { ChatOllama } from '@langchain/ollama'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { ollamaModel, ollamaOptions, ollamaDescription } from '../LMOllama/description'; import { N8nLlmTracing } from '../N8nLlmTracing'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; export class LmChatOllama implements INodeType { description: INodeTypeDescription = { @@ -64,6 +65,7 @@ export class LmChatOllama implements INodeType { model: modelName, format: options.format === 'default' ? undefined : options.format, callbacks: [new N8nLlmTracing(this)], + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMChatOpenAi/LmChatOpenAi.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMChatOpenAi/LmChatOpenAi.node.ts index 2e724bf3a7..2e55e56722 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMChatOpenAi/LmChatOpenAi.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMChatOpenAi/LmChatOpenAi.node.ts @@ -1,19 +1,18 @@ /* eslint-disable n8n-nodes-base/node-dirname-against-convention */ + +import { ChatOpenAI, type ClientOptions } from '@langchain/openai'; import { NodeConnectionType, type INodeType, type INodeTypeDescription, type ISupplyDataFunctions, type SupplyData, - type JsonObject, - NodeApiError, } from 'n8n-workflow'; -import { ChatOpenAI, type ClientOptions } from '@langchain/openai'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { openAiFailedAttemptHandler } from '../../vendors/OpenAi/helpers/error-handling'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; import { N8nLlmTracing } from '../N8nLlmTracing'; -import { RateLimitError } from 'openai'; -import { getCustomErrorMessage } from '../../vendors/OpenAi/helpers/error-handling'; export class LmChatOpenAi implements INodeType { description: INodeTypeDescription = { @@ -276,25 +275,7 @@ export class LmChatOpenAi implements INodeType { response_format: { type: options.responseFormat }, } : undefined, - onFailedAttempt: (error: any) => { - // If the error is a rate limit error, we want to handle it differently - // because OpenAI has multiple different rate limit errors - if (error instanceof RateLimitError) { - const errorCode = error?.code; - if (errorCode) { - const customErrorMessage = getCustomErrorMessage(errorCode); - - const apiError = new NodeApiError(this.getNode(), error as unknown as JsonObject); - if (customErrorMessage) { - apiError.message = customErrorMessage; - } - - throw apiError; - } - } - - throw error; - }, + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this, openAiFailedAttemptHandler), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMCohere/LmCohere.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMCohere/LmCohere.node.ts index 6957cd9d9a..4b5f85f915 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMCohere/LmCohere.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMCohere/LmCohere.node.ts @@ -10,6 +10,7 @@ import { import { Cohere } from '@langchain/cohere'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { N8nLlmTracing } from '../N8nLlmTracing'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; export class LmCohere implements INodeType { description: INodeTypeDescription = { @@ -99,6 +100,7 @@ export class LmCohere implements INodeType { apiKey: credentials.apiKey as string, ...options, callbacks: [new N8nLlmTracing(this)], + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMOllama/LmOllama.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMOllama/LmOllama.node.ts index f71708cbca..ddd565e3a9 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMOllama/LmOllama.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMOllama/LmOllama.node.ts @@ -11,6 +11,7 @@ import { Ollama } from '@langchain/community/llms/ollama'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { N8nLlmTracing } from '../N8nLlmTracing'; import { ollamaDescription, ollamaModel, ollamaOptions } from './description'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; export class LmOllama implements INodeType { description: INodeTypeDescription = { @@ -62,6 +63,7 @@ export class LmOllama implements INodeType { model: modelName, ...options, callbacks: [new N8nLlmTracing(this)], + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMOpenAi/LmOpenAi.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMOpenAi/LmOpenAi.node.ts index 5fb9be937e..41cb622294 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMOpenAi/LmOpenAi.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMOpenAi/LmOpenAi.node.ts @@ -10,6 +10,7 @@ import type { import { OpenAI, type ClientOptions } from '@langchain/openai'; import { N8nLlmTracing } from '../N8nLlmTracing'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; type LmOpenAiOptions = { baseURL?: string; @@ -260,6 +261,7 @@ export class LmOpenAi implements INodeType { timeout: options.timeout ?? 60000, maxRetries: options.maxRetries ?? 2, callbacks: [new N8nLlmTracing(this)], + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMOpenHuggingFaceInference/LmOpenHuggingFaceInference.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMOpenHuggingFaceInference/LmOpenHuggingFaceInference.node.ts index ddf8065bf6..7823c91b52 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMOpenHuggingFaceInference/LmOpenHuggingFaceInference.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMOpenHuggingFaceInference/LmOpenHuggingFaceInference.node.ts @@ -10,6 +10,7 @@ import { import { HuggingFaceInference } from '@langchain/community/llms/hf'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { N8nLlmTracing } from '../N8nLlmTracing'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; export class LmOpenHuggingFaceInference implements INodeType { description: INodeTypeDescription = { @@ -143,6 +144,7 @@ export class LmOpenHuggingFaceInference implements INodeType { apiKey: credentials.apiKey as string, ...options, callbacks: [new N8nLlmTracing(this)], + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatAwsBedrock/LmChatAwsBedrock.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatAwsBedrock/LmChatAwsBedrock.node.ts index b4eafde76e..3d928ce801 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatAwsBedrock/LmChatAwsBedrock.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatAwsBedrock/LmChatAwsBedrock.node.ts @@ -10,6 +10,7 @@ import { import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { N8nLlmTracing } from '../N8nLlmTracing'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; export class LmChatAwsBedrock implements INodeType { description: INodeTypeDescription = { @@ -151,6 +152,7 @@ export class LmChatAwsBedrock implements INodeType { sessionToken: credentials.sessionToken as string, }, callbacks: [new N8nLlmTracing(this)], + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatAzureOpenAi/LmChatAzureOpenAi.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatAzureOpenAi/LmChatAzureOpenAi.node.ts index 55a5afb7ce..ffa7f4d58f 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatAzureOpenAi/LmChatAzureOpenAi.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatAzureOpenAi/LmChatAzureOpenAi.node.ts @@ -10,6 +10,7 @@ import { import { ChatOpenAI } from '@langchain/openai'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { N8nLlmTracing } from '../N8nLlmTracing'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; export class LmChatAzureOpenAi implements INodeType { description: INodeTypeDescription = { @@ -195,6 +196,7 @@ export class LmChatAzureOpenAi implements INodeType { response_format: { type: options.responseFormat }, } : undefined, + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleGemini/LmChatGoogleGemini.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleGemini/LmChatGoogleGemini.node.ts index 44691a47ef..9bade1e26a 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleGemini/LmChatGoogleGemini.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleGemini/LmChatGoogleGemini.node.ts @@ -11,6 +11,7 @@ import type { SafetySetting } from '@google/generative-ai'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { N8nLlmTracing } from '../N8nLlmTracing'; import { additionalOptions } from '../gemini-common/additional-options'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; export class LmChatGoogleGemini implements INodeType { description: INodeTypeDescription = { @@ -144,6 +145,7 @@ export class LmChatGoogleGemini implements INodeType { maxOutputTokens: options.maxOutputTokens, safetySettings, callbacks: [new N8nLlmTracing(this)], + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleVertex/LmChatGoogleVertex.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleVertex/LmChatGoogleVertex.node.ts index a9a01ebf1b..92b51e534f 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleVertex/LmChatGoogleVertex.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleVertex/LmChatGoogleVertex.node.ts @@ -17,6 +17,7 @@ import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { N8nLlmTracing } from '../N8nLlmTracing'; import { additionalOptions } from '../gemini-common/additional-options'; import { makeErrorFromStatus } from './error-handling'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; export class LmChatGoogleVertex implements INodeType { description: INodeTypeDescription = { @@ -170,7 +171,8 @@ export class LmChatGoogleVertex implements INodeType { safetySettings, callbacks: [new N8nLlmTracing(this)], // Handle ChatVertexAI invocation errors to provide better error messages - onFailedAttempt: (error: any) => { + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this, (error: any) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access const customError = makeErrorFromStatus(Number(error?.response?.status), { modelName, }); @@ -180,7 +182,7 @@ export class LmChatGoogleVertex implements INodeType { } throw error; - }, + }), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGroq/LmChatGroq.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGroq/LmChatGroq.node.ts index 3588cf0cc3..1494dbcf55 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGroq/LmChatGroq.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGroq/LmChatGroq.node.ts @@ -10,6 +10,7 @@ import { import { ChatGroq } from '@langchain/groq'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { N8nLlmTracing } from '../N8nLlmTracing'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; export class LmChatGroq implements INodeType { description: INodeTypeDescription = { @@ -144,6 +145,7 @@ export class LmChatGroq implements INodeType { maxTokens: options.maxTokensToSample, temperature: options.temperature, callbacks: [new N8nLlmTracing(this)], + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatMistralCloud/LmChatMistralCloud.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatMistralCloud/LmChatMistralCloud.node.ts index 5ff28bd30d..edd533b6ba 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatMistralCloud/LmChatMistralCloud.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatMistralCloud/LmChatMistralCloud.node.ts @@ -11,6 +11,7 @@ import type { ChatMistralAIInput } from '@langchain/mistralai'; import { ChatMistralAI } from '@langchain/mistralai'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { N8nLlmTracing } from '../N8nLlmTracing'; +import { makeN8nLlmFailedAttemptHandler } from '../n8nLlmFailedAttemptHandler'; export class LmChatMistralCloud implements INodeType { description: INodeTypeDescription = { @@ -190,6 +191,7 @@ export class LmChatMistralCloud implements INodeType { modelName, ...options, callbacks: [new N8nLlmTracing(this)], + onFailedAttempt: makeN8nLlmFailedAttemptHandler(this), }); return { diff --git a/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts b/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts index 660bf3b0a9..af60b72982 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts @@ -1,17 +1,18 @@ import { BaseCallbackHandler } from '@langchain/core/callbacks/base'; +import type { SerializedFields } from '@langchain/core/dist/load/map_keys'; import { getModelNameForTiktoken } from '@langchain/core/language_models/base'; -import { encodingForModel } from '@langchain/core/utils/tiktoken'; import type { Serialized, SerializedNotImplemented, SerializedSecret, } from '@langchain/core/load/serializable'; -import type { LLMResult } from '@langchain/core/outputs'; -import type { IDataObject, ISupplyDataFunctions } from 'n8n-workflow'; -import { NodeConnectionType } from 'n8n-workflow'; -import { pick } from 'lodash'; import type { BaseMessage } from '@langchain/core/messages'; -import type { SerializedFields } from '@langchain/core/dist/load/map_keys'; +import type { LLMResult } from '@langchain/core/outputs'; +import { encodingForModel } from '@langchain/core/utils/tiktoken'; +import type { IDataObject, ISupplyDataFunctions, JsonObject } from 'n8n-workflow'; +import { pick } from 'lodash'; +import { NodeConnectionType, NodeError, NodeOperationError } from 'n8n-workflow'; + import { logAiEvent } from '../../utils/helpers'; type TokensUsageParser = (llmOutput: LLMResult['llmOutput']) => { @@ -30,6 +31,10 @@ const TIKTOKEN_ESTIMATE_MODEL = 'gpt-4o'; export class N8nLlmTracing extends BaseCallbackHandler { name = 'N8nLlmTracing'; + // This flag makes sure that LangChain will wait for the handlers to finish before continuing + // This is crucial for the handleLLMError handler to work correctly (it should be called before the error is propagated to the root node) + awaitHandlers = true; + connectionType = NodeConnectionType.AiLanguageModel; promptTokensEstimate = 0; @@ -135,6 +140,7 @@ export class N8nLlmTracing extends BaseCallbackHandler { this.executionFunctions.addOutputData(this.connectionType, runDetails.index, [ [{ json: { ...response } }], ]); + logAiEvent(this.executionFunctions, 'ai-llm-generated-output', { messages: parsedMessages, options: runDetails.options, @@ -172,6 +178,8 @@ export class N8nLlmTracing extends BaseCallbackHandler { runId: string, parentRunId?: string | undefined, ) { + const runDetails = this.runsMap[runId] ?? { index: Object.keys(this.runsMap).length }; + // Filter out non-x- headers to avoid leaking sensitive information in logs if (typeof error === 'object' && error?.hasOwnProperty('headers')) { const errorWithHeaders = error as { headers: Record }; @@ -183,6 +191,19 @@ export class N8nLlmTracing extends BaseCallbackHandler { }); } + if (error instanceof NodeError) { + this.executionFunctions.addOutputData(this.connectionType, runDetails.index, error); + } else { + // If the error is not a NodeError, we wrap it in a NodeOperationError + this.executionFunctions.addOutputData( + this.connectionType, + runDetails.index, + new NodeOperationError(this.executionFunctions.getNode(), error as JsonObject, { + functionality: 'configuration-node', + }), + ); + } + logAiEvent(this.executionFunctions, 'ai-llm-errored', { error: Object.keys(error).length === 0 ? error.toString() : error, runId, diff --git a/packages/@n8n/nodes-langchain/nodes/llms/n8nDefaultFailedAttemptHandler.test.ts b/packages/@n8n/nodes-langchain/nodes/llms/n8nDefaultFailedAttemptHandler.test.ts new file mode 100644 index 0000000000..6f81cc1368 --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/llms/n8nDefaultFailedAttemptHandler.test.ts @@ -0,0 +1,66 @@ +import { n8nDefaultFailedAttemptHandler } from './n8nDefaultFailedAttemptHandler'; + +class MockHttpError extends Error { + response: { status: number }; + + constructor(message: string, code: number) { + super(message); + this.response = { status: code }; + } +} + +describe('n8nDefaultFailedAttemptHandler', () => { + it('should throw error if message starts with "Cancel"', () => { + const error = new Error('Cancel operation'); + expect(() => n8nDefaultFailedAttemptHandler(error)).toThrow(error); + }); + + it('should throw error if message starts with "AbortError"', () => { + const error = new Error('AbortError occurred'); + expect(() => n8nDefaultFailedAttemptHandler(error)).toThrow(error); + }); + + it('should throw error if name is "AbortError"', () => { + class MockAbortError extends Error { + constructor() { + super('Some error'); + this.name = 'AbortError'; + } + } + + const error = new MockAbortError(); + + expect(() => n8nDefaultFailedAttemptHandler(error)).toThrow(error); + }); + + it('should throw error if code is "ECONNABORTED"', () => { + class MockAbortError extends Error { + code: string; + + constructor() { + super('Some error'); + this.code = 'ECONNABORTED'; + } + } + + const error = new MockAbortError(); + expect(() => n8nDefaultFailedAttemptHandler(error)).toThrow(error); + }); + + it('should throw error if status is in STATUS_NO_RETRY', () => { + const error = new MockHttpError('Some error', 400); + expect(() => n8nDefaultFailedAttemptHandler(error)).toThrow(error); + }); + + it('should not throw error if status is not in STATUS_NO_RETRY', () => { + const error = new MockHttpError('Some error', 500); + error.response = { status: 500 }; + + expect(() => n8nDefaultFailedAttemptHandler(error)).not.toThrow(); + }); + + it('should not throw error if no conditions are met', () => { + const error = new Error('Some random error'); + expect(() => n8nDefaultFailedAttemptHandler(error)).not.toThrow(); + }); +}); diff --git a/packages/@n8n/nodes-langchain/nodes/llms/n8nDefaultFailedAttemptHandler.ts b/packages/@n8n/nodes-langchain/nodes/llms/n8nDefaultFailedAttemptHandler.ts new file mode 100644 index 0000000000..11bbc5c87e --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/llms/n8nDefaultFailedAttemptHandler.ts @@ -0,0 +1,41 @@ +const STATUS_NO_RETRY = [ + 400, // Bad Request + 401, // Unauthorized + 402, // Payment Required + 403, // Forbidden + 404, // Not Found + 405, // Method Not Allowed + 406, // Not Acceptable + 407, // Proxy Authentication Required + 409, // Conflict +]; + +/** + * This function is used as a default handler for failed attempts in all LLMs. + * It is based on a default handler from the langchain core package. + * It throws an error when it encounters a known error that should not be retried. + * @param error + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const n8nDefaultFailedAttemptHandler = (error: any) => { + if ( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access,@typescript-eslint/no-unsafe-call + error?.message?.startsWith?.('Cancel') || + error?.message?.startsWith?.('AbortError') || + error?.name === 'AbortError' + ) { + throw error; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any,@typescript-eslint/no-unsafe-member-access + if (error?.code === 'ECONNABORTED') { + throw error; + } + + const status = + // eslint-disable-next-line @typescript-eslint/no-explicit-any,@typescript-eslint/no-unsafe-member-access + error?.response?.status ?? error?.status; + if (status && STATUS_NO_RETRY.includes(+status)) { + throw error; + } +}; diff --git a/packages/@n8n/nodes-langchain/nodes/llms/n8nLlmFailedAttemptHandler.test.ts b/packages/@n8n/nodes-langchain/nodes/llms/n8nLlmFailedAttemptHandler.test.ts new file mode 100644 index 0000000000..20c664c3c8 --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/llms/n8nLlmFailedAttemptHandler.test.ts @@ -0,0 +1,65 @@ +import { mock } from 'jest-mock-extended'; +import type { ISupplyDataFunctions } from 'n8n-workflow'; +import { ApplicationError, NodeApiError } from 'n8n-workflow'; + +import { makeN8nLlmFailedAttemptHandler } from './n8nLlmFailedAttemptHandler'; + +describe('makeN8nLlmFailedAttemptHandler', () => { + const ctx = mock({ + getNode: jest.fn(), + }); + + it('should throw a wrapped error, when NO custom handler is provided', () => { + const handler = makeN8nLlmFailedAttemptHandler(ctx); + + expect(() => handler(new Error('Test error'))).toThrow(NodeApiError); + }); + + it('should wrapped error when custom handler is provided', () => { + const customHandler = jest.fn(); + const handler = makeN8nLlmFailedAttemptHandler(ctx, customHandler); + + expect(() => handler(new Error('Test error'))).toThrow(NodeApiError); + expect(customHandler).toHaveBeenCalled(); + }); + + it('should throw wrapped exception from custom handler', () => { + const customHandler = jest.fn(() => { + throw new ApplicationError('Custom handler error'); + }); + const handler = makeN8nLlmFailedAttemptHandler(ctx, customHandler); + + expect(() => handler(new Error('Test error'))).toThrow('Custom handler error'); + expect(customHandler).toHaveBeenCalled(); + }); + + it('should not throw if retries are left', () => { + const customHandler = jest.fn(); + const handler = makeN8nLlmFailedAttemptHandler(ctx, customHandler); + + const error = new Error('Test error'); + (error as any).retriesLeft = 1; + + expect(() => handler(error)).not.toThrow(); + }); + + it('should throw NodeApiError if no retries are left', () => { + const handler = makeN8nLlmFailedAttemptHandler(ctx); + + const error = new Error('Test error'); + (error as any).retriesLeft = 0; + + expect(() => handler(error)).toThrow(NodeApiError); + }); + + it('should throw NodeApiError if no retries are left with custom handler', () => { + const customHandler = jest.fn(); + const handler = makeN8nLlmFailedAttemptHandler(ctx, customHandler); + + const error = new Error('Test error'); + (error as any).retriesLeft = 0; + + expect(() => handler(error)).toThrow(NodeApiError); + expect(customHandler).toHaveBeenCalled(); + }); +}); diff --git a/packages/@n8n/nodes-langchain/nodes/llms/n8nLlmFailedAttemptHandler.ts b/packages/@n8n/nodes-langchain/nodes/llms/n8nLlmFailedAttemptHandler.ts new file mode 100644 index 0000000000..61e8d16581 --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/llms/n8nLlmFailedAttemptHandler.ts @@ -0,0 +1,46 @@ +import type { FailedAttemptHandler } from '@langchain/core/dist/utils/async_caller'; +import type { ISupplyDataFunctions, JsonObject } from 'n8n-workflow'; +import { NodeApiError } from 'n8n-workflow'; + +import { n8nDefaultFailedAttemptHandler } from './n8nDefaultFailedAttemptHandler'; + +/** + * This function returns a custom failed attempt handler for using with LangChain models. + * It first tries to use a custom handler passed as an argument, and if that doesn't throw an error, it uses the default handler. + * It always wraps the error in a NodeApiError. + * It throws an error ONLY if there are no retries left. + */ +export const makeN8nLlmFailedAttemptHandler = ( + ctx: ISupplyDataFunctions, + handler?: FailedAttemptHandler, +): FailedAttemptHandler => { + return (error: any) => { + try { + // Try custom error handler first + handler?.(error); + + // If it didn't throw an error, use the default handler + n8nDefaultFailedAttemptHandler(error); + } catch (e) { + // Wrap the error in a NodeApiError + const apiError = new NodeApiError(ctx.getNode(), e as unknown as JsonObject, { + functionality: 'configuration-node', + }); + + throw apiError; + } + + // If no error was thrown, check if it is the last retry + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + if (error?.retriesLeft > 0) { + return; + } + + // If there are no retries left, throw the error wrapped in a NodeApiError + const apiError = new NodeApiError(ctx.getNode(), error as unknown as JsonObject, { + functionality: 'configuration-node', + }); + + throw apiError; + }; +}; diff --git a/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/helpers/error-handling.ts b/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/helpers/error-handling.ts index 8db54a1b86..5cea5eaf51 100644 --- a/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/helpers/error-handling.ts +++ b/packages/@n8n/nodes-langchain/nodes/vendors/OpenAi/helpers/error-handling.ts @@ -1,4 +1,5 @@ import { OpenAIError } from 'openai/error'; +import { RateLimitError } from 'openai'; const errorMap: Record = { insufficient_quota: 'OpenAI: Insufficient quota', @@ -12,3 +13,20 @@ export function getCustomErrorMessage(errorCode: string): string | undefined { export function isOpenAiError(error: any): error is OpenAIError { return error instanceof OpenAIError; } + +export const openAiFailedAttemptHandler = (error: any) => { + if (error instanceof RateLimitError) { + // If the error is a rate limit error, we want to handle it differently + // because OpenAI has multiple different rate limit errors + const errorCode = error?.code; + if (errorCode) { + const customErrorMessage = getCustomErrorMessage(errorCode); + + if (customErrorMessage) { + error.message = customErrorMessage; + } + } + + throw error; + } +}; diff --git a/packages/@n8n/nodes-langchain/utils/logWrapper.ts b/packages/@n8n/nodes-langchain/utils/logWrapper.ts index eca1431a4b..10e55ba6ef 100644 --- a/packages/@n8n/nodes-langchain/utils/logWrapper.ts +++ b/packages/@n8n/nodes-langchain/utils/logWrapper.ts @@ -17,13 +17,6 @@ import { logAiEvent, isToolsInstance, isBaseChatMemory, isBaseChatMessageHistory import { N8nBinaryLoader } from './N8nBinaryLoader'; import { N8nJsonLoader } from './N8nJsonLoader'; -const errorsMap: { [key: string]: { message: string; description: string } } = { - 'You exceeded your current quota, please check your plan and billing details.': { - message: 'OpenAI quota exceeded', - description: 'You exceeded your current quota, please check your plan and billing details.', - }, -}; - export async function callMethodAsync( this: T, parameters: { @@ -37,30 +30,25 @@ export async function callMethodAsync( try { return await parameters.method.call(this, ...parameters.arguments); } catch (e) { - // Propagate errors from sub-nodes - if (e.functionality === 'configuration-node') throw e; const connectedNode = parameters.executeFunctions.getNode(); const error = new NodeOperationError(connectedNode, e, { functionality: 'configuration-node', }); - if (errorsMap[error.message]) { - error.description = errorsMap[error.message].description; - error.message = errorsMap[error.message].message; - } - parameters.executeFunctions.addOutputData( parameters.connectionType, parameters.currentNodeRunIndex, error, ); + if (error.message) { if (!error.description) { error.description = error.message; } throw error; } + throw new NodeOperationError( connectedNode, `Error on node "${connectedNode.name}" which is connected via input "${parameters.connectionType}"`, @@ -82,8 +70,6 @@ export function callMethodSync( try { return parameters.method.call(this, ...parameters.arguments); } catch (e) { - // Propagate errors from sub-nodes - if (e.functionality === 'configuration-node') throw e; const connectedNode = parameters.executeFunctions.getNode(); const error = new NodeOperationError(connectedNode, e); parameters.executeFunctions.addOutputData( @@ -91,6 +77,7 @@ export function callMethodSync( parameters.currentNodeRunIndex, error, ); + throw new NodeOperationError( connectedNode, `Error on node "${connectedNode.name}" which is connected via input "${parameters.connectionType}"`, diff --git a/packages/editor-ui/src/composables/usePushConnection.ts b/packages/editor-ui/src/composables/usePushConnection.ts index fa6515a613..ea9f79d27e 100644 --- a/packages/editor-ui/src/composables/usePushConnection.ts +++ b/packages/editor-ui/src/composables/usePushConnection.ts @@ -1,6 +1,7 @@ import { parse } from 'flatted'; import { h, ref } from 'vue'; import type { useRouter } from 'vue-router'; +import { TelemetryHelpers } from 'n8n-workflow'; import type { ExpressionError, IDataObject, @@ -12,8 +13,8 @@ import type { IExecuteContextData, NodeOperationError, INodeTypeDescription, + NodeError, } from 'n8n-workflow'; -import { TelemetryHelpers } from 'n8n-workflow'; import type { PushMessage, PushPayload } from '@n8n/api-types'; import type { IExecutionResponse, IExecutionsCurrentSummaryExtended } from '@/Interface'; @@ -322,8 +323,7 @@ export function usePushConnection({ router }: { router: ReturnType { - return error instanceof ExpressionError && error.context.functionality === 'pairedItem'; + return error instanceof ExpressionError && error.functionality === 'pairedItem'; }; export const getResolvableState = (error: unknown, ignoreError = false): ResolvableState => { diff --git a/packages/workflow/src/errors/expression.error.ts b/packages/workflow/src/errors/expression.error.ts index 02b3c74444..32aaae3812 100644 --- a/packages/workflow/src/errors/expression.error.ts +++ b/packages/workflow/src/errors/expression.error.ts @@ -40,7 +40,6 @@ export class ExpressionError extends ExecutionBaseError { 'causeDetailed', 'descriptionTemplate', 'descriptionKey', - 'functionality', 'itemIndex', 'messageTemplate', 'nodeCause', @@ -48,7 +47,12 @@ export class ExpressionError extends ExecutionBaseError { 'runIndex', 'type', ]; + if (options !== undefined) { + if (options.functionality !== undefined) { + this.functionality = options.functionality; + } + Object.keys(options as IDataObject).forEach((key) => { if (allowedKeys.includes(key)) { this.context[key] = (options as IDataObject)[key]; diff --git a/packages/workflow/src/errors/node-api.error.ts b/packages/workflow/src/errors/node-api.error.ts index f18f2e6fed..aeae1c754e 100644 --- a/packages/workflow/src/errors/node-api.error.ts +++ b/packages/workflow/src/errors/node-api.error.ts @@ -261,7 +261,7 @@ export class NodeApiError extends NodeError { messageMapping, ); - if (functionality !== undefined) this.context.functionality = functionality; + if (functionality !== undefined) this.functionality = functionality; if (runIndex !== undefined) this.context.runIndex = runIndex; if (itemIndex !== undefined) this.context.itemIndex = itemIndex; }