feat(core): Add batching and other options to declarative nodes (#8885)

Co-authored-by: Michael Kret <michael.k@radency.com>
This commit is contained in:
Jan Oberhauser 2024-06-06 22:39:31 -07:00 committed by GitHub
parent e520f8a98f
commit 4e568631be
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 514 additions and 78 deletions

View file

@ -139,6 +139,7 @@
:parameters="parametersSetting"
:node-values="nodeValues"
:is-read-only="isReadOnly"
:hide-delete="true"
:hidden-issues-inputs="hiddenIssuesInputs"
path="parameters"
@value-changed="valueChanged"

View file

@ -272,6 +272,134 @@ const commonCORSParameters: INodeProperties[] = [
},
];
const declarativeNodeOptionParameters: INodeProperties = {
displayName: 'Request Options',
name: 'requestOptions',
type: 'collection',
isNodeSetting: true,
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Batching',
name: 'batching',
placeholder: 'Add Batching',
type: 'fixedCollection',
typeOptions: {
multipleValues: false,
},
default: {
batch: {},
},
options: [
{
displayName: 'Batching',
name: 'batch',
values: [
{
displayName: 'Items per Batch',
name: 'batchSize',
type: 'number',
typeOptions: {
minValue: -1,
},
default: 50,
description:
'Input will be split in batches to throttle requests. -1 for disabled. 0 will be treated as 1.',
},
{
displayName: 'Batch Interval (ms)',
name: 'batchInterval',
type: 'number',
typeOptions: {
minValue: 0,
},
default: 1000,
description: 'Time (in milliseconds) between each batch of requests. 0 for disabled.',
},
],
},
],
},
{
displayName: 'Ignore SSL Issues',
name: 'allowUnauthorizedCerts',
type: 'boolean',
noDataExpression: true,
default: false,
description:
'Whether to accept the response even if SSL certificate validation is not possible',
},
{
displayName: 'Proxy',
name: 'proxy',
type: 'string',
default: '',
placeholder: 'e.g. http://myproxy:3128',
description:
'HTTP proxy to use. If authentication is required it can be defined as follow: http://username:password@myproxy:3128',
},
{
displayName: 'Timeout',
name: 'timeout',
type: 'number',
typeOptions: {
minValue: 1,
},
default: 10000,
description:
'Time in ms to wait for the server to send response headers (and start the response body) before aborting the request',
},
],
};
export function applyDeclarativeNodeOptionParameters(nodeType: INodeType): void {
if (nodeType.execute || nodeType.trigger || nodeType.webhook || nodeType.description.polling) {
return;
}
const parameters = nodeType.description.properties;
if (!parameters) {
return;
}
// Was originally under "options" instead of "requestOptions" so the chance
// that that existed was quite high. With this name the chance is actually
// very low that it already exists but lets leave it in anyway to be sure.
const existingRequestOptionsIndex = parameters.findIndex(
(parameter) => parameter.name === 'requestOptions',
);
if (existingRequestOptionsIndex !== -1) {
parameters[existingRequestOptionsIndex] = {
...declarativeNodeOptionParameters,
options: [
...(declarativeNodeOptionParameters.options || []),
...(parameters[existingRequestOptionsIndex]?.options || []),
],
};
if (parameters[existingRequestOptionsIndex]?.options) {
parameters[existingRequestOptionsIndex].options!.sort((a, b) => {
if ('displayName' in a && 'displayName' in b) {
if (a.displayName < b.displayName) {
return -1;
}
if (a.displayName > b.displayName) {
return 1;
}
}
return 0;
});
}
} else {
parameters.push(declarativeNodeOptionParameters);
}
return;
}
/**
* Apply special parameters which should be added to nodeTypes depending on their type or configuration
*/
@ -289,6 +417,8 @@ export function applySpecialNodeParameters(nodeType: INodeType): void {
];
else properties.push(...commonCORSParameters);
}
applyDeclarativeNodeOptionParameters(nodeType);
}
const getPropertyValues = (

View file

@ -9,6 +9,7 @@
import get from 'lodash/get';
import merge from 'lodash/merge';
import set from 'lodash/set';
import url from 'node:url';
import type {
ICredentialDataDecryptedObject,
@ -46,6 +47,7 @@ import type { Workflow } from './Workflow';
import { NodeOperationError } from './errors/node-operation.error';
import { NodeApiError } from './errors/node-api.error';
import { sleep } from './utils';
export class RoutingNode {
additionalData: IWorkflowExecuteAdditionalData;
@ -76,6 +78,7 @@ export class RoutingNode {
this.workflow = workflow;
}
// eslint-disable-next-line complexity
async runNode(
inputData: ITaskDataConnections,
runIndex: number,
@ -87,7 +90,6 @@ export class RoutingNode {
): Promise<INodeExecutionData[][] | null | undefined> {
const items = inputData.main[0] as INodeExecutionData[];
const returnData: INodeExecutionData[] = [];
let responseData;
let credentialType: string | undefined;
@ -129,24 +131,41 @@ export class RoutingNode {
}
}
// TODO: Think about how batching could be handled for REST APIs which support it
for (let i = 0; i < items.length; i++) {
let thisArgs: IExecuteSingleFunctions | undefined;
try {
thisArgs = nodeExecuteFunctions.getExecuteSingleFunctions(
const { batching } = executeFunctions.getNodeParameter('requestOptions', 0, {}) as {
batching: { batch: { batchSize: number; batchInterval: number } };
};
const batchSize = batching?.batch?.batchSize > 0 ? batching?.batch?.batchSize : 1;
const batchInterval = batching?.batch.batchInterval;
const requestPromises = [];
const itemContext: Array<{
thisArgs: IExecuteSingleFunctions;
requestData: DeclarativeRestApiSettings.ResultOptions;
}> = [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
if (itemIndex > 0 && batchSize >= 0 && batchInterval > 0) {
if (itemIndex % batchSize === 0) {
await sleep(batchInterval);
}
}
itemContext.push({
thisArgs: nodeExecuteFunctions.getExecuteSingleFunctions(
this.workflow,
this.runExecutionData,
runIndex,
this.connectionInputData,
inputData,
this.node,
i,
itemIndex,
this.additionalData,
executeData,
this.mode,
abortSignal,
);
const requestData: DeclarativeRestApiSettings.ResultOptions = {
),
requestData: {
options: {
qs: {},
body: {},
@ -155,88 +174,160 @@ export class RoutingNode {
preSend: [],
postReceive: [],
requestOperations: {},
} as DeclarativeRestApiSettings.ResultOptions,
});
const { proxy, timeout, allowUnauthorizedCerts } = itemContext[
itemIndex
].thisArgs.getNodeParameter('requestOptions', 0, {}) as {
proxy: string;
timeout: number;
allowUnauthorizedCerts: boolean;
};
if (nodeType.description.requestOperations) {
itemContext[itemIndex].requestData.requestOperations = {
...nodeType.description.requestOperations,
};
}
if (nodeType.description.requestOperations) {
requestData.requestOperations = { ...nodeType.description.requestOperations };
}
if (nodeType.description.requestDefaults) {
for (const key of Object.keys(nodeType.description.requestDefaults)) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let value = (nodeType.description.requestDefaults as Record<string, any>)[key];
// If the value is an expression resolve it
value = this.getParameterValue(
value,
i,
runIndex,
executeData,
{ $credentials: credentials, $version: this.node.typeVersion },
false,
) as string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(requestData.options as Record<string, any>)[key] = value;
}
}
for (const property of nodeType.description.properties) {
let value = get(this.node.parameters, property.name, []) as string | NodeParameterValue;
if (nodeType.description.requestDefaults) {
for (const key of Object.keys(nodeType.description.requestDefaults)) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let value = (nodeType.description.requestDefaults as Record<string, any>)[key];
// If the value is an expression resolve it
value = this.getParameterValue(
value,
i,
itemIndex,
runIndex,
executeData,
{ $credentials: credentials, $version: this.node.typeVersion },
false,
) as string | NodeParameterValue;
const tempOptions = this.getRequestOptionsFromParameters(
thisArgs,
property,
i,
runIndex,
'',
{ $credentials: credentials, $value: value, $version: this.node.typeVersion },
);
this.mergeOptions(requestData, tempOptions);
) as string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(itemContext[itemIndex].requestData.options as Record<string, any>)[key] = value;
}
}
// TODO: Change to handle some requests in parallel (should be configurable)
responseData = await this.makeRoutingRequest(
requestData,
thisArgs,
i,
for (const property of nodeType.description.properties) {
let value = get(this.node.parameters, property.name, []) as string | NodeParameterValue;
// If the value is an expression resolve it
value = this.getParameterValue(
value,
itemIndex,
runIndex,
credentialType,
requestData.requestOperations,
credentialsDecrypted,
executeData,
{ $credentials: credentials, $version: this.node.typeVersion },
false,
) as string | NodeParameterValue;
const tempOptions = this.getRequestOptionsFromParameters(
itemContext[itemIndex].thisArgs,
property,
itemIndex,
runIndex,
'',
{ $credentials: credentials, $value: value, $version: this.node.typeVersion },
);
if (requestData.maxResults) {
// Remove not needed items in case APIs return to many
responseData.splice(requestData.maxResults as number);
this.mergeOptions(itemContext[itemIndex].requestData, tempOptions);
}
if (proxy) {
const proxyParsed = url.parse(proxy);
const proxyProperties = ['host', 'port'];
for (const property of proxyProperties) {
if (
!(property in proxyParsed) ||
proxyParsed[property as keyof typeof proxyParsed] === null
) {
throw new NodeOperationError(this.node, 'The proxy is not value', {
runIndex,
itemIndex,
description: `The proxy URL does not contain a valid value for "${property}"`,
});
}
}
returnData.push(...responseData);
} catch (error) {
if (thisArgs !== undefined && thisArgs.continueOnFail()) {
itemContext[itemIndex].requestData.options.proxy = {
host: proxyParsed.hostname as string,
port: parseInt(proxyParsed.port!),
protocol: proxyParsed.protocol?.replace(/:$/, '') || undefined,
};
if (proxyParsed.auth) {
const [username, password] = proxyParsed.auth.split(':');
itemContext[itemIndex].requestData.options.proxy!.auth = {
username,
password,
};
}
}
if (allowUnauthorizedCerts) {
itemContext[itemIndex].requestData.options.skipSslCertificateValidation =
allowUnauthorizedCerts;
}
if (timeout) {
itemContext[itemIndex].requestData.options.timeout = timeout;
} else {
// set default timeout to 5 minutes
itemContext[itemIndex].requestData.options.timeout = 300_000;
}
requestPromises.push(
this.makeRoutingRequest(
itemContext[itemIndex].requestData,
itemContext[itemIndex].thisArgs,
itemIndex,
runIndex,
credentialType,
itemContext[itemIndex].requestData.requestOperations,
credentialsDecrypted,
),
);
}
const promisesResponses = await Promise.allSettled(requestPromises);
let responseData: any;
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
responseData = promisesResponses.shift();
if (responseData!.status !== 'fulfilled') {
if (responseData.reason.statusCode === 429) {
responseData.reason.message =
"Try spacing your requests out using the batching settings under 'Options'";
}
const error = responseData.reason;
if (itemContext[itemIndex].thisArgs?.continueOnFail()) {
returnData.push({ json: {}, error: error as NodeApiError });
continue;
}
if (error instanceof NodeApiError) {
set(error, 'context.itemIndex', i);
set(error, 'context.itemIndex', itemIndex);
set(error, 'context.runIndex', runIndex);
throw error;
}
throw new NodeApiError(this.node, error as JsonObject, {
runIndex,
itemIndex: i,
itemIndex,
message: error?.message,
description: error?.description,
httpCode: error.isAxiosError && error.response ? String(error.response?.status) : 'none',
});
}
if (itemContext[itemIndex].requestData.maxResults) {
// Remove not needed items in case APIs return to many
responseData.value.splice(itemContext[itemIndex].requestData.maxResults as number);
}
returnData.push(...responseData.value);
}
return [returnData];

View file

@ -19,7 +19,10 @@ import type {
import { RoutingNode } from '@/RoutingNode';
import { Workflow } from '@/Workflow';
import * as utilsModule from '@/utils';
import * as Helpers from './Helpers';
import { applyDeclarativeNodeOptionParameters } from '@/NodeHelpers';
import { mock } from 'jest-mock-extended';
const postReceiveFunction1 = async function (
@ -42,6 +45,23 @@ const preSendFunction1 = async function (
describe('RoutingNode', () => {
const additionalData = mock<IWorkflowExecuteAdditionalData>();
test('applyDeclarativeNodeOptionParameters', () => {
const nodeTypes = Helpers.NodeTypes();
const nodeType = nodeTypes.getByNameAndVersion('test.setMulti');
applyDeclarativeNodeOptionParameters(nodeType);
const options = nodeType.description.properties.find(
(property) => property.name === 'requestOptions',
);
expect(options?.options).toBeDefined;
const optionNames = options!.options!.map((option) => option.name);
expect(optionNames).toEqual(['batching', 'allowUnauthorizedCerts', 'proxy', 'timeout']);
});
describe('getRequestOptionsFromParameters', () => {
const tests: Array<{
description: string;
@ -717,6 +737,11 @@ describe('RoutingNode', () => {
const tests: Array<{
description: string;
input: {
specialTestOptions?: {
applyDeclarativeNodeOptionParameters?: boolean;
numberOfItems?: number;
sleepCalls?: number[][];
};
nodeType: {
properties?: INodeProperties[];
credentials?: INodeCredentialDescription[];
@ -772,6 +797,7 @@ describe('RoutingNode', () => {
},
baseURL: 'http://127.0.0.1:5678',
returnFullResponse: true,
timeout: 300000,
},
},
},
@ -821,6 +847,7 @@ describe('RoutingNode', () => {
},
baseURL: 'http://127.0.0.1:5678',
returnFullResponse: true,
timeout: 300000,
},
},
},
@ -876,6 +903,7 @@ describe('RoutingNode', () => {
},
baseURL: 'http://127.0.0.1:5678',
returnFullResponse: true,
timeout: 300000,
},
},
},
@ -931,6 +959,7 @@ describe('RoutingNode', () => {
},
baseURL: 'http://127.0.0.1:5678',
returnFullResponse: true,
timeout: 300000,
},
},
},
@ -988,6 +1017,154 @@ describe('RoutingNode', () => {
offset: 10,
},
returnFullResponse: true,
timeout: 300000,
},
},
},
],
],
},
{
description: 'multiple parameters, from applyDeclarativeNodeOptionParameters',
input: {
specialTestOptions: {
applyDeclarativeNodeOptionParameters: true,
numberOfItems: 5,
sleepCalls: [[500], [500]],
},
node: {
parameters: {
requestOptions: {
allowUnauthorizedCerts: true,
batching: {
batch: {
batchSize: 2,
batchInterval: 500,
},
},
proxy: 'http://user:password@127.0.0.1:8080',
timeout: 123,
},
},
},
nodeType: {
properties: [],
},
},
output: [
[
{
json: {
headers: {},
statusCode: 200,
requestOptions: {
qs: {},
headers: {},
proxy: {
auth: {
username: 'user',
password: 'password',
},
host: '127.0.0.1',
protocol: 'http',
port: 8080,
},
body: {},
returnFullResponse: true,
skipSslCertificateValidation: true,
timeout: 123,
},
},
},
{
json: {
headers: {},
statusCode: 200,
requestOptions: {
qs: {},
headers: {},
proxy: {
auth: {
username: 'user',
password: 'password',
},
host: '127.0.0.1',
protocol: 'http',
port: 8080,
},
body: {},
returnFullResponse: true,
skipSslCertificateValidation: true,
timeout: 123,
},
},
},
{
json: {
headers: {},
statusCode: 200,
requestOptions: {
qs: {},
headers: {},
proxy: {
auth: {
username: 'user',
password: 'password',
},
host: '127.0.0.1',
protocol: 'http',
port: 8080,
},
body: {},
returnFullResponse: true,
skipSslCertificateValidation: true,
timeout: 123,
},
},
},
{
json: {
headers: {},
statusCode: 200,
requestOptions: {
qs: {},
headers: {},
proxy: {
auth: {
username: 'user',
password: 'password',
},
host: '127.0.0.1',
protocol: 'http',
port: 8080,
},
body: {},
returnFullResponse: true,
skipSslCertificateValidation: true,
timeout: 123,
},
},
},
{
json: {
headers: {},
statusCode: 200,
requestOptions: {
qs: {},
headers: {},
proxy: {
auth: {
username: 'user',
password: 'password',
},
host: '127.0.0.1',
protocol: 'http',
port: 8080,
},
body: {},
returnFullResponse: true,
skipSslCertificateValidation: true,
timeout: 123,
},
},
},
@ -1424,6 +1601,7 @@ describe('RoutingNode', () => {
addedIn: 'preSendFunction1',
},
returnFullResponse: true,
timeout: 300000,
},
},
],
@ -1519,6 +1697,7 @@ describe('RoutingNode', () => {
baseURL: 'http://127.0.0.1:5678',
url: '/test-url',
returnFullResponse: true,
timeout: 300000,
},
},
},
@ -1698,15 +1877,12 @@ describe('RoutingNode', () => {
const connectionInputData: INodeExecutionData[] = [];
const runExecutionData: IRunExecutionData = { resultData: { runData: {} } };
const nodeType = nodeTypes.getByNameAndVersion(baseNode.type);
applyDeclarativeNodeOptionParameters(nodeType);
const propertiesOriginal = nodeType.description.properties;
const inputData: ITaskDataConnections = {
main: [
[
{
json: {},
},
],
],
main: [[]],
};
for (const testData of tests) {
@ -1719,6 +1895,9 @@ describe('RoutingNode', () => {
};
nodeType.description = { ...testData.input.nodeType } as INodeTypeDescription;
if (testData.input.specialTestOptions?.applyDeclarativeNodeOptionParameters) {
nodeType.description.properties = propertiesOriginal;
}
const workflow = new Workflow({
nodes: workflowData.nodes,
@ -1742,18 +1921,46 @@ describe('RoutingNode', () => {
source: null,
} as IExecuteData;
const executeFunctions = mock<IExecuteFunctions>();
const executeSingleFunctions = Helpers.getExecuteSingleFunctions(
workflow,
runExecutionData,
runIndex,
node,
itemIndex,
);
const nodeExecuteFunctions: Partial<INodeExecuteFunctions> = {
getExecuteFunctions: () => mock<IExecuteFunctions>(),
getExecuteSingleFunctions: () =>
Helpers.getExecuteSingleFunctions(
workflow,
runExecutionData,
runIndex,
node,
itemIndex,
),
getExecuteFunctions: () => executeFunctions,
getExecuteSingleFunctions: () => executeSingleFunctions,
};
const numberOfItems = testData.input.specialTestOptions?.numberOfItems ?? 1;
if (!inputData.main[0] || inputData.main[0].length !== numberOfItems) {
inputData.main[0] = [];
for (let i = 0; i < numberOfItems; i++) {
inputData.main[0].push({ json: {} });
}
}
const spy = jest.spyOn(utilsModule, 'sleep').mockReturnValue(
new Promise((resolve) => {
resolve();
}),
);
spy.mockClear();
executeFunctions.getNodeParameter.mockImplementation(
(parameterName: string) => testData.input.node.parameters[parameterName] || {},
);
const getNodeParameter = executeSingleFunctions.getNodeParameter;
executeSingleFunctions.getNodeParameter = (parameterName: string) =>
parameterName in testData.input.node.parameters
? testData.input.node.parameters[parameterName]
: getNodeParameter(parameterName) ?? {};
const result = await routingNode.runNode(
inputData,
runIndex,
@ -1762,6 +1969,12 @@ describe('RoutingNode', () => {
nodeExecuteFunctions as INodeExecuteFunctions,
);
if (testData.input.specialTestOptions?.sleepCalls) {
expect(spy.mock.calls).toEqual(testData.input.specialTestOptions?.sleepCalls);
} else {
expect(spy).toHaveBeenCalledTimes(0);
}
expect(result).toEqual(testData.output);
});
}
@ -1820,6 +2033,7 @@ describe('RoutingNode', () => {
},
baseURL: 'http://127.0.0.1:5678',
returnFullResponse: true,
timeout: 300000,
},
},
},