refactor(core): Extract poll context out of NodeExecutionFunctions (no-changelog) (#11449)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-11-04 09:49:52 +01:00 committed by GitHub
parent ce963e8824
commit 2104fa1733
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 1222 additions and 103 deletions

View file

@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { ActiveWorkflows, InstanceSettings, NodeExecuteFunctions } from 'n8n-core';
import { ActiveWorkflows, InstanceSettings, NodeExecuteFunctions, PollContext } from 'n8n-core';
import type {
ExecutionError,
IDeferredPromise,
@ -274,18 +274,11 @@ export class ActiveWorkflowManager {
activation: WorkflowActivateMode,
): IGetExecutePollFunctions {
return (workflow: Workflow, node: INode) => {
const returnFunctions = NodeExecuteFunctions.getExecutePollFunctions(
workflow,
node,
additionalData,
mode,
activation,
);
returnFunctions.__emit = (
const __emit = (
data: INodeExecutionData[][],
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
donePromise?: IDeferredPromise<IRun | undefined>,
): void => {
) => {
this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`);
void this.workflowStaticDataService.saveStaticData(workflow);
const executePromise = this.workflowExecutionService.runWorkflow(
@ -309,14 +302,15 @@ export class ActiveWorkflowManager {
}
};
returnFunctions.__emitError = (error: ExecutionError): void => {
const __emitError = (error: ExecutionError) => {
void this.executionService
.createErrorExecution(error, node, workflowData, workflow, mode)
.then(() => {
this.executeErrorWorkflow(error, workflowData, mode);
});
};
return returnFunctions;
return new PollContext(workflow, node, additionalData, mode, activation, __emit, __emitError);
};
}

View file

@ -13,13 +13,13 @@
"scripts": {
"clean": "rimraf dist .turbo",
"typecheck": "tsc --noEmit",
"build": "tsc -p tsconfig.build.json",
"build": "tsc -p tsconfig.build.json && tsc-alias -p tsconfig.build.json",
"dev": "pnpm watch",
"format": "biome format --write .",
"format:check": "biome ci .",
"lint": "eslint . --quiet",
"lintfix": "eslint . --fix",
"watch": "tsc -p tsconfig.build.json --watch",
"watch": "tsc-watch -p tsconfig.build.json --onCompilationComplete \"tsc-alias -p tsconfig.build.json\"",
"test": "jest"
},
"files": [

View file

@ -167,6 +167,8 @@ import {
import { extractValue } from './ExtractValue';
import { InstanceSettings } from './InstanceSettings';
import type { ExtendedValidationResult, IResponseError } from './Interfaces';
// eslint-disable-next-line import/no-cycle
import { PollContext } from './node-execution-context';
import { ScheduledTaskManager } from './ScheduledTaskManager';
import { getSecretsProxy } from './Secrets';
import { SSHClientsManager } from './SSHClientsManager';
@ -215,7 +217,7 @@ const createFormDataObject = (data: Record<string, unknown>) => {
return formData;
};
const validateUrl = (url?: string): boolean => {
export const validateUrl = (url?: string): boolean => {
if (!url) return false;
try {
@ -776,7 +778,7 @@ export function parseIncomingMessage(message: IncomingMessage) {
}
}
async function binaryToString(body: Buffer | Readable, encoding?: BufferEncoding) {
export async function binaryToString(body: Buffer | Readable, encoding?: BufferEncoding) {
const buffer = await binaryToBuffer(body);
if (!encoding && body instanceof IncomingMessage) {
parseIncomingMessage(body);
@ -1010,7 +1012,7 @@ export const removeEmptyBody = (requestOptions: IHttpRequestOptions | IRequestOp
}
};
async function httpRequest(
export async function httpRequest(
requestOptions: IHttpRequestOptions,
): Promise<IN8nHttpFullResponse | IN8nHttpResponse> {
removeEmptyBody(requestOptions);
@ -1205,7 +1207,7 @@ export async function copyBinaryFile(
* base64 and adds metadata.
*/
// eslint-disable-next-line complexity
async function prepareBinaryData(
export async function prepareBinaryData(
binaryData: Buffer | Readable,
executionId: string,
workflowId: string,
@ -1348,6 +1350,7 @@ export async function clearAllProcessedItems(
options,
);
}
export async function getProcessedDataCount(
scope: DeduplicationScope,
contextData: ICheckProcessedContextData,
@ -1359,7 +1362,8 @@ export async function getProcessedDataCount(
options,
);
}
function applyPaginationRequestData(
export function applyPaginationRequestData(
requestData: IRequestOptions,
paginationRequestData: PaginationOptions['request'],
): IRequestOptions {
@ -3553,57 +3557,7 @@ export function getExecutePollFunctions(
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
): IPollFunctions {
return ((workflow: Workflow, node: INode) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
__emit: (): void => {
throw new ApplicationError(
'Overwrite NodeExecuteFunctions.getExecutePollFunctions.__emit function',
);
},
__emitError() {
throw new ApplicationError(
'Overwrite NodeExecuteFunctions.getExecutePollFunctions.__emitError function',
);
},
getMode: () => mode,
getActivationMode: () => activation,
getCredentials: async (type) =>
await getCredentials(workflow, node, type, additionalData, mode),
getNodeParameter: (
parameterName: string,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
): NodeParameterValueType | object => {
const runExecutionData: IRunExecutionData | null = null;
const itemIndex = 0;
const runIndex = 0;
const connectionInputData: INodeExecutionData[] = [];
return getNodeParameter(
workflow,
runExecutionData,
runIndex,
connectionInputData,
node,
parameterName,
itemIndex,
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
undefined,
fallbackValue,
options,
);
},
helpers: {
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getSchedulingFunctions(workflow),
returnJsonArray,
},
};
})(workflow, node);
return new PollContext(workflow, node, additionalData, mode, activation);
}
/**
@ -4400,6 +4354,7 @@ export function getExecuteSingleFunctions(
},
helpers: {
createDeferredPromise,
returnJsonArray,
...getRequestHelperFunctions(
workflow,
node,

View file

@ -20,3 +20,4 @@ export { ObjectStoreService } from './ObjectStore/ObjectStore.service.ee';
export { BinaryData } from './BinaryData/types';
export { isStoredMode as isValidNonDefaultMode } from './BinaryData/utils';
export * from './ExecutionMetadata';
export * from './node-execution-context';

View file

@ -0,0 +1,168 @@
import { mock } from 'jest-mock-extended';
import type {
INode,
INodeExecutionData,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { Container } from 'typedi';
import { InstanceSettings } from '@/InstanceSettings';
import { NodeExecutionContext } from '../node-execution-context';
class TestContext extends NodeExecutionContext {}
describe('BaseContext', () => {
const instanceSettings = mock<InstanceSettings>({ instanceId: 'abc123' });
Container.set(InstanceSettings, instanceSettings);
const workflow = mock<Workflow>({
id: '123',
name: 'Test Workflow',
active: true,
nodeTypes: mock(),
timezone: 'UTC',
});
const node = mock<INode>();
let additionalData = mock<IWorkflowExecuteAdditionalData>({
credentialsHelper: mock(),
});
const mode: WorkflowExecuteMode = 'manual';
const testContext = new TestContext(workflow, node, additionalData, mode);
beforeEach(() => {
jest.clearAllMocks();
});
describe('getNode', () => {
it('should return a deep copy of the node', () => {
const result = testContext.getNode();
expect(result).not.toBe(node);
expect(JSON.stringify(result)).toEqual(JSON.stringify(node));
});
});
describe('getWorkflow', () => {
it('should return the id, name, and active properties of the workflow', () => {
const result = testContext.getWorkflow();
expect(result).toEqual({ id: '123', name: 'Test Workflow', active: true });
});
});
describe('getMode', () => {
it('should return the mode property', () => {
const result = testContext.getMode();
expect(result).toBe(mode);
});
});
describe('getWorkflowStaticData', () => {
it('should call getStaticData method of workflow', () => {
testContext.getWorkflowStaticData('testType');
expect(workflow.getStaticData).toHaveBeenCalledWith('testType', node);
});
});
describe('getChildNodes', () => {
it('should return an array of NodeTypeAndVersion objects for the child nodes of the given node', () => {
const childNode1 = mock<INode>({ name: 'Child Node 1', type: 'testType1', typeVersion: 1 });
const childNode2 = mock<INode>({ name: 'Child Node 2', type: 'testType2', typeVersion: 2 });
workflow.getChildNodes.mockReturnValue(['Child Node 1', 'Child Node 2']);
workflow.nodes = {
'Child Node 1': childNode1,
'Child Node 2': childNode2,
};
const result = testContext.getChildNodes('Test Node');
expect(result).toEqual([
{ name: 'Child Node 1', type: 'testType1', typeVersion: 1 },
{ name: 'Child Node 2', type: 'testType2', typeVersion: 2 },
]);
});
});
describe('getParentNodes', () => {
it('should return an array of NodeTypeAndVersion objects for the parent nodes of the given node', () => {
const parentNode1 = mock<INode>({ name: 'Parent Node 1', type: 'testType1', typeVersion: 1 });
const parentNode2 = mock<INode>({ name: 'Parent Node 2', type: 'testType2', typeVersion: 2 });
workflow.getParentNodes.mockReturnValue(['Parent Node 1', 'Parent Node 2']);
workflow.nodes = {
'Parent Node 1': parentNode1,
'Parent Node 2': parentNode2,
};
const result = testContext.getParentNodes('Test Node');
expect(result).toEqual([
{ name: 'Parent Node 1', type: 'testType1', typeVersion: 1 },
{ name: 'Parent Node 2', type: 'testType2', typeVersion: 2 },
]);
});
});
describe('getKnownNodeTypes', () => {
it('should call getKnownTypes method of workflow.nodeTypes', () => {
testContext.getKnownNodeTypes();
expect(workflow.nodeTypes.getKnownTypes).toHaveBeenCalled();
});
});
describe('getRestApiUrl', () => {
it('should return the restApiUrl property of additionalData', () => {
additionalData.restApiUrl = 'https://example.com/api';
const result = testContext.getRestApiUrl();
expect(result).toBe('https://example.com/api');
});
});
describe('getInstanceBaseUrl', () => {
it('should return the instanceBaseUrl property of additionalData', () => {
additionalData.instanceBaseUrl = 'https://example.com';
const result = testContext.getInstanceBaseUrl();
expect(result).toBe('https://example.com');
});
});
describe('getInstanceId', () => {
it('should return the instanceId property of instanceSettings', () => {
const result = testContext.getInstanceId();
expect(result).toBe('abc123');
});
});
describe('getTimezone', () => {
it('should return the timezone property of workflow', () => {
const result = testContext.getTimezone();
expect(result).toBe('UTC');
});
});
describe('getCredentialsProperties', () => {
it('should call getCredentialsProperties method of additionalData.credentialsHelper', () => {
testContext.getCredentialsProperties('testType');
expect(additionalData.credentialsHelper.getCredentialsProperties).toHaveBeenCalledWith(
'testType',
);
});
});
describe('prepareOutputData', () => {
it('should return the input array wrapped in another array', async () => {
const outputData = [mock<INodeExecutionData>(), mock<INodeExecutionData>()];
const result = await testContext.prepareOutputData(outputData);
expect(result).toEqual([outputData]);
});
});
});

View file

@ -0,0 +1,96 @@
import { mock } from 'jest-mock-extended';
import type {
Expression,
ICredentialDataDecryptedObject,
ICredentialsHelper,
INode,
INodeType,
INodeTypes,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { PollContext } from '../poll-context';
describe('PollContext', () => {
const testCredentialType = 'testCredential';
const nodeType = mock<INodeType>({
description: {
credentials: [
{
name: testCredentialType,
required: true,
},
],
properties: [
{
name: 'testParameter',
required: true,
},
],
},
});
const nodeTypes = mock<INodeTypes>();
const expression = mock<Expression>();
const workflow = mock<Workflow>({ expression, nodeTypes });
const node = mock<INode>({
credentials: {
[testCredentialType]: {
id: 'testCredentialId',
},
},
});
node.parameters = {
testParameter: 'testValue',
};
const credentialsHelper = mock<ICredentialsHelper>();
const additionalData = mock<IWorkflowExecuteAdditionalData>({ credentialsHelper });
const mode: WorkflowExecuteMode = 'manual';
const activation: WorkflowActivateMode = 'init';
const pollContext = new PollContext(workflow, node, additionalData, mode, activation);
beforeEach(() => {
jest.clearAllMocks();
});
describe('getActivationMode', () => {
it('should return the activation property', () => {
const result = pollContext.getActivationMode();
expect(result).toBe(activation);
});
});
describe('getCredentials', () => {
it('should get decrypted credentials', async () => {
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
credentialsHelper.getDecrypted.mockResolvedValue({ secret: 'token' });
const credentials =
await pollContext.getCredentials<ICredentialDataDecryptedObject>(testCredentialType);
expect(credentials).toEqual({ secret: 'token' });
});
});
describe('getNodeParameter', () => {
beforeEach(() => {
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
expression.getParameterValue.mockImplementation((value) => value);
});
it('should return parameter value when it exists', () => {
const parameter = pollContext.getNodeParameter('testParameter');
expect(parameter).toBe('testValue');
});
it('should return the fallback value when the parameter does not exist', () => {
const parameter = pollContext.getNodeParameter('otherParameter', 'fallback');
expect(parameter).toBe('fallback');
});
});
});

View file

@ -0,0 +1,136 @@
import FileType from 'file-type';
import { IncomingMessage, type ClientRequest } from 'http';
import { mock } from 'jest-mock-extended';
import type { Workflow, IWorkflowExecuteAdditionalData, IBinaryData } from 'n8n-workflow';
import type { Socket } from 'net';
import { Container } from 'typedi';
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
import { BinaryHelpers } from '../binary-helpers';
jest.mock('file-type');
describe('BinaryHelpers', () => {
let binaryDataService = mock<BinaryDataService>();
Container.set(BinaryDataService, binaryDataService);
const workflow = mock<Workflow>({ id: '123' });
const additionalData = mock<IWorkflowExecuteAdditionalData>({ executionId: '456' });
const binaryHelpers = new BinaryHelpers(workflow, additionalData);
beforeEach(() => {
jest.clearAllMocks();
binaryDataService.store.mockImplementation(
async (_workflowId, _executionId, _buffer, value) => value,
);
});
describe('getBinaryPath', () => {
it('should call getPath method of BinaryDataService', () => {
binaryHelpers.getBinaryPath('mock-binary-data-id');
expect(binaryDataService.getPath).toHaveBeenCalledWith('mock-binary-data-id');
});
});
describe('getBinaryMetadata', () => {
it('should call getMetadata method of BinaryDataService', async () => {
await binaryHelpers.getBinaryMetadata('mock-binary-data-id');
expect(binaryDataService.getMetadata).toHaveBeenCalledWith('mock-binary-data-id');
});
});
describe('getBinaryStream', () => {
it('should call getStream method of BinaryDataService', async () => {
await binaryHelpers.getBinaryStream('mock-binary-data-id');
expect(binaryDataService.getAsStream).toHaveBeenCalledWith('mock-binary-data-id', undefined);
});
});
describe('prepareBinaryData', () => {
it('should guess the mime type and file extension if not provided', async () => {
const buffer = Buffer.from('test');
const fileTypeData = { mime: 'application/pdf', ext: 'pdf' };
(FileType.fromBuffer as jest.Mock).mockResolvedValue(fileTypeData);
const binaryData = await binaryHelpers.prepareBinaryData(buffer);
expect(binaryData.mimeType).toEqual('application/pdf');
expect(binaryData.fileExtension).toEqual('pdf');
expect(binaryData.fileType).toEqual('pdf');
expect(binaryData.fileName).toBeUndefined();
expect(binaryData.directory).toBeUndefined();
expect(binaryDataService.store).toHaveBeenCalledWith(
workflow.id,
additionalData.executionId!,
buffer,
binaryData,
);
});
it('should use the provided mime type and file extension if provided', async () => {
const buffer = Buffer.from('test');
const mimeType = 'application/octet-stream';
const binaryData = await binaryHelpers.prepareBinaryData(buffer, undefined, mimeType);
expect(binaryData.mimeType).toEqual(mimeType);
expect(binaryData.fileExtension).toEqual('bin');
expect(binaryData.fileType).toBeUndefined();
expect(binaryData.fileName).toBeUndefined();
expect(binaryData.directory).toBeUndefined();
expect(binaryDataService.store).toHaveBeenCalledWith(
workflow.id,
additionalData.executionId!,
buffer,
binaryData,
);
});
const mockSocket = mock<Socket>({ readableHighWaterMark: 0 });
it('should use the contentDisposition.filename, responseUrl, and contentType properties to set the fileName, directory, and mimeType properties of the binaryData object', async () => {
const incomingMessage = new IncomingMessage(mockSocket);
incomingMessage.contentDisposition = { filename: 'test.txt', type: 'attachment' };
incomingMessage.contentType = 'text/plain';
incomingMessage.responseUrl = 'https://example.com/test.txt';
const binaryData = await binaryHelpers.prepareBinaryData(incomingMessage);
expect(binaryData.fileName).toEqual('test.txt');
expect(binaryData.fileType).toEqual('text');
expect(binaryData.directory).toBeUndefined();
expect(binaryData.mimeType).toEqual('text/plain');
expect(binaryData.fileExtension).toEqual('txt');
});
it('should use the req.path property to set the fileName property of the binaryData object if contentDisposition.filename and responseUrl are not provided', async () => {
const incomingMessage = new IncomingMessage(mockSocket);
incomingMessage.contentType = 'text/plain';
incomingMessage.req = mock<ClientRequest>({ path: '/test.txt' });
const binaryData = await binaryHelpers.prepareBinaryData(incomingMessage);
expect(binaryData.fileName).toEqual('test.txt');
expect(binaryData.directory).toBeUndefined();
expect(binaryData.mimeType).toEqual('text/plain');
expect(binaryData.fileExtension).toEqual('txt');
});
});
describe('setBinaryDataBuffer', () => {
it('should call store method of BinaryDataService', async () => {
const binaryData = mock<IBinaryData>();
const bufferOrStream = mock<Buffer>();
await binaryHelpers.setBinaryDataBuffer(binaryData, bufferOrStream);
expect(binaryDataService.store).toHaveBeenCalledWith(
workflow.id,
additionalData.executionId,
bufferOrStream,
binaryData,
);
});
});
});

View file

@ -0,0 +1,33 @@
import { mock } from 'jest-mock-extended';
import type { Workflow } from 'n8n-workflow';
import { Container } from 'typedi';
import { ScheduledTaskManager } from '@/ScheduledTaskManager';
import { SchedulingHelpers } from '../scheduling-helpers';
describe('SchedulingHelpers', () => {
const scheduledTaskManager = mock<ScheduledTaskManager>();
Container.set(ScheduledTaskManager, scheduledTaskManager);
const workflow = mock<Workflow>();
const schedulingHelpers = new SchedulingHelpers(workflow);
beforeEach(() => {
jest.clearAllMocks();
});
describe('registerCron', () => {
it('should call registerCron method of ScheduledTaskManager', () => {
const cronExpression = '* * * * * *';
const onTick = jest.fn();
schedulingHelpers.registerCron(cronExpression, onTick);
expect(scheduledTaskManager.registerCron).toHaveBeenCalledWith(
workflow,
cronExpression,
onTick,
);
});
});
});

View file

@ -0,0 +1,148 @@
import FileType from 'file-type';
import { IncomingMessage } from 'http';
import MimeTypes from 'mime-types';
import { ApplicationError, fileTypeFromMimeType } from 'n8n-workflow';
import type {
BinaryHelperFunctions,
IWorkflowExecuteAdditionalData,
Workflow,
IBinaryData,
} from 'n8n-workflow';
import path from 'path';
import type { Readable } from 'stream';
import Container from 'typedi';
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
import { binaryToBuffer } from '@/BinaryData/utils';
// eslint-disable-next-line import/no-cycle
import { binaryToString } from '@/NodeExecuteFunctions';
export class BinaryHelpers {
private readonly binaryDataService = Container.get(BinaryDataService);
constructor(
private readonly workflow: Workflow,
private readonly additionalData: IWorkflowExecuteAdditionalData,
) {}
get exported(): BinaryHelperFunctions {
return {
getBinaryPath: this.getBinaryPath.bind(this),
getBinaryMetadata: this.getBinaryMetadata.bind(this),
getBinaryStream: this.getBinaryStream.bind(this),
binaryToBuffer,
binaryToString,
prepareBinaryData: this.prepareBinaryData.bind(this),
setBinaryDataBuffer: this.setBinaryDataBuffer.bind(this),
copyBinaryFile: this.copyBinaryFile.bind(this),
};
}
getBinaryPath(binaryDataId: string) {
return this.binaryDataService.getPath(binaryDataId);
}
async getBinaryMetadata(binaryDataId: string) {
return await this.binaryDataService.getMetadata(binaryDataId);
}
async getBinaryStream(binaryDataId: string, chunkSize?: number) {
return await this.binaryDataService.getAsStream(binaryDataId, chunkSize);
}
// eslint-disable-next-line complexity
async prepareBinaryData(binaryData: Buffer | Readable, filePath?: string, mimeType?: string) {
let fileExtension: string | undefined;
if (binaryData instanceof IncomingMessage) {
if (!filePath) {
try {
const { responseUrl } = binaryData;
filePath =
binaryData.contentDisposition?.filename ??
((responseUrl && new URL(responseUrl).pathname) ?? binaryData.req?.path)?.slice(1);
} catch {}
}
if (!mimeType) {
mimeType = binaryData.contentType;
}
}
if (!mimeType) {
// If no mime type is given figure it out
if (filePath) {
// Use file path to guess mime type
const mimeTypeLookup = MimeTypes.lookup(filePath);
if (mimeTypeLookup) {
mimeType = mimeTypeLookup;
}
}
if (!mimeType) {
if (Buffer.isBuffer(binaryData)) {
// Use buffer to guess mime type
const fileTypeData = await FileType.fromBuffer(binaryData);
if (fileTypeData) {
mimeType = fileTypeData.mime;
fileExtension = fileTypeData.ext;
}
} else if (binaryData instanceof IncomingMessage) {
mimeType = binaryData.headers['content-type'];
} else {
// TODO: detect filetype from other kind of streams
}
}
}
if (!fileExtension && mimeType) {
fileExtension = MimeTypes.extension(mimeType) || undefined;
}
if (!mimeType) {
// Fall back to text
mimeType = 'text/plain';
}
const returnData: IBinaryData = {
mimeType,
fileType: fileTypeFromMimeType(mimeType),
fileExtension,
data: '',
};
if (filePath) {
if (filePath.includes('?')) {
// Remove maybe present query parameters
filePath = filePath.split('?').shift();
}
const filePathParts = path.parse(filePath as string);
if (filePathParts.dir !== '') {
returnData.directory = filePathParts.dir;
}
returnData.fileName = filePathParts.base;
// Remove the dot
const extractedFileExtension = filePathParts.ext.slice(1);
if (extractedFileExtension) {
returnData.fileExtension = extractedFileExtension;
}
}
return await this.setBinaryDataBuffer(returnData, binaryData);
}
async setBinaryDataBuffer(binaryData: IBinaryData, bufferOrStream: Buffer | Readable) {
return await this.binaryDataService.store(
this.workflow.id,
this.additionalData.executionId!,
bufferOrStream,
binaryData,
);
}
async copyBinaryFile(): Promise<never> {
throw new ApplicationError('`copyBinaryFile` has been removed. Please upgrade this node.');
}
}

View file

@ -0,0 +1,381 @@
import { createHash } from 'crypto';
import { pick } from 'lodash';
import { jsonParse, NodeOperationError, sleep } from 'n8n-workflow';
import type {
RequestHelperFunctions,
IAdditionalCredentialOptions,
IAllExecuteFunctions,
IExecuteData,
IHttpRequestOptions,
IN8nHttpFullResponse,
IN8nHttpResponse,
INode,
INodeExecutionData,
IOAuth2Options,
IRequestOptions,
IRunExecutionData,
IWorkflowDataProxyAdditionalKeys,
IWorkflowExecuteAdditionalData,
NodeParameterValueType,
PaginationOptions,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { Readable } from 'stream';
// eslint-disable-next-line import/no-cycle
import {
applyPaginationRequestData,
binaryToString,
httpRequest,
httpRequestWithAuthentication,
proxyRequestToAxios,
requestOAuth1,
requestOAuth2,
requestWithAuthentication,
validateUrl,
} from '@/NodeExecuteFunctions';
export class RequestHelpers {
constructor(
private readonly context: IAllExecuteFunctions,
private readonly workflow: Workflow,
private readonly node: INode,
private readonly additionalData: IWorkflowExecuteAdditionalData,
private readonly runExecutionData: IRunExecutionData | null = null,
private readonly connectionInputData: INodeExecutionData[] = [],
) {}
get exported(): RequestHelperFunctions {
return {
httpRequest,
httpRequestWithAuthentication: this.httpRequestWithAuthentication.bind(this),
requestWithAuthenticationPaginated: this.requestWithAuthenticationPaginated.bind(this),
request: this.request.bind(this),
requestWithAuthentication: this.requestWithAuthentication.bind(this),
requestOAuth1: this.requestOAuth1.bind(this),
requestOAuth2: this.requestOAuth2.bind(this),
};
}
get httpRequest() {
return httpRequest;
}
async httpRequestWithAuthentication(
credentialsType: string,
requestOptions: IHttpRequestOptions,
additionalCredentialOptions?: IAdditionalCredentialOptions,
) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return await httpRequestWithAuthentication.call(
this.context,
credentialsType,
requestOptions,
this.workflow,
this.node,
this.additionalData,
additionalCredentialOptions,
);
}
// eslint-disable-next-line complexity
async requestWithAuthenticationPaginated(
requestOptions: IRequestOptions,
itemIndex: number,
paginationOptions: PaginationOptions,
credentialsType?: string,
additionalCredentialOptions?: IAdditionalCredentialOptions,
): Promise<unknown[]> {
const responseData = [];
if (!requestOptions.qs) {
requestOptions.qs = {};
}
requestOptions.resolveWithFullResponse = true;
requestOptions.simple = false;
let tempResponseData: IN8nHttpFullResponse;
let makeAdditionalRequest: boolean;
let paginateRequestData: PaginationOptions['request'];
const runIndex = 0;
const additionalKeys = {
$request: requestOptions,
$response: {} as IN8nHttpFullResponse,
$version: this.node.typeVersion,
$pageCount: 0,
};
const executeData: IExecuteData = {
data: {},
node: this.node,
source: null,
};
const hashData = {
identicalCount: 0,
previousLength: 0,
previousHash: '',
};
do {
paginateRequestData = this.getResolvedValue(
paginationOptions.request as unknown as NodeParameterValueType,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as object as PaginationOptions['request'];
const tempRequestOptions = applyPaginationRequestData(requestOptions, paginateRequestData);
if (!validateUrl(tempRequestOptions.uri as string)) {
throw new NodeOperationError(
this.node,
`'${paginateRequestData.url}' is not a valid URL.`,
{
itemIndex,
runIndex,
type: 'invalid_url',
},
);
}
if (credentialsType) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
tempResponseData = await this.requestWithAuthentication(
credentialsType,
tempRequestOptions,
additionalCredentialOptions,
);
} else {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
tempResponseData = await this.request(tempRequestOptions);
}
const newResponse: IN8nHttpFullResponse = Object.assign(
{
body: {},
headers: {},
statusCode: 0,
},
pick(tempResponseData, ['body', 'headers', 'statusCode']),
);
let contentBody: Exclude<IN8nHttpResponse, Buffer>;
if (newResponse.body instanceof Readable && paginationOptions.binaryResult !== true) {
// Keep the original string version that we can use it to hash if needed
contentBody = await binaryToString(newResponse.body as Buffer | Readable);
const responseContentType = newResponse.headers['content-type']?.toString() ?? '';
if (responseContentType.includes('application/json')) {
newResponse.body = jsonParse(contentBody, { fallbackValue: {} });
} else {
newResponse.body = contentBody;
}
tempResponseData.__bodyResolved = true;
tempResponseData.body = newResponse.body;
} else {
contentBody = newResponse.body;
}
if (paginationOptions.binaryResult !== true || tempResponseData.headers.etag) {
// If the data is not binary (and so not a stream), or an etag is present,
// we check via etag or hash if identical data is received
let contentLength = 0;
if ('content-length' in tempResponseData.headers) {
contentLength = parseInt(tempResponseData.headers['content-length'] as string) || 0;
}
if (hashData.previousLength === contentLength) {
let hash: string;
if (tempResponseData.headers.etag) {
// If an etag is provided, we use it as "hash"
hash = tempResponseData.headers.etag as string;
} else {
// If there is no etag, we calculate a hash from the data in the body
if (typeof contentBody !== 'string') {
contentBody = JSON.stringify(contentBody);
}
hash = createHash('md5').update(contentBody).digest('base64');
}
if (hashData.previousHash === hash) {
hashData.identicalCount += 1;
if (hashData.identicalCount > 2) {
// Length was identical 5x and hash 3x
throw new NodeOperationError(
this.node,
'The returned response was identical 5x, so requests got stopped',
{
itemIndex,
description:
'Check if "Pagination Completed When" has been configured correctly.',
},
);
}
} else {
hashData.identicalCount = 0;
}
hashData.previousHash = hash;
} else {
hashData.identicalCount = 0;
}
hashData.previousLength = contentLength;
}
responseData.push(tempResponseData);
additionalKeys.$response = newResponse;
additionalKeys.$pageCount = additionalKeys.$pageCount + 1;
const maxRequests = this.getResolvedValue(
paginationOptions.maxRequests,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as number;
if (maxRequests && additionalKeys.$pageCount >= maxRequests) {
break;
}
makeAdditionalRequest = this.getResolvedValue(
paginationOptions.continue,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as boolean;
if (makeAdditionalRequest) {
if (paginationOptions.requestInterval) {
const requestInterval = this.getResolvedValue(
paginationOptions.requestInterval,
itemIndex,
runIndex,
executeData,
additionalKeys,
false,
) as number;
await sleep(requestInterval);
}
if (tempResponseData.statusCode < 200 || tempResponseData.statusCode >= 300) {
// We have it configured to let all requests pass no matter the response code
// via "requestOptions.simple = false" to not by default fail if it is for example
// configured to stop on 404 response codes. For that reason we have to throw here
// now an error manually if the response code is not a success one.
let data = tempResponseData.body;
if (data instanceof Readable && paginationOptions.binaryResult !== true) {
data = await binaryToString(data as Buffer | Readable);
} else if (typeof data === 'object') {
data = JSON.stringify(data);
}
throw Object.assign(new Error(`${tempResponseData.statusCode} - "${data?.toString()}"`), {
statusCode: tempResponseData.statusCode,
error: data,
isAxiosError: true,
response: {
headers: tempResponseData.headers,
status: tempResponseData.statusCode,
statusText: tempResponseData.statusMessage,
},
});
}
}
} while (makeAdditionalRequest);
return responseData;
}
async request(uriOrObject: string | IRequestOptions, options?: IRequestOptions) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return await proxyRequestToAxios(
this.workflow,
this.additionalData,
this.node,
uriOrObject,
options,
);
}
async requestWithAuthentication(
credentialsType: string,
requestOptions: IRequestOptions,
additionalCredentialOptions?: IAdditionalCredentialOptions,
itemIndex?: number,
) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return await requestWithAuthentication.call(
this.context,
credentialsType,
requestOptions,
this.workflow,
this.node,
this.additionalData,
additionalCredentialOptions,
itemIndex,
);
}
async requestOAuth1(credentialsType: string, requestOptions: IRequestOptions) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return await requestOAuth1.call(this.context, credentialsType, requestOptions);
}
async requestOAuth2(
credentialsType: string,
requestOptions: IRequestOptions,
oAuth2Options?: IOAuth2Options,
) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return await requestOAuth2.call(
this.context,
credentialsType,
requestOptions,
this.node,
this.additionalData,
oAuth2Options,
);
}
private getResolvedValue(
parameterValue: NodeParameterValueType,
itemIndex: number,
runIndex: number,
executeData: IExecuteData,
additionalKeys?: IWorkflowDataProxyAdditionalKeys,
returnObjectAsString = false,
): NodeParameterValueType {
const mode: WorkflowExecuteMode = 'internal';
if (
typeof parameterValue === 'object' ||
(typeof parameterValue === 'string' && parameterValue.charAt(0) === '=')
) {
return this.workflow.expression.getParameterValue(
parameterValue,
this.runExecutionData,
runIndex,
itemIndex,
this.node.name,
this.connectionInputData,
mode,
additionalKeys ?? {},
executeData,
returnObjectAsString,
);
}
return parameterValue;
}
}

View file

@ -0,0 +1,20 @@
import type { CronExpression, Workflow, SchedulingFunctions } from 'n8n-workflow';
import { Container } from 'typedi';
import { ScheduledTaskManager } from '@/ScheduledTaskManager';
export class SchedulingHelpers {
private readonly scheduledTaskManager = Container.get(ScheduledTaskManager);
constructor(private readonly workflow: Workflow) {}
get exported(): SchedulingFunctions {
return {
registerCron: this.registerCron.bind(this),
};
}
registerCron(cronExpression: CronExpression, onTick: () => void) {
this.scheduledTaskManager.registerCron(this.workflow, cronExpression, onTick);
}
}

View file

@ -0,0 +1,2 @@
// eslint-disable-next-line import/no-cycle
export { PollContext } from './poll-context';

View file

@ -0,0 +1,107 @@
import type {
FunctionsBase,
INode,
INodeExecutionData,
IWorkflowExecuteAdditionalData,
NodeTypeAndVersion,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { deepCopy, LoggerProxy } from 'n8n-workflow';
import { Container } from 'typedi';
import { InstanceSettings } from '@/InstanceSettings';
export abstract class NodeExecutionContext implements Omit<FunctionsBase, 'getCredentials'> {
protected readonly instanceSettings = Container.get(InstanceSettings);
constructor(
protected readonly workflow: Workflow,
protected readonly node: INode,
protected readonly additionalData: IWorkflowExecuteAdditionalData,
protected readonly mode: WorkflowExecuteMode,
) {}
get logger() {
return LoggerProxy;
}
getExecutionId() {
return this.additionalData.executionId!;
}
getNode(): INode {
return deepCopy(this.node);
}
getWorkflow() {
const { id, name, active } = this.workflow;
return { id, name, active };
}
getMode() {
return this.mode;
}
getWorkflowStaticData(type: string) {
return this.workflow.getStaticData(type, this.node);
}
getChildNodes(nodeName: string) {
const output: NodeTypeAndVersion[] = [];
const nodeNames = this.workflow.getChildNodes(nodeName);
for (const n of nodeNames) {
const node = this.workflow.nodes[n];
output.push({
name: node.name,
type: node.type,
typeVersion: node.typeVersion,
});
}
return output;
}
getParentNodes(nodeName: string) {
const output: NodeTypeAndVersion[] = [];
const nodeNames = this.workflow.getParentNodes(nodeName);
for (const n of nodeNames) {
const node = this.workflow.nodes[n];
output.push({
name: node.name,
type: node.type,
typeVersion: node.typeVersion,
});
}
return output;
}
getKnownNodeTypes() {
return this.workflow.nodeTypes.getKnownTypes();
}
getRestApiUrl() {
return this.additionalData.restApiUrl;
}
getInstanceBaseUrl() {
return this.additionalData.instanceBaseUrl;
}
getInstanceId() {
return this.instanceSettings.instanceId;
}
getTimezone() {
return this.workflow.timezone;
}
getCredentialsProperties(type: string) {
return this.additionalData.credentialsHelper.getCredentialsProperties(type);
}
async prepareOutputData(outputData: INodeExecutionData[]) {
return [outputData];
}
}

View file

@ -0,0 +1,94 @@
import type {
ICredentialDataDecryptedObject,
IGetNodeParameterOptions,
INode,
INodeExecutionData,
IPollFunctions,
IRunExecutionData,
IWorkflowExecuteAdditionalData,
NodeParameterValueType,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle
import {
getAdditionalKeys,
getCredentials,
getNodeParameter,
returnJsonArray,
} from '@/NodeExecuteFunctions';
import { BinaryHelpers } from './helpers/binary-helpers';
import { RequestHelpers } from './helpers/request-helpers';
import { SchedulingHelpers } from './helpers/scheduling-helpers';
import { NodeExecutionContext } from './node-execution-context';
const throwOnEmit = () => {
throw new ApplicationError('Overwrite PollContext.__emit function');
};
const throwOnEmitError = () => {
throw new ApplicationError('Overwrite PollContext.__emitError function');
};
export class PollContext extends NodeExecutionContext implements IPollFunctions {
readonly helpers: IPollFunctions['helpers'];
constructor(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
private readonly activation: WorkflowActivateMode,
readonly __emit: IPollFunctions['__emit'] = throwOnEmit,
readonly __emitError: IPollFunctions['__emitError'] = throwOnEmitError,
) {
super(workflow, node, additionalData, mode);
this.helpers = {
createDeferredPromise,
returnJsonArray,
...new BinaryHelpers(workflow, additionalData).exported,
...new RequestHelpers(this, workflow, node, additionalData).exported,
...new SchedulingHelpers(workflow).exported,
};
}
getActivationMode() {
return this.activation;
}
async getCredentials<T extends object = ICredentialDataDecryptedObject>(type: string) {
return await getCredentials<T>(this.workflow, this.node, type, this.additionalData, this.mode);
}
getNodeParameter(
parameterName: string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
fallbackValue?: any,
options?: IGetNodeParameterOptions,
): NodeParameterValueType | object {
const runExecutionData: IRunExecutionData | null = null;
const itemIndex = 0;
const runIndex = 0;
const connectionInputData: INodeExecutionData[] = [];
return getNodeParameter(
this.workflow,
runExecutionData,
runIndex,
connectionInputData,
this.node,
parameterName,
itemIndex,
this.mode,
getAdditionalKeys(this.additionalData, this.mode, runExecutionData),
undefined,
fallbackValue,
options,
);
}
}

View file

@ -3,7 +3,7 @@ import { mock } from 'jest-mock-extended';
import get from 'lodash/get';
import merge from 'lodash/merge';
import set from 'lodash/set';
import { getExecutePollFunctions, returnJsonArray, type InstanceSettings } from 'n8n-core';
import { PollContext, returnJsonArray, type InstanceSettings } from 'n8n-core';
import { ScheduledTaskManager } from 'n8n-core/dist/ScheduledTaskManager';
import type {
IBinaryData,
@ -13,7 +13,6 @@ import type {
INode,
INodeType,
INodeTypes,
IPollFunctions,
ITriggerFunctions,
IWebhookFunctions,
IWorkflowExecuteAdditionalData,
@ -193,14 +192,15 @@ export async function testPollingTriggerNode(
options.node,
) as INode;
const workflow = mock<Workflow>({
timezone: options.timezone ?? 'Europe/Berlin',
timezone,
nodeTypes: mock<INodeTypes>({
getByNameAndVersion: () => mock<INodeType>({ description: trigger.description }),
}),
getStaticData: () => options.workflowStaticData ?? {},
});
const mode = options.mode ?? 'trigger';
const originalPollingFunctions = getExecutePollFunctions(
const pollContext = new PollContext(
workflow,
node,
mock<IWorkflowExecuteAdditionalData>({
@ -218,22 +218,13 @@ export async function testPollingTriggerNode(
'init',
);
async function getCredentials<T extends object = ICredentialDataDecryptedObject>(): Promise<T> {
return (options.credential ?? {}) as T;
}
pollContext.getNode = () => node;
pollContext.getCredentials = async <T extends object = ICredentialDataDecryptedObject>() =>
(options.credential ?? {}) as T;
pollContext.getNodeParameter = (parameterName, fallback) =>
get(node.parameters, parameterName) ?? fallback;
const pollingFunctions = mock<IPollFunctions>({
...originalPollingFunctions,
getCredentials,
getTimezone: () => timezone,
getNode: () => node,
getMode: () => mode,
getInstanceId: () => 'instanceId',
getWorkflowStaticData: () => options.workflowStaticData ?? {},
getNodeParameter: (parameterName, fallback) => get(node.parameters, parameterName) ?? fallback,
});
const response = await trigger.poll?.call(pollingFunctions);
const response = await trigger.poll?.call(pollContext);
return {
response,

View file

@ -731,11 +731,8 @@ export interface ICredentialTestFunctions {
};
}
interface BaseHelperFunctions {
export interface BaseHelperFunctions {
createDeferredPromise: <T = void>() => IDeferredPromise<T>;
}
interface JsonHelperFunctions {
returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[];
}
@ -756,6 +753,7 @@ export interface BinaryHelperFunctions {
mimeType?: string,
): Promise<IBinaryData>;
setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>;
/** @deprecated */
copyBinaryFile(): Promise<never>;
binaryToBuffer(body: Buffer | Readable): Promise<Buffer>;
binaryToString(body: Buffer | Readable, encoding?: BufferEncoding): Promise<string>;
@ -985,8 +983,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
BinaryHelperFunctions &
DeduplicationHelperFunctions &
FileSystemHelperFunctions &
SSHTunnelFunctions &
JsonHelperFunctions & {
SSHTunnelFunctions & {
normalizeItems(items: INodeExecutionData | INodeExecutionData[]): INodeExecutionData[];
constructExecutionMetaData(
inputData: INodeExecutionData[],
@ -1081,8 +1078,7 @@ export interface IPollFunctions
helpers: RequestHelperFunctions &
BaseHelperFunctions &
BinaryHelperFunctions &
SchedulingFunctions &
JsonHelperFunctions;
SchedulingFunctions;
}
export interface ITriggerFunctions
@ -1102,8 +1098,7 @@ export interface ITriggerFunctions
BaseHelperFunctions &
BinaryHelperFunctions &
SSHTunnelFunctions &
SchedulingFunctions &
JsonHelperFunctions;
SchedulingFunctions;
}
export interface IHookFunctions
@ -1140,10 +1135,7 @@ export interface IWebhookFunctions extends FunctionsBaseWithRequiredKeys<'getMod
getResponseObject(): express.Response;
getWebhookName(): string;
nodeHelpers: NodeHelperFunctions;
helpers: RequestHelperFunctions &
BaseHelperFunctions &
BinaryHelperFunctions &
JsonHelperFunctions;
helpers: RequestHelperFunctions & BaseHelperFunctions & BinaryHelperFunctions;
}
export interface INodeCredentialsDetails {
@ -1641,6 +1633,7 @@ export abstract class Node {
abstract description: INodeTypeDescription;
execute?(context: IExecuteFunctions): Promise<INodeExecutionData[][]>;
webhook?(context: IWebhookFunctions): Promise<IWebhookResponseData>;
poll?(context: IPollFunctions): Promise<INodeExecutionData[][] | null>;
}
export interface IVersionedNodeType {