mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-05 09:57:27 -08:00
214 lines
6.5 KiB
TypeScript
214 lines
6.5 KiB
TypeScript
import { BaseCallbackHandler } from '@langchain/core/callbacks/base';
|
|
import type { SerializedFields } from '@langchain/core/dist/load/map_keys';
|
|
import { getModelNameForTiktoken } from '@langchain/core/language_models/base';
|
|
import type {
|
|
Serialized,
|
|
SerializedNotImplemented,
|
|
SerializedSecret,
|
|
} from '@langchain/core/load/serializable';
|
|
import type { BaseMessage } from '@langchain/core/messages';
|
|
import type { LLMResult } from '@langchain/core/outputs';
|
|
import { encodingForModel } from '@langchain/core/utils/tiktoken';
|
|
import type { IDataObject, ISupplyDataFunctions, JsonObject } from 'n8n-workflow';
|
|
import { pick } from 'lodash';
|
|
import { NodeConnectionType, NodeError, NodeOperationError } from 'n8n-workflow';
|
|
|
|
import { logAiEvent } from '../../utils/helpers';
|
|
|
|
type TokensUsageParser = (llmOutput: LLMResult['llmOutput']) => {
|
|
completionTokens: number;
|
|
promptTokens: number;
|
|
totalTokens: number;
|
|
};
|
|
|
|
type RunDetail = {
|
|
index: number;
|
|
messages: BaseMessage[] | string[] | string;
|
|
options: SerializedSecret | SerializedNotImplemented | SerializedFields;
|
|
};
|
|
|
|
const TIKTOKEN_ESTIMATE_MODEL = 'gpt-4o';
|
|
export class N8nLlmTracing extends BaseCallbackHandler {
|
|
name = 'N8nLlmTracing';
|
|
|
|
// This flag makes sure that LangChain will wait for the handlers to finish before continuing
|
|
// This is crucial for the handleLLMError handler to work correctly (it should be called before the error is propagated to the root node)
|
|
awaitHandlers = true;
|
|
|
|
connectionType = NodeConnectionType.AiLanguageModel;
|
|
|
|
promptTokensEstimate = 0;
|
|
|
|
completionTokensEstimate = 0;
|
|
|
|
/**
|
|
* A map to associate LLM run IDs to run details.
|
|
* Key: Unique identifier for each LLM run (run ID)
|
|
* Value: RunDetails object
|
|
*
|
|
*/
|
|
runsMap: Record<string, RunDetail> = {};
|
|
|
|
options = {
|
|
// Default(OpenAI format) parser
|
|
tokensUsageParser: (llmOutput: LLMResult['llmOutput']) => {
|
|
const completionTokens = (llmOutput?.tokenUsage?.completionTokens as number) ?? 0;
|
|
const promptTokens = (llmOutput?.tokenUsage?.promptTokens as number) ?? 0;
|
|
|
|
return {
|
|
completionTokens,
|
|
promptTokens,
|
|
totalTokens: completionTokens + promptTokens,
|
|
};
|
|
},
|
|
};
|
|
|
|
constructor(
|
|
private executionFunctions: ISupplyDataFunctions,
|
|
options?: { tokensUsageParser: TokensUsageParser },
|
|
) {
|
|
super();
|
|
this.options = { ...this.options, ...options };
|
|
}
|
|
|
|
async estimateTokensFromGeneration(generations: LLMResult['generations']) {
|
|
const messages = generations.flatMap((gen) => gen.map((g) => g.text));
|
|
return await this.estimateTokensFromStringList(messages);
|
|
}
|
|
|
|
async estimateTokensFromStringList(list: string[]) {
|
|
const embeddingModel = getModelNameForTiktoken(TIKTOKEN_ESTIMATE_MODEL);
|
|
const encoder = await encodingForModel(embeddingModel);
|
|
|
|
const encodedListLength = await Promise.all(
|
|
list.map(async (text) => encoder.encode(text).length),
|
|
);
|
|
|
|
return encodedListLength.reduce((acc, curr) => acc + curr, 0);
|
|
}
|
|
|
|
async handleLLMEnd(output: LLMResult, runId: string) {
|
|
// The fallback should never happen since handleLLMStart should always set the run details
|
|
// but just in case, we set the index to the length of the runsMap
|
|
const runDetails = this.runsMap[runId] ?? { index: Object.keys(this.runsMap).length };
|
|
|
|
output.generations = output.generations.map((gen) =>
|
|
gen.map((g) => pick(g, ['text', 'generationInfo'])),
|
|
);
|
|
|
|
const tokenUsageEstimate = {
|
|
completionTokens: 0,
|
|
promptTokens: 0,
|
|
totalTokens: 0,
|
|
};
|
|
const tokenUsage = this.options.tokensUsageParser(output.llmOutput);
|
|
|
|
if (output.generations.length > 0) {
|
|
tokenUsageEstimate.completionTokens = await this.estimateTokensFromGeneration(
|
|
output.generations,
|
|
);
|
|
|
|
tokenUsageEstimate.promptTokens = this.promptTokensEstimate;
|
|
tokenUsageEstimate.totalTokens =
|
|
tokenUsageEstimate.completionTokens + this.promptTokensEstimate;
|
|
}
|
|
const response: {
|
|
response: { generations: LLMResult['generations'] };
|
|
tokenUsageEstimate?: typeof tokenUsageEstimate;
|
|
tokenUsage?: typeof tokenUsage;
|
|
} = {
|
|
response: { generations: output.generations },
|
|
};
|
|
|
|
// If the LLM response contains actual tokens usage, otherwise fallback to the estimate
|
|
if (tokenUsage.completionTokens > 0) {
|
|
response.tokenUsage = tokenUsage;
|
|
} else {
|
|
response.tokenUsageEstimate = tokenUsageEstimate;
|
|
}
|
|
|
|
const parsedMessages =
|
|
typeof runDetails.messages === 'string'
|
|
? runDetails.messages
|
|
: runDetails.messages.map((message) => {
|
|
if (typeof message === 'string') return message;
|
|
if (typeof message?.toJSON === 'function') return message.toJSON();
|
|
|
|
return message;
|
|
});
|
|
|
|
this.executionFunctions.addOutputData(this.connectionType, runDetails.index, [
|
|
[{ json: { ...response } }],
|
|
]);
|
|
|
|
logAiEvent(this.executionFunctions, 'ai-llm-generated-output', {
|
|
messages: parsedMessages,
|
|
options: runDetails.options,
|
|
response,
|
|
});
|
|
}
|
|
|
|
async handleLLMStart(llm: Serialized, prompts: string[], runId: string) {
|
|
const estimatedTokens = await this.estimateTokensFromStringList(prompts);
|
|
|
|
const options = llm.type === 'constructor' ? llm.kwargs : llm;
|
|
const { index } = this.executionFunctions.addInputData(this.connectionType, [
|
|
[
|
|
{
|
|
json: {
|
|
messages: prompts,
|
|
estimatedTokens,
|
|
options,
|
|
},
|
|
},
|
|
],
|
|
]);
|
|
|
|
// Save the run details for later use when processing `handleLLMEnd` event
|
|
this.runsMap[runId] = {
|
|
index,
|
|
options,
|
|
messages: prompts,
|
|
};
|
|
this.promptTokensEstimate = estimatedTokens;
|
|
}
|
|
|
|
async handleLLMError(
|
|
error: IDataObject | Error,
|
|
runId: string,
|
|
parentRunId?: string | undefined,
|
|
) {
|
|
const runDetails = this.runsMap[runId] ?? { index: Object.keys(this.runsMap).length };
|
|
|
|
// Filter out non-x- headers to avoid leaking sensitive information in logs
|
|
if (typeof error === 'object' && error?.hasOwnProperty('headers')) {
|
|
const errorWithHeaders = error as { headers: Record<string, unknown> };
|
|
|
|
Object.keys(errorWithHeaders.headers).forEach((key) => {
|
|
if (!key.startsWith('x-')) {
|
|
delete errorWithHeaders.headers[key];
|
|
}
|
|
});
|
|
}
|
|
|
|
if (error instanceof NodeError) {
|
|
this.executionFunctions.addOutputData(this.connectionType, runDetails.index, error);
|
|
} else {
|
|
// If the error is not a NodeError, we wrap it in a NodeOperationError
|
|
this.executionFunctions.addOutputData(
|
|
this.connectionType,
|
|
runDetails.index,
|
|
new NodeOperationError(this.executionFunctions.getNode(), error as JsonObject, {
|
|
functionality: 'configuration-node',
|
|
}),
|
|
);
|
|
}
|
|
|
|
logAiEvent(this.executionFunctions, 'ai-llm-errored', {
|
|
error: Object.keys(error).length === 0 ? error.toString() : error,
|
|
runId,
|
|
parentRunId,
|
|
});
|
|
}
|
|
}
|