feat(core): Add closeFunction support to Sub-Nodes (#7708)

Github issue / Community forum post (link here to close automatically):

---------

Signed-off-by: Oleg Ivaniv <me@olegivaniv.com>
Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
This commit is contained in:
Jan Oberhauser 2023-12-21 14:21:09 +01:00 committed by GitHub
parent 9ac8825a67
commit bec0faed9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 47 additions and 6 deletions

View file

@ -114,7 +114,12 @@ export class MemoryRedisChat implements INodeType {
outputKey: 'output', outputKey: 'output',
}); });
async function closeFunction() {
void client.disconnect();
}
return { return {
closeFunction,
response: logWrapper(memory, this), response: logWrapper(memory, this),
}; };
} }

View file

@ -37,6 +37,7 @@ import pick from 'lodash/pick';
import { extension, lookup } from 'mime-types'; import { extension, lookup } from 'mime-types';
import type { import type {
BinaryHelperFunctions, BinaryHelperFunctions,
CloseFunction,
ConnectionTypes, ConnectionTypes,
ContextType, ContextType,
FieldType, FieldType,
@ -3118,6 +3119,7 @@ export function getExecuteFunctions(
additionalData: IWorkflowExecuteAdditionalData, additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData, executeData: IExecuteData,
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
closeFunctions: CloseFunction[],
abortSignal?: AbortSignal, abortSignal?: AbortSignal,
): IExecuteFunctions { ): IExecuteFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => { return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
@ -3294,7 +3296,11 @@ export function getExecuteFunctions(
}; };
try { try {
return await nodeType.supplyData.call(context, itemIndex); const response = await nodeType.supplyData.call(context, itemIndex);
if (response.closeFunction) {
closeFunctions.push(response.closeFunction);
}
return response;
} catch (error) { } catch (error) {
// Propagate errors from sub-nodes // Propagate errors from sub-nodes
if (error.functionality === 'configuration-node') throw error; if (error.functionality === 'configuration-node') throw error;

View file

@ -32,6 +32,7 @@ import type {
IRunExecutionData, IRunExecutionData,
IWorkflowExecuteAdditionalData, IWorkflowExecuteAdditionalData,
WorkflowExecuteMode, WorkflowExecuteMode,
CloseFunction,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import {
LoggerProxy as Logger, LoggerProxy as Logger,
@ -1074,7 +1075,7 @@ export class WorkflowExecute {
const errorItems: INodeExecutionData[] = []; const errorItems: INodeExecutionData[] = [];
const successItems: INodeExecutionData[] = []; const successItems: INodeExecutionData[] = [];
const closeFunctions: CloseFunction[] = [];
// Create a WorkflowDataProxy instance that we can get the data of the // Create a WorkflowDataProxy instance that we can get the data of the
// item which did error // item which did error
const executeFunctions = NodeExecuteFunctions.getExecuteFunctions( const executeFunctions = NodeExecuteFunctions.getExecuteFunctions(
@ -1087,6 +1088,7 @@ export class WorkflowExecute {
this.additionalData, this.additionalData,
executionData, executionData,
this.mode, this.mode,
closeFunctions,
this.abortController.signal, this.abortController.signal,
); );
const dataProxy = executeFunctions.getWorkflowDataProxy(0); const dataProxy = executeFunctions.getWorkflowDataProxy(0);

View file

@ -377,6 +377,8 @@ export interface IConnections {
export type GenericValue = string | object | number | boolean | undefined | null; export type GenericValue = string | object | number | boolean | undefined | null;
export type CloseFunction = () => Promise<void>;
export interface IDataObject { export interface IDataObject {
[key: string]: GenericValue | IDataObject | GenericValue[] | IDataObject[]; [key: string]: GenericValue | IDataObject | GenericValue[] | IDataObject[];
} }
@ -410,7 +412,7 @@ export interface IGetExecuteTriggerFunctions {
export interface IRunNodeResponse { export interface IRunNodeResponse {
data: INodeExecutionData[][] | null | undefined; data: INodeExecutionData[][] | null | undefined;
closeFunction?: () => Promise<void>; closeFunction?: CloseFunction;
} }
export interface IGetExecuteFunctions { export interface IGetExecuteFunctions {
( (
@ -423,6 +425,7 @@ export interface IGetExecuteFunctions {
additionalData: IWorkflowExecuteAdditionalData, additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData, executeData: IExecuteData,
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
closeFunctions: CloseFunction[],
abortSignal?: AbortSignal, abortSignal?: AbortSignal,
): IExecuteFunctions; ): IExecuteFunctions;
} }
@ -1289,13 +1292,13 @@ export type IParameterLabel = {
}; };
export interface IPollResponse { export interface IPollResponse {
closeFunction?: () => Promise<void>; closeFunction?: CloseFunction;
} }
export interface ITriggerResponse { export interface ITriggerResponse {
closeFunction?: () => Promise<void>; closeFunction?: CloseFunction;
// To manually trigger the run // To manually trigger the run
manualTriggerFunction?: () => Promise<void>; manualTriggerFunction?: CloseFunction;
// Gets added automatically at manual workflow runs resolves with // Gets added automatically at manual workflow runs resolves with
// the first emitted data // the first emitted data
manualTriggerResponse?: Promise<INodeExecutionData[][]>; manualTriggerResponse?: Promise<INodeExecutionData[][]>;
@ -1324,6 +1327,7 @@ export namespace MultiPartFormData {
export interface SupplyData { export interface SupplyData {
metadata?: IDataObject; metadata?: IDataObject;
response: unknown; response: unknown;
closeFunction?: CloseFunction;
} }
export interface INodeType { export interface INodeType {

View file

@ -37,6 +37,7 @@ import type {
NodeParameterValueType, NodeParameterValueType,
PostReceiveAction, PostReceiveAction,
JsonObject, JsonObject,
CloseFunction,
} from './Interfaces'; } from './Interfaces';
import * as NodeHelpers from './NodeHelpers'; import * as NodeHelpers from './NodeHelpers';
@ -94,6 +95,7 @@ export class RoutingNode {
if (nodeType.description.credentials?.length) { if (nodeType.description.credentials?.length) {
credentialType = nodeType.description.credentials[0].name; credentialType = nodeType.description.credentials[0].name;
} }
const closeFunctions: CloseFunction[] = [];
const executeFunctions = nodeExecuteFunctions.getExecuteFunctions( const executeFunctions = nodeExecuteFunctions.getExecuteFunctions(
this.workflow, this.workflow,
this.runExecutionData, this.runExecutionData,
@ -104,6 +106,7 @@ export class RoutingNode {
this.additionalData, this.additionalData,
executeData, executeData,
this.mode, this.mode,
closeFunctions,
abortSignal, abortSignal,
); );

View file

@ -42,6 +42,7 @@ import type {
IRunNodeResponse, IRunNodeResponse,
NodeParameterValueType, NodeParameterValueType,
ConnectionTypes, ConnectionTypes,
CloseFunction,
} from './Interfaces'; } from './Interfaces';
import { Node } from './Interfaces'; import { Node } from './Interfaces';
import type { IDeferredPromise } from './DeferredPromise'; import type { IDeferredPromise } from './DeferredPromise';
@ -1298,6 +1299,7 @@ export class Workflow {
} }
if (nodeType.execute) { if (nodeType.execute) {
const closeFunctions: CloseFunction[] = [];
const context = nodeExecuteFunctions.getExecuteFunctions( const context = nodeExecuteFunctions.getExecuteFunctions(
this, this,
runExecutionData, runExecutionData,
@ -1308,12 +1310,31 @@ export class Workflow {
additionalData, additionalData,
executionData, executionData,
mode, mode,
closeFunctions,
abortSignal, abortSignal,
); );
const data = const data =
nodeType instanceof Node nodeType instanceof Node
? await nodeType.execute(context) ? await nodeType.execute(context)
: await nodeType.execute.call(context); : await nodeType.execute.call(context);
const closeFunctionsResults = await Promise.allSettled(
closeFunctions.map(async (fn) => fn()),
);
const closingErrors = closeFunctionsResults
.filter((result): result is PromiseRejectedResult => result.status === 'rejected')
.map((result) => result.reason);
if (closingErrors.length > 0) {
if (closingErrors[0] instanceof Error) throw closingErrors[0];
throw new ApplicationError("Error on execution node's close function(s)", {
extra: { nodeName: node.name },
tags: { nodeType: node.type },
cause: closingErrors,
});
}
return { data }; return { data };
} else if (nodeType.poll) { } else if (nodeType.poll) {
if (mode === 'manual') { if (mode === 'manual') {