extract out polling context

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-10-25 17:37:20 +02:00
parent 351134f786
commit 645efce03d
No known key found for this signature in database
22 changed files with 845 additions and 550 deletions

View file

@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { ActiveWorkflows, InstanceSettings, NodeExecuteFunctions } from 'n8n-core'; import { ActiveWorkflows, InstanceSettings, NodeExecuteFunctions, PollContext } from 'n8n-core';
import type { import type {
ExecutionError, ExecutionError,
IDeferredPromise, IDeferredPromise,
@ -274,18 +274,11 @@ export class ActiveWorkflowManager {
activation: WorkflowActivateMode, activation: WorkflowActivateMode,
): IGetExecutePollFunctions { ): IGetExecutePollFunctions {
return (workflow: Workflow, node: INode) => { return (workflow: Workflow, node: INode) => {
const returnFunctions = NodeExecuteFunctions.getExecutePollFunctions( const __emit = (
workflow,
node,
additionalData,
mode,
activation,
);
returnFunctions.__emit = (
data: INodeExecutionData[][], data: INodeExecutionData[][],
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>, responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
donePromise?: IDeferredPromise<IRun | undefined>, donePromise?: IDeferredPromise<IRun | undefined>,
): void => { ) => {
this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`); this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`);
void this.workflowStaticDataService.saveStaticData(workflow); void this.workflowStaticDataService.saveStaticData(workflow);
const executePromise = this.workflowExecutionService.runWorkflow( const executePromise = this.workflowExecutionService.runWorkflow(
@ -309,14 +302,15 @@ export class ActiveWorkflowManager {
} }
}; };
returnFunctions.__emitError = (error: ExecutionError): void => { const __emitError = (error: ExecutionError) => {
void this.executionService void this.executionService
.createErrorExecution(error, node, workflowData, workflow, mode) .createErrorExecution(error, node, workflowData, workflow, mode)
.then(() => { .then(() => {
this.executeErrorWorkflow(error, workflowData, mode); this.executeErrorWorkflow(error, workflowData, mode);
}); });
}; };
return returnFunctions;
return new PollContext(workflow, node, additionalData, mode, activation, __emit, __emitError);
}; };
} }

View file

@ -13,13 +13,13 @@
"scripts": { "scripts": {
"clean": "rimraf dist .turbo", "clean": "rimraf dist .turbo",
"typecheck": "tsc --noEmit", "typecheck": "tsc --noEmit",
"build": "tsc -p tsconfig.build.json", "build": "tsc -p tsconfig.build.json && tsc-alias -p tsconfig.build.json",
"dev": "pnpm watch", "dev": "pnpm watch",
"format": "biome format --write .", "format": "biome format --write .",
"format:check": "biome ci .", "format:check": "biome ci .",
"lint": "eslint . --quiet", "lint": "eslint . --quiet",
"lintfix": "eslint . --fix", "lintfix": "eslint . --fix",
"watch": "tsc -p tsconfig.build.json --watch", "watch": "tsc-watch -p tsconfig.build.json --onCompilationComplete \"tsc-alias -p tsconfig.build.json\"",
"test": "jest" "test": "jest"
}, },
"files": [ "files": [

View file

@ -127,9 +127,7 @@ import {
isResourceMapperValue, isResourceMapperValue,
validateFieldType, validateFieldType,
ExecutionBaseError, ExecutionBaseError,
jsonParse,
ApplicationError, ApplicationError,
sleep,
} from 'n8n-workflow'; } from 'n8n-workflow';
import type { Token } from 'oauth-1.0a'; import type { Token } from 'oauth-1.0a';
import clientOAuth1 from 'oauth-1.0a'; import clientOAuth1 from 'oauth-1.0a';
@ -169,6 +167,7 @@ import type { ExtendedValidationResult, IResponseError } from './Interfaces';
import { ScheduledTaskManager } from './ScheduledTaskManager'; import { ScheduledTaskManager } from './ScheduledTaskManager';
import { getSecretsProxy } from './Secrets'; import { getSecretsProxy } from './Secrets';
import { SSHClientsManager } from './SSHClientsManager'; import { SSHClientsManager } from './SSHClientsManager';
import { PollContext } from './node-execution-context';
axios.defaults.timeout = 300000; axios.defaults.timeout = 300000;
// Prevent axios from adding x-form-www-urlencoded headers by default // Prevent axios from adding x-form-www-urlencoded headers by default
@ -214,7 +213,7 @@ const createFormDataObject = (data: Record<string, unknown>) => {
return formData; return formData;
}; };
const validateUrl = (url?: string): boolean => { export const validateUrl = (url?: string): boolean => {
if (!url) return false; if (!url) return false;
try { try {
@ -775,7 +774,8 @@ export function parseIncomingMessage(message: IncomingMessage) {
} }
} }
async function binaryToString(body: Buffer | Readable, encoding?: BufferEncoding) { // TODO: move to BinaryHelpers
export async function binaryToString(body: Buffer | Readable, encoding?: BufferEncoding) {
const buffer = await binaryToBuffer(body); const buffer = await binaryToBuffer(body);
if (!encoding && body instanceof IncomingMessage) { if (!encoding && body instanceof IncomingMessage) {
parseIncomingMessage(body); parseIncomingMessage(body);
@ -1009,7 +1009,7 @@ export const removeEmptyBody = (requestOptions: IHttpRequestOptions | IRequestOp
} }
}; };
async function httpRequest( export async function httpRequest(
requestOptions: IHttpRequestOptions, requestOptions: IHttpRequestOptions,
): Promise<IN8nHttpFullResponse | IN8nHttpResponse> { ): Promise<IN8nHttpFullResponse | IN8nHttpResponse> {
removeEmptyBody(requestOptions); removeEmptyBody(requestOptions);
@ -1053,21 +1053,21 @@ async function httpRequest(
} }
export function getBinaryPath(binaryDataId: string): string { export function getBinaryPath(binaryDataId: string): string {
return Container.get(BinaryDataService).getPath(binaryDataId); throw new Error('Not implemented');
} }
/** /**
* Returns binary file metadata * Returns binary file metadata
*/ */
export async function getBinaryMetadata(binaryDataId: string): Promise<BinaryData.Metadata> { export async function getBinaryMetadata(binaryDataId: string): Promise<BinaryData.Metadata> {
return await Container.get(BinaryDataService).getMetadata(binaryDataId); throw new Error('Not implemented');
} }
/** /**
* Returns binary file stream for piping * Returns binary file stream for piping
*/ */
export async function getBinaryStream(binaryDataId: string, chunkSize?: number): Promise<Readable> { export async function getBinaryStream(binaryDataId: string, chunkSize?: number): Promise<Readable> {
return await Container.get(BinaryDataService).getAsStream(binaryDataId, chunkSize); throw new Error('Not implemented');
} }
export function assertBinaryData( export function assertBinaryData(
@ -1132,12 +1132,7 @@ export async function setBinaryDataBuffer(
workflowId: string, workflowId: string,
executionId: string, executionId: string,
): Promise<IBinaryData> { ): Promise<IBinaryData> {
return await Container.get(BinaryDataService).store( throw new Error('Not implemented');
workflowId,
executionId,
bufferOrStream,
binaryData,
);
} }
export async function copyBinaryFile( export async function copyBinaryFile(
@ -1204,92 +1199,14 @@ export async function copyBinaryFile(
* base64 and adds metadata. * base64 and adds metadata.
*/ */
// eslint-disable-next-line complexity // eslint-disable-next-line complexity
async function prepareBinaryData( export async function prepareBinaryData(
binaryData: Buffer | Readable, binaryData: Buffer | Readable,
executionId: string, executionId: string,
workflowId: string, workflowId: string,
filePath?: string, filePath?: string,
mimeType?: string, mimeType?: string,
): Promise<IBinaryData> { ): Promise<IBinaryData> {
let fileExtension: string | undefined; throw new Error('Not implemented');
if (binaryData instanceof IncomingMessage) {
if (!filePath) {
try {
const { responseUrl } = binaryData;
filePath =
binaryData.contentDisposition?.filename ??
((responseUrl && new URL(responseUrl).pathname) ?? binaryData.req?.path)?.slice(1);
} catch {}
}
if (!mimeType) {
mimeType = binaryData.contentType;
}
}
if (!mimeType) {
// If no mime type is given figure it out
if (filePath) {
// Use file path to guess mime type
const mimeTypeLookup = lookup(filePath);
if (mimeTypeLookup) {
mimeType = mimeTypeLookup;
}
}
if (!mimeType) {
if (Buffer.isBuffer(binaryData)) {
// Use buffer to guess mime type
const fileTypeData = await FileType.fromBuffer(binaryData);
if (fileTypeData) {
mimeType = fileTypeData.mime;
fileExtension = fileTypeData.ext;
}
} else if (binaryData instanceof IncomingMessage) {
mimeType = binaryData.headers['content-type'];
} else {
// TODO: detect filetype from other kind of streams
}
}
}
if (!fileExtension && mimeType) {
fileExtension = extension(mimeType) || undefined;
}
if (!mimeType) {
// Fall back to text
mimeType = 'text/plain';
}
const returnData: IBinaryData = {
mimeType,
fileType: fileTypeFromMimeType(mimeType),
fileExtension,
data: '',
};
if (filePath) {
if (filePath.includes('?')) {
// Remove maybe present query parameters
filePath = filePath.split('?').shift();
}
const filePathParts = path.parse(filePath as string);
if (filePathParts.dir !== '') {
returnData.directory = filePathParts.dir;
}
returnData.fileName = filePathParts.base;
// Remove the dot
const fileExtension = filePathParts.ext.slice(1);
if (fileExtension) {
returnData.fileExtension = fileExtension;
}
}
return await setBinaryDataBuffer(returnData, binaryData, workflowId, executionId);
} }
export async function checkProcessedAndRecord( export async function checkProcessedAndRecord(
@ -1347,6 +1264,7 @@ export async function clearAllProcessedItems(
options, options,
); );
} }
export async function getProcessedDataCount( export async function getProcessedDataCount(
scope: DeduplicationScope, scope: DeduplicationScope,
contextData: ICheckProcessedContextData, contextData: ICheckProcessedContextData,
@ -1358,7 +1276,8 @@ export async function getProcessedDataCount(
options, options,
); );
} }
function applyPaginationRequestData(
export function applyPaginationRequestData(
requestData: IRequestOptions, requestData: IRequestOptions,
paginationRequestData: PaginationOptions['request'], paginationRequestData: PaginationOptions['request'],
): IRequestOptions { ): IRequestOptions {
@ -1385,7 +1304,7 @@ function applyPaginationRequestData(
* *
*/ */
export async function requestOAuth2( export async function requestOAuth2(
this: IAllExecuteFunctions, context: IAllExecuteFunctions,
credentialsType: string, credentialsType: string,
requestOptions: IHttpRequestOptions | IRequestOptions, requestOptions: IHttpRequestOptions | IRequestOptions,
node: INode, node: INode,
@ -1395,7 +1314,7 @@ export async function requestOAuth2(
) { ) {
removeEmptyBody(requestOptions); removeEmptyBody(requestOptions);
const credentials = (await this.getCredentials( const credentials = (await context.getCredentials(
credentialsType, credentialsType,
)) as unknown as OAuth2CredentialData; )) as unknown as OAuth2CredentialData;
@ -1471,7 +1390,7 @@ export async function requestOAuth2(
}); });
} }
if (isN8nRequest) { if (isN8nRequest) {
return await this.helpers.httpRequest(newRequestOptions).catch(async (error: AxiosError) => { return await context.helpers.httpRequest(newRequestOptions).catch(async (error: AxiosError) => {
if (error.response?.status === 401) { if (error.response?.status === 401) {
Logger.debug( Logger.debug(
`OAuth2 token for "${credentialsType}" used by node "${node.name}" expired. Should revalidate.`, `OAuth2 token for "${credentialsType}" used by node "${node.name}" expired. Should revalidate.`,
@ -1528,7 +1447,7 @@ export async function requestOAuth2(
}); });
} }
return await this.helpers.httpRequest(refreshedRequestOption); return await context.helpers.httpRequest(refreshedRequestOption);
} }
throw error; throw error;
}); });
@ -1538,7 +1457,7 @@ export async function requestOAuth2(
? 401 ? 401
: oAuth2Options?.tokenExpiredStatusCode; : oAuth2Options?.tokenExpiredStatusCode;
return await this.helpers return await context.helpers
.request(newRequestOptions as IRequestOptions) .request(newRequestOptions as IRequestOptions)
.then((response) => { .then((response) => {
const requestOptions = newRequestOptions as any; const requestOptions = newRequestOptions as any;
@ -1615,7 +1534,7 @@ export async function requestOAuth2(
}); });
} }
return await this.helpers.request(newRequestOptions as IRequestOptions); return await context.helpers.request(newRequestOptions as IRequestOptions);
} }
// Unknown error so simply throw it // Unknown error so simply throw it
@ -1627,14 +1546,14 @@ export async function requestOAuth2(
* Makes a request using OAuth1 data for authentication * Makes a request using OAuth1 data for authentication
*/ */
export async function requestOAuth1( export async function requestOAuth1(
this: IAllExecuteFunctions, context: IAllExecuteFunctions,
credentialsType: string, credentialsType: string,
requestOptions: IHttpRequestOptions | IRequestOptions, requestOptions: IHttpRequestOptions | IRequestOptions,
isN8nRequest = false, isN8nRequest = false,
) { ) {
removeEmptyBody(requestOptions); removeEmptyBody(requestOptions);
const credentials = await this.getCredentials(credentialsType); const credentials = await context.getCredentials(credentialsType);
if (credentials === undefined) { if (credentials === undefined) {
throw new ApplicationError('No credentials were returned'); throw new ApplicationError('No credentials were returned');
@ -1687,10 +1606,10 @@ export async function requestOAuth1(
oauth.authorize(requestOptions as unknown as clientOAuth1.RequestOptions, token), oauth.authorize(requestOptions as unknown as clientOAuth1.RequestOptions, token),
) as unknown as Record<string, string>; ) as unknown as Record<string, string>;
if (isN8nRequest) { if (isN8nRequest) {
return await this.helpers.httpRequest(requestOptions as IHttpRequestOptions); return await context.helpers.httpRequest(requestOptions as IHttpRequestOptions);
} }
return await this.helpers return await context.helpers
.request(requestOptions as IRequestOptions) .request(requestOptions as IRequestOptions)
.catch(async (error: IResponseError) => { .catch(async (error: IResponseError) => {
// Unknown error so simply throw it // Unknown error so simply throw it
@ -1699,7 +1618,7 @@ export async function requestOAuth1(
} }
export async function httpRequestWithAuthentication( export async function httpRequestWithAuthentication(
this: IAllExecuteFunctions, context: IAllExecuteFunctions,
credentialsType: string, credentialsType: string,
requestOptions: IHttpRequestOptions, requestOptions: IHttpRequestOptions,
workflow: Workflow, workflow: Workflow,
@ -1714,11 +1633,11 @@ export async function httpRequestWithAuthentication(
const parentTypes = additionalData.credentialsHelper.getParentTypes(credentialsType); const parentTypes = additionalData.credentialsHelper.getParentTypes(credentialsType);
if (parentTypes.includes('oAuth1Api')) { if (parentTypes.includes('oAuth1Api')) {
return await requestOAuth1.call(this, credentialsType, requestOptions, true); return await requestOAuth1(context, credentialsType, requestOptions, true);
} }
if (parentTypes.includes('oAuth2Api')) { if (parentTypes.includes('oAuth2Api')) {
return await requestOAuth2.call( return await requestOAuth2(
this, context,
credentialsType, credentialsType,
requestOptions, requestOptions,
node, node,
@ -1732,7 +1651,7 @@ export async function httpRequestWithAuthentication(
credentialsDecrypted = additionalCredentialOptions.credentialsDecrypted.data; credentialsDecrypted = additionalCredentialOptions.credentialsDecrypted.data;
} else { } else {
credentialsDecrypted = credentialsDecrypted =
await this.getCredentials<ICredentialDataDecryptedObject>(credentialsType); await context.getCredentials<ICredentialDataDecryptedObject>(credentialsType);
} }
if (credentialsDecrypted === undefined) { if (credentialsDecrypted === undefined) {
@ -1744,7 +1663,7 @@ export async function httpRequestWithAuthentication(
} }
const data = await additionalData.credentialsHelper.preAuthentication( const data = await additionalData.credentialsHelper.preAuthentication(
{ helpers: this.helpers }, { helpers: context.helpers },
credentialsDecrypted, credentialsDecrypted,
credentialsType, credentialsType,
node, node,
@ -1776,7 +1695,7 @@ export async function httpRequestWithAuthentication(
if (credentialsDecrypted !== undefined) { if (credentialsDecrypted !== undefined) {
// try to refresh the credentials // try to refresh the credentials
const data = await additionalData.credentialsHelper.preAuthentication( const data = await additionalData.credentialsHelper.preAuthentication(
{ helpers: this.helpers }, { helpers: context.helpers },
credentialsDecrypted, credentialsDecrypted,
credentialsType, credentialsType,
node, node,
@ -1800,19 +1719,14 @@ export async function httpRequestWithAuthentication(
// retry the request // retry the request
return await httpRequest(requestOptions); return await httpRequest(requestOptions);
} catch (error) { } catch (error) {
throw new NodeApiError(this.getNode(), error); throw new NodeApiError(context.getNode(), error);
} }
} }
throw new NodeApiError(this.getNode(), error); throw new NodeApiError(context.getNode(), error);
} }
} }
/**
* Takes generic input data and brings it into the json format n8n uses.
*
* @param {(IDataObject | IDataObject[])} jsonData
*/
export function returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[] { export function returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[] {
const returnData: INodeExecutionData[] = []; const returnData: INodeExecutionData[] = [];
@ -1895,7 +1809,7 @@ export function normalizeItems(
// TODO: Move up later // TODO: Move up later
export async function requestWithAuthentication( export async function requestWithAuthentication(
this: IAllExecuteFunctions, context: IAllExecuteFunctions,
credentialsType: string, credentialsType: string,
requestOptions: IRequestOptions, requestOptions: IRequestOptions,
workflow: Workflow, workflow: Workflow,
@ -1912,11 +1826,11 @@ export async function requestWithAuthentication(
const parentTypes = additionalData.credentialsHelper.getParentTypes(credentialsType); const parentTypes = additionalData.credentialsHelper.getParentTypes(credentialsType);
if (credentialsType === 'oAuth1Api' || parentTypes.includes('oAuth1Api')) { if (credentialsType === 'oAuth1Api' || parentTypes.includes('oAuth1Api')) {
return await requestOAuth1.call(this, credentialsType, requestOptions, false); return await requestOAuth1(context, credentialsType, requestOptions, false);
} }
if (credentialsType === 'oAuth2Api' || parentTypes.includes('oAuth2Api')) { if (credentialsType === 'oAuth2Api' || parentTypes.includes('oAuth2Api')) {
return await requestOAuth2.call( return await requestOAuth2(
this, context,
credentialsType, credentialsType,
requestOptions, requestOptions,
node, node,
@ -1929,7 +1843,7 @@ export async function requestWithAuthentication(
if (additionalCredentialOptions?.credentialsDecrypted) { if (additionalCredentialOptions?.credentialsDecrypted) {
credentialsDecrypted = additionalCredentialOptions.credentialsDecrypted.data; credentialsDecrypted = additionalCredentialOptions.credentialsDecrypted.data;
} else { } else {
credentialsDecrypted = await this.getCredentials<ICredentialDataDecryptedObject>( credentialsDecrypted = await context.getCredentials<ICredentialDataDecryptedObject>(
credentialsType, credentialsType,
itemIndex, itemIndex,
); );
@ -1944,7 +1858,7 @@ export async function requestWithAuthentication(
} }
const data = await additionalData.credentialsHelper.preAuthentication( const data = await additionalData.credentialsHelper.preAuthentication(
{ helpers: this.helpers }, { helpers: context.helpers },
credentialsDecrypted, credentialsDecrypted,
credentialsType, credentialsType,
node, node,
@ -1970,7 +1884,7 @@ export async function requestWithAuthentication(
if (credentialsDecrypted !== undefined) { if (credentialsDecrypted !== undefined) {
// try to refresh the credentials // try to refresh the credentials
const data = await additionalData.credentialsHelper.preAuthentication( const data = await additionalData.credentialsHelper.preAuthentication(
{ helpers: this.helpers }, { helpers: context.helpers },
credentialsDecrypted, credentialsDecrypted,
credentialsType, credentialsType,
node, node,
@ -1996,7 +1910,7 @@ export async function requestWithAuthentication(
} catch (error) { } catch (error) {
if (error instanceof ExecutionBaseError) throw error; if (error instanceof ExecutionBaseError) throw error;
throw new NodeApiError(this.getNode(), error); throw new NodeApiError(context.getNode(), error);
} }
} }
} }
@ -2798,7 +2712,7 @@ const addExecutionDataFunctions = async (
}; };
async function getInputConnectionData( async function getInputConnectionData(
this: IAllExecuteFunctions, context: IAllExecuteFunctions,
workflow: Workflow, workflow: Workflow,
runExecutionData: IRunExecutionData, runExecutionData: IRunExecutionData,
runIndex: number, runIndex: number,
@ -2810,7 +2724,7 @@ async function getInputConnectionData(
inputName: NodeConnectionType, inputName: NodeConnectionType,
itemIndex: number, itemIndex: number,
): Promise<unknown> { ): Promise<unknown> {
const node = this.getNode(); const node = context.getNode();
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
const inputs = NodeHelpers.getNodeInputs(workflow, node, nodeType.description); const inputs = NodeHelpers.getNodeInputs(workflow, node, nodeType.description);
@ -2858,9 +2772,9 @@ async function getInputConnectionData(
// TODO: create a new context object here based on the type of `connectedNode`, and avoid using `Object.assign` on context objects // TODO: create a new context object here based on the type of `connectedNode`, and avoid using `Object.assign` on context objects
// https://linear.app/n8n/issue/CAT-269 // https://linear.app/n8n/issue/CAT-269
const context = Object.assign({}, this); const newContext = Object.assign({}, context);
context.getNodeParameter = ( newContext.getNodeParameter = (
parameterName: string, parameterName: string,
itemIndex: number, itemIndex: number,
fallbackValue?: any, fallbackValue?: any,
@ -2883,11 +2797,11 @@ async function getInputConnectionData(
}; };
// TODO: Check what else should be overwritten // TODO: Check what else should be overwritten
context.getNode = () => { newContext.getNode = () => {
return deepCopy(connectedNode); return deepCopy(connectedNode);
}; };
context.getCredentials = async (key: string) => { newContext.getCredentials = async (key: string) => {
try { try {
return await getCredentials( return await getCredentials(
workflow, workflow,
@ -2938,7 +2852,7 @@ async function getInputConnectionData(
} }
try { try {
const response = await nodeType.supplyData.call(context, itemIndex); const response = await nodeType.supplyData.call(newContext, itemIndex);
if (response.closeFunction) { if (response.closeFunction) {
closeFunctions.push(response.closeFunction); closeFunctions.push(response.closeFunction);
} }
@ -3075,317 +2989,7 @@ const getRequestHelperFunctions = (
runExecutionData: IRunExecutionData | null = null, runExecutionData: IRunExecutionData | null = null,
connectionInputData: INodeExecutionData[] = [], connectionInputData: INodeExecutionData[] = [],
): RequestHelperFunctions => { ): RequestHelperFunctions => {
const getResolvedValue = ( return {} as RequestHelperFunctions;
parameterValue: NodeParameterValueType,
itemIndex: number,
runIndex: number,
executeData: IExecuteData,
additionalKeys?: IWorkflowDataProxyAdditionalKeys,
returnObjectAsString = false,
): NodeParameterValueType => {
const mode: WorkflowExecuteMode = 'internal';
if (
typeof parameterValue === 'object' ||
(typeof parameterValue === 'string' && parameterValue.charAt(0) === '=')
) {
return workflow.expression.getParameterValue(
parameterValue,
runExecutionData,
runIndex,
itemIndex,
node.name,
connectionInputData,
mode,
additionalKeys ?? {},
executeData,
returnObjectAsString,
);
}
return parameterValue;
};
return {
httpRequest,
// eslint-disable-next-line complexity
async requestWithAuthenticationPaginated(
this: IExecuteFunctions,
requestOptions: IRequestOptions,
itemIndex: number,
paginationOptions: PaginationOptions,
credentialsType?: string,
additionalCredentialOptions?: IAdditionalCredentialOptions,
): Promise<any[]> {
const responseData = [];
if (!requestOptions.qs) {
requestOptions.qs = {};
}
requestOptions.resolveWithFullResponse = true;
requestOptions.simple = false;
let tempResponseData: IN8nHttpFullResponse;
let makeAdditionalRequest: boolean;
let paginateRequestData: PaginationOptions['request'];
const runIndex = 0;
const additionalKeys = {
$request: requestOptions,
$response: {} as IN8nHttpFullResponse,
$version: node.typeVersion,
$pageCount: 0,
};
const executeData: IExecuteData = {
data: {},
node,
source: null,
};
const hashData = {
identicalCount: 0,
previousLength: 0,
previousHash: '',
};
do {
paginateRequestData = getResolvedValue(
paginationOptions.request as unknown as NodeParameterValueType,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as object as PaginationOptions['request'];
const tempRequestOptions = applyPaginationRequestData(requestOptions, paginateRequestData);
if (!validateUrl(tempRequestOptions.uri as string)) {
throw new NodeOperationError(node, `'${paginateRequestData.url}' is not a valid URL.`, {
itemIndex,
runIndex,
type: 'invalid_url',
});
}
if (credentialsType) {
tempResponseData = await this.helpers.requestWithAuthentication.call(
this,
credentialsType,
tempRequestOptions,
additionalCredentialOptions,
);
} else {
tempResponseData = await this.helpers.request(tempRequestOptions);
}
const newResponse: IN8nHttpFullResponse = Object.assign(
{
body: {},
headers: {},
statusCode: 0,
},
pick(tempResponseData, ['body', 'headers', 'statusCode']),
);
let contentBody: Exclude<IN8nHttpResponse, Buffer>;
if (newResponse.body instanceof Readable && paginationOptions.binaryResult !== true) {
// Keep the original string version that we can use it to hash if needed
contentBody = await binaryToString(newResponse.body as Buffer | Readable);
const responseContentType = newResponse.headers['content-type']?.toString() ?? '';
if (responseContentType.includes('application/json')) {
newResponse.body = jsonParse(contentBody, { fallbackValue: {} });
} else {
newResponse.body = contentBody;
}
tempResponseData.__bodyResolved = true;
tempResponseData.body = newResponse.body;
} else {
contentBody = newResponse.body;
}
if (paginationOptions.binaryResult !== true || tempResponseData.headers.etag) {
// If the data is not binary (and so not a stream), or an etag is present,
// we check via etag or hash if identical data is received
let contentLength = 0;
if ('content-length' in tempResponseData.headers) {
contentLength = parseInt(tempResponseData.headers['content-length'] as string) || 0;
}
if (hashData.previousLength === contentLength) {
let hash: string;
if (tempResponseData.headers.etag) {
// If an etag is provided, we use it as "hash"
hash = tempResponseData.headers.etag as string;
} else {
// If there is no etag, we calculate a hash from the data in the body
if (typeof contentBody !== 'string') {
contentBody = JSON.stringify(contentBody);
}
hash = crypto.createHash('md5').update(contentBody).digest('base64');
}
if (hashData.previousHash === hash) {
hashData.identicalCount += 1;
if (hashData.identicalCount > 2) {
// Length was identical 5x and hash 3x
throw new NodeOperationError(
node,
'The returned response was identical 5x, so requests got stopped',
{
itemIndex,
description:
'Check if "Pagination Completed When" has been configured correctly.',
},
);
}
} else {
hashData.identicalCount = 0;
}
hashData.previousHash = hash;
} else {
hashData.identicalCount = 0;
}
hashData.previousLength = contentLength;
}
responseData.push(tempResponseData);
additionalKeys.$response = newResponse;
additionalKeys.$pageCount = additionalKeys.$pageCount + 1;
const maxRequests = getResolvedValue(
paginationOptions.maxRequests,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as number;
if (maxRequests && additionalKeys.$pageCount >= maxRequests) {
break;
}
makeAdditionalRequest = getResolvedValue(
paginationOptions.continue,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as boolean;
if (makeAdditionalRequest) {
if (paginationOptions.requestInterval) {
const requestInterval = getResolvedValue(
paginationOptions.requestInterval,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as number;
await sleep(requestInterval);
}
if (tempResponseData.statusCode < 200 || tempResponseData.statusCode >= 300) {
// We have it configured to let all requests pass no matter the response code
// via "requestOptions.simple = false" to not by default fail if it is for example
// configured to stop on 404 response codes. For that reason we have to throw here
// now an error manually if the response code is not a success one.
let data = tempResponseData.body;
if (data instanceof Readable && paginationOptions.binaryResult !== true) {
data = await binaryToString(data as Buffer | Readable);
} else if (typeof data === 'object') {
data = JSON.stringify(data);
}
throw Object.assign(
new Error(`${tempResponseData.statusCode} - "${data?.toString()}"`),
{
statusCode: tempResponseData.statusCode,
error: data,
isAxiosError: true,
response: {
headers: tempResponseData.headers,
status: tempResponseData.statusCode,
statusText: tempResponseData.statusMessage,
},
},
);
}
}
} while (makeAdditionalRequest);
return responseData;
},
async httpRequestWithAuthentication(
this,
credentialsType,
requestOptions,
additionalCredentialOptions,
): Promise<any> {
return await httpRequestWithAuthentication.call(
this,
credentialsType,
requestOptions,
workflow,
node,
additionalData,
additionalCredentialOptions,
);
},
request: async (uriOrObject, options) =>
await proxyRequestToAxios(workflow, additionalData, node, uriOrObject, options),
async requestWithAuthentication(
this,
credentialsType,
requestOptions,
additionalCredentialOptions,
itemIndex,
): Promise<any> {
return await requestWithAuthentication.call(
this,
credentialsType,
requestOptions,
workflow,
node,
additionalData,
additionalCredentialOptions,
itemIndex,
);
},
async requestOAuth1(
this: IAllExecuteFunctions,
credentialsType: string,
requestOptions: IRequestOptions,
): Promise<any> {
return await requestOAuth1.call(this, credentialsType, requestOptions);
},
async requestOAuth2(
this: IAllExecuteFunctions,
credentialsType: string,
requestOptions: IRequestOptions,
oAuth2Options?: IOAuth2Options,
): Promise<any> {
return await requestOAuth2.call(
this,
credentialsType,
requestOptions,
node,
additionalData,
oAuth2Options,
);
},
};
}; };
const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({ const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({
@ -3511,6 +3115,7 @@ const getNodeHelperFunctions = (
await copyBinaryFile(workflowId, executionId!, filePath, fileName, mimeType), await copyBinaryFile(workflowId, executionId!, filePath, fileName, mimeType),
}); });
/** @deprecated */
const getBinaryHelperFunctions = ( const getBinaryHelperFunctions = (
{ executionId }: IWorkflowExecuteAdditionalData, { executionId }: IWorkflowExecuteAdditionalData,
workflowId: string, workflowId: string,
@ -3596,7 +3201,7 @@ export function copyInputItems(items: INodeExecutionData[], properties: string[]
/** /**
* Returns the execute functions the poll nodes have access to. * Returns the execute functions the poll nodes have access to.
*/ */
// TODO: Check if I can get rid of: additionalData, and so then maybe also at ActiveWorkflowManager.add // TODO: DELETE THIS
export function getExecutePollFunctions( export function getExecutePollFunctions(
workflow: Workflow, workflow: Workflow,
node: INode, node: INode,
@ -3604,57 +3209,7 @@ export function getExecutePollFunctions(
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
activation: WorkflowActivateMode, activation: WorkflowActivateMode,
): IPollFunctions { ): IPollFunctions {
return ((workflow: Workflow, node: INode) => { return new PollContext(workflow, node, additionalData, mode, activation);
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
__emit: (): void => {
throw new ApplicationError(
'Overwrite NodeExecuteFunctions.getExecutePollFunctions.__emit function',
);
},
__emitError() {
throw new ApplicationError(
'Overwrite NodeExecuteFunctions.getExecutePollFunctions.__emitError function',
);
},
getMode: () => mode,
getActivationMode: () => activation,
getCredentials: async (type) =>
await getCredentials(workflow, node, type, additionalData, mode),
getNodeParameter: (
parameterName: string,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
): NodeParameterValueType | object => {
const runExecutionData: IRunExecutionData | null = null;
const itemIndex = 0;
const runIndex = 0;
const connectionInputData: INodeExecutionData[] = [];
return getNodeParameter(
workflow,
runExecutionData,
runIndex,
connectionInputData,
node,
parameterName,
itemIndex,
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
undefined,
fallbackValue,
options,
);
},
helpers: {
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getSchedulingFunctions(workflow),
returnJsonArray,
},
};
})(workflow, node);
} }
/** /**
@ -3806,7 +3361,7 @@ export function getExecuteFunctions(
inputName: NodeConnectionType, inputName: NodeConnectionType,
itemIndex: number, itemIndex: number,
): Promise<unknown> { ): Promise<unknown> {
return await getInputConnectionData.call( return await getInputConnectionData(
this, this,
workflow, workflow,
runExecutionData, runExecutionData,
@ -4187,6 +3742,7 @@ export function getExecuteSingleFunctions(
}, },
helpers: { helpers: {
createDeferredPromise, createDeferredPromise,
returnJsonArray,
...getRequestHelperFunctions( ...getRequestHelperFunctions(
workflow, workflow,
node, node,
@ -4425,7 +3981,7 @@ export function getExecuteWebhookFunctions(
}; };
const runIndex = 0; const runIndex = 0;
return await getInputConnectionData.call( return await getInputConnectionData(
this, this,
workflow, workflow,
runExecutionData, runExecutionData,

View file

@ -20,3 +20,4 @@ export { ObjectStoreService } from './ObjectStore/ObjectStore.service.ee';
export { BinaryData } from './BinaryData/types'; export { BinaryData } from './BinaryData/types';
export { isStoredMode as isValidNonDefaultMode } from './BinaryData/utils'; export { isStoredMode as isValidNonDefaultMode } from './BinaryData/utils';
export * from './ExecutionMetadata'; export * from './ExecutionMetadata';
export * from './node-execution-context';

View file

@ -0,0 +1,100 @@
import { Container } from 'typedi';
import type {
FunctionsBase,
INode,
INodeExecutionData,
IWorkflowExecuteAdditionalData,
NodeTypeAndVersion,
Workflow,
} from 'n8n-workflow';
import { deepCopy, LoggerProxy } from 'n8n-workflow';
import { InstanceSettings } from '@/InstanceSettings';
export abstract class BaseContext implements Omit<FunctionsBase, 'getCredentials'> {
protected readonly instanceSettings = Container.get(InstanceSettings);
constructor(
protected readonly workflow: Workflow,
protected readonly node: INode,
protected readonly additionalData: IWorkflowExecuteAdditionalData,
) {}
get logger() {
return LoggerProxy;
}
getExecutionId() {
return this.additionalData.executionId!;
}
getNode(): INode {
return deepCopy(this.node);
}
getWorkflow() {
const { id, name, active } = this.workflow;
return { id, name, active };
}
getWorkflowStaticData(type: string) {
return this.workflow.getStaticData(type, this.node);
}
getChildNodes(nodeName: string) {
const output: NodeTypeAndVersion[] = [];
const nodes = this.workflow.getChildNodes(nodeName);
for (const nodeName of nodes) {
const node = this.workflow.nodes[nodeName];
output.push({
name: node.name,
type: node.type,
typeVersion: node.typeVersion,
});
}
return output;
}
getParentNodes(nodeName: string) {
const output: NodeTypeAndVersion[] = [];
const nodes = this.workflow.getParentNodes(nodeName);
for (const nodeName of nodes) {
const node = this.workflow.nodes[nodeName];
output.push({
name: node.name,
type: node.type,
typeVersion: node.typeVersion,
});
}
return output;
}
getKnownNodeTypes() {
return this.workflow.nodeTypes.getKnownTypes();
}
getRestApiUrl() {
return this.additionalData.restApiUrl;
}
getInstanceBaseUrl() {
return this.additionalData.instanceBaseUrl;
}
getInstanceId() {
return this.instanceSettings.instanceId;
}
getTimezone() {
return this.workflow.timezone;
}
getCredentialsProperties(type: string) {
return this.additionalData.credentialsHelper.getCredentialsProperties(type);
}
async prepareOutputData(outputData: INodeExecutionData[]) {
return [outputData];
}
}

View file

@ -0,0 +1,136 @@
import Container from 'typedi';
import FileType from 'file-type';
import { IncomingMessage } from 'http';
import path from 'path';
import { extension, lookup } from 'mime-types';
import { Readable } from 'stream';
import { ApplicationError, fileTypeFromMimeType, IBinaryData } from 'n8n-workflow';
import type { BinaryHelperFunctions, IWorkflowExecuteAdditionalData, Workflow } from 'n8n-workflow';
import { binaryToBuffer } from '@/BinaryData/utils';
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
import { binaryToString } from '@/NodeExecuteFunctions';
export class BinaryHelpers implements BinaryHelperFunctions {
private readonly binaryDataService = Container.get(BinaryDataService);
constructor(
private readonly workflow: Workflow,
private readonly additionalData: IWorkflowExecuteAdditionalData,
) {}
getBinaryPath(binaryDataId: string) {
return this.binaryDataService.getPath(binaryDataId);
}
async getBinaryMetadata(binaryDataId: string) {
return await this.binaryDataService.getMetadata(binaryDataId);
}
async getBinaryStream(binaryDataId: string, chunkSize?: number) {
return await this.binaryDataService.getAsStream(binaryDataId, chunkSize);
}
get binaryToBuffer() {
return binaryToBuffer;
}
get binaryToString() {
return binaryToString;
}
async prepareBinaryData(binaryData: Buffer | Readable, filePath?: string, mimeType?: string) {
let fileExtension: string | undefined;
if (binaryData instanceof IncomingMessage) {
if (!filePath) {
try {
const { responseUrl } = binaryData;
filePath =
binaryData.contentDisposition?.filename ??
((responseUrl && new URL(responseUrl).pathname) ?? binaryData.req?.path)?.slice(1);
} catch {}
}
if (!mimeType) {
mimeType = binaryData.contentType;
}
}
if (!mimeType) {
// If no mime type is given figure it out
if (filePath) {
// Use file path to guess mime type
const mimeTypeLookup = lookup(filePath);
if (mimeTypeLookup) {
mimeType = mimeTypeLookup;
}
}
if (!mimeType) {
if (Buffer.isBuffer(binaryData)) {
// Use buffer to guess mime type
const fileTypeData = await FileType.fromBuffer(binaryData);
if (fileTypeData) {
mimeType = fileTypeData.mime;
fileExtension = fileTypeData.ext;
}
} else if (binaryData instanceof IncomingMessage) {
mimeType = binaryData.headers['content-type'];
} else {
// TODO: detect filetype from other kind of streams
}
}
}
if (!fileExtension && mimeType) {
fileExtension = extension(mimeType) || undefined;
}
if (!mimeType) {
// Fall back to text
mimeType = 'text/plain';
}
const returnData: IBinaryData = {
mimeType,
fileType: fileTypeFromMimeType(mimeType),
fileExtension,
data: '',
};
if (filePath) {
if (filePath.includes('?')) {
// Remove maybe present query parameters
filePath = filePath.split('?').shift();
}
const filePathParts = path.parse(filePath as string);
if (filePathParts.dir !== '') {
returnData.directory = filePathParts.dir;
}
returnData.fileName = filePathParts.base;
// Remove the dot
const fileExtension = filePathParts.ext.slice(1);
if (fileExtension) {
returnData.fileExtension = fileExtension;
}
}
return await this.setBinaryDataBuffer(returnData, binaryData);
}
async setBinaryDataBuffer(binaryData: IBinaryData, bufferOrStream: Buffer | Readable) {
return await this.binaryDataService.store(
this.workflow.id,
this.additionalData.executionId!,
bufferOrStream,
binaryData,
);
}
async copyBinaryFile(): Promise<never> {
throw new ApplicationError('`copyBinaryFile` has been removed. Please upgrade this node.');
}
}

View file

@ -0,0 +1,362 @@
import { createHash } from 'crypto';
import { Readable } from 'stream';
import { pick } from 'lodash';
import {
IAdditionalCredentialOptions,
IAllExecuteFunctions,
IExecuteData,
IHttpRequestOptions,
IN8nHttpFullResponse,
IN8nHttpResponse,
INode,
INodeExecutionData,
IOAuth2Options,
IRequestOptions,
IRunExecutionData,
IWorkflowDataProxyAdditionalKeys,
IWorkflowExecuteAdditionalData,
jsonParse,
NodeOperationError,
NodeParameterValueType,
PaginationOptions,
sleep,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import {
applyPaginationRequestData,
binaryToString,
httpRequest,
httpRequestWithAuthentication,
proxyRequestToAxios,
requestOAuth1,
requestOAuth2,
requestWithAuthentication,
validateUrl,
} from '@/NodeExecuteFunctions';
import type { RequestHelperFunctions } from 'n8n-workflow';
export class RequestHelpers implements RequestHelperFunctions {
constructor(
private readonly context: IAllExecuteFunctions,
private readonly workflow: Workflow,
private readonly node: INode,
private readonly additionalData: IWorkflowExecuteAdditionalData,
private readonly runExecutionData: IRunExecutionData | null = null,
private readonly connectionInputData: INodeExecutionData[] = [],
) {}
get httpRequest() {
return httpRequest;
}
async httpRequestWithAuthentication(
credentialsType: string,
requestOptions: IHttpRequestOptions,
additionalCredentialOptions?: IAdditionalCredentialOptions,
) {
return await httpRequestWithAuthentication(
this.context,
credentialsType,
requestOptions,
this.workflow,
this.node,
this.additionalData,
additionalCredentialOptions,
);
}
async requestWithAuthenticationPaginated(
requestOptions: IRequestOptions,
itemIndex: number,
paginationOptions: PaginationOptions,
credentialsType?: string,
additionalCredentialOptions?: IAdditionalCredentialOptions,
): Promise<any[]> {
const responseData = [];
if (!requestOptions.qs) {
requestOptions.qs = {};
}
requestOptions.resolveWithFullResponse = true;
requestOptions.simple = false;
let tempResponseData: IN8nHttpFullResponse;
let makeAdditionalRequest: boolean;
let paginateRequestData: PaginationOptions['request'];
const runIndex = 0;
const additionalKeys = {
$request: requestOptions,
$response: {} as IN8nHttpFullResponse,
$version: this.node.typeVersion,
$pageCount: 0,
};
const executeData: IExecuteData = {
data: {},
node: this.node,
source: null,
};
const hashData = {
identicalCount: 0,
previousLength: 0,
previousHash: '',
};
do {
paginateRequestData = this.getResolvedValue(
paginationOptions.request as unknown as NodeParameterValueType,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as object as PaginationOptions['request'];
const tempRequestOptions = applyPaginationRequestData(requestOptions, paginateRequestData);
if (!validateUrl(tempRequestOptions.uri as string)) {
throw new NodeOperationError(
this.node,
`'${paginateRequestData.url}' is not a valid URL.`,
{
itemIndex,
runIndex,
type: 'invalid_url',
},
);
}
if (credentialsType) {
tempResponseData = await this.requestWithAuthentication(
credentialsType,
tempRequestOptions,
additionalCredentialOptions,
);
} else {
tempResponseData = await this.request(tempRequestOptions);
}
const newResponse: IN8nHttpFullResponse = Object.assign(
{
body: {},
headers: {},
statusCode: 0,
},
pick(tempResponseData, ['body', 'headers', 'statusCode']),
);
let contentBody: Exclude<IN8nHttpResponse, Buffer>;
if (newResponse.body instanceof Readable && paginationOptions.binaryResult !== true) {
// Keep the original string version that we can use it to hash if needed
contentBody = await binaryToString(newResponse.body as Buffer | Readable);
const responseContentType = newResponse.headers['content-type']?.toString() ?? '';
if (responseContentType.includes('application/json')) {
newResponse.body = jsonParse(contentBody, { fallbackValue: {} });
} else {
newResponse.body = contentBody;
}
tempResponseData.__bodyResolved = true;
tempResponseData.body = newResponse.body;
} else {
contentBody = newResponse.body;
}
if (paginationOptions.binaryResult !== true || tempResponseData.headers.etag) {
// If the data is not binary (and so not a stream), or an etag is present,
// we check via etag or hash if identical data is received
let contentLength = 0;
if ('content-length' in tempResponseData.headers) {
contentLength = parseInt(tempResponseData.headers['content-length'] as string) || 0;
}
if (hashData.previousLength === contentLength) {
let hash: string;
if (tempResponseData.headers.etag) {
// If an etag is provided, we use it as "hash"
hash = tempResponseData.headers.etag as string;
} else {
// If there is no etag, we calculate a hash from the data in the body
if (typeof contentBody !== 'string') {
contentBody = JSON.stringify(contentBody);
}
hash = createHash('md5').update(contentBody).digest('base64');
}
if (hashData.previousHash === hash) {
hashData.identicalCount += 1;
if (hashData.identicalCount > 2) {
// Length was identical 5x and hash 3x
throw new NodeOperationError(
this.node,
'The returned response was identical 5x, so requests got stopped',
{
itemIndex,
description:
'Check if "Pagination Completed When" has been configured correctly.',
},
);
}
} else {
hashData.identicalCount = 0;
}
hashData.previousHash = hash;
} else {
hashData.identicalCount = 0;
}
hashData.previousLength = contentLength;
}
responseData.push(tempResponseData);
additionalKeys.$response = newResponse;
additionalKeys.$pageCount = additionalKeys.$pageCount + 1;
const maxRequests = this.getResolvedValue(
paginationOptions.maxRequests,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as number;
if (maxRequests && additionalKeys.$pageCount >= maxRequests) {
break;
}
makeAdditionalRequest = this.getResolvedValue(
paginationOptions.continue,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as boolean;
if (makeAdditionalRequest) {
if (paginationOptions.requestInterval) {
const requestInterval = this.getResolvedValue(
paginationOptions.requestInterval,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as number;
await sleep(requestInterval);
}
if (tempResponseData.statusCode < 200 || tempResponseData.statusCode >= 300) {
// We have it configured to let all requests pass no matter the response code
// via "requestOptions.simple = false" to not by default fail if it is for example
// configured to stop on 404 response codes. For that reason we have to throw here
// now an error manually if the response code is not a success one.
let data = tempResponseData.body;
if (data instanceof Readable && paginationOptions.binaryResult !== true) {
data = await binaryToString(data as Buffer | Readable);
} else if (typeof data === 'object') {
data = JSON.stringify(data);
}
throw Object.assign(new Error(`${tempResponseData.statusCode} - "${data?.toString()}"`), {
statusCode: tempResponseData.statusCode,
error: data,
isAxiosError: true,
response: {
headers: tempResponseData.headers,
status: tempResponseData.statusCode,
statusText: tempResponseData.statusMessage,
},
});
}
}
} while (makeAdditionalRequest);
return responseData;
}
async request(uriOrObject: string | IRequestOptions, options?: IRequestOptions) {
return await proxyRequestToAxios(
this.workflow,
this.additionalData,
this.node,
uriOrObject,
options,
);
}
async requestWithAuthentication(
credentialsType: string,
requestOptions: IRequestOptions,
additionalCredentialOptions?: IAdditionalCredentialOptions,
itemIndex?: number,
) {
return await requestWithAuthentication(
this.context,
credentialsType,
requestOptions,
this.workflow,
this.node,
this.additionalData,
additionalCredentialOptions,
itemIndex,
);
}
async requestOAuth1(credentialsType: string, requestOptions: IRequestOptions) {
return await requestOAuth1(this.context, credentialsType, requestOptions);
}
async requestOAuth2(
credentialsType: string,
requestOptions: IRequestOptions,
oAuth2Options?: IOAuth2Options,
) {
return await requestOAuth2(
this.context,
credentialsType,
requestOptions,
this.node,
this.additionalData,
oAuth2Options,
);
}
private getResolvedValue(
parameterValue: NodeParameterValueType,
itemIndex: number,
runIndex: number,
executeData: IExecuteData,
additionalKeys?: IWorkflowDataProxyAdditionalKeys,
returnObjectAsString = false,
): NodeParameterValueType {
const mode: WorkflowExecuteMode = 'internal';
if (
typeof parameterValue === 'object' ||
(typeof parameterValue === 'string' && parameterValue.charAt(0) === '=')
) {
return this.workflow.expression.getParameterValue(
parameterValue,
this.runExecutionData,
runIndex,
itemIndex,
this.node.name,
this.connectionInputData,
mode,
additionalKeys ?? {},
executeData,
returnObjectAsString,
);
}
return parameterValue;
}
}

View file

@ -0,0 +1,15 @@
import { Container } from 'typedi';
import { CronExpression, Workflow } from 'n8n-workflow';
import type { SchedulingFunctions } from 'n8n-workflow';
import { ScheduledTaskManager } from '@/ScheduledTaskManager';
export class SchedulingHelpers implements SchedulingFunctions {
private readonly scheduledTaskManager = Container.get(ScheduledTaskManager);
constructor(private readonly workflow: Workflow) {}
registerCron(cronExpression: CronExpression, onTick: () => void) {
this.scheduledTaskManager.registerCron(this.workflow, cronExpression, onTick);
}
}

View file

@ -0,0 +1,12 @@
import { Container } from 'typedi';
import type { SSHCredentials, SSHTunnelFunctions } from 'n8n-workflow';
import { SSHClientsManager } from '@/SSHClientsManager';
export class SSHTunnelHelpers implements SSHTunnelFunctions {
private readonly sshClientsManager = Container.get(SSHClientsManager);
async getSSHClient(credentials: SSHCredentials) {
return await this.sshClientsManager.getClient(credentials);
}
}

View file

@ -0,0 +1 @@
export { PollContext } from './poll-context';

View file

@ -0,0 +1,117 @@
import type {
ICredentialDataDecryptedObject,
IGetNodeParameterOptions,
INode,
INodeExecutionData,
IPollFunctions,
IRunExecutionData,
IWorkflowExecuteAdditionalData,
NodeParameterValueType,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
import {
getAdditionalKeys,
getCredentials,
getNodeParameter,
returnJsonArray,
} from '@/NodeExecuteFunctions';
import { BaseContext } from './base-contexts';
import { BinaryHelpers } from './helpers/binary-helpers';
import { RequestHelpers } from './helpers/request-helpers';
import { SchedulingHelpers } from './helpers/scheduling-helpers';
const throwOnEmit = () => {
throw new ApplicationError('Overwrite PollContext.__emit function');
};
const throwOnEmitError = () => {
throw new ApplicationError('Overwrite PollContext.__emitError function');
};
export class PollContext extends BaseContext implements IPollFunctions {
readonly helpers: IPollFunctions['helpers'];
constructor(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
private readonly mode: WorkflowExecuteMode,
private readonly activation: WorkflowActivateMode,
readonly __emit: IPollFunctions['__emit'] = throwOnEmit,
readonly __emitError: IPollFunctions['__emitError'] = throwOnEmitError,
) {
super(workflow, node, additionalData);
const binaryHelpers = new BinaryHelpers(workflow, additionalData);
const requestHelpers = new RequestHelpers(this, workflow, node, additionalData);
const schedulingHelepers = new SchedulingHelpers(workflow);
this.helpers = {
createDeferredPromise: () => createDeferredPromise(),
returnJsonArray: (items) => returnJsonArray(items),
getBinaryPath: (id) => binaryHelpers.getBinaryPath(id),
getBinaryMetadata: (id) => binaryHelpers.getBinaryMetadata(id),
getBinaryStream: (id) => binaryHelpers.getBinaryStream(id),
binaryToBuffer: (body) => binaryHelpers.binaryToBuffer(body),
binaryToString: (body) => binaryHelpers.binaryToString(body),
prepareBinaryData: binaryHelpers.prepareBinaryData.bind(binaryHelpers),
setBinaryDataBuffer: binaryHelpers.setBinaryDataBuffer.bind(binaryHelpers),
copyBinaryFile: () => binaryHelpers.copyBinaryFile(),
httpRequest: requestHelpers.httpRequest.bind(requestHelpers),
httpRequestWithAuthentication:
requestHelpers.httpRequestWithAuthentication.bind(requestHelpers),
requestWithAuthenticationPaginated:
requestHelpers.requestWithAuthenticationPaginated.bind(requestHelpers),
request: requestHelpers.request.bind(requestHelpers),
requestWithAuthentication: requestHelpers.requestWithAuthentication.bind(requestHelpers),
requestOAuth1: requestHelpers.requestOAuth1.bind(requestHelpers),
requestOAuth2: requestHelpers.requestOAuth2.bind(requestHelpers),
registerCron: schedulingHelepers.registerCron.bind(schedulingHelepers),
};
}
getMode() {
return this.mode;
}
getActivationMode() {
return this.activation;
}
async getCredentials<T extends object = ICredentialDataDecryptedObject>(type: string) {
return await getCredentials<T>(this.workflow, this.node, type, this.additionalData, this.mode);
}
getNodeParameter(
parameterName: string,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
): NodeParameterValueType | object {
const runExecutionData: IRunExecutionData | null = null;
const itemIndex = 0;
const runIndex = 0;
const connectionInputData: INodeExecutionData[] = [];
return getNodeParameter(
this.workflow,
runExecutionData,
runIndex,
connectionInputData,
this.node,
parameterName,
itemIndex,
this.mode,
getAdditionalKeys(this.additionalData, this.mode, runExecutionData),
undefined,
fallbackValue,
options,
);
}
}

View file

@ -3,7 +3,7 @@ import { mock } from 'jest-mock-extended';
import get from 'lodash/get'; import get from 'lodash/get';
import merge from 'lodash/merge'; import merge from 'lodash/merge';
import set from 'lodash/set'; import set from 'lodash/set';
import { getExecutePollFunctions, returnJsonArray, type InstanceSettings } from 'n8n-core'; import { PollContext, returnJsonArray, type InstanceSettings } from 'n8n-core';
import { ScheduledTaskManager } from 'n8n-core/dist/ScheduledTaskManager'; import { ScheduledTaskManager } from 'n8n-core/dist/ScheduledTaskManager';
import type { import type {
IBinaryData, IBinaryData,
@ -200,7 +200,7 @@ export async function testPollingTriggerNode(
}); });
const mode = options.mode ?? 'trigger'; const mode = options.mode ?? 'trigger';
const originalPollingFunctions = getExecutePollFunctions( const originalPollingFunctions = new PollContext(
workflow, workflow,
node, node,
mock<IWorkflowExecuteAdditionalData>({ mock<IWorkflowExecuteAdditionalData>({

View file

@ -730,11 +730,10 @@ export interface ICredentialTestFunctions {
}; };
} }
interface BaseHelperFunctions { export interface BaseHelperFunctions {
createDeferredPromise: <T = void>() => IDeferredPromise<T>; createDeferredPromise: <T = void>() => IDeferredPromise<T>;
}
interface JsonHelperFunctions { /** Takes generic input data and brings it into the json format n8n uses. */
returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[]; returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[];
} }
@ -755,6 +754,7 @@ export interface BinaryHelperFunctions {
mimeType?: string, mimeType?: string,
): Promise<IBinaryData>; ): Promise<IBinaryData>;
setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>; setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>;
/** @deprecated */
copyBinaryFile(): Promise<never>; copyBinaryFile(): Promise<never>;
binaryToBuffer(body: Buffer | Readable): Promise<Buffer>; binaryToBuffer(body: Buffer | Readable): Promise<Buffer>;
binaryToString(body: Buffer | Readable, encoding?: BufferEncoding): Promise<string>; binaryToString(body: Buffer | Readable, encoding?: BufferEncoding): Promise<string>;
@ -816,13 +816,11 @@ export interface NodeHelperFunctions {
export interface RequestHelperFunctions { export interface RequestHelperFunctions {
httpRequest(requestOptions: IHttpRequestOptions): Promise<any>; httpRequest(requestOptions: IHttpRequestOptions): Promise<any>;
httpRequestWithAuthentication( httpRequestWithAuthentication(
this: IAllExecuteFunctions,
credentialsType: string, credentialsType: string,
requestOptions: IHttpRequestOptions, requestOptions: IHttpRequestOptions,
additionalCredentialOptions?: IAdditionalCredentialOptions, additionalCredentialOptions?: IAdditionalCredentialOptions,
): Promise<any>; ): Promise<any>;
requestWithAuthenticationPaginated( requestWithAuthenticationPaginated(
this: IAllExecuteFunctions,
requestOptions: IRequestOptions, requestOptions: IRequestOptions,
itemIndex: number, itemIndex: number,
paginationOptions: PaginationOptions, paginationOptions: PaginationOptions,
@ -835,32 +833,29 @@ export interface RequestHelperFunctions {
* @see RequestHelperFunctions.httpRequest * @see RequestHelperFunctions.httpRequest
*/ */
request(uriOrObject: string | IRequestOptions, options?: IRequestOptions): Promise<any>; request(uriOrObject: string | IRequestOptions, options?: IRequestOptions): Promise<any>;
/** /**
* @deprecated Use .httpRequestWithAuthentication instead * @deprecated Use .httpRequestWithAuthentication instead
* @see RequestHelperFunctions.requestWithAuthentication * @see RequestHelperFunctions.requestWithAuthentication
*/ */
requestWithAuthentication( requestWithAuthentication(
this: IAllExecuteFunctions,
credentialsType: string, credentialsType: string,
requestOptions: IRequestOptions, requestOptions: IRequestOptions,
additionalCredentialOptions?: IAdditionalCredentialOptions, additionalCredentialOptions?: IAdditionalCredentialOptions,
itemIndex?: number, itemIndex?: number,
): Promise<any>; ): Promise<any>;
/** /**
* @deprecated Use .httpRequestWithAuthentication instead * @deprecated Use .httpRequestWithAuthentication instead
* @see RequestHelperFunctions.requestWithAuthentication * @see RequestHelperFunctions.requestWithAuthentication
*/ */
requestOAuth1( requestOAuth1(credentialsType: string, requestOptions: IRequestOptions): Promise<any>;
this: IAllExecuteFunctions,
credentialsType: string,
requestOptions: IRequestOptions,
): Promise<any>;
/** /**
* @deprecated Use .httpRequestWithAuthentication instead * @deprecated Use .httpRequestWithAuthentication instead
* @see RequestHelperFunctions.requestWithAuthentication * @see RequestHelperFunctions.requestWithAuthentication
*/ */
requestOAuth2( requestOAuth2(
this: IAllExecuteFunctions,
credentialsType: string, credentialsType: string,
requestOptions: IRequestOptions, requestOptions: IRequestOptions,
oAuth2Options?: IOAuth2Options, oAuth2Options?: IOAuth2Options,
@ -984,8 +979,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
BinaryHelperFunctions & BinaryHelperFunctions &
DeduplicationHelperFunctions & DeduplicationHelperFunctions &
FileSystemHelperFunctions & FileSystemHelperFunctions &
SSHTunnelFunctions & SSHTunnelFunctions & {
JsonHelperFunctions & {
normalizeItems(items: INodeExecutionData | INodeExecutionData[]): INodeExecutionData[]; normalizeItems(items: INodeExecutionData | INodeExecutionData[]): INodeExecutionData[];
constructExecutionMetaData( constructExecutionMetaData(
inputData: INodeExecutionData[], inputData: INodeExecutionData[],
@ -1059,8 +1053,7 @@ export interface IPollFunctions
helpers: RequestHelperFunctions & helpers: RequestHelperFunctions &
BaseHelperFunctions & BaseHelperFunctions &
BinaryHelperFunctions & BinaryHelperFunctions &
SchedulingFunctions & SchedulingFunctions;
JsonHelperFunctions;
} }
export interface ITriggerFunctions export interface ITriggerFunctions
@ -1080,8 +1073,7 @@ export interface ITriggerFunctions
BaseHelperFunctions & BaseHelperFunctions &
BinaryHelperFunctions & BinaryHelperFunctions &
SSHTunnelFunctions & SSHTunnelFunctions &
SchedulingFunctions & SchedulingFunctions;
JsonHelperFunctions;
} }
export interface IHookFunctions export interface IHookFunctions
@ -1118,10 +1110,7 @@ export interface IWebhookFunctions extends FunctionsBaseWithRequiredKeys<'getMod
getResponseObject(): express.Response; getResponseObject(): express.Response;
getWebhookName(): string; getWebhookName(): string;
nodeHelpers: NodeHelperFunctions; nodeHelpers: NodeHelperFunctions;
helpers: RequestHelperFunctions & helpers: RequestHelperFunctions & BaseHelperFunctions & BinaryHelperFunctions;
BaseHelperFunctions &
BinaryHelperFunctions &
JsonHelperFunctions;
} }
export interface INodeCredentialsDetails { export interface INodeCredentialsDetails {
@ -1619,6 +1608,8 @@ export abstract class Node {
abstract description: INodeTypeDescription; abstract description: INodeTypeDescription;
execute?(context: IExecuteFunctions): Promise<INodeExecutionData[][]>; execute?(context: IExecuteFunctions): Promise<INodeExecutionData[][]>;
webhook?(context: IWebhookFunctions): Promise<IWebhookResponseData>; webhook?(context: IWebhookFunctions): Promise<IWebhookResponseData>;
poll?(context: IPollFunctions): Promise<INodeExecutionData[][] | null>;
trigger?(context: ITriggerFunctions): Promise<ITriggerResponse | undefined>;
} }
export interface IVersionedNodeType { export interface IVersionedNodeType {

View file

@ -591,8 +591,7 @@ export class RoutingNode {
let responseData: IN8nHttpFullResponse; let responseData: IN8nHttpFullResponse;
requestData.options.returnFullResponse = true; requestData.options.returnFullResponse = true;
if (credentialType) { if (credentialType) {
responseData = (await executeSingleFunctions.helpers.httpRequestWithAuthentication.call( responseData = (await executeSingleFunctions.helpers.httpRequestWithAuthentication(
executeSingleFunctions,
credentialType, credentialType,
requestData.options as IHttpRequestOptions, requestData.options as IHttpRequestOptions,
{ credentialsDecrypted }, { credentialsDecrypted },

View file

@ -52,6 +52,7 @@ import type {
NodeParameterValueType, NodeParameterValueType,
CloseFunction, CloseFunction,
INodeOutputConfiguration, INodeOutputConfiguration,
IGetExecuteHookFunctions,
} from './Interfaces'; } from './Interfaces';
import { Node, NodeConnectionType } from './Interfaces'; import { Node, NodeConnectionType } from './Interfaces';
import * as NodeHelpers from './NodeHelpers'; import * as NodeHelpers from './NodeHelpers';
@ -1056,7 +1057,7 @@ export class Workflow {
async createWebhookIfNotExists( async createWebhookIfNotExists(
webhookData: IWebhookData, webhookData: IWebhookData,
nodeExecuteFunctions: INodeExecuteFunctions, nodeExecuteFunctions: { getExecuteHookFunctions: IGetExecuteHookFunctions },
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
activation: WorkflowActivateMode, activation: WorkflowActivateMode,
): Promise<void> { ): Promise<void> {
@ -1075,7 +1076,7 @@ export class Workflow {
async deleteWebhook( async deleteWebhook(
webhookData: IWebhookData, webhookData: IWebhookData,
nodeExecuteFunctions: INodeExecuteFunctions, nodeExecuteFunctions: { getExecuteHookFunctions: IGetExecuteHookFunctions },
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
activation: WorkflowActivateMode, activation: WorkflowActivateMode,
) { ) {
@ -1085,7 +1086,7 @@ export class Workflow {
private async runWebhookMethod( private async runWebhookMethod(
method: WebhookSetupMethodNames, method: WebhookSetupMethodNames,
webhookData: IWebhookData, webhookData: IWebhookData,
nodeExecuteFunctions: INodeExecuteFunctions, nodeExecuteFunctions: { getExecuteHookFunctions: IGetExecuteHookFunctions },
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
activation: WorkflowActivateMode, activation: WorkflowActivateMode,
): Promise<boolean | undefined> { ): Promise<boolean | undefined> {
@ -1122,8 +1123,6 @@ export class Workflow {
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
activation: WorkflowActivateMode, activation: WorkflowActivateMode,
): Promise<ITriggerResponse | undefined> { ): Promise<ITriggerResponse | undefined> {
const triggerFunctions = getTriggerFunctions(this, node, additionalData, mode, activation);
const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
if (nodeType === undefined) { if (nodeType === undefined) {
@ -1140,14 +1139,19 @@ export class Workflow {
}); });
} }
const context = getTriggerFunctions(this, node, additionalData, mode, activation);
if (mode === 'manual') { if (mode === 'manual') {
// In manual mode we do not just start the trigger function we also // In manual mode we do not just start the trigger function we also
// want to be able to get informed as soon as the first data got emitted // want to be able to get informed as soon as the first data got emitted
const triggerResponse = await nodeType.trigger.call(triggerFunctions); const triggerResponse =
nodeType instanceof Node
? await nodeType.trigger(context)
: await nodeType.trigger.call(context);
// Add the manual trigger response which resolves when the first time data got emitted // Add the manual trigger response which resolves when the first time data got emitted
triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => { triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => {
triggerFunctions.emit = ( context.emit = (
(resolveEmit) => (resolveEmit) =>
( (
data: INodeExecutionData[][], data: INodeExecutionData[][],
@ -1173,7 +1177,7 @@ export class Workflow {
resolveEmit(data); resolveEmit(data);
} }
)(resolve); )(resolve);
triggerFunctions.emitError = ( context.emitError = (
(rejectEmit) => (rejectEmit) =>
(error: Error, responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>) => { (error: Error, responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>) => {
additionalData.hooks!.hookFunctions.sendResponse = [ additionalData.hooks!.hookFunctions.sendResponse = [
@ -1191,8 +1195,11 @@ export class Workflow {
return triggerResponse; return triggerResponse;
} }
// In all other modes simply start the trigger // In all other modes simply start the trigger
return await nodeType.trigger.call(triggerFunctions); return nodeType instanceof Node
? await nodeType.trigger(context)
: await nodeType.trigger.call(context);
} }
/** /**
@ -1402,14 +1409,18 @@ export class Workflow {
} else if (nodeType.poll) { } else if (nodeType.poll) {
if (mode === 'manual') { if (mode === 'manual') {
// In manual mode run the poll function // In manual mode run the poll function
const thisArgs = nodeExecuteFunctions.getExecutePollFunctions( const context = nodeExecuteFunctions.getExecutePollFunctions(
this, this,
node, node,
additionalData, additionalData,
mode, mode,
'manual', 'manual',
); );
return { data: await nodeType.poll.call(thisArgs) }; const data =
nodeType instanceof Node
? await nodeType.poll(context)
: await nodeType.poll.call(context);
return { data };
} }
// In any other mode pass data through as it already contains the result of the poll // In any other mode pass data through as it already contains the result of the poll
return { data: inputData.main as INodeExecutionData[][] }; return { data: inputData.main as INodeExecutionData[][] };