feat(core): Improve Langsmith traces for AI executions (#9081)

Signed-off-by: Oleg Ivaniv <me@olegivaniv.com>
This commit is contained in:
oleg 2024-04-08 22:51:49 +02:00 committed by GitHub
parent 3bcfef95f6
commit 936682eeaa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 99 additions and 26 deletions

View file

@ -1,9 +1,5 @@
import {
type IExecuteFunctions,
type INodeExecutionData,
NodeConnectionType,
NodeOperationError,
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import { initializeAgentExecutorWithOptions } from 'langchain/agents';
import type { BaseChatMemory } from '@langchain/community/memory/chat_memory';
@ -16,13 +12,13 @@ import {
getOptionalOutputParsers,
getConnectedTools,
} from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';
export async function conversationalAgentExecute(
this: IExecuteFunctions,
nodeVersion: number,
): Promise<INodeExecutionData[][]> {
this.logger.verbose('Executing Conversational Agent');
const model = await this.getInputConnectionData(NodeConnectionType.AiLanguageModel, 0);
if (!isChatInstance(model)) {
@ -104,7 +100,9 @@ export async function conversationalAgentExecute(
input = (await prompt.invoke({ input })).value;
}
let response = await agentExecutor.call({ input, outputParsers });
let response = await agentExecutor
.withConfig(getTracingConfig(this))
.invoke({ input, outputParsers });
if (outputParser) {
response = { output: await outputParser.parse(response.output as string) };

View file

@ -17,6 +17,7 @@ import {
getOptionalOutputParsers,
getPromptInputByType,
} from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';
export async function openAiFunctionsAgentExecute(
this: IExecuteFunctions,
@ -104,7 +105,9 @@ export async function openAiFunctionsAgentExecute(
input = (await prompt.invoke({ input })).value;
}
let response = await agentExecutor.call({ input, outputParsers });
let response = await agentExecutor
.withConfig(getTracingConfig(this))
.invoke({ input, outputParsers });
if (outputParser) {
response = { output: await outputParser.parse(response.output as string) };

View file

@ -15,6 +15,7 @@ import {
getOptionalOutputParsers,
getPromptInputByType,
} from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';
export async function planAndExecuteAgentExecute(
this: IExecuteFunctions,
@ -79,7 +80,9 @@ export async function planAndExecuteAgentExecute(
input = (await prompt.invoke({ input })).value;
}
let response = await agentExecutor.call({ input, outputParsers });
let response = await agentExecutor
.withConfig(getTracingConfig(this))
.invoke({ input, outputParsers });
if (outputParser) {
response = { output: await outputParser.parse(response.output as string) };

View file

@ -17,6 +17,7 @@ import {
getPromptInputByType,
isChatInstance,
} from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';
export async function reActAgentAgentExecute(
this: IExecuteFunctions,
@ -100,7 +101,10 @@ export async function reActAgentAgentExecute(
input = (await prompt.invoke({ input })).value;
}
let response = await agentExecutor.call({ input, outputParsers });
let response = await agentExecutor
.withConfig(getTracingConfig(this))
.invoke({ input, outputParsers });
if (outputParser) {
response = { output: await outputParser.parse(response.output as string) };
}

View file

@ -14,6 +14,7 @@ import type { BaseChatMemory } from '@langchain/community/memory/chat_memory';
import type { DataSource } from '@n8n/typeorm';
import { getPromptInputByType, serializeChatHistory } from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';
import { getSqliteDataSource } from './other/handlers/sqlite';
import { getPostgresDataSource } from './other/handlers/postgres';
import { SQL_PREFIX, SQL_SUFFIX } from './other/prompts';
@ -126,7 +127,7 @@ export async function sqlAgentAgentExecute(
let response: IDataObject;
try {
response = await agentExecutor.call({
response = await agentExecutor.withConfig(getTracingConfig(this)).invoke({
input,
signal: this.getExecutionCancelSignal(),
chatHistory,

View file

@ -10,6 +10,7 @@ import type {
} from 'n8n-workflow';
import type { OpenAIToolType } from 'langchain/dist/experimental/openai_assistant/schema';
import { getConnectedTools } from '../../../utils/helpers';
import { getTracingConfig } from '../../../utils/tracing';
import { formatToOpenAIAssistantTool } from './utils';
export class OpenAiAssistant implements INodeType {
@ -373,7 +374,7 @@ export class OpenAiAssistant implements INodeType {
tools,
});
const response = await agentExecutor.call({
const response = await agentExecutor.withConfig(getTracingConfig(this)).invoke({
content: input,
signal: this.getExecutionCancelSignal(),
timeout: options.timeout ?? 10000,

View file

@ -27,6 +27,7 @@ import {
getPromptInputByType,
isChatInstance,
} from '../../../utils/helpers';
import { getTracingConfig } from '../../../utils/tracing';
interface MessagesTemplate {
type: string;
@ -154,9 +155,9 @@ async function createSimpleLLMChain(
const chain = new LLMChain({
llm,
prompt,
});
}).withConfig(getTracingConfig(context));
const response = (await chain.call({
const response = (await chain.invoke({
query,
signal: context.getExecutionCancelSignal(),
})) as string[];
@ -203,8 +204,9 @@ async function getChain(
);
const chain = prompt.pipe(llm).pipe(combinedOutputParser);
const response = (await chain.invoke({ query })) as string | string[];
const response = (await chain.withConfig(getTracingConfig(context)).invoke({ query })) as
| string
| string[];
return Array.isArray(response) ? response : [response];
}

View file

@ -12,6 +12,7 @@ import type { BaseLanguageModel } from '@langchain/core/language_models/base';
import type { BaseRetriever } from '@langchain/core/retrievers';
import { getTemplateNoticeField } from '../../../utils/sharedFields';
import { getPromptInputByType } from '../../../utils/helpers';
import { getTracingConfig } from '../../../utils/tracing';
export class ChainRetrievalQa implements INodeType {
description: INodeTypeDescription = {
@ -176,7 +177,7 @@ export class ChainRetrievalQa implements INodeType {
throw new NodeOperationError(this.getNode(), 'The query parameter is empty.');
}
const response = await chain.call({ query });
const response = await chain.withConfig(getTracingConfig(this)).invoke({ query });
returnData.push({ json: { response } });
}
return await this.prepareOutputData(returnData);

View file

@ -18,6 +18,7 @@ import { N8nBinaryLoader } from '../../../../utils/N8nBinaryLoader';
import { getTemplateNoticeField } from '../../../../utils/sharedFields';
import { REFINE_PROMPT_TEMPLATE, DEFAULT_PROMPT_TEMPLATE } from '../prompt';
import { getChainPromptsArgs } from '../helpers';
import { getTracingConfig } from '../../../../utils/tracing';
function getInputs(parameters: IDataObject) {
const chunkingMode = parameters?.chunkingMode;
@ -364,7 +365,7 @@ export class ChainSummarizationV2 implements INodeType {
? await documentInput.processItem(item, itemIndex)
: documentInput;
const response = await chain.call({
const response = await chain.withConfig(getTracingConfig(this)).invoke({
input_documents: processedDocuments,
});

View file

@ -16,6 +16,7 @@ import { Document } from '@langchain/core/documents';
import type { SetField, SetNodeOptions } from 'n8n-nodes-base/dist/nodes/Set/v2/helpers/interfaces';
import * as manual from 'n8n-nodes-base/dist/nodes/Set/v2/manual.mode';
import type { CallbackManagerForRetrieverRun } from '@langchain/core/callbacks/manager';
import { logWrapper } from '../../../utils/logWrapper';
function objectToString(obj: Record<string, string> | IDataObject, level = 0) {
@ -287,7 +288,10 @@ export class RetrieverWorkflow implements INodeType {
this.executeFunctions = executeFunctions;
}
async getRelevantDocuments(query: string): Promise<Document[]> {
async _getRelevantDocuments(
query: string,
config?: CallbackManagerForRetrieverRun,
): Promise<Document[]> {
const source = this.executeFunctions.getNodeParameter('source', itemIndex) as string;
const baseMetadata: IDataObject = {
@ -360,6 +364,7 @@ export class RetrieverWorkflow implements INodeType {
receivedItems = (await this.executeFunctions.executeWorkflow(
workflowInfo,
items,
config?.getChild(),
)) as INodeExecutionData[][];
} catch (error) {
// Make sure a valid error gets returned that can by json-serialized else it will

View file

@ -16,6 +16,7 @@ import * as manual from 'n8n-nodes-base/dist/nodes/Set/v2/manual.mode';
import { DynamicTool } from '@langchain/core/tools';
import get from 'lodash/get';
import isObject from 'lodash/isObject';
import type { CallbackManagerForToolRun } from '@langchain/core/callbacks/manager';
import { getConnectionHintNoticeField } from '../../../utils/sharedFields';
export class ToolWorkflow implements INodeType {
@ -320,7 +321,10 @@ export class ToolWorkflow implements INodeType {
const name = this.getNodeParameter('name', itemIndex) as string;
const description = this.getNodeParameter('description', itemIndex) as string;
const runFunction = async (query: string): Promise<string> => {
const runFunction = async (
query: string,
runManager?: CallbackManagerForToolRun,
): Promise<string> => {
const source = this.getNodeParameter('source', itemIndex) as string;
const responsePropertyName = this.getNodeParameter(
'responsePropertyName',
@ -385,7 +389,11 @@ export class ToolWorkflow implements INodeType {
let receivedData: INodeExecutionData;
try {
receivedData = (await this.executeWorkflow(workflowInfo, items)) as INodeExecutionData;
receivedData = (await this.executeWorkflow(
workflowInfo,
items,
runManager?.getChild(),
)) as INodeExecutionData;
} catch (error) {
// Make sure a valid error gets returned that can by json-serialized else it will
// not show up in the frontend
@ -413,13 +421,13 @@ export class ToolWorkflow implements INodeType {
name,
description,
func: async (query: string): Promise<string> => {
func: async (query: string, runManager?: CallbackManagerForToolRun): Promise<string> => {
const { index } = this.addInputData(NodeConnectionType.AiTool, [[{ json: { query } }]]);
let response: string = '';
let executionError: ExecutionError | undefined;
try {
response = await runFunction(query);
response = await runFunction(query, runManager);
} catch (error) {
// TODO: Do some more testing. Issues here should actually fail the workflow
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment

View file

@ -11,6 +11,7 @@ import { formatToOpenAIAssistantTool } from '../../helpers/utils';
import { assistantRLC } from '../descriptions';
import { getConnectedTools } from '../../../../../utils/helpers';
import { getTracingConfig } from '../../../../../utils/tracing';
const properties: INodeProperties[] = [
assistantRLC,
@ -181,7 +182,7 @@ export async function execute(this: IExecuteFunctions, i: number): Promise<INode
tools: tools ?? [],
});
const response = await agentExecutor.invoke({
const response = await agentExecutor.withConfig(getTracingConfig(this)).invoke({
content: input,
signal: this.getExecutionCancelSignal(),
timeout: options.timeout ?? 10000,

View file

@ -0,0 +1,26 @@
import type { BaseCallbackConfig } from '@langchain/core/callbacks/manager';
import type { IExecuteFunctions } from 'n8n-workflow';
interface TracingConfig {
additionalMetadata?: Record<string, unknown>;
}
export function getTracingConfig(
context: IExecuteFunctions,
config: TracingConfig = {},
): BaseCallbackConfig {
const parentRunManager = context.getParentCallbackManager
? context.getParentCallbackManager()
: undefined;
return {
runName: `[${context.getWorkflow().name}] ${context.getNode().name}`,
metadata: {
execution_id: context.getExecutionId(),
workflow: context.getWorkflow(),
node: context.getNode().name,
...(config.additionalMetadata ?? {}),
},
callbacks: parentRunManager,
};
}

View file

@ -24,6 +24,7 @@ import type {
ExecutionStatus,
ExecutionError,
EventNamesAiNodesType,
CallbackManager,
} from 'n8n-workflow';
import {
ApplicationError,
@ -754,6 +755,7 @@ async function executeWorkflow(
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: IWorkflowExecutionDataProcess;
parentWorkflowSettings?: IWorkflowSettings;
parentCallbackManager?: CallbackManager;
},
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const internalHooks = Container.get(InternalHooks);
@ -815,6 +817,7 @@ async function executeWorkflow(
workflowData,
);
additionalDataIntegrated.executionId = executionId;
additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager;
// Make sure we pass on the original executeWorkflow function we received
// This one already contains changes to talk to parent process

View file

@ -98,6 +98,7 @@ import type {
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
CallbackManager,
} from 'n8n-workflow';
import {
ExpressionError,
@ -3487,6 +3488,7 @@ export function getExecuteFunctions(
async executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
parentCallbackManager?: CallbackManager,
): Promise<any> {
return await additionalData
.executeWorkflow(workflowInfo, additionalData, {
@ -3494,6 +3496,7 @@ export function getExecuteFunctions(
inputData,
parentWorkflowSettings: workflow.settings,
node,
parentCallbackManager,
})
.then(
async (result) =>
@ -3719,6 +3722,7 @@ export function getExecuteFunctions(
msg,
});
},
getParentCallbackManager: () => additionalData.parentCallbackManager,
};
})(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions;
}

View file

@ -65,6 +65,7 @@
"recast": "0.21.5",
"title-case": "3.0.3",
"transliteration": "2.3.5",
"xml2js": "0.6.2"
"xml2js": "0.6.2",
"@langchain/core": "0.1.41"
}
}

View file

@ -19,6 +19,7 @@ import type { WorkflowHooks } from './WorkflowHooks';
import type { NodeOperationError } from './errors/node-operation.error';
import type { NodeApiError } from './errors/node-api.error';
import type { AxiosProxyConfig } from 'axios';
import type { CallbackManager as CallbackManagerLC } from '@langchain/core/callbacks/manager';
export interface IAdditionalCredentialOptions {
oauth2?: IOAuth2Options;
@ -842,6 +843,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
parentCallbackManager?: CallbackManager,
): Promise<any>;
getInputConnectionData(
inputName: ConnectionTypes,
@ -881,6 +883,8 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
getBinaryDataBuffer(itemIndex: number, propertyName: string): Promise<Buffer>;
copyInputItems(items: INodeExecutionData[], properties: string[]): IDataObject[];
};
getParentCallbackManager(): CallbackManager | undefined;
};
export interface IExecuteSingleFunctions extends BaseExecutionFunctions {
@ -2028,6 +2032,7 @@ export interface IWorkflowExecuteAdditionalData {
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: any;
parentWorkflowSettings?: IWorkflowSettings;
parentCallbackManager?: CallbackManager;
},
) => Promise<any>;
executionId?: string;
@ -2059,6 +2064,7 @@ export interface IWorkflowExecuteAdditionalData {
nodeType?: string;
},
) => Promise<void>;
parentCallbackManager?: CallbackManager;
}
export type WorkflowExecuteMode =
@ -2583,3 +2589,5 @@ export type BannerName =
export type Functionality = 'regular' | 'configuration-node' | 'pairedItem';
export type Result<T, E> = { ok: true; result: T } | { ok: false; error: E };
export type CallbackManager = CallbackManagerLC;

View file

@ -1515,6 +1515,9 @@ importers:
packages/workflow:
dependencies:
'@langchain/core':
specifier: 0.1.41
version: 0.1.41
'@n8n/tournament':
specifier: 1.0.2
version: 1.0.2