mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
extract out trigger context
This commit is contained in:
parent
645efce03d
commit
7b945afebd
|
@ -1,6 +1,12 @@
|
||||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||||
|
|
||||||
import { ActiveWorkflows, InstanceSettings, NodeExecuteFunctions, PollContext } from 'n8n-core';
|
import {
|
||||||
|
ActiveWorkflows,
|
||||||
|
InstanceSettings,
|
||||||
|
NodeExecuteFunctions,
|
||||||
|
PollContext,
|
||||||
|
TriggerContext,
|
||||||
|
} from 'n8n-core';
|
||||||
import type {
|
import type {
|
||||||
ExecutionError,
|
ExecutionError,
|
||||||
IDeferredPromise,
|
IDeferredPromise,
|
||||||
|
@ -325,18 +331,11 @@ export class ActiveWorkflowManager {
|
||||||
activation: WorkflowActivateMode,
|
activation: WorkflowActivateMode,
|
||||||
): IGetExecuteTriggerFunctions {
|
): IGetExecuteTriggerFunctions {
|
||||||
return (workflow: Workflow, node: INode) => {
|
return (workflow: Workflow, node: INode) => {
|
||||||
const returnFunctions = NodeExecuteFunctions.getExecuteTriggerFunctions(
|
const emit = (
|
||||||
workflow,
|
|
||||||
node,
|
|
||||||
additionalData,
|
|
||||||
mode,
|
|
||||||
activation,
|
|
||||||
);
|
|
||||||
returnFunctions.emit = (
|
|
||||||
data: INodeExecutionData[][],
|
data: INodeExecutionData[][],
|
||||||
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
|
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
|
||||||
donePromise?: IDeferredPromise<IRun | undefined>,
|
donePromise?: IDeferredPromise<IRun | undefined>,
|
||||||
): void => {
|
) => {
|
||||||
this.logger.debug(`Received trigger for workflow "${workflow.name}"`);
|
this.logger.debug(`Received trigger for workflow "${workflow.name}"`);
|
||||||
void this.workflowStaticDataService.saveStaticData(workflow);
|
void this.workflowStaticDataService.saveStaticData(workflow);
|
||||||
|
|
||||||
|
@ -360,7 +359,7 @@ export class ActiveWorkflowManager {
|
||||||
executePromise.catch((error: Error) => this.logger.error(error.message, { error }));
|
executePromise.catch((error: Error) => this.logger.error(error.message, { error }));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
returnFunctions.emitError = (error: Error): void => {
|
const emitError = (error: Error): void => {
|
||||||
this.logger.info(
|
this.logger.info(
|
||||||
`The trigger node "${node.name}" of workflow "${workflowData.name}" failed with the error: "${error.message}". Will try to reactivate.`,
|
`The trigger node "${node.name}" of workflow "${workflowData.name}" failed with the error: "${error.message}". Will try to reactivate.`,
|
||||||
{
|
{
|
||||||
|
@ -385,7 +384,7 @@ export class ActiveWorkflowManager {
|
||||||
|
|
||||||
this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity);
|
this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity);
|
||||||
};
|
};
|
||||||
return returnFunctions;
|
return new TriggerContext(workflow, node, additionalData, mode, activation, emit, emitError);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -168,6 +168,7 @@ import { ScheduledTaskManager } from './ScheduledTaskManager';
|
||||||
import { getSecretsProxy } from './Secrets';
|
import { getSecretsProxy } from './Secrets';
|
||||||
import { SSHClientsManager } from './SSHClientsManager';
|
import { SSHClientsManager } from './SSHClientsManager';
|
||||||
import { PollContext } from './node-execution-context';
|
import { PollContext } from './node-execution-context';
|
||||||
|
import { TriggerContext } from './node-execution-context/trigger-context';
|
||||||
|
|
||||||
axios.defaults.timeout = 300000;
|
axios.defaults.timeout = 300000;
|
||||||
// Prevent axios from adding x-form-www-urlencoded headers by default
|
// Prevent axios from adding x-form-www-urlencoded headers by default
|
||||||
|
@ -3198,10 +3199,7 @@ export function copyInputItems(items: INodeExecutionData[], properties: string[]
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/** @deprecated */
|
||||||
* Returns the execute functions the poll nodes have access to.
|
|
||||||
*/
|
|
||||||
// TODO: DELETE THIS
|
|
||||||
export function getExecutePollFunctions(
|
export function getExecutePollFunctions(
|
||||||
workflow: Workflow,
|
workflow: Workflow,
|
||||||
node: INode,
|
node: INode,
|
||||||
|
@ -3212,10 +3210,7 @@ export function getExecutePollFunctions(
|
||||||
return new PollContext(workflow, node, additionalData, mode, activation);
|
return new PollContext(workflow, node, additionalData, mode, activation);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/** @deprecated */
|
||||||
* Returns the execute functions the trigger nodes have access to.
|
|
||||||
*/
|
|
||||||
// TODO: Check if I can get rid of: additionalData, and so then maybe also at ActiveWorkflowManager.add
|
|
||||||
export function getExecuteTriggerFunctions(
|
export function getExecuteTriggerFunctions(
|
||||||
workflow: Workflow,
|
workflow: Workflow,
|
||||||
node: INode,
|
node: INode,
|
||||||
|
@ -3223,58 +3218,7 @@ export function getExecuteTriggerFunctions(
|
||||||
mode: WorkflowExecuteMode,
|
mode: WorkflowExecuteMode,
|
||||||
activation: WorkflowActivateMode,
|
activation: WorkflowActivateMode,
|
||||||
): ITriggerFunctions {
|
): ITriggerFunctions {
|
||||||
return ((workflow: Workflow, node: INode) => {
|
return new TriggerContext(workflow, node, additionalData, mode, activation);
|
||||||
return {
|
|
||||||
...getCommonWorkflowFunctions(workflow, node, additionalData),
|
|
||||||
emit: (): void => {
|
|
||||||
throw new ApplicationError(
|
|
||||||
'Overwrite NodeExecuteFunctions.getExecuteTriggerFunctions.emit function',
|
|
||||||
);
|
|
||||||
},
|
|
||||||
emitError: (): void => {
|
|
||||||
throw new ApplicationError(
|
|
||||||
'Overwrite NodeExecuteFunctions.getExecuteTriggerFunctions.emit function',
|
|
||||||
);
|
|
||||||
},
|
|
||||||
getMode: () => mode,
|
|
||||||
getActivationMode: () => activation,
|
|
||||||
getCredentials: async (type) =>
|
|
||||||
await getCredentials(workflow, node, type, additionalData, mode),
|
|
||||||
getNodeParameter: (
|
|
||||||
parameterName: string,
|
|
||||||
fallbackValue?: any,
|
|
||||||
options?: IGetNodeParameterOptions,
|
|
||||||
): NodeParameterValueType | object => {
|
|
||||||
const runExecutionData: IRunExecutionData | null = null;
|
|
||||||
const itemIndex = 0;
|
|
||||||
const runIndex = 0;
|
|
||||||
const connectionInputData: INodeExecutionData[] = [];
|
|
||||||
|
|
||||||
return getNodeParameter(
|
|
||||||
workflow,
|
|
||||||
runExecutionData,
|
|
||||||
runIndex,
|
|
||||||
connectionInputData,
|
|
||||||
node,
|
|
||||||
parameterName,
|
|
||||||
itemIndex,
|
|
||||||
mode,
|
|
||||||
getAdditionalKeys(additionalData, mode, runExecutionData),
|
|
||||||
undefined,
|
|
||||||
fallbackValue,
|
|
||||||
options,
|
|
||||||
);
|
|
||||||
},
|
|
||||||
helpers: {
|
|
||||||
createDeferredPromise,
|
|
||||||
...getSSHTunnelFunctions(),
|
|
||||||
...getRequestHelperFunctions(workflow, node, additionalData),
|
|
||||||
...getBinaryHelperFunctions(additionalData, workflow.id),
|
|
||||||
...getSchedulingFunctions(workflow),
|
|
||||||
returnJsonArray,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
})(workflow, node);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1 +1,2 @@
|
||||||
export { PollContext } from './poll-context';
|
export { PollContext } from './poll-context';
|
||||||
|
export { TriggerContext } from './trigger-context';
|
||||||
|
|
|
@ -0,0 +1,123 @@
|
||||||
|
import type {
|
||||||
|
ICredentialDataDecryptedObject,
|
||||||
|
IGetNodeParameterOptions,
|
||||||
|
INode,
|
||||||
|
INodeExecutionData,
|
||||||
|
IRunExecutionData,
|
||||||
|
ITriggerFunctions,
|
||||||
|
IWorkflowExecuteAdditionalData,
|
||||||
|
NodeParameterValueType,
|
||||||
|
Workflow,
|
||||||
|
WorkflowActivateMode,
|
||||||
|
WorkflowExecuteMode,
|
||||||
|
} from 'n8n-workflow';
|
||||||
|
import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
|
||||||
|
|
||||||
|
import {
|
||||||
|
getAdditionalKeys,
|
||||||
|
getCredentials,
|
||||||
|
getNodeParameter,
|
||||||
|
returnJsonArray,
|
||||||
|
} from '@/NodeExecuteFunctions';
|
||||||
|
import { BaseContext } from './base-contexts';
|
||||||
|
import { BinaryHelpers } from './helpers/binary-helpers';
|
||||||
|
import { RequestHelpers } from './helpers/request-helpers';
|
||||||
|
import { SchedulingHelpers } from './helpers/scheduling-helpers';
|
||||||
|
import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers';
|
||||||
|
|
||||||
|
const throwOnEmit = () => {
|
||||||
|
throw new ApplicationError('Overwrite TriggerContext.emit function');
|
||||||
|
};
|
||||||
|
|
||||||
|
const throwOnEmitError = () => {
|
||||||
|
throw new ApplicationError('Overwrite TriggerContext.emitError function');
|
||||||
|
};
|
||||||
|
|
||||||
|
export class TriggerContext extends BaseContext implements ITriggerFunctions {
|
||||||
|
readonly helpers: ITriggerFunctions['helpers'];
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
workflow: Workflow,
|
||||||
|
node: INode,
|
||||||
|
additionalData: IWorkflowExecuteAdditionalData,
|
||||||
|
private readonly mode: WorkflowExecuteMode,
|
||||||
|
private readonly activation: WorkflowActivateMode,
|
||||||
|
readonly emit: ITriggerFunctions['emit'] = throwOnEmit,
|
||||||
|
readonly emitError: ITriggerFunctions['emitError'] = throwOnEmitError,
|
||||||
|
) {
|
||||||
|
super(workflow, node, additionalData);
|
||||||
|
|
||||||
|
const binaryHelpers = new BinaryHelpers(workflow, additionalData);
|
||||||
|
const requestHelpers = new RequestHelpers(this, workflow, node, additionalData);
|
||||||
|
const schedulingHelepers = new SchedulingHelpers(workflow);
|
||||||
|
const sshTunnelHelpers = new SSHTunnelHelpers();
|
||||||
|
|
||||||
|
// TODO: This is almost identical to the helpers in PollContext.
|
||||||
|
this.helpers = {
|
||||||
|
createDeferredPromise: () => createDeferredPromise(),
|
||||||
|
returnJsonArray: (items) => returnJsonArray(items),
|
||||||
|
|
||||||
|
getBinaryPath: (id) => binaryHelpers.getBinaryPath(id),
|
||||||
|
getBinaryMetadata: (id) => binaryHelpers.getBinaryMetadata(id),
|
||||||
|
getBinaryStream: (id) => binaryHelpers.getBinaryStream(id),
|
||||||
|
binaryToBuffer: (body) => binaryHelpers.binaryToBuffer(body),
|
||||||
|
binaryToString: (body) => binaryHelpers.binaryToString(body),
|
||||||
|
prepareBinaryData: binaryHelpers.prepareBinaryData.bind(binaryHelpers),
|
||||||
|
setBinaryDataBuffer: binaryHelpers.setBinaryDataBuffer.bind(binaryHelpers),
|
||||||
|
copyBinaryFile: () => binaryHelpers.copyBinaryFile(),
|
||||||
|
|
||||||
|
httpRequest: requestHelpers.httpRequest.bind(requestHelpers),
|
||||||
|
httpRequestWithAuthentication:
|
||||||
|
requestHelpers.httpRequestWithAuthentication.bind(requestHelpers),
|
||||||
|
requestWithAuthenticationPaginated:
|
||||||
|
requestHelpers.requestWithAuthenticationPaginated.bind(requestHelpers),
|
||||||
|
request: requestHelpers.request.bind(requestHelpers),
|
||||||
|
requestWithAuthentication: requestHelpers.requestWithAuthentication.bind(requestHelpers),
|
||||||
|
requestOAuth1: requestHelpers.requestOAuth1.bind(requestHelpers),
|
||||||
|
requestOAuth2: requestHelpers.requestOAuth2.bind(requestHelpers),
|
||||||
|
|
||||||
|
registerCron: schedulingHelepers.registerCron.bind(schedulingHelepers),
|
||||||
|
|
||||||
|
getSSHClient: sshTunnelHelpers.getSSHClient.bind(sshTunnelHelpers),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: all the following are duplicated from PollContext. abstract these our into another base class
|
||||||
|
getMode() {
|
||||||
|
return this.mode;
|
||||||
|
}
|
||||||
|
|
||||||
|
getActivationMode() {
|
||||||
|
return this.activation;
|
||||||
|
}
|
||||||
|
|
||||||
|
async getCredentials<T extends object = ICredentialDataDecryptedObject>(type: string) {
|
||||||
|
return await getCredentials<T>(this.workflow, this.node, type, this.additionalData, this.mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
getNodeParameter(
|
||||||
|
parameterName: string,
|
||||||
|
fallbackValue?: any,
|
||||||
|
options?: IGetNodeParameterOptions,
|
||||||
|
): NodeParameterValueType | object {
|
||||||
|
const runExecutionData: IRunExecutionData | null = null;
|
||||||
|
const itemIndex = 0;
|
||||||
|
const runIndex = 0;
|
||||||
|
const connectionInputData: INodeExecutionData[] = [];
|
||||||
|
|
||||||
|
return getNodeParameter(
|
||||||
|
this.workflow,
|
||||||
|
runExecutionData,
|
||||||
|
runIndex,
|
||||||
|
connectionInputData,
|
||||||
|
this.node,
|
||||||
|
parameterName,
|
||||||
|
itemIndex,
|
||||||
|
this.mode,
|
||||||
|
getAdditionalKeys(this.additionalData, this.mode, runExecutionData),
|
||||||
|
undefined,
|
||||||
|
fallbackValue,
|
||||||
|
options,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue