From 359ade45bcaeafd47cff26b82a3c56a28f91e625 Mon Sep 17 00:00:00 2001 From: oleg Date: Sun, 12 May 2024 21:12:07 +0200 Subject: [PATCH] refactor: Implement LLM tracing callback to improve parsing of tokens usage stats (#9311) Signed-off-by: Oleg Ivaniv --- .../LMChatAnthropic/LmChatAnthropic.node.ts | 17 +- .../llms/LMChatOllama/LmChatOllama.node.ts | 5 +- .../llms/LMChatOpenAi/LmChatOpenAi.node.ts | 5 +- .../nodes/llms/LMCohere/LmCohere.node.ts | 5 +- .../nodes/llms/LMOllama/LmOllama.node.ts | 5 +- .../nodes/llms/LMOpenAi/LmOpenAi.node.ts | 5 +- .../LmOpenHuggingFaceInference.node.ts | 5 +- .../LmChatAwsBedrock/LmChatAwsBedrock.node.ts | 5 +- .../LmChatAzureOpenAi.node.ts | 5 +- .../LmChatGoogleGemini.node.ts | 5 +- .../LmChatGooglePalm/LmChatGooglePalm.node.ts | 5 +- .../nodes/llms/LmChatGroq/LmChatGroq.node.ts | 5 +- .../LmChatMistralCloud.node.ts | 5 +- .../llms/LmGooglePalm/LmGooglePalm.node.ts | 5 +- .../nodes/llms/N8nLlmTracing.ts | 193 ++++++++++++++++++ .../@n8n/nodes-langchain/utils/logWrapper.ts | 63 +----- packages/core/src/WorkflowExecute.ts | 25 ++- .../components/RunDataAi/RunDataAiContent.vue | 29 +-- packages/workflow/src/Interfaces.ts | 1 + 19 files changed, 282 insertions(+), 111 deletions(-) create mode 100644 packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMChatAnthropic/LmChatAnthropic.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMChatAnthropic/LmChatAnthropic.node.ts index 1140c8aa8f..caf77dd14e 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMChatAnthropic/LmChatAnthropic.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMChatAnthropic/LmChatAnthropic.node.ts @@ -9,8 +9,9 @@ import { } from 'n8n-workflow'; import { ChatAnthropic } from '@langchain/anthropic'; -import { logWrapper } from '../../../utils/logWrapper'; +import type { LLMResult } from '@langchain/core/outputs'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; const modelField: INodeProperties = { displayName: 'Model', @@ -166,6 +167,17 @@ export class LmChatAnthropic implements INodeType { topP: number; }; + const tokensUsageParser = (llmOutput: LLMResult['llmOutput']) => { + const usage = (llmOutput?.usage as { input_tokens: number; output_tokens: number }) ?? { + input_tokens: 0, + output_tokens: 0, + }; + return { + completionTokens: usage.output_tokens, + promptTokens: usage.input_tokens, + totalTokens: usage.input_tokens + usage.output_tokens, + }; + }; const model = new ChatAnthropic({ anthropicApiKey: credentials.apiKey as string, modelName, @@ -173,10 +185,11 @@ export class LmChatAnthropic implements INodeType { temperature: options.temperature, topK: options.topK, topP: options.topP, + callbacks: [new N8nLlmTracing(this, { tokensUsageParser })], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMChatOllama/LmChatOllama.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMChatOllama/LmChatOllama.node.ts index c5cd5eabfb..8ef8d01b01 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMChatOllama/LmChatOllama.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMChatOllama/LmChatOllama.node.ts @@ -9,9 +9,9 @@ import { import type { ChatOllamaInput } from '@langchain/community/chat_models/ollama'; import { ChatOllama } from '@langchain/community/chat_models/ollama'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; import { ollamaModel, ollamaOptions, ollamaDescription } from '../LMOllama/description'; +import { N8nLlmTracing } from '../N8nLlmTracing'; export class LmChatOllama implements INodeType { description: INodeTypeDescription = { @@ -62,10 +62,11 @@ export class LmChatOllama implements INodeType { baseUrl: credentials.baseUrl as string, model: modelName, format: options.format === 'default' ? undefined : options.format, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMChatOpenAi/LmChatOpenAi.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMChatOpenAi/LmChatOpenAi.node.ts index 9d676b91c4..42bb08aaea 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMChatOpenAi/LmChatOpenAi.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMChatOpenAi/LmChatOpenAi.node.ts @@ -8,8 +8,8 @@ import { } from 'n8n-workflow'; import { ChatOpenAI, type ClientOptions } from '@langchain/openai'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; export class LmChatOpenAi implements INodeType { description: INodeTypeDescription = { @@ -247,6 +247,7 @@ export class LmChatOpenAi implements INodeType { timeout: options.timeout ?? 60000, maxRetries: options.maxRetries ?? 2, configuration, + callbacks: [new N8nLlmTracing(this)], modelKwargs: options.responseFormat ? { response_format: { type: options.responseFormat }, @@ -255,7 +256,7 @@ export class LmChatOpenAi implements INodeType { }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMCohere/LmCohere.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMCohere/LmCohere.node.ts index 8c127bb5ef..6322c2c908 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMCohere/LmCohere.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMCohere/LmCohere.node.ts @@ -8,8 +8,8 @@ import { } from 'n8n-workflow'; import { Cohere } from '@langchain/cohere'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; export class LmCohere implements INodeType { description: INodeTypeDescription = { @@ -97,10 +97,11 @@ export class LmCohere implements INodeType { const model = new Cohere({ apiKey: credentials.apiKey as string, ...options, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMOllama/LmOllama.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMOllama/LmOllama.node.ts index 8904d5b3b6..ee416ccbbb 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMOllama/LmOllama.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMOllama/LmOllama.node.ts @@ -8,8 +8,8 @@ import { } from 'n8n-workflow'; import { Ollama } from '@langchain/community/llms/ollama'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; import { ollamaDescription, ollamaModel, ollamaOptions } from './description'; export class LmOllama implements INodeType { @@ -60,10 +60,11 @@ export class LmOllama implements INodeType { baseUrl: credentials.baseUrl as string, model: modelName, ...options, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMOpenAi/LmOpenAi.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMOpenAi/LmOpenAi.node.ts index 55398a60b1..70b8970cc2 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMOpenAi/LmOpenAi.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMOpenAi/LmOpenAi.node.ts @@ -9,8 +9,8 @@ import type { } from 'n8n-workflow'; import { OpenAI, type ClientOptions } from '@langchain/openai'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; type LmOpenAiOptions = { baseURL?: string; @@ -240,10 +240,11 @@ export class LmOpenAi implements INodeType { configuration, timeout: options.timeout ?? 60000, maxRetries: options.maxRetries ?? 2, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LMOpenHuggingFaceInference/LmOpenHuggingFaceInference.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LMOpenHuggingFaceInference/LmOpenHuggingFaceInference.node.ts index 43092ab917..f0a248c3fb 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LMOpenHuggingFaceInference/LmOpenHuggingFaceInference.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LMOpenHuggingFaceInference/LmOpenHuggingFaceInference.node.ts @@ -8,8 +8,8 @@ import { } from 'n8n-workflow'; import { HuggingFaceInference } from '@langchain/community/llms/hf'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; export class LmOpenHuggingFaceInference implements INodeType { description: INodeTypeDescription = { @@ -141,10 +141,11 @@ export class LmOpenHuggingFaceInference implements INodeType { model: modelName, apiKey: credentials.apiKey as string, ...options, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatAwsBedrock/LmChatAwsBedrock.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatAwsBedrock/LmChatAwsBedrock.node.ts index 48cfc603fd..99eb4196ea 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatAwsBedrock/LmChatAwsBedrock.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatAwsBedrock/LmChatAwsBedrock.node.ts @@ -7,12 +7,12 @@ import { type SupplyData, } from 'n8n-workflow'; import { BedrockChat } from '@langchain/community/chat_models/bedrock'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; // Dependencies needed underneath the hood. We add them // here only to track where what dependency is used import '@aws-sdk/credential-provider-node'; import '@aws-sdk/client-bedrock-runtime'; +import { N8nLlmTracing } from '../N8nLlmTracing'; export class LmChatAwsBedrock implements INodeType { description: INodeTypeDescription = { @@ -152,10 +152,11 @@ export class LmChatAwsBedrock implements INodeType { accessKeyId: credentials.accessKeyId as string, sessionToken: credentials.sessionToken as string, }, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatAzureOpenAi/LmChatAzureOpenAi.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatAzureOpenAi/LmChatAzureOpenAi.node.ts index dad7643c6d..66aedb2963 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatAzureOpenAi/LmChatAzureOpenAi.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatAzureOpenAi/LmChatAzureOpenAi.node.ts @@ -9,8 +9,8 @@ import { import type { ClientOptions } from '@langchain/openai'; import { ChatOpenAI } from '@langchain/openai'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; export class LmChatAzureOpenAi implements INodeType { description: INodeTypeDescription = { @@ -160,10 +160,11 @@ export class LmChatAzureOpenAi implements INodeType { timeout: options.timeout ?? 60000, maxRetries: options.maxRetries ?? 2, configuration, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleGemini/LmChatGoogleGemini.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleGemini/LmChatGoogleGemini.node.ts index 425ef2294c..a1bade87a9 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleGemini/LmChatGoogleGemini.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGoogleGemini/LmChatGoogleGemini.node.ts @@ -8,8 +8,8 @@ import { } from 'n8n-workflow'; import { ChatGoogleGenerativeAI } from '@langchain/google-genai'; import type { HarmBlockThreshold, HarmCategory, SafetySetting } from '@google/generative-ai'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; import { harmCategories, harmThresholds } from './options'; export class LmChatGoogleGemini implements INodeType { @@ -224,10 +224,11 @@ export class LmChatGoogleGemini implements INodeType { temperature: options.temperature, maxOutputTokens: options.maxOutputTokens, safetySettings, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGooglePalm/LmChatGooglePalm.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGooglePalm/LmChatGooglePalm.node.ts index b7d3eb8eda..e6d94fb126 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGooglePalm/LmChatGooglePalm.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGooglePalm/LmChatGooglePalm.node.ts @@ -7,8 +7,8 @@ import { type SupplyData, } from 'n8n-workflow'; import { ChatGooglePaLM } from '@langchain/community/chat_models/googlepalm'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; export class LmChatGooglePalm implements INodeType { description: INodeTypeDescription = { @@ -156,10 +156,11 @@ export class LmChatGooglePalm implements INodeType { apiKey: credentials.apiKey as string, modelName, ...options, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGroq/LmChatGroq.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGroq/LmChatGroq.node.ts index f996570997..b38f0ae575 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatGroq/LmChatGroq.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatGroq/LmChatGroq.node.ts @@ -8,8 +8,8 @@ import { } from 'n8n-workflow'; import { ChatGroq } from '@langchain/groq'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; export class LmChatGroq implements INodeType { description: INodeTypeDescription = { @@ -142,10 +142,11 @@ export class LmChatGroq implements INodeType { modelName, maxTokens: options.maxTokensToSample, temperature: options.temperature, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmChatMistralCloud/LmChatMistralCloud.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmChatMistralCloud/LmChatMistralCloud.node.ts index a23ab4c0c1..06cc5bbbc8 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmChatMistralCloud/LmChatMistralCloud.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmChatMistralCloud/LmChatMistralCloud.node.ts @@ -9,8 +9,8 @@ import { import type { ChatMistralAIInput } from '@langchain/mistralai'; import { ChatMistralAI } from '@langchain/mistralai'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; export class LmChatMistralCloud implements INodeType { description: INodeTypeDescription = { @@ -188,10 +188,11 @@ export class LmChatMistralCloud implements INodeType { apiKey: credentials.apiKey as string, modelName, ...options, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/LmGooglePalm/LmGooglePalm.node.ts b/packages/@n8n/nodes-langchain/nodes/llms/LmGooglePalm/LmGooglePalm.node.ts index a47001ba3b..29bc3ff29a 100644 --- a/packages/@n8n/nodes-langchain/nodes/llms/LmGooglePalm/LmGooglePalm.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/llms/LmGooglePalm/LmGooglePalm.node.ts @@ -7,8 +7,8 @@ import { type SupplyData, } from 'n8n-workflow'; import { GooglePaLM } from '@langchain/community/llms/googlepalm'; -import { logWrapper } from '../../../utils/logWrapper'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; +import { N8nLlmTracing } from '../N8nLlmTracing'; export class LmGooglePalm implements INodeType { description: INodeTypeDescription = { @@ -163,10 +163,11 @@ export class LmGooglePalm implements INodeType { apiKey: credentials.apiKey as string, modelName, ...options, + callbacks: [new N8nLlmTracing(this)], }); return { - response: logWrapper(model, this), + response: model, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts b/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts new file mode 100644 index 0000000000..d217c53e7b --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/llms/N8nLlmTracing.ts @@ -0,0 +1,193 @@ +import { BaseCallbackHandler } from '@langchain/core/callbacks/base'; +import { getModelNameForTiktoken } from '@langchain/core/language_models/base'; +import { encodingForModel } from '@langchain/core/utils/tiktoken'; +import type { + Serialized, + SerializedNotImplemented, + SerializedSecret, +} from '@langchain/core/load/serializable'; +import type { LLMResult } from '@langchain/core/outputs'; +import type { IDataObject, IExecuteFunctions } from 'n8n-workflow'; +import { NodeConnectionType } from 'n8n-workflow'; +import { pick } from 'lodash'; +import type { BaseMessage } from '@langchain/core/messages'; +import type { SerializedFields } from '@langchain/core/dist/load/map_keys'; +import { logAiEvent } from '../../utils/helpers'; + +type TokensUsageParser = (llmOutput: LLMResult['llmOutput']) => { + completionTokens: number; + promptTokens: number; + totalTokens: number; +}; + +type LastInput = { + index: number; + messages: BaseMessage[] | string[] | string; + options: SerializedSecret | SerializedNotImplemented | SerializedFields; +}; + +const TIKTOKEN_ESTIMATE_MODEL = 'gpt-3.5-turbo'; +export class N8nLlmTracing extends BaseCallbackHandler { + name = 'N8nLlmTracing'; + + executionFunctions: IExecuteFunctions; + + connectionType = NodeConnectionType.AiLanguageModel; + + promptTokensEstimate = 0; + + completionTokensEstimate = 0; + + lastInput: LastInput = { + index: 0, + messages: [], + options: {}, + }; + + options = { + // Default(OpenAI format) parser + tokensUsageParser: (llmOutput: LLMResult['llmOutput']) => { + const completionTokens = (llmOutput?.tokenUsage?.completionTokens as number) ?? 0; + const promptTokens = (llmOutput?.tokenUsage?.promptTokens as number) ?? 0; + + return { + completionTokens, + promptTokens, + totalTokens: completionTokens + promptTokens, + }; + }, + }; + + constructor( + executionFunctions: IExecuteFunctions, + options?: { tokensUsageParser: TokensUsageParser }, + ) { + super(); + this.executionFunctions = executionFunctions; + this.options = { ...this.options, ...options }; + } + + async estimateTokensFromGeneration(generations: LLMResult['generations']) { + const messages = generations.flatMap((gen) => gen.map((g) => g.text)); + return await this.estimateTokensFromStringList(messages); + } + + async estimateTokensFromStringList(list: string[]) { + const embeddingModel = getModelNameForTiktoken(TIKTOKEN_ESTIMATE_MODEL); + const encoder = await encodingForModel(embeddingModel); + + const encodedListLength = await Promise.all( + list.map(async (text) => encoder.encode(text).length), + ); + + return encodedListLength.reduce((acc, curr) => acc + curr, 0); + } + + async handleLLMEnd(output: LLMResult) { + output.generations = output.generations.map((gen) => + gen.map((g) => pick(g, ['text', 'generationInfo'])), + ); + + const tokenUsageEstimate = { + completionTokens: 0, + promptTokens: 0, + totalTokens: 0, + }; + const tokenUsage = this.options.tokensUsageParser(output.llmOutput); + + if (output.generations.length > 0) { + tokenUsageEstimate.completionTokens = await this.estimateTokensFromGeneration( + output.generations, + ); + + tokenUsageEstimate.promptTokens = this.promptTokensEstimate; + tokenUsageEstimate.totalTokens = + tokenUsageEstimate.completionTokens + this.promptTokensEstimate; + } + const response: { + response: { generations: LLMResult['generations'] }; + tokenUsageEstimate?: typeof tokenUsageEstimate; + tokenUsage?: typeof tokenUsage; + } = { + response: { generations: output.generations }, + }; + + // If the LLM response contains actual tokens usage, otherwise fallback to the estimate + if (tokenUsage.completionTokens > 0) { + response.tokenUsage = tokenUsage; + } else { + response.tokenUsageEstimate = tokenUsageEstimate; + } + + const parsedMessages = + typeof this.lastInput.messages === 'string' + ? this.lastInput.messages + : this.lastInput.messages.map((message) => { + if (typeof message === 'string') return message; + if (typeof message?.toJSON === 'function') return message.toJSON(); + + return message; + }); + + this.executionFunctions.addOutputData(this.connectionType, this.lastInput.index, [ + [{ json: { ...response } }], + ]); + void logAiEvent(this.executionFunctions, 'n8n.ai.llm.generated', { + messages: parsedMessages, + options: this.lastInput.options, + response, + }); + } + + async handleLLMStart(llm: Serialized, prompts: string[]) { + const estimatedTokens = await this.estimateTokensFromStringList(prompts); + + const options = llm.type === 'constructor' ? llm.kwargs : llm; + const { index } = this.executionFunctions.addInputData( + this.connectionType, + [ + [ + { + json: { + messages: prompts, + estimatedTokens, + options, + }, + }, + ], + ], + this.lastInput.index + 1, + ); + + // Save the last input for later use when processing `handleLLMEnd` event + this.lastInput = { + index, + options, + messages: prompts, + }; + this.promptTokensEstimate = estimatedTokens; + } + + async handleLLMError( + error: IDataObject | Error, + runId: string, + parentRunId?: string | undefined, + ) { + // Filter out non-x- headers to avoid leaking sensitive information in logs + if (typeof error === 'object' && error?.hasOwnProperty('headers')) { + const errorWithHeaders = error as { headers: Record }; + + Object.keys(errorWithHeaders.headers).forEach((key) => { + if (!key.startsWith('x-')) { + delete errorWithHeaders.headers[key]; + } + }); + } + + void logAiEvent(this.executionFunctions, 'n8n.ai.llm.error', { + error: Object.keys(error).length === 0 ? error.toString() : error, + runId, + parentRunId, + }); + } +} diff --git a/packages/@n8n/nodes-langchain/utils/logWrapper.ts b/packages/@n8n/nodes-langchain/utils/logWrapper.ts index c4bb7e59e8..1ce6924813 100644 --- a/packages/@n8n/nodes-langchain/utils/logWrapper.ts +++ b/packages/@n8n/nodes-langchain/utils/logWrapper.ts @@ -4,20 +4,13 @@ import type { ConnectionTypes, IExecuteFunctions, INodeExecutionData } from 'n8n import { Tool } from '@langchain/core/tools'; import type { BaseMessage } from '@langchain/core/messages'; import type { InputValues, MemoryVariables, OutputValues } from '@langchain/core/memory'; -import type { ChatResult } from '@langchain/core/outputs'; import { BaseChatMessageHistory } from '@langchain/core/chat_history'; -import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; -import type { - CallbackManagerForLLMRun, - BaseCallbackConfig, - Callbacks, -} from '@langchain/core/callbacks/manager'; +import type { BaseCallbackConfig, Callbacks } from '@langchain/core/callbacks/manager'; import { Embeddings } from '@langchain/core/embeddings'; import { VectorStore } from '@langchain/core/vectorstores'; import type { Document } from '@langchain/core/documents'; import { TextSplitter } from 'langchain/text_splitter'; -import { BaseLLM } from '@langchain/core/language_models/llms'; import { BaseChatMemory } from '@langchain/community/memory/chat_memory'; import { BaseRetriever } from '@langchain/core/retrievers'; import type { FormatInstructionsOptions } from '@langchain/core/output_parsers'; @@ -26,7 +19,7 @@ import { isObject } from 'lodash'; import type { BaseDocumentLoader } from 'langchain/dist/document_loaders/base'; import { N8nJsonLoader } from './N8nJsonLoader'; import { N8nBinaryLoader } from './N8nBinaryLoader'; -import { isChatInstance, logAiEvent } from './helpers'; +import { logAiEvent } from './helpers'; const errorsMap: { [key: string]: { message: string; description: string } } = { 'You exceeded your current quota, please check your plan and billing details.': { @@ -115,9 +108,7 @@ export function callMethodSync( export function logWrapper( originalInstance: | Tool - | BaseChatModel | BaseChatMemory - | BaseLLM | BaseChatMessageHistory | BaseOutputParser | BaseRetriever @@ -229,56 +220,6 @@ export function logWrapper( } } - // ========== BaseChatModel ========== - if (originalInstance instanceof BaseLLM || isChatInstance(originalInstance)) { - if (prop === '_generate' && '_generate' in target) { - return async ( - messages: BaseMessage[] & string[], - options: any, - runManager?: CallbackManagerForLLMRun, - ): Promise => { - connectionType = NodeConnectionType.AiLanguageModel; - const { index } = executeFunctions.addInputData(connectionType, [ - [{ json: { messages, options } }], - ]); - try { - const response = (await callMethodAsync.call(target, { - executeFunctions, - connectionType, - currentNodeRunIndex: index, - method: target[prop], - arguments: [ - messages, - { ...options, signal: executeFunctions.getExecutionCancelSignal() }, - runManager, - ], - })) as ChatResult; - const parsedMessages = - typeof messages === 'string' - ? messages - : messages.map((message) => { - if (typeof message === 'string') return message; - if (typeof message?.toJSON === 'function') return message.toJSON(); - - return message; - }); - - void logAiEvent(executeFunctions, 'n8n.ai.llm.generated', { - messages: parsedMessages, - options, - response, - }); - executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); - return response; - } catch (error) { - // Mute AbortError as they are expected - if (error?.name === 'AbortError') return { generations: [] }; - throw error; - } - }; - } - } - // ========== BaseOutputParser ========== if (originalInstance instanceof BaseOutputParser) { if (prop === 'getFormatInstructions' && 'getFormatInstructions' in target) { diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index f11ab58cdf..932596723d 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -140,6 +140,10 @@ export class WorkflowExecute { return this.processRunExecutionData(workflow); } + static isAbortError(e?: ExecutionBaseError) { + return e?.message === 'AbortError'; + } + forceInputNodeExecution(workflow: Workflow): boolean { return workflow.settings.executionOrder !== 'v1'; } @@ -834,7 +838,6 @@ export class WorkflowExecute { this.abortController.abort(); const fullRunData = this.getFullRunData(startedAt); void this.executeHook('workflowExecuteAfter', [fullRunData]); - setTimeout(() => resolve(fullRunData), 10); }); // eslint-disable-next-line complexity @@ -1323,12 +1326,14 @@ export class WorkflowExecute { // Add the execution data again so that it can get restarted this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); - - await this.executeHook('nodeExecuteAfter', [ - executionNode.name, - taskData, - this.runExecutionData, - ]); + // Only execute the nodeExecuteAfter hook if the node did not get aborted + if (!WorkflowExecute.isAbortError(executionError)) { + await this.executeHook('nodeExecuteAfter', [ + executionNode.name, + taskData, + this.runExecutionData, + ]); + } break; } @@ -1770,8 +1775,10 @@ export class WorkflowExecute { } this.moveNodeMetadata(); - - await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + // Prevent from running the hook if the error is an abort error as it was already handled + if (!WorkflowExecute.isAbortError(executionError)) { + await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + } if (closeFunction) { try { diff --git a/packages/editor-ui/src/components/RunDataAi/RunDataAiContent.vue b/packages/editor-ui/src/components/RunDataAi/RunDataAiContent.vue index 367fd3cba7..8ed80a2c46 100644 --- a/packages/editor-ui/src/components/RunDataAi/RunDataAiContent.vue +++ b/packages/editor-ui/src/components/RunDataAi/RunDataAiContent.vue @@ -31,7 +31,7 @@ {{ $locale.baseText('runData.aiContentBlock.tokens', { interpolate: { - count: consumedTokensSum?.totalTokens.toString()!, + count: formatTokenUsageCount(consumedTokensSum?.totalTokens ?? 0), }, }) }} @@ -42,7 +42,7 @@ {{ $locale.baseText('runData.aiContentBlock.tokens', { interpolate: { - count: consumedTokensSum?.promptTokens.toString()!, + count: formatTokenUsageCount(consumedTokensSum?.promptTokens ?? 0), }, }) }} @@ -53,7 +53,7 @@ {{ $locale.baseText('runData.aiContentBlock.tokens', { interpolate: { - count: consumedTokensSum?.completionTokens.toString()!, + count: formatTokenUsageCount(consumedTokensSum?.completionTokens ?? 0), }, }) }} @@ -75,12 +75,7 @@ import type { IAiData, IAiDataContent } from '@/Interface'; import { useNodeTypesStore } from '@/stores/nodeTypes.store'; import { useWorkflowsStore } from '@/stores/workflows.store'; -import type { - IDataObject, - INodeExecutionData, - INodeTypeDescription, - NodeConnectionType, -} from 'n8n-workflow'; +import type { INodeExecutionData, INodeTypeDescription, NodeConnectionType } from 'n8n-workflow'; import { computed } from 'vue'; import NodeIcon from '@/components/NodeIcon.vue'; import AiRunContentBlock from './AiRunContentBlock.vue'; @@ -105,12 +100,13 @@ type TokenUsageData = { promptTokens: number; totalTokens: number; }; + const consumedTokensSum = computed(() => { // eslint-disable-next-line @typescript-eslint/no-use-before-define - const consumedTokensSum1 = outputRun.value?.data?.reduce( + const tokenUsage = outputRun.value?.data?.reduce( (acc: TokenUsageData, curr: INodeExecutionData) => { - const response = curr.json?.response as IDataObject; - const tokenUsageData = (response?.llmOutput as IDataObject)?.tokenUsage as TokenUsageData; + const tokenUsageData = (curr.json?.tokenUsage ?? + curr.json?.tokenUsageEstimate) as TokenUsageData; if (!tokenUsageData) return acc; @@ -127,9 +123,16 @@ const consumedTokensSum = computed(() => { }, ); - return consumedTokensSum1; + return tokenUsage; }); +const usingTokensEstimates = computed(() => { + return outputRun.value?.data?.some((d) => d.json?.tokenUsageEstimate); +}); + +function formatTokenUsageCount(count: number) { + return usingTokensEstimates.value ? `~${count}` : count.toString(); +} function extractRunMeta(run: IAiDataContent) { const uiNode = workflowsStore.getNodeByName(props.inputData.node); const nodeType = nodeTypesStore.getNodeType(uiNode?.type ?? ''); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 84b8b24b2d..2dfff7434a 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -2020,6 +2020,7 @@ export const eventNamesAiNodes = [ 'n8n.ai.tool.called', 'n8n.ai.vector.store.searched', 'n8n.ai.llm.generated', + 'n8n.ai.llm.error', 'n8n.ai.vector.store.populated', ] as const;