From bec0faed9e51fe6ea20ab3b07b4dfa849b28516b Mon Sep 17 00:00:00 2001 From: Jan Oberhauser Date: Thu, 21 Dec 2023 14:21:09 +0100 Subject: [PATCH] feat(core): Add closeFunction support to Sub-Nodes (#7708) Github issue / Community forum post (link here to close automatically): --------- Signed-off-by: Oleg Ivaniv Co-authored-by: Oleg Ivaniv --- .../MemoryRedisChat/MemoryRedisChat.node.ts | 5 +++++ packages/core/src/NodeExecuteFunctions.ts | 8 ++++++- packages/core/src/WorkflowExecute.ts | 4 +++- packages/workflow/src/Interfaces.ts | 12 +++++++---- packages/workflow/src/RoutingNode.ts | 3 +++ packages/workflow/src/Workflow.ts | 21 +++++++++++++++++++ 6 files changed, 47 insertions(+), 6 deletions(-) diff --git a/packages/@n8n/nodes-langchain/nodes/memory/MemoryRedisChat/MemoryRedisChat.node.ts b/packages/@n8n/nodes-langchain/nodes/memory/MemoryRedisChat/MemoryRedisChat.node.ts index 10402a2893..538ce06f38 100644 --- a/packages/@n8n/nodes-langchain/nodes/memory/MemoryRedisChat/MemoryRedisChat.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/memory/MemoryRedisChat/MemoryRedisChat.node.ts @@ -114,7 +114,12 @@ export class MemoryRedisChat implements INodeType { outputKey: 'output', }); + async function closeFunction() { + void client.disconnect(); + } + return { + closeFunction, response: logWrapper(memory, this), }; } diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 55cfdd2a1a..df6f786582 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -37,6 +37,7 @@ import pick from 'lodash/pick'; import { extension, lookup } from 'mime-types'; import type { BinaryHelperFunctions, + CloseFunction, ConnectionTypes, ContextType, FieldType, @@ -3118,6 +3119,7 @@ export function getExecuteFunctions( additionalData: IWorkflowExecuteAdditionalData, executeData: IExecuteData, mode: WorkflowExecuteMode, + closeFunctions: CloseFunction[], abortSignal?: AbortSignal, ): IExecuteFunctions { return ((workflow, runExecutionData, connectionInputData, inputData, node) => { @@ -3294,7 +3296,11 @@ export function getExecuteFunctions( }; try { - return await nodeType.supplyData.call(context, itemIndex); + const response = await nodeType.supplyData.call(context, itemIndex); + if (response.closeFunction) { + closeFunctions.push(response.closeFunction); + } + return response; } catch (error) { // Propagate errors from sub-nodes if (error.functionality === 'configuration-node') throw error; diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index d4f733cd0d..781aea9fdb 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -32,6 +32,7 @@ import type { IRunExecutionData, IWorkflowExecuteAdditionalData, WorkflowExecuteMode, + CloseFunction, } from 'n8n-workflow'; import { LoggerProxy as Logger, @@ -1074,7 +1075,7 @@ export class WorkflowExecute { const errorItems: INodeExecutionData[] = []; const successItems: INodeExecutionData[] = []; - + const closeFunctions: CloseFunction[] = []; // Create a WorkflowDataProxy instance that we can get the data of the // item which did error const executeFunctions = NodeExecuteFunctions.getExecuteFunctions( @@ -1087,6 +1088,7 @@ export class WorkflowExecute { this.additionalData, executionData, this.mode, + closeFunctions, this.abortController.signal, ); const dataProxy = executeFunctions.getWorkflowDataProxy(0); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 4b6fded659..210741c44b 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -377,6 +377,8 @@ export interface IConnections { export type GenericValue = string | object | number | boolean | undefined | null; +export type CloseFunction = () => Promise; + export interface IDataObject { [key: string]: GenericValue | IDataObject | GenericValue[] | IDataObject[]; } @@ -410,7 +412,7 @@ export interface IGetExecuteTriggerFunctions { export interface IRunNodeResponse { data: INodeExecutionData[][] | null | undefined; - closeFunction?: () => Promise; + closeFunction?: CloseFunction; } export interface IGetExecuteFunctions { ( @@ -423,6 +425,7 @@ export interface IGetExecuteFunctions { additionalData: IWorkflowExecuteAdditionalData, executeData: IExecuteData, mode: WorkflowExecuteMode, + closeFunctions: CloseFunction[], abortSignal?: AbortSignal, ): IExecuteFunctions; } @@ -1289,13 +1292,13 @@ export type IParameterLabel = { }; export interface IPollResponse { - closeFunction?: () => Promise; + closeFunction?: CloseFunction; } export interface ITriggerResponse { - closeFunction?: () => Promise; + closeFunction?: CloseFunction; // To manually trigger the run - manualTriggerFunction?: () => Promise; + manualTriggerFunction?: CloseFunction; // Gets added automatically at manual workflow runs resolves with // the first emitted data manualTriggerResponse?: Promise; @@ -1324,6 +1327,7 @@ export namespace MultiPartFormData { export interface SupplyData { metadata?: IDataObject; response: unknown; + closeFunction?: CloseFunction; } export interface INodeType { diff --git a/packages/workflow/src/RoutingNode.ts b/packages/workflow/src/RoutingNode.ts index fcd8fea8c3..f33382bb39 100644 --- a/packages/workflow/src/RoutingNode.ts +++ b/packages/workflow/src/RoutingNode.ts @@ -37,6 +37,7 @@ import type { NodeParameterValueType, PostReceiveAction, JsonObject, + CloseFunction, } from './Interfaces'; import * as NodeHelpers from './NodeHelpers'; @@ -94,6 +95,7 @@ export class RoutingNode { if (nodeType.description.credentials?.length) { credentialType = nodeType.description.credentials[0].name; } + const closeFunctions: CloseFunction[] = []; const executeFunctions = nodeExecuteFunctions.getExecuteFunctions( this.workflow, this.runExecutionData, @@ -104,6 +106,7 @@ export class RoutingNode { this.additionalData, executeData, this.mode, + closeFunctions, abortSignal, ); diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index 931dd668b4..9eda896bea 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -42,6 +42,7 @@ import type { IRunNodeResponse, NodeParameterValueType, ConnectionTypes, + CloseFunction, } from './Interfaces'; import { Node } from './Interfaces'; import type { IDeferredPromise } from './DeferredPromise'; @@ -1298,6 +1299,7 @@ export class Workflow { } if (nodeType.execute) { + const closeFunctions: CloseFunction[] = []; const context = nodeExecuteFunctions.getExecuteFunctions( this, runExecutionData, @@ -1308,12 +1310,31 @@ export class Workflow { additionalData, executionData, mode, + closeFunctions, abortSignal, ); const data = nodeType instanceof Node ? await nodeType.execute(context) : await nodeType.execute.call(context); + + const closeFunctionsResults = await Promise.allSettled( + closeFunctions.map(async (fn) => fn()), + ); + + const closingErrors = closeFunctionsResults + .filter((result): result is PromiseRejectedResult => result.status === 'rejected') + .map((result) => result.reason); + + if (closingErrors.length > 0) { + if (closingErrors[0] instanceof Error) throw closingErrors[0]; + throw new ApplicationError("Error on execution node's close function(s)", { + extra: { nodeName: node.name }, + tags: { nodeType: node.type }, + cause: closingErrors, + }); + } + return { data }; } else if (nodeType.poll) { if (mode === 'manual') {