From 7501ad8f3c56d9fcc5f4ec3d6fc468ab9cdb5024 Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Fri, 9 Feb 2024 09:41:01 +0000 Subject: [PATCH] feat: Add support for AI log streaming (#8526) Co-authored-by: Oleg Ivaniv --- .../shared/createVectorStoreNode.ts | 5 +- .../@n8n/nodes-langchain/utils/logWrapper.ts | 63 +++++++++++++++---- .../cli/src/WorkflowExecuteAdditionalData.ts | 18 ++++++ .../EventMessageClasses/EventMessageAiNode.ts | 52 +++++++++++++++ .../src/eventbus/EventMessageClasses/index.ts | 7 ++- .../MessageEventBus/MessageEventBus.ts | 8 +++ .../unit/ExecutionMetadataService.test.ts | 34 ++++++++++ .../WorkflowExecuteAdditionalData.test.ts | 57 +++++++++-------- packages/core/src/NodeExecuteFunctions.ts | 21 +++++++ .../src/plugins/i18n/locales/en.json | 1 + packages/workflow/src/Interfaces.ts | 30 +++++++++ packages/workflow/src/MessageEventBus.ts | 1 + 12 files changed, 259 insertions(+), 38 deletions(-) create mode 100644 packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts create mode 100644 packages/cli/test/unit/ExecutionMetadataService.test.ts diff --git a/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts b/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts index 277254204d..b061afa348 100644 --- a/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts +++ b/packages/@n8n/nodes-langchain/nodes/vector_store/shared/createVectorStoreNode.ts @@ -1,7 +1,7 @@ /* eslint-disable n8n-nodes-base/node-filename-against-convention */ /* eslint-disable n8n-nodes-base/node-dirname-against-convention */ import type { VectorStore } from 'langchain/vectorstores/base'; -import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow'; import type { INodeCredentialDescription, INodeProperties, @@ -237,6 +237,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => }); resultData.push(...serializedDocs); + void this.logAiEvent('n8n.ai.vector.store.searched', jsonStringify({ query: prompt })); } return await this.prepareOutputData(resultData); @@ -262,6 +263,8 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) => try { await args.populateVectorStore(this, embeddings, processedDocuments, itemIndex); + + void this.logAiEvent('n8n.ai.vector.store.populated'); } catch (error) { throw error; } diff --git a/packages/@n8n/nodes-langchain/utils/logWrapper.ts b/packages/@n8n/nodes-langchain/utils/logWrapper.ts index 771c3e5452..a5d861a1c6 100644 --- a/packages/@n8n/nodes-langchain/utils/logWrapper.ts +++ b/packages/@n8n/nodes-langchain/utils/logWrapper.ts @@ -4,6 +4,7 @@ import { type IExecuteFunctions, type INodeExecutionData, NodeConnectionType, + jsonStringify, } from 'n8n-workflow'; import { Tool } from 'langchain/tools'; @@ -198,17 +199,20 @@ export function logWrapper( arguments: [], })) as BaseMessage[]; - executeFunctions.addOutputData(connectionType, index, [ - [{ json: { action: 'getMessages', response } }], - ]); + const payload = { action: 'getMessages', response }; + executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]); + + void executeFunctions.logAiEvent( + 'n8n.ai.memory.get.messages', + jsonStringify({ response }), + ); return response; }; } else if (prop === 'addMessage' && 'addMessage' in target) { return async (message: BaseMessage): Promise => { connectionType = NodeConnectionType.AiMemory; - const { index } = executeFunctions.addInputData(connectionType, [ - [{ json: { action: 'addMessage', message } }], - ]); + const payload = { action: 'addMessage', message }; + const { index } = executeFunctions.addInputData(connectionType, [[{ json: payload }]]); await callMethodAsync.call(target, { executeFunctions, @@ -218,9 +222,11 @@ export function logWrapper( arguments: [message], }); - executeFunctions.addOutputData(connectionType, index, [ - [{ json: { action: 'addMessage' } }], - ]); + void executeFunctions.logAiEvent( + 'n8n.ai.memory.added.message', + jsonStringify({ message }), + ); + executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]); }; } } @@ -237,7 +243,6 @@ export function logWrapper( const { index } = executeFunctions.addInputData(connectionType, [ [{ json: { messages, options } }], ]); - try { const response = (await callMethodAsync.call(target, { executeFunctions, @@ -250,6 +255,18 @@ export function logWrapper( runManager, ], })) as ChatResult; + + void executeFunctions.logAiEvent( + 'n8n.ai.llm.generated', + jsonStringify({ + messages: + typeof messages === 'string' + ? messages + : messages.map((message) => message.toJSON()), + options, + response, + }), + ); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; } catch (error) { @@ -282,6 +299,10 @@ export function logWrapper( executeFunctions.addOutputData(connectionType, index, [ [{ json: { action: 'getFormatInstructions', response } }], ]); + void executeFunctions.logAiEvent( + 'n8n.ai.output.parser.get.instructions', + jsonStringify({ response }), + ); return response; }; } else if (prop === 'parse' && 'parse' in target) { @@ -300,6 +321,10 @@ export function logWrapper( arguments: [stringifiedText], })) as object; + void executeFunctions.logAiEvent( + 'n8n.ai.output.parser.parsed', + jsonStringify({ text, response }), + ); executeFunctions.addOutputData(connectionType, index, [ [{ json: { action: 'parse', response } }], ]); @@ -328,6 +353,10 @@ export function logWrapper( arguments: [query, config], })) as Array>>; + void executeFunctions.logAiEvent( + 'n8n.ai.retriever.get.relevant.documents', + jsonStringify({ query }), + ); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -352,6 +381,7 @@ export function logWrapper( arguments: [documents], })) as number[][]; + void executeFunctions.logAiEvent('n8n.ai.embeddings.embedded.document'); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -371,7 +401,7 @@ export function logWrapper( method: target[prop], arguments: [query], })) as number[]; - + void executeFunctions.logAiEvent('n8n.ai.embeddings.embedded.query'); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -401,6 +431,7 @@ export function logWrapper( return response; }; } + // Process Each if (prop === 'processItem' && 'processItem' in target) { return async (item: INodeExecutionData, itemIndex: number): Promise => { @@ -415,6 +446,7 @@ export function logWrapper( arguments: [item, itemIndex], })) as number[]; + void executeFunctions.logAiEvent('n8n.ai.document.processed'); executeFunctions.addOutputData(connectionType, index, [ [{ json: { response }, pairedItem: { item: itemIndex } }], ]); @@ -440,6 +472,7 @@ export function logWrapper( arguments: [text], })) as string[]; + void executeFunctions.logAiEvent('n8n.ai.text.splitter.split'); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -463,6 +496,10 @@ export function logWrapper( arguments: [query], })) as string; + void executeFunctions.logAiEvent( + 'n8n.ai.tool.called', + jsonStringify({ query, response }), + ); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; }; @@ -492,6 +529,10 @@ export function logWrapper( arguments: [query, k, filter, _callbacks], })) as Array>>; + void executeFunctions.logAiEvent( + 'n8n.ai.vector.store.searched', + jsonStringify({ query }), + ); executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]); return response; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index cc146008f4..58bab9f70b 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -23,6 +23,7 @@ import type { WorkflowExecuteMode, ExecutionStatus, ExecutionError, + EventNamesAiNodesType, } from 'n8n-workflow'; import { ApplicationError, @@ -68,6 +69,7 @@ import { WorkflowStaticDataService } from './workflows/workflowStaticData.servic import { WorkflowRepository } from './databases/repositories/workflow.repository'; import { UrlService } from './services/url.service'; import { WorkflowExecutionService } from './workflows/workflowExecution.service'; +import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); @@ -982,6 +984,22 @@ export async function getBase( setExecutionStatus, variables, secretsHelpers: Container.get(SecretsHelper), + logAiEvent: async ( + eventName: EventNamesAiNodesType, + payload: { + msg?: string | undefined; + executionId: string; + nodeName: string; + workflowId?: string | undefined; + workflowName: string; + nodeType?: string | undefined; + }, + ) => { + return await Container.get(MessageEventBus).sendAiNodeEvent({ + eventName, + payload, + }); + }, }; } diff --git a/packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts b/packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts new file mode 100644 index 0000000000..44d9feafbd --- /dev/null +++ b/packages/cli/src/eventbus/EventMessageClasses/EventMessageAiNode.ts @@ -0,0 +1,52 @@ +import { AbstractEventMessage, isEventMessageOptionsWithType } from './AbstractEventMessage'; +import type { EventNamesAiNodesType, JsonObject } from 'n8n-workflow'; +import { EventMessageTypeNames } from 'n8n-workflow'; +import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions'; +import type { AbstractEventPayload } from './AbstractEventPayload'; + +// -------------------------------------- +// EventMessage class for Node events +// -------------------------------------- +export interface EventPayloadAiNode extends AbstractEventPayload { + msg?: string; + executionId: string; + nodeName: string; + workflowId?: string; + workflowName: string; + nodeType?: string; +} + +export interface EventMessageAiNodeOptions extends AbstractEventMessageOptions { + eventName: EventNamesAiNodesType; + + payload?: EventPayloadAiNode | undefined; +} + +export class EventMessageAiNode extends AbstractEventMessage { + readonly __type = EventMessageTypeNames.aiNode; + + eventName: EventNamesAiNodesType; + + payload: EventPayloadAiNode; + + constructor(options: EventMessageAiNodeOptions) { + super(options); + if (options.payload) this.setPayload(options.payload); + if (options.anonymize) { + this.anonymize(); + } + } + + setPayload(payload: EventPayloadAiNode): this { + this.payload = payload; + return this; + } + + deserialize(data: JsonObject): this { + if (isEventMessageOptionsWithType(data, this.__type)) { + this.setOptionsOrDefault(data); + if (data.payload) this.setPayload(data.payload as EventPayloadAiNode); + } + return this; + } +} diff --git a/packages/cli/src/eventbus/EventMessageClasses/index.ts b/packages/cli/src/eventbus/EventMessageClasses/index.ts index c6a0f85bd9..51ab91fb01 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/index.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/index.ts @@ -1,7 +1,9 @@ +import type { EventMessageAiNode } from './EventMessageAiNode'; import type { EventMessageAudit } from './EventMessageAudit'; import type { EventMessageGeneric } from './EventMessageGeneric'; import type { EventMessageNode } from './EventMessageNode'; import type { EventMessageWorkflow } from './EventMessageWorkflow'; +import { eventNamesAiNodes, type EventNamesAiNodesType } from 'n8n-workflow'; export const eventNamesWorkflow = [ 'n8n.workflow.started', @@ -45,6 +47,7 @@ export type EventNamesTypes = | EventNamesWorkflowType | EventNamesNodeType | EventNamesGenericType + | EventNamesAiNodesType | 'n8n.destination.test'; export const eventNamesAll = [ @@ -52,13 +55,15 @@ export const eventNamesAll = [ ...eventNamesWorkflow, ...eventNamesNode, ...eventNamesGeneric, + ...eventNamesAiNodes, ]; export type EventMessageTypes = | EventMessageGeneric | EventMessageWorkflow | EventMessageAudit - | EventMessageNode; + | EventMessageNode + | EventMessageAiNode; export interface FailedEventSummary { lastNodeExecuted: string; diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 6c897cb38d..1b2fe14026 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -37,6 +37,10 @@ import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; import { ExecutionDataRecoveryService } from '../executionDataRecovery.service'; +import { + EventMessageAiNode, + type EventMessageAiNodeOptions, +} from '../EventMessageClasses/EventMessageAiNode'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -457,4 +461,8 @@ export class MessageEventBus extends EventEmitter { async sendNodeEvent(options: EventMessageNodeOptions) { await this.send(new EventMessageNode(options)); } + + async sendAiNodeEvent(options: EventMessageAiNodeOptions) { + await this.send(new EventMessageAiNode(options)); + } } diff --git a/packages/cli/test/unit/ExecutionMetadataService.test.ts b/packages/cli/test/unit/ExecutionMetadataService.test.ts new file mode 100644 index 0000000000..48b754f0ff --- /dev/null +++ b/packages/cli/test/unit/ExecutionMetadataService.test.ts @@ -0,0 +1,34 @@ +import { Container } from 'typedi'; +import { ExecutionMetadataRepository } from '@db/repositories/executionMetadata.repository'; +import { ExecutionMetadataService } from '@/services/executionMetadata.service'; +import { mockInstance } from '../shared/mocking'; + +describe('ExecutionMetadataService', () => { + const repository = mockInstance(ExecutionMetadataRepository); + + test('Execution metadata is saved in a batch', async () => { + const toSave = { + test1: 'value1', + test2: 'value2', + }; + const executionId = '1234'; + + await Container.get(ExecutionMetadataService).save(executionId, toSave); + + expect(repository.save).toHaveBeenCalledTimes(1); + expect(repository.save.mock.calls[0]).toEqual([ + [ + { + execution: { id: executionId }, + key: 'test1', + value: 'value1', + }, + { + execution: { id: executionId }, + key: 'test2', + value: 'value2', + }, + ], + ]); + }); +}); diff --git a/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts b/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts index c36c43455e..2984220637 100644 --- a/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts +++ b/packages/cli/test/unit/WorkflowExecuteAdditionalData.test.ts @@ -1,34 +1,41 @@ -import { Container } from 'typedi'; -import { ExecutionMetadataRepository } from '@db/repositories/executionMetadata.repository'; -import { ExecutionMetadataService } from '@/services/executionMetadata.service'; +import { VariablesService } from '@/environments/variables/variables.service.ee'; import { mockInstance } from '../shared/mocking'; +import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; +import { getBase } from '@/WorkflowExecuteAdditionalData'; +import Container from 'typedi'; +import { CredentialsHelper } from '@/CredentialsHelper'; +import { SecretsHelper } from '@/SecretsHelpers'; describe('WorkflowExecuteAdditionalData', () => { - const repository = mockInstance(ExecutionMetadataRepository); + const messageEventBus = mockInstance(MessageEventBus); + const variablesService = mockInstance(VariablesService); + variablesService.getAllCached.mockResolvedValue([]); + const credentialsHelper = mockInstance(CredentialsHelper); + const secretsHelper = mockInstance(SecretsHelper); + Container.set(MessageEventBus, messageEventBus); + Container.set(VariablesService, variablesService); + Container.set(CredentialsHelper, credentialsHelper); + Container.set(SecretsHelper, secretsHelper); - test('Execution metadata is saved in a batch', async () => { - const toSave = { - test1: 'value1', - test2: 'value2', + test('logAiEvent should call MessageEventBus', async () => { + const additionalData = await getBase('user-id'); + + const eventName = 'n8n.ai.memory.get.messages'; + const payload = { + msg: 'test message', + executionId: '123', + nodeName: 'n8n-memory', + workflowId: 'workflow-id', + workflowName: 'workflow-name', + nodeType: 'n8n-memory', }; - const executionId = '1234'; - await Container.get(ExecutionMetadataService).save(executionId, toSave); + await additionalData.logAiEvent(eventName, payload); - expect(repository.save).toHaveBeenCalledTimes(1); - expect(repository.save.mock.calls[0]).toEqual([ - [ - { - execution: { id: executionId }, - key: 'test1', - value: 'value1', - }, - { - execution: { id: executionId }, - key: 'test2', - value: 'value2', - }, - ], - ]); + expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledTimes(1); + expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledWith({ + eventName, + payload, + }); }); }); diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 99fe4bf280..8e5cb6a714 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -40,6 +40,7 @@ import type { CloseFunction, ConnectionTypes, ContextType, + EventNamesAiNodesType, FieldType, FileSystemHelperFunctions, FunctionsBase, @@ -3641,6 +3642,16 @@ export function getExecuteFunctions( constructExecutionMetaData, }, nodeHelpers: getNodeHelperFunctions(additionalData, workflow.id), + logAiEvent: async (eventName: EventNamesAiNodesType, msg: string) => { + return await additionalData.logAiEvent(eventName, { + executionId: additionalData.executionId ?? 'unsaved-execution', + nodeName: node.name, + workflowName: workflow.name ?? 'Unnamed workflow', + nodeType: node.type, + workflowId: workflow.id ?? 'unsaved-workflow', + msg, + }); + }, }; })(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions; } @@ -3781,6 +3792,16 @@ export function getExecuteSingleFunctions( getBinaryDataBuffer: async (propertyName, inputIndex = 0) => await getBinaryDataBuffer(inputData, itemIndex, propertyName, inputIndex), }, + logAiEvent: async (eventName: EventNamesAiNodesType, msg: string) => { + return await additionalData.logAiEvent(eventName, { + executionId: additionalData.executionId ?? 'unsaved-execution', + nodeName: node.name, + workflowName: workflow.name ?? 'Unnamed workflow', + nodeType: node.type, + workflowId: workflow.id ?? 'unsaved-workflow', + msg, + }); + }, }; })(workflow, runExecutionData, connectionInputData, inputData, node, itemIndex); } diff --git a/packages/editor-ui/src/plugins/i18n/locales/en.json b/packages/editor-ui/src/plugins/i18n/locales/en.json index 9a3849c1df..ded8ee96ff 100644 --- a/packages/editor-ui/src/plugins/i18n/locales/en.json +++ b/packages/editor-ui/src/plugins/i18n/locales/en.json @@ -1519,6 +1519,7 @@ "settings.log-streaming.tab.events.title": "Select groups or single events to subscribe to:", "settings.log-streaming.tab.events.anonymize": "Anonymize sensitive data", "settings.log-streaming.tab.events.anonymize.info": "Fields containing personal information like name or email are anonymized", + "settings.log-streaming.eventGroup.n8n.ai": "AI node logs", "settings.log-streaming.eventGroup.n8n.audit": "Audit Events", "settings.log-streaming.eventGroup.n8n.audit.info": "Will send events when user details or other audit data changes", "settings.log-streaming.eventGroup.n8n.workflow": "Workflow Events", diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 2ed33bdd2f..71129dca01 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -787,6 +787,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & { getInputSourceData(inputIndex?: number, inputName?: string): ISourceData; getExecutionCancelSignal(): AbortSignal | undefined; onExecutionCancellation(handler: () => unknown): void; + logAiEvent(eventName: EventNamesAiNodesType, msg?: string | undefined): Promise; }; // TODO: Create later own type only for Config-Nodes @@ -1945,6 +1946,24 @@ export interface IWorkflowExecuteHooks { sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise>; } +export const eventNamesAiNodes = [ + 'n8n.ai.memory.get.messages', + 'n8n.ai.memory.added.message', + 'n8n.ai.output.parser.get.instructions', + 'n8n.ai.output.parser.parsed', + 'n8n.ai.retriever.get.relevant.documents', + 'n8n.ai.embeddings.embedded.document', + 'n8n.ai.embeddings.embedded.query', + 'n8n.ai.document.processed', + 'n8n.ai.text.splitter.split', + 'n8n.ai.tool.called', + 'n8n.ai.vector.store.searched', + 'n8n.ai.llm.generated', + 'n8n.ai.vector.store.populated', +] as const; + +export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number]; + export interface IWorkflowExecuteAdditionalData { credentialsHelper: ICredentialsHelper; executeWorkflow: ( @@ -1978,6 +1997,17 @@ export interface IWorkflowExecuteAdditionalData { userId: string; variables: IDataObject; secretsHelpers: SecretsHelpersBase; + logAiEvent: ( + eventName: EventNamesAiNodesType, + payload: { + msg?: string; + executionId: string; + nodeName: string; + workflowId?: string; + workflowName: string; + nodeType?: string; + }, + ) => Promise; } export type WorkflowExecuteMode = diff --git a/packages/workflow/src/MessageEventBus.ts b/packages/workflow/src/MessageEventBus.ts index 2da8c7a20d..97ca4116b4 100644 --- a/packages/workflow/src/MessageEventBus.ts +++ b/packages/workflow/src/MessageEventBus.ts @@ -11,6 +11,7 @@ export const enum EventMessageTypeNames { confirm = '$$EventMessageConfirm', workflow = '$$EventMessageWorkflow', node = '$$EventMessageNode', + aiNode = '$$EventMessageAiNode', } export const enum MessageEventBusDestinationTypeNames {