n8n/packages/@n8n/nodes-langchain/utils/logWrapper.ts

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

525 lines
17 KiB
TypeScript
Raw Normal View History

import { NodeOperationError, NodeConnectionType } from 'n8n-workflow';
import type { ConnectionTypes, IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import { Tool } from 'langchain/tools';
import type { ChatResult, InputValues, BaseMessage } from 'langchain/schema';
import { BaseChatMessageHistory } from 'langchain/schema';
import type { BaseChatModel } from 'langchain/chat_models/base';
import type { CallbackManagerForLLMRun } from 'langchain/callbacks';
import { Embeddings } from 'langchain/embeddings/base';
import { VectorStore } from 'langchain/vectorstores/base';
import type { Document } from 'langchain/document';
import { TextSplitter } from 'langchain/text_splitter';
import type { BaseDocumentLoader } from 'langchain/document_loaders/base';
import type { BaseCallbackConfig, Callbacks } from 'langchain/dist/callbacks/manager';
import { BaseLLM } from 'langchain/llms/base';
import { BaseChatMemory } from 'langchain/memory';
import type { MemoryVariables } from 'langchain/dist/memory/base';
import { BaseRetriever } from 'langchain/schema/retriever';
import type { FormatInstructionsOptions } from 'langchain/schema/output_parser';
import { BaseOutputParser } from 'langchain/schema/output_parser';
import { isObject } from 'lodash';
import { N8nJsonLoader } from './N8nJsonLoader';
import { N8nBinaryLoader } from './N8nBinaryLoader';
import { isChatInstance, logAiEvent } from './helpers';
const errorsMap: { [key: string]: { message: string; description: string } } = {
'You exceeded your current quota, please check your plan and billing details.': {
message: 'OpenAI quota exceeded',
description: 'You exceeded your current quota, please check your plan and billing details.',
},
};
export async function callMethodAsync<T>(
this: T,
parameters: {
executeFunctions: IExecuteFunctions;
connectionType: ConnectionTypes;
currentNodeRunIndex: number;
method: (...args: any[]) => Promise<unknown>;
arguments: unknown[];
},
): Promise<unknown> {
try {
return await parameters.method.call(this, ...parameters.arguments);
} catch (e) {
feat: AI nodes usability fixes + Summarization Chain V2 (#7949) Fixes: - Refactor connection snapping when dragging and enable it also for non-main connection types - Fix propagation of errors from sub-nodes - Fix chat scrolling when sending/receiving messages - Prevent empty chat messages - Fix sub-node selected styles - Fix output names text overflow Usability improvements: - Auto-add manual chat trigger for agents & chain nodes - Various labels and description updates - Make the output parser input optional for Basic LLM Chain - Summarization Chain V2 with a simplified document loader & text chunking mode #### How to test the change: Example workflow showcasing different operation mode of the new summarization chain: [Summarization_V2.json](https://github.com/n8n-io/n8n/files/13599901/Summarization_V2.json) ## Issues fixed Include links to Github issue or Community forum post or **Linear ticket**: > Important in order to close automatically and provide context to reviewers - https://www.notion.so/n8n/David-Langchain-Posthog-notes-7a9294938420403095f4508f1a21d31d - https://linear.app/n8n/issue/N8N-7070/ux-fixes-batch - https://linear.app/n8n/issue/N8N-7071/ai-sub-node-bugs ## Review / Merge checklist - [x] PR title and summary are descriptive. **Remember, the title automatically goes into the changelog. Use `(no-changelog)` otherwise.** ([conventions](https://github.com/n8n-io/n8n/blob/master/.github/pull_request_title_conventions.md)) - [x] [Docs updated](https://github.com/n8n-io/n8n-docs) or follow-up ticket created. - [ ] Tests included. > A bug is not considered fixed, unless a test is added to prevent it from happening again. A feature is not complete without tests. > > *(internal)* You can use Slack commands to trigger [e2e tests](https://www.notion.so/n8n/How-to-use-Test-Instances-d65f49dfc51f441ea44367fb6f67eb0a?pvs=4#a39f9e5ba64a48b58a71d81c837e8227) or [deploy test instance](https://www.notion.so/n8n/How-to-use-Test-Instances-d65f49dfc51f441ea44367fb6f67eb0a?pvs=4#f6a177d32bde4b57ae2da0b8e454bfce) or [deploy early access version on Cloud](https://www.notion.so/n8n/Cloudbot-3dbe779836004972b7057bc989526998?pvs=4#fef2d36ab02247e1a0f65a74f6fb534e). --------- Signed-off-by: Oleg Ivaniv <me@olegivaniv.com> Co-authored-by: Elias Meire <elias@meire.dev>
2023-12-08 04:42:32 -08:00
// Propagate errors from sub-nodes
if (e.functionality === 'configuration-node') throw e;
const connectedNode = parameters.executeFunctions.getNode();
const error = new NodeOperationError(connectedNode, e, {
functionality: 'configuration-node',
});
if (errorsMap[error.message]) {
error.description = errorsMap[error.message].description;
error.message = errorsMap[error.message].message;
}
parameters.executeFunctions.addOutputData(
parameters.connectionType,
parameters.currentNodeRunIndex,
error,
);
if (error.message) {
error.description = error.message;
throw error;
}
throw new NodeOperationError(
connectedNode,
`Error on node "${connectedNode.name}" which is connected via input "${parameters.connectionType}"`,
{ functionality: 'configuration-node' },
);
}
}
export function callMethodSync<T>(
this: T,
parameters: {
executeFunctions: IExecuteFunctions;
connectionType: ConnectionTypes;
currentNodeRunIndex: number;
method: (...args: any[]) => T;
arguments: unknown[];
},
): unknown {
try {
return parameters.method.call(this, ...parameters.arguments);
} catch (e) {
feat: AI nodes usability fixes + Summarization Chain V2 (#7949) Fixes: - Refactor connection snapping when dragging and enable it also for non-main connection types - Fix propagation of errors from sub-nodes - Fix chat scrolling when sending/receiving messages - Prevent empty chat messages - Fix sub-node selected styles - Fix output names text overflow Usability improvements: - Auto-add manual chat trigger for agents & chain nodes - Various labels and description updates - Make the output parser input optional for Basic LLM Chain - Summarization Chain V2 with a simplified document loader & text chunking mode #### How to test the change: Example workflow showcasing different operation mode of the new summarization chain: [Summarization_V2.json](https://github.com/n8n-io/n8n/files/13599901/Summarization_V2.json) ## Issues fixed Include links to Github issue or Community forum post or **Linear ticket**: > Important in order to close automatically and provide context to reviewers - https://www.notion.so/n8n/David-Langchain-Posthog-notes-7a9294938420403095f4508f1a21d31d - https://linear.app/n8n/issue/N8N-7070/ux-fixes-batch - https://linear.app/n8n/issue/N8N-7071/ai-sub-node-bugs ## Review / Merge checklist - [x] PR title and summary are descriptive. **Remember, the title automatically goes into the changelog. Use `(no-changelog)` otherwise.** ([conventions](https://github.com/n8n-io/n8n/blob/master/.github/pull_request_title_conventions.md)) - [x] [Docs updated](https://github.com/n8n-io/n8n-docs) or follow-up ticket created. - [ ] Tests included. > A bug is not considered fixed, unless a test is added to prevent it from happening again. A feature is not complete without tests. > > *(internal)* You can use Slack commands to trigger [e2e tests](https://www.notion.so/n8n/How-to-use-Test-Instances-d65f49dfc51f441ea44367fb6f67eb0a?pvs=4#a39f9e5ba64a48b58a71d81c837e8227) or [deploy test instance](https://www.notion.so/n8n/How-to-use-Test-Instances-d65f49dfc51f441ea44367fb6f67eb0a?pvs=4#f6a177d32bde4b57ae2da0b8e454bfce) or [deploy early access version on Cloud](https://www.notion.so/n8n/Cloudbot-3dbe779836004972b7057bc989526998?pvs=4#fef2d36ab02247e1a0f65a74f6fb534e). --------- Signed-off-by: Oleg Ivaniv <me@olegivaniv.com> Co-authored-by: Elias Meire <elias@meire.dev>
2023-12-08 04:42:32 -08:00
// Propagate errors from sub-nodes
if (e.functionality === 'configuration-node') throw e;
const connectedNode = parameters.executeFunctions.getNode();
const error = new NodeOperationError(connectedNode, e);
parameters.executeFunctions.addOutputData(
parameters.connectionType,
parameters.currentNodeRunIndex,
error,
);
throw new NodeOperationError(
connectedNode,
`Error on node "${connectedNode.name}" which is connected via input "${parameters.connectionType}"`,
{ functionality: 'configuration-node' },
);
}
}
export function logWrapper(
originalInstance:
| Tool
| BaseChatModel
| BaseChatMemory
| BaseLLM
| BaseChatMessageHistory
| BaseOutputParser
| BaseRetriever
| Embeddings
| Document[]
| Document
| BaseDocumentLoader
| TextSplitter
| VectorStore
| N8nBinaryLoader
| N8nJsonLoader,
executeFunctions: IExecuteFunctions,
) {
return new Proxy(originalInstance, {
get: (target, prop) => {
let connectionType: ConnectionTypes | undefined;
// ========== BaseChatMemory ==========
if (originalInstance instanceof BaseChatMemory) {
if (prop === 'loadMemoryVariables' && 'loadMemoryVariables' in target) {
return async (values: InputValues): Promise<MemoryVariables> => {
connectionType = NodeConnectionType.AiMemory;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { action: 'loadMemoryVariables', values } }],
]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [values],
})) as MemoryVariables;
executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'loadMemoryVariables', response } }],
]);
return response;
};
} else if (
prop === 'outputKey' &&
'outputKey' in target &&
target.constructor.name === 'BufferWindowMemory'
) {
connectionType = NodeConnectionType.AiMemory;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { action: 'chatHistory' } }],
]);
const response = target[prop];
target.chatHistory
.getMessages()
.then((messages) => {
executeFunctions.addOutputData(NodeConnectionType.AiMemory, index, [
[{ json: { action: 'chatHistory', chatHistory: messages } }],
]);
})
.catch((error: Error) => {
executeFunctions.addOutputData(NodeConnectionType.AiMemory, index, [
[{ json: { action: 'chatHistory', error } }],
]);
});
return response;
}
}
// ========== BaseChatMessageHistory ==========
if (originalInstance instanceof BaseChatMessageHistory) {
if (prop === 'getMessages' && 'getMessages' in target) {
return async (): Promise<BaseMessage[]> => {
connectionType = NodeConnectionType.AiMemory;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { action: 'getMessages' } }],
]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [],
})) as BaseMessage[];
const payload = { action: 'getMessages', response };
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);
void logAiEvent(executeFunctions, 'n8n.ai.memory.get.messages', { response });
return response;
};
} else if (prop === 'addMessage' && 'addMessage' in target) {
return async (message: BaseMessage): Promise<void> => {
connectionType = NodeConnectionType.AiMemory;
const payload = { action: 'addMessage', message };
const { index } = executeFunctions.addInputData(connectionType, [[{ json: payload }]]);
await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [message],
});
void logAiEvent(executeFunctions, 'n8n.ai.memory.added.message', { message });
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);
};
}
}
// ========== BaseChatModel ==========
if (originalInstance instanceof BaseLLM || isChatInstance(originalInstance)) {
if (prop === '_generate' && '_generate' in target) {
return async (
messages: BaseMessage[] & string[],
options: any,
runManager?: CallbackManagerForLLMRun,
): Promise<ChatResult> => {
connectionType = NodeConnectionType.AiLanguageModel;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { messages, options } }],
]);
try {
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [
messages,
{ ...options, signal: executeFunctions.getExecutionCancelSignal() },
runManager,
],
})) as ChatResult;
const parsedMessages =
typeof messages === 'string'
? messages
: messages.map((message) => {
if (typeof message === 'string') return message;
if (typeof message?.toJSON === 'function') return message.toJSON();
return message;
});
void logAiEvent(executeFunctions, 'n8n.ai.llm.generated', {
messages: parsedMessages,
options,
response,
});
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
} catch (error) {
// Mute AbortError as they are expected
if (error?.name === 'AbortError') return { generations: [] };
throw error;
}
};
}
}
// ========== BaseOutputParser ==========
if (originalInstance instanceof BaseOutputParser) {
if (prop === 'getFormatInstructions' && 'getFormatInstructions' in target) {
return (options?: FormatInstructionsOptions): string => {
connectionType = NodeConnectionType.AiOutputParser;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { action: 'getFormatInstructions' } }],
]);
// @ts-ignore
const response = callMethodSync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [options],
}) as string;
executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'getFormatInstructions', response } }],
]);
void logAiEvent(executeFunctions, 'n8n.ai.output.parser.get.instructions', {
response,
});
return response;
};
} else if (prop === 'parse' && 'parse' in target) {
return async (text: string | Record<string, unknown>): Promise<unknown> => {
connectionType = NodeConnectionType.AiOutputParser;
const stringifiedText = isObject(text) ? JSON.stringify(text) : text;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { action: 'parse', text: stringifiedText } }],
]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [stringifiedText],
})) as object;
void logAiEvent(executeFunctions, 'n8n.ai.output.parser.parsed', { text, response });
executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'parse', response } }],
]);
return response;
};
}
}
// ========== BaseRetriever ==========
if (originalInstance instanceof BaseRetriever) {
if (prop === 'getRelevantDocuments' && 'getRelevantDocuments' in target) {
return async (
query: string,
config?: Callbacks | BaseCallbackConfig,
): Promise<Document[]> => {
connectionType = NodeConnectionType.AiRetriever;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { query, config } }],
]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [query, config],
})) as Array<Document<Record<string, any>>>;
void logAiEvent(executeFunctions, 'n8n.ai.retriever.get.relevant.documents', { query });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
}
}
// ========== Embeddings ==========
if (originalInstance instanceof Embeddings) {
// Docs -> Embeddings
if (prop === 'embedDocuments' && 'embedDocuments' in target) {
return async (documents: string[]): Promise<number[][]> => {
connectionType = NodeConnectionType.AiEmbedding;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { documents } }],
]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [documents],
})) as number[][];
void logAiEvent(executeFunctions, 'n8n.ai.embeddings.embedded.document');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
}
// Query -> Embeddings
if (prop === 'embedQuery' && 'embedQuery' in target) {
return async (query: string): Promise<number[]> => {
connectionType = NodeConnectionType.AiEmbedding;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { query } }],
]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [query],
})) as number[];
void logAiEvent(executeFunctions, 'n8n.ai.embeddings.embedded.query');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
}
}
// ========== N8n Loaders Process All ==========
if (
originalInstance instanceof N8nJsonLoader ||
originalInstance instanceof N8nBinaryLoader
) {
// Process All
if (prop === 'processAll' && 'processAll' in target) {
return async (items: INodeExecutionData[]): Promise<number[]> => {
connectionType = NodeConnectionType.AiDocument;
const { index } = executeFunctions.addInputData(connectionType, [items]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [items],
})) as number[];
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
}
// Process Each
if (prop === 'processItem' && 'processItem' in target) {
return async (item: INodeExecutionData, itemIndex: number): Promise<number[]> => {
connectionType = NodeConnectionType.AiDocument;
const { index } = executeFunctions.addInputData(connectionType, [[item]]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [item, itemIndex],
})) as number[];
void logAiEvent(executeFunctions, 'n8n.ai.document.processed');
executeFunctions.addOutputData(connectionType, index, [
[{ json: { response }, pairedItem: { item: itemIndex } }],
]);
return response;
};
}
}
// ========== TextSplitter ==========
if (originalInstance instanceof TextSplitter) {
if (prop === 'splitText' && 'splitText' in target) {
return async (text: string): Promise<string[]> => {
connectionType = NodeConnectionType.AiTextSplitter;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { textSplitter: text } }],
]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [text],
})) as string[];
void logAiEvent(executeFunctions, 'n8n.ai.text.splitter.split');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
}
}
// ========== Tool ==========
if (originalInstance instanceof Tool) {
if (prop === '_call' && '_call' in target) {
return async (query: string): Promise<string> => {
connectionType = NodeConnectionType.AiTool;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { query } }],
]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [query],
})) as string;
void logAiEvent(executeFunctions, 'n8n.ai.tool.called', { query, response });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
}
}
// ========== VectorStore ==========
if (originalInstance instanceof VectorStore) {
if (prop === 'similaritySearch' && 'similaritySearch' in target) {
return async (
query: string,
k?: number,
// @ts-ignore
filter?: BiquadFilterType | undefined,
_callbacks?: Callbacks | undefined,
): Promise<Document[]> => {
connectionType = NodeConnectionType.AiVectorStore;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { query, k, filter } }],
]);
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [query, k, filter, _callbacks],
})) as Array<Document<Record<string, any>>>;
void logAiEvent(executeFunctions, 'n8n.ai.vector.store.searched', { query });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
}
}
return (target as any)[prop];
},
});
}