mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 21:37:32 -08:00
feat(core): Add closeFunction support to Sub-Nodes (#7708)
Github issue / Community forum post (link here to close automatically): --------- Signed-off-by: Oleg Ivaniv <me@olegivaniv.com> Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
This commit is contained in:
parent
9ac8825a67
commit
bec0faed9e
|
@ -114,7 +114,12 @@ export class MemoryRedisChat implements INodeType {
|
||||||
outputKey: 'output',
|
outputKey: 'output',
|
||||||
});
|
});
|
||||||
|
|
||||||
|
async function closeFunction() {
|
||||||
|
void client.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
closeFunction,
|
||||||
response: logWrapper(memory, this),
|
response: logWrapper(memory, this),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ import pick from 'lodash/pick';
|
||||||
import { extension, lookup } from 'mime-types';
|
import { extension, lookup } from 'mime-types';
|
||||||
import type {
|
import type {
|
||||||
BinaryHelperFunctions,
|
BinaryHelperFunctions,
|
||||||
|
CloseFunction,
|
||||||
ConnectionTypes,
|
ConnectionTypes,
|
||||||
ContextType,
|
ContextType,
|
||||||
FieldType,
|
FieldType,
|
||||||
|
@ -3118,6 +3119,7 @@ export function getExecuteFunctions(
|
||||||
additionalData: IWorkflowExecuteAdditionalData,
|
additionalData: IWorkflowExecuteAdditionalData,
|
||||||
executeData: IExecuteData,
|
executeData: IExecuteData,
|
||||||
mode: WorkflowExecuteMode,
|
mode: WorkflowExecuteMode,
|
||||||
|
closeFunctions: CloseFunction[],
|
||||||
abortSignal?: AbortSignal,
|
abortSignal?: AbortSignal,
|
||||||
): IExecuteFunctions {
|
): IExecuteFunctions {
|
||||||
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
|
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
|
||||||
|
@ -3294,7 +3296,11 @@ export function getExecuteFunctions(
|
||||||
};
|
};
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await nodeType.supplyData.call(context, itemIndex);
|
const response = await nodeType.supplyData.call(context, itemIndex);
|
||||||
|
if (response.closeFunction) {
|
||||||
|
closeFunctions.push(response.closeFunction);
|
||||||
|
}
|
||||||
|
return response;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// Propagate errors from sub-nodes
|
// Propagate errors from sub-nodes
|
||||||
if (error.functionality === 'configuration-node') throw error;
|
if (error.functionality === 'configuration-node') throw error;
|
||||||
|
|
|
@ -32,6 +32,7 @@ import type {
|
||||||
IRunExecutionData,
|
IRunExecutionData,
|
||||||
IWorkflowExecuteAdditionalData,
|
IWorkflowExecuteAdditionalData,
|
||||||
WorkflowExecuteMode,
|
WorkflowExecuteMode,
|
||||||
|
CloseFunction,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
import {
|
import {
|
||||||
LoggerProxy as Logger,
|
LoggerProxy as Logger,
|
||||||
|
@ -1074,7 +1075,7 @@ export class WorkflowExecute {
|
||||||
|
|
||||||
const errorItems: INodeExecutionData[] = [];
|
const errorItems: INodeExecutionData[] = [];
|
||||||
const successItems: INodeExecutionData[] = [];
|
const successItems: INodeExecutionData[] = [];
|
||||||
|
const closeFunctions: CloseFunction[] = [];
|
||||||
// Create a WorkflowDataProxy instance that we can get the data of the
|
// Create a WorkflowDataProxy instance that we can get the data of the
|
||||||
// item which did error
|
// item which did error
|
||||||
const executeFunctions = NodeExecuteFunctions.getExecuteFunctions(
|
const executeFunctions = NodeExecuteFunctions.getExecuteFunctions(
|
||||||
|
@ -1087,6 +1088,7 @@ export class WorkflowExecute {
|
||||||
this.additionalData,
|
this.additionalData,
|
||||||
executionData,
|
executionData,
|
||||||
this.mode,
|
this.mode,
|
||||||
|
closeFunctions,
|
||||||
this.abortController.signal,
|
this.abortController.signal,
|
||||||
);
|
);
|
||||||
const dataProxy = executeFunctions.getWorkflowDataProxy(0);
|
const dataProxy = executeFunctions.getWorkflowDataProxy(0);
|
||||||
|
|
|
@ -377,6 +377,8 @@ export interface IConnections {
|
||||||
|
|
||||||
export type GenericValue = string | object | number | boolean | undefined | null;
|
export type GenericValue = string | object | number | boolean | undefined | null;
|
||||||
|
|
||||||
|
export type CloseFunction = () => Promise<void>;
|
||||||
|
|
||||||
export interface IDataObject {
|
export interface IDataObject {
|
||||||
[key: string]: GenericValue | IDataObject | GenericValue[] | IDataObject[];
|
[key: string]: GenericValue | IDataObject | GenericValue[] | IDataObject[];
|
||||||
}
|
}
|
||||||
|
@ -410,7 +412,7 @@ export interface IGetExecuteTriggerFunctions {
|
||||||
|
|
||||||
export interface IRunNodeResponse {
|
export interface IRunNodeResponse {
|
||||||
data: INodeExecutionData[][] | null | undefined;
|
data: INodeExecutionData[][] | null | undefined;
|
||||||
closeFunction?: () => Promise<void>;
|
closeFunction?: CloseFunction;
|
||||||
}
|
}
|
||||||
export interface IGetExecuteFunctions {
|
export interface IGetExecuteFunctions {
|
||||||
(
|
(
|
||||||
|
@ -423,6 +425,7 @@ export interface IGetExecuteFunctions {
|
||||||
additionalData: IWorkflowExecuteAdditionalData,
|
additionalData: IWorkflowExecuteAdditionalData,
|
||||||
executeData: IExecuteData,
|
executeData: IExecuteData,
|
||||||
mode: WorkflowExecuteMode,
|
mode: WorkflowExecuteMode,
|
||||||
|
closeFunctions: CloseFunction[],
|
||||||
abortSignal?: AbortSignal,
|
abortSignal?: AbortSignal,
|
||||||
): IExecuteFunctions;
|
): IExecuteFunctions;
|
||||||
}
|
}
|
||||||
|
@ -1289,13 +1292,13 @@ export type IParameterLabel = {
|
||||||
};
|
};
|
||||||
|
|
||||||
export interface IPollResponse {
|
export interface IPollResponse {
|
||||||
closeFunction?: () => Promise<void>;
|
closeFunction?: CloseFunction;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface ITriggerResponse {
|
export interface ITriggerResponse {
|
||||||
closeFunction?: () => Promise<void>;
|
closeFunction?: CloseFunction;
|
||||||
// To manually trigger the run
|
// To manually trigger the run
|
||||||
manualTriggerFunction?: () => Promise<void>;
|
manualTriggerFunction?: CloseFunction;
|
||||||
// Gets added automatically at manual workflow runs resolves with
|
// Gets added automatically at manual workflow runs resolves with
|
||||||
// the first emitted data
|
// the first emitted data
|
||||||
manualTriggerResponse?: Promise<INodeExecutionData[][]>;
|
manualTriggerResponse?: Promise<INodeExecutionData[][]>;
|
||||||
|
@ -1324,6 +1327,7 @@ export namespace MultiPartFormData {
|
||||||
export interface SupplyData {
|
export interface SupplyData {
|
||||||
metadata?: IDataObject;
|
metadata?: IDataObject;
|
||||||
response: unknown;
|
response: unknown;
|
||||||
|
closeFunction?: CloseFunction;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface INodeType {
|
export interface INodeType {
|
||||||
|
|
|
@ -37,6 +37,7 @@ import type {
|
||||||
NodeParameterValueType,
|
NodeParameterValueType,
|
||||||
PostReceiveAction,
|
PostReceiveAction,
|
||||||
JsonObject,
|
JsonObject,
|
||||||
|
CloseFunction,
|
||||||
} from './Interfaces';
|
} from './Interfaces';
|
||||||
|
|
||||||
import * as NodeHelpers from './NodeHelpers';
|
import * as NodeHelpers from './NodeHelpers';
|
||||||
|
@ -94,6 +95,7 @@ export class RoutingNode {
|
||||||
if (nodeType.description.credentials?.length) {
|
if (nodeType.description.credentials?.length) {
|
||||||
credentialType = nodeType.description.credentials[0].name;
|
credentialType = nodeType.description.credentials[0].name;
|
||||||
}
|
}
|
||||||
|
const closeFunctions: CloseFunction[] = [];
|
||||||
const executeFunctions = nodeExecuteFunctions.getExecuteFunctions(
|
const executeFunctions = nodeExecuteFunctions.getExecuteFunctions(
|
||||||
this.workflow,
|
this.workflow,
|
||||||
this.runExecutionData,
|
this.runExecutionData,
|
||||||
|
@ -104,6 +106,7 @@ export class RoutingNode {
|
||||||
this.additionalData,
|
this.additionalData,
|
||||||
executeData,
|
executeData,
|
||||||
this.mode,
|
this.mode,
|
||||||
|
closeFunctions,
|
||||||
abortSignal,
|
abortSignal,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ import type {
|
||||||
IRunNodeResponse,
|
IRunNodeResponse,
|
||||||
NodeParameterValueType,
|
NodeParameterValueType,
|
||||||
ConnectionTypes,
|
ConnectionTypes,
|
||||||
|
CloseFunction,
|
||||||
} from './Interfaces';
|
} from './Interfaces';
|
||||||
import { Node } from './Interfaces';
|
import { Node } from './Interfaces';
|
||||||
import type { IDeferredPromise } from './DeferredPromise';
|
import type { IDeferredPromise } from './DeferredPromise';
|
||||||
|
@ -1298,6 +1299,7 @@ export class Workflow {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nodeType.execute) {
|
if (nodeType.execute) {
|
||||||
|
const closeFunctions: CloseFunction[] = [];
|
||||||
const context = nodeExecuteFunctions.getExecuteFunctions(
|
const context = nodeExecuteFunctions.getExecuteFunctions(
|
||||||
this,
|
this,
|
||||||
runExecutionData,
|
runExecutionData,
|
||||||
|
@ -1308,12 +1310,31 @@ export class Workflow {
|
||||||
additionalData,
|
additionalData,
|
||||||
executionData,
|
executionData,
|
||||||
mode,
|
mode,
|
||||||
|
closeFunctions,
|
||||||
abortSignal,
|
abortSignal,
|
||||||
);
|
);
|
||||||
const data =
|
const data =
|
||||||
nodeType instanceof Node
|
nodeType instanceof Node
|
||||||
? await nodeType.execute(context)
|
? await nodeType.execute(context)
|
||||||
: await nodeType.execute.call(context);
|
: await nodeType.execute.call(context);
|
||||||
|
|
||||||
|
const closeFunctionsResults = await Promise.allSettled(
|
||||||
|
closeFunctions.map(async (fn) => fn()),
|
||||||
|
);
|
||||||
|
|
||||||
|
const closingErrors = closeFunctionsResults
|
||||||
|
.filter((result): result is PromiseRejectedResult => result.status === 'rejected')
|
||||||
|
.map((result) => result.reason);
|
||||||
|
|
||||||
|
if (closingErrors.length > 0) {
|
||||||
|
if (closingErrors[0] instanceof Error) throw closingErrors[0];
|
||||||
|
throw new ApplicationError("Error on execution node's close function(s)", {
|
||||||
|
extra: { nodeName: node.name },
|
||||||
|
tags: { nodeType: node.type },
|
||||||
|
cause: closingErrors,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
return { data };
|
return { data };
|
||||||
} else if (nodeType.poll) {
|
} else if (nodeType.poll) {
|
||||||
if (mode === 'manual') {
|
if (mode === 'manual') {
|
||||||
|
|
Loading…
Reference in a new issue