2022-07-24 08:25:01 -07:00
|
|
|
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
2021-08-21 05:11:32 -07:00
|
|
|
/* eslint-disable @typescript-eslint/prefer-optional-chain */
|
|
|
|
/* eslint-disable @typescript-eslint/no-shadow */
|
|
|
|
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
2019-06-23 03:35:23 -07:00
|
|
|
/* eslint-disable id-denylist */
|
|
|
|
/* eslint-disable prefer-spread */
|
2021-08-21 05:11:32 -07:00
|
|
|
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
|
|
|
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
2023-01-27 05:56:56 -08:00
|
|
|
import type express from 'express';
|
2023-08-01 08:32:30 -07:00
|
|
|
import { Container } from 'typedi';
|
2023-06-16 07:26:35 -07:00
|
|
|
import get from 'lodash/get';
|
2024-05-07 04:48:20 -07:00
|
|
|
import { finished } from 'stream/promises';
|
2023-08-01 08:32:30 -07:00
|
|
|
import formidable from 'formidable';
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2023-09-22 08:22:12 -07:00
|
|
|
import { BinaryDataService, NodeExecuteFunctions } from 'n8n-core';
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2023-01-27 05:56:56 -08:00
|
|
|
import type {
|
2023-05-17 01:06:24 -07:00
|
|
|
IBinaryData,
|
2019-06-23 03:35:23 -07:00
|
|
|
IBinaryKeyData,
|
|
|
|
IDataObject,
|
2021-11-05 09:45:51 -07:00
|
|
|
IDeferredPromise,
|
2019-06-23 03:35:23 -07:00
|
|
|
IExecuteData,
|
2021-11-05 09:45:51 -07:00
|
|
|
IExecuteResponsePromiseData,
|
2023-08-01 08:32:30 -07:00
|
|
|
IHttpRequestMethods,
|
2021-11-05 09:45:51 -07:00
|
|
|
IN8nHttpFullResponse,
|
2019-06-23 03:35:23 -07:00
|
|
|
INode,
|
2023-12-27 09:22:33 -08:00
|
|
|
IPinData,
|
2019-06-23 03:35:23 -07:00
|
|
|
IRunExecutionData,
|
|
|
|
IWebhookData,
|
2019-11-23 12:57:50 -08:00
|
|
|
IWebhookResponseData,
|
2021-08-21 05:11:32 -07:00
|
|
|
IWorkflowDataProxyAdditionalKeys,
|
2019-06-23 03:35:23 -07:00
|
|
|
IWorkflowExecuteAdditionalData,
|
2024-05-07 04:48:20 -07:00
|
|
|
WebhookResponseMode,
|
2023-01-27 05:56:56 -08:00
|
|
|
Workflow,
|
|
|
|
WorkflowExecuteMode,
|
|
|
|
} from 'n8n-workflow';
|
|
|
|
import {
|
2024-02-27 03:36:49 -08:00
|
|
|
ApplicationError,
|
2023-03-09 09:13:15 -08:00
|
|
|
BINARY_ENCODING,
|
2023-01-27 05:56:56 -08:00
|
|
|
createDeferredPromise,
|
2022-11-04 09:34:47 -07:00
|
|
|
ErrorReporterProxy as ErrorReporter,
|
2019-06-23 03:35:23 -07:00
|
|
|
NodeHelpers,
|
|
|
|
} from 'n8n-workflow';
|
2021-12-23 13:29:04 -08:00
|
|
|
|
2023-01-27 05:56:56 -08:00
|
|
|
import type {
|
2019-06-23 03:35:23 -07:00
|
|
|
IExecutionDb,
|
|
|
|
IResponseCallbackData,
|
2023-08-01 08:32:30 -07:00
|
|
|
IWebhookManager,
|
2019-06-23 03:35:23 -07:00
|
|
|
IWorkflowDb,
|
2019-08-08 11:38:25 -07:00
|
|
|
IWorkflowExecutionDataProcess,
|
2023-08-01 08:32:30 -07:00
|
|
|
WebhookCORSRequest,
|
|
|
|
WebhookRequest,
|
2022-11-09 06:25:00 -08:00
|
|
|
} from '@/Interfaces';
|
|
|
|
import * as ResponseHelper from '@/ResponseHelper';
|
|
|
|
import * as WorkflowHelpers from '@/WorkflowHelpers';
|
|
|
|
import { WorkflowRunner } from '@/WorkflowRunner';
|
|
|
|
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
|
2023-02-21 10:21:56 -08:00
|
|
|
import { ActiveExecutions } from '@/ActiveExecutions';
|
2024-07-19 03:12:44 -07:00
|
|
|
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
|
2023-07-31 02:37:09 -07:00
|
|
|
import { OwnershipService } from './services/ownership.service';
|
2023-08-11 00:18:33 -07:00
|
|
|
import { parseBody } from './middlewares';
|
2023-10-25 07:35:22 -07:00
|
|
|
import { Logger } from './Logger';
|
2023-11-28 01:19:27 -08:00
|
|
|
import { NotFoundError } from './errors/response-errors/not-found.error';
|
|
|
|
import { InternalServerError } from './errors/response-errors/internal-server.error';
|
|
|
|
import { UnprocessableRequestError } from './errors/response-errors/unprocessable.error';
|
2024-05-17 01:53:15 -07:00
|
|
|
import type { Project } from './databases/entities/Project';
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2023-08-01 08:32:30 -07:00
|
|
|
export const WEBHOOK_METHODS: IHttpRequestMethods[] = [
|
|
|
|
'DELETE',
|
|
|
|
'GET',
|
|
|
|
'HEAD',
|
|
|
|
'PATCH',
|
|
|
|
'POST',
|
|
|
|
'PUT',
|
|
|
|
];
|
|
|
|
|
|
|
|
export const webhookRequestHandler =
|
|
|
|
(webhookManager: IWebhookManager) =>
|
|
|
|
async (req: WebhookRequest | WebhookCORSRequest, res: express.Response) => {
|
|
|
|
const { path } = req.params;
|
|
|
|
const method = req.method;
|
|
|
|
|
|
|
|
if (method !== 'OPTIONS' && !WEBHOOK_METHODS.includes(method)) {
|
|
|
|
return ResponseHelper.sendErrorResponse(
|
|
|
|
res,
|
|
|
|
new Error(`The method ${method} is not supported.`),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Setup CORS headers only if the incoming request has an `origin` header
|
|
|
|
if ('origin' in req.headers) {
|
|
|
|
if (webhookManager.getWebhookMethods) {
|
|
|
|
try {
|
|
|
|
const allowedMethods = await webhookManager.getWebhookMethods(path);
|
|
|
|
res.header('Access-Control-Allow-Methods', ['OPTIONS', ...allowedMethods].join(', '));
|
|
|
|
} catch (error) {
|
|
|
|
return ResponseHelper.sendErrorResponse(res, error as Error);
|
|
|
|
}
|
|
|
|
}
|
2023-11-22 08:49:56 -08:00
|
|
|
|
|
|
|
const requestedMethod =
|
|
|
|
method === 'OPTIONS'
|
|
|
|
? (req.headers['access-control-request-method'] as IHttpRequestMethods)
|
|
|
|
: method;
|
|
|
|
if (webhookManager.findAccessControlOptions && requestedMethod) {
|
|
|
|
const options = await webhookManager.findAccessControlOptions(path, requestedMethod);
|
|
|
|
const { allowedOrigins } = options ?? {};
|
|
|
|
|
2024-03-28 01:46:39 -07:00
|
|
|
if (allowedOrigins && allowedOrigins !== '*' && allowedOrigins !== req.headers.origin) {
|
|
|
|
const originsList = allowedOrigins.split(',');
|
|
|
|
const defaultOrigin = originsList[0];
|
|
|
|
|
|
|
|
if (originsList.length === 1) {
|
|
|
|
res.header('Access-Control-Allow-Origin', defaultOrigin);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (originsList.includes(req.headers.origin as string)) {
|
|
|
|
res.header('Access-Control-Allow-Origin', req.headers.origin);
|
|
|
|
} else {
|
|
|
|
res.header('Access-Control-Allow-Origin', defaultOrigin);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
res.header('Access-Control-Allow-Origin', req.headers.origin);
|
|
|
|
}
|
2023-11-22 08:49:56 -08:00
|
|
|
|
|
|
|
if (method === 'OPTIONS') {
|
|
|
|
res.header('Access-Control-Max-Age', '300');
|
|
|
|
const requestedHeaders = req.headers['access-control-request-headers'];
|
|
|
|
if (requestedHeaders?.length) {
|
|
|
|
res.header('Access-Control-Allow-Headers', requestedHeaders);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-08-01 08:32:30 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
if (method === 'OPTIONS') {
|
|
|
|
return ResponseHelper.sendSuccessResponse(res, {}, true, 204);
|
|
|
|
}
|
|
|
|
|
|
|
|
let response;
|
|
|
|
try {
|
|
|
|
response = await webhookManager.executeWebhook(req, res);
|
|
|
|
} catch (error) {
|
2023-12-13 07:00:51 -08:00
|
|
|
return ResponseHelper.sendErrorResponse(res, error as Error);
|
2023-08-01 08:32:30 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// Don't respond, if already responded
|
|
|
|
if (response.noWebhookResponse !== true) {
|
2023-08-14 03:38:17 -07:00
|
|
|
ResponseHelper.sendSuccessResponse(
|
|
|
|
res,
|
|
|
|
response.data,
|
|
|
|
true,
|
|
|
|
response.responseCode,
|
|
|
|
response.headers,
|
|
|
|
);
|
2023-08-01 08:32:30 -07:00
|
|
|
}
|
|
|
|
};
|
2022-02-20 07:44:30 -08:00
|
|
|
|
2019-06-23 03:35:23 -07:00
|
|
|
/**
|
2022-12-20 01:52:01 -08:00
|
|
|
* Returns all the webhooks which should be created for the given workflow
|
2019-06-23 03:35:23 -07:00
|
|
|
*/
|
2021-08-21 05:11:32 -07:00
|
|
|
export function getWorkflowWebhooks(
|
|
|
|
workflow: Workflow,
|
|
|
|
additionalData: IWorkflowExecuteAdditionalData,
|
|
|
|
destinationNode?: string,
|
2022-09-02 07:13:17 -07:00
|
|
|
ignoreRestartWebhooks = false,
|
2021-08-21 05:11:32 -07:00
|
|
|
): IWebhookData[] {
|
2019-06-23 03:35:23 -07:00
|
|
|
// Check all the nodes in the workflow if they have webhooks
|
|
|
|
|
|
|
|
const returnData: IWebhookData[] = [];
|
|
|
|
|
|
|
|
let parentNodes: string[] | undefined;
|
|
|
|
if (destinationNode !== undefined) {
|
|
|
|
parentNodes = workflow.getParentNodes(destinationNode);
|
2019-11-17 05:43:54 -08:00
|
|
|
// Also add the destination node in case it itself is a webhook node
|
|
|
|
parentNodes.push(destinationNode);
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
for (const node of Object.values(workflow.nodes)) {
|
|
|
|
if (parentNodes !== undefined && !parentNodes.includes(node.name)) {
|
|
|
|
// If parentNodes are given check only them if they have webhooks
|
|
|
|
// and no other ones
|
2023-07-31 02:00:48 -07:00
|
|
|
|
2019-06-23 03:35:23 -07:00
|
|
|
continue;
|
|
|
|
}
|
2021-08-21 05:11:32 -07:00
|
|
|
returnData.push.apply(
|
|
|
|
returnData,
|
2022-09-02 07:13:17 -07:00
|
|
|
NodeHelpers.getNodeWebhooks(workflow, node, additionalData, ignoreRestartWebhooks),
|
2021-08-21 05:11:32 -07:00
|
|
|
);
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
return returnData;
|
|
|
|
}
|
|
|
|
|
2021-11-05 09:45:51 -07:00
|
|
|
export function encodeWebhookResponse(
|
|
|
|
response: IExecuteResponsePromiseData,
|
|
|
|
): IExecuteResponsePromiseData {
|
|
|
|
if (typeof response === 'object' && Buffer.isBuffer(response.body)) {
|
|
|
|
response.body = {
|
|
|
|
'__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
return response;
|
|
|
|
}
|
|
|
|
|
2023-10-10 06:19:05 -07:00
|
|
|
const normalizeFormData = <T>(values: Record<string, T | T[]>) => {
|
|
|
|
for (const key in values) {
|
|
|
|
const value = values[key];
|
|
|
|
if (Array.isArray(value) && value.length === 1) {
|
|
|
|
values[key] = value[0];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2019-06-23 03:35:23 -07:00
|
|
|
/**
|
|
|
|
* Executes a webhook
|
|
|
|
*/
|
2024-04-10 05:02:02 -07:00
|
|
|
// eslint-disable-next-line complexity
|
2021-08-21 05:11:32 -07:00
|
|
|
export async function executeWebhook(
|
|
|
|
workflow: Workflow,
|
|
|
|
webhookData: IWebhookData,
|
|
|
|
workflowData: IWorkflowDb,
|
|
|
|
workflowStartNode: INode,
|
|
|
|
executionMode: WorkflowExecuteMode,
|
2024-04-03 04:43:14 -07:00
|
|
|
pushRef: string | undefined,
|
2021-08-21 05:11:32 -07:00
|
|
|
runExecutionData: IRunExecutionData | undefined,
|
|
|
|
executionId: string | undefined,
|
2023-08-01 08:32:30 -07:00
|
|
|
req: WebhookRequest,
|
2021-08-21 05:11:32 -07:00
|
|
|
res: express.Response,
|
|
|
|
responseCallback: (error: Error | null, data: IResponseCallbackData) => void,
|
2022-12-27 03:50:50 -08:00
|
|
|
destinationNode?: string,
|
2021-08-21 05:11:32 -07:00
|
|
|
): Promise<string | undefined> {
|
2019-06-23 03:35:23 -07:00
|
|
|
// Get the nodeType to know which responseMode is set
|
2021-09-21 10:38:24 -07:00
|
|
|
const nodeType = workflow.nodeTypes.getByNameAndVersion(
|
|
|
|
workflowStartNode.type,
|
|
|
|
workflowStartNode.typeVersion,
|
|
|
|
);
|
2019-06-23 03:35:23 -07:00
|
|
|
if (nodeType === undefined) {
|
2022-05-05 07:53:35 -07:00
|
|
|
const errorMessage = `The type of the webhook node "${workflowStartNode.name}" is not known`;
|
2019-06-23 03:35:23 -07:00
|
|
|
responseCallback(new Error(errorMessage), {});
|
2023-11-28 01:19:27 -08:00
|
|
|
throw new InternalServerError(errorMessage);
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
2021-08-21 05:11:32 -07:00
|
|
|
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
|
|
|
|
$executionId: executionId,
|
|
|
|
};
|
|
|
|
|
2024-05-17 01:53:15 -07:00
|
|
|
let project: Project | undefined = undefined;
|
|
|
|
try {
|
|
|
|
project = await Container.get(OwnershipService).getWorkflowProjectCached(workflowData.id);
|
|
|
|
} catch (error) {
|
|
|
|
throw new NotFoundError('Cannot find workflow');
|
2022-04-10 02:33:42 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// Prepare everything that is needed to run the workflow
|
2024-05-17 01:53:15 -07:00
|
|
|
const additionalData = await WorkflowExecuteAdditionalData.getBase();
|
2022-04-10 02:33:42 -07:00
|
|
|
|
2019-06-23 03:35:23 -07:00
|
|
|
// Get the responseMode
|
2021-08-21 05:11:32 -07:00
|
|
|
const responseMode = workflow.expression.getSimpleParameterValue(
|
|
|
|
workflowStartNode,
|
|
|
|
webhookData.webhookDescription.responseMode,
|
|
|
|
executionMode,
|
|
|
|
additionalKeys,
|
2022-06-03 08:25:07 -07:00
|
|
|
undefined,
|
2021-08-21 05:11:32 -07:00
|
|
|
'onReceived',
|
2024-05-07 04:48:20 -07:00
|
|
|
) as WebhookResponseMode;
|
2021-08-21 05:11:32 -07:00
|
|
|
const responseCode = workflow.expression.getSimpleParameterValue(
|
|
|
|
workflowStartNode,
|
2024-03-28 01:46:39 -07:00
|
|
|
webhookData.webhookDescription.responseCode as string,
|
2021-08-21 05:11:32 -07:00
|
|
|
executionMode,
|
|
|
|
additionalKeys,
|
2022-06-03 08:25:07 -07:00
|
|
|
undefined,
|
2021-08-21 05:11:32 -07:00
|
|
|
200,
|
|
|
|
) as number;
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2024-03-28 01:46:39 -07:00
|
|
|
const responseData = workflow.expression.getComplexParameterValue(
|
2022-02-19 03:37:41 -08:00
|
|
|
workflowStartNode,
|
|
|
|
webhookData.webhookDescription.responseData,
|
|
|
|
executionMode,
|
|
|
|
additionalKeys,
|
2022-06-03 08:25:07 -07:00
|
|
|
undefined,
|
2022-02-19 03:37:41 -08:00
|
|
|
'firstEntryJson',
|
|
|
|
);
|
|
|
|
|
2024-05-07 04:48:20 -07:00
|
|
|
if (!['onReceived', 'lastNode', 'responseNode'].includes(responseMode)) {
|
2019-06-23 03:35:23 -07:00
|
|
|
// If the mode is not known we error. Is probably best like that instead of using
|
|
|
|
// the default that people know as early as possible (probably already testing phase)
|
|
|
|
// that something does not resolve properly.
|
2022-05-05 07:53:35 -07:00
|
|
|
const errorMessage = `The response mode '${responseMode}' is not valid!`;
|
2019-06-23 03:35:23 -07:00
|
|
|
responseCallback(new Error(errorMessage), {});
|
2023-11-28 01:19:27 -08:00
|
|
|
throw new InternalServerError(errorMessage);
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// Add the Response and Request so that this data can be accessed in the node
|
|
|
|
additionalData.httpRequest = req;
|
|
|
|
additionalData.httpResponse = res;
|
|
|
|
|
2023-12-06 07:46:40 -08:00
|
|
|
let binaryData;
|
|
|
|
|
|
|
|
const nodeVersion = workflowStartNode.typeVersion;
|
|
|
|
if (nodeVersion === 1) {
|
|
|
|
// binaryData option is removed in versions higher than 1
|
|
|
|
binaryData = workflow.expression.getSimpleParameterValue(
|
|
|
|
workflowStartNode,
|
|
|
|
'={{$parameter["options"]["binaryData"]}}',
|
|
|
|
executionMode,
|
|
|
|
additionalKeys,
|
|
|
|
undefined,
|
|
|
|
false,
|
|
|
|
);
|
|
|
|
}
|
2023-08-01 08:32:30 -07:00
|
|
|
|
2019-06-23 03:35:23 -07:00
|
|
|
let didSendResponse = false;
|
2019-11-23 12:57:50 -08:00
|
|
|
let runExecutionDataMerge = {};
|
2019-06-23 03:35:23 -07:00
|
|
|
try {
|
|
|
|
// Run the webhook function to see what should be returned and if
|
|
|
|
// the workflow should be executed or not
|
2019-11-23 12:57:50 -08:00
|
|
|
let webhookResultData: IWebhookResponseData;
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2023-08-01 08:32:30 -07:00
|
|
|
// if `Webhook` or `Wait` node, and binaryData is enabled, skip pre-parse the request-body
|
2023-12-06 07:46:40 -08:00
|
|
|
// always falsy for versions higher than 1
|
2023-08-01 08:32:30 -07:00
|
|
|
if (!binaryData) {
|
|
|
|
const { contentType, encoding } = req;
|
|
|
|
if (contentType === 'multipart/form-data') {
|
|
|
|
const form = formidable({
|
|
|
|
multiples: true,
|
|
|
|
encoding: encoding as formidable.BufferEncoding,
|
|
|
|
// TODO: pass a custom `fileWriteStreamHandler` to create binary data files directly
|
|
|
|
});
|
|
|
|
req.body = await new Promise((resolve) => {
|
2024-03-28 01:46:39 -07:00
|
|
|
form.parse(req, async (_err, data, files) => {
|
2023-10-10 06:19:05 -07:00
|
|
|
normalizeFormData(data);
|
|
|
|
normalizeFormData(files);
|
2023-08-01 08:32:30 -07:00
|
|
|
resolve({ data, files });
|
|
|
|
});
|
|
|
|
});
|
|
|
|
} else {
|
2023-12-06 07:46:40 -08:00
|
|
|
if (nodeVersion > 1) {
|
|
|
|
if (
|
|
|
|
contentType?.startsWith('application/json') ||
|
|
|
|
contentType?.startsWith('text/plain') ||
|
|
|
|
contentType?.startsWith('application/x-www-form-urlencoded') ||
|
|
|
|
contentType?.endsWith('/xml') ||
|
|
|
|
contentType?.endsWith('+xml')
|
|
|
|
) {
|
|
|
|
await parseBody(req);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
await parseBody(req);
|
|
|
|
}
|
2023-08-01 08:32:30 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-23 12:57:50 -08:00
|
|
|
try {
|
2020-01-22 15:06:43 -08:00
|
|
|
webhookResultData = await workflow.runWebhook(
|
|
|
|
webhookData,
|
|
|
|
workflowStartNode,
|
|
|
|
additionalData,
|
|
|
|
NodeExecuteFunctions,
|
|
|
|
executionMode,
|
|
|
|
);
|
2024-07-19 03:12:44 -07:00
|
|
|
Container.get(WorkflowStatisticsService).emit(
|
|
|
|
'nodeFetchedData',
|
|
|
|
workflow.id,
|
|
|
|
workflowStartNode,
|
|
|
|
);
|
2021-04-16 09:33:36 -07:00
|
|
|
} catch (err) {
|
2019-11-23 12:57:50 -08:00
|
|
|
// Send error response to webhook caller
|
|
|
|
const errorMessage = 'Workflow Webhook Error: Workflow could not be started!';
|
|
|
|
responseCallback(new Error(errorMessage), {});
|
|
|
|
didSendResponse = true;
|
|
|
|
|
|
|
|
// Add error to execution data that it can be logged and send to Editor-UI
|
|
|
|
runExecutionDataMerge = {
|
|
|
|
resultData: {
|
|
|
|
runData: {},
|
|
|
|
lastNodeExecuted: workflowStartNode.name,
|
|
|
|
error: {
|
2021-04-16 09:33:36 -07:00
|
|
|
...err,
|
|
|
|
message: err.message,
|
|
|
|
stack: err.stack,
|
2019-11-23 12:57:50 -08:00
|
|
|
},
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
webhookResultData = {
|
|
|
|
noWebhookResponse: true,
|
|
|
|
// Add empty data that it at least tries to "execute" the webhook
|
|
|
|
// which then so gets the chance to throw the error.
|
|
|
|
workflowData: [[{ json: {} }]],
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2021-08-21 05:11:32 -07:00
|
|
|
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
|
|
|
|
$executionId: executionId,
|
|
|
|
};
|
|
|
|
|
2020-04-26 02:01:20 -07:00
|
|
|
if (webhookData.webhookDescription.responseHeaders !== undefined) {
|
2021-08-21 05:11:32 -07:00
|
|
|
const responseHeaders = workflow.expression.getComplexParameterValue(
|
|
|
|
workflowStartNode,
|
|
|
|
webhookData.webhookDescription.responseHeaders,
|
|
|
|
executionMode,
|
|
|
|
additionalKeys,
|
|
|
|
undefined,
|
2022-06-03 08:25:07 -07:00
|
|
|
undefined,
|
2021-08-21 05:11:32 -07:00
|
|
|
) as {
|
2020-04-26 06:28:51 -07:00
|
|
|
entries?:
|
|
|
|
| Array<{
|
|
|
|
name: string;
|
|
|
|
value: string;
|
|
|
|
}>
|
|
|
|
| undefined;
|
|
|
|
};
|
2020-04-26 02:01:20 -07:00
|
|
|
|
|
|
|
if (responseHeaders !== undefined && responseHeaders.entries !== undefined) {
|
|
|
|
for (const item of responseHeaders.entries) {
|
2020-04-26 06:28:51 -07:00
|
|
|
res.setHeader(item.name, item.value);
|
2020-04-26 02:01:20 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-23 12:57:50 -08:00
|
|
|
if (webhookResultData.noWebhookResponse === true && !didSendResponse) {
|
2019-06-23 03:35:23 -07:00
|
|
|
// The response got already send
|
|
|
|
responseCallback(null, {
|
|
|
|
noWebhookResponse: true,
|
|
|
|
});
|
|
|
|
didSendResponse = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (webhookResultData.workflowData === undefined) {
|
|
|
|
// Workflow should not run
|
|
|
|
if (webhookResultData.webhookResponse !== undefined) {
|
|
|
|
// Data to respond with is given
|
2019-11-23 12:57:50 -08:00
|
|
|
if (!didSendResponse) {
|
|
|
|
responseCallback(null, {
|
|
|
|
data: webhookResultData.webhookResponse,
|
|
|
|
responseCode,
|
|
|
|
});
|
|
|
|
didSendResponse = true;
|
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
} else {
|
|
|
|
// Send default response
|
2023-07-31 02:00:48 -07:00
|
|
|
|
2019-11-23 12:57:50 -08:00
|
|
|
if (!didSendResponse) {
|
|
|
|
responseCallback(null, {
|
|
|
|
data: {
|
2022-05-05 07:53:35 -07:00
|
|
|
message: 'Webhook call received',
|
2019-11-23 12:57:50 -08:00
|
|
|
},
|
|
|
|
responseCode,
|
|
|
|
});
|
|
|
|
didSendResponse = true;
|
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-11-13 14:31:27 -08:00
|
|
|
// Now that we know that the workflow should run we can return the default response
|
2022-09-02 07:13:17 -07:00
|
|
|
// directly if responseMode it set to "onReceived" and a response should be sent
|
2019-08-28 08:16:09 -07:00
|
|
|
if (responseMode === 'onReceived' && !didSendResponse) {
|
2019-06-23 03:35:23 -07:00
|
|
|
// Return response directly and do not wait for the workflow to finish
|
2022-02-19 03:37:41 -08:00
|
|
|
if (responseData === 'noData') {
|
|
|
|
// Return without data
|
|
|
|
responseCallback(null, {
|
|
|
|
responseCode,
|
|
|
|
});
|
2024-03-28 01:46:39 -07:00
|
|
|
} else if (responseData) {
|
|
|
|
// Return the data specified in the response data option
|
|
|
|
responseCallback(null, {
|
|
|
|
data: responseData as IDataObject,
|
|
|
|
responseCode,
|
|
|
|
});
|
2022-02-19 03:37:41 -08:00
|
|
|
} else if (webhookResultData.webhookResponse !== undefined) {
|
2019-06-23 03:35:23 -07:00
|
|
|
// Data to respond with is given
|
|
|
|
responseCallback(null, {
|
|
|
|
data: webhookResultData.webhookResponse,
|
2019-08-28 08:03:35 -07:00
|
|
|
responseCode,
|
2019-06-23 03:35:23 -07:00
|
|
|
});
|
|
|
|
} else {
|
|
|
|
responseCallback(null, {
|
|
|
|
data: {
|
2022-05-05 07:53:35 -07:00
|
|
|
message: 'Workflow was started',
|
2019-08-28 08:03:35 -07:00
|
|
|
},
|
|
|
|
responseCode,
|
2019-06-23 03:35:23 -07:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
didSendResponse = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Initialize the data of the webhook node
|
|
|
|
const nodeExecutionStack: IExecuteData[] = [];
|
|
|
|
nodeExecutionStack.push({
|
|
|
|
node: workflowStartNode,
|
|
|
|
data: {
|
|
|
|
main: webhookResultData.workflowData,
|
|
|
|
},
|
2022-06-03 08:25:07 -07:00
|
|
|
source: null,
|
2021-08-29 11:58:11 -07:00
|
|
|
});
|
|
|
|
|
2021-08-21 05:11:32 -07:00
|
|
|
runExecutionData =
|
|
|
|
runExecutionData ||
|
2021-08-29 11:58:11 -07:00
|
|
|
({
|
2019-06-23 03:35:23 -07:00
|
|
|
startData: {},
|
2019-11-23 12:57:50 -08:00
|
|
|
resultData: {
|
2019-06-23 03:35:23 -07:00
|
|
|
runData: {},
|
2021-08-29 11:58:11 -07:00
|
|
|
},
|
2019-06-23 03:35:23 -07:00
|
|
|
executionData: {
|
|
|
|
contextData: {},
|
|
|
|
nodeExecutionStack,
|
|
|
|
waitingExecution: {},
|
2021-08-29 11:58:11 -07:00
|
|
|
},
|
2021-08-21 05:11:32 -07:00
|
|
|
} as IRunExecutionData);
|
|
|
|
|
2022-12-27 03:50:50 -08:00
|
|
|
if (destinationNode && runExecutionData.startData) {
|
|
|
|
runExecutionData.startData.destinationNode = destinationNode;
|
|
|
|
}
|
|
|
|
|
2021-08-21 05:11:32 -07:00
|
|
|
if (executionId !== undefined) {
|
|
|
|
// Set the data the webhook node did return on the waiting node if executionId
|
|
|
|
// already exists as it means that we are restarting an existing execution.
|
|
|
|
runExecutionData.executionData!.nodeExecutionStack[0].data.main =
|
|
|
|
webhookResultData.workflowData;
|
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2019-11-23 12:57:50 -08:00
|
|
|
if (Object.keys(runExecutionDataMerge).length !== 0) {
|
|
|
|
// If data to merge got defined add it to the execution data
|
|
|
|
Object.assign(runExecutionData, runExecutionDataMerge);
|
|
|
|
}
|
|
|
|
|
2023-12-27 09:22:33 -08:00
|
|
|
let pinData: IPinData | undefined;
|
|
|
|
const usePinData = executionMode === 'manual';
|
|
|
|
if (usePinData) {
|
|
|
|
pinData = workflowData.pinData;
|
|
|
|
runExecutionData.resultData.pinData = pinData;
|
2023-12-27 01:51:53 -08:00
|
|
|
}
|
|
|
|
|
2019-08-08 11:38:25 -07:00
|
|
|
const runData: IWorkflowExecutionDataProcess = {
|
|
|
|
executionMode,
|
|
|
|
executionData: runExecutionData,
|
2024-04-03 04:43:14 -07:00
|
|
|
pushRef,
|
2019-08-08 11:38:25 -07:00
|
|
|
workflowData,
|
2023-12-27 09:22:33 -08:00
|
|
|
pinData,
|
2024-05-17 01:53:15 -07:00
|
|
|
projectId: project?.id,
|
2019-08-08 11:38:25 -07:00
|
|
|
};
|
|
|
|
|
2021-11-05 09:45:51 -07:00
|
|
|
let responsePromise: IDeferredPromise<IN8nHttpFullResponse> | undefined;
|
|
|
|
if (responseMode === 'responseNode') {
|
|
|
|
responsePromise = await createDeferredPromise<IN8nHttpFullResponse>();
|
|
|
|
responsePromise
|
|
|
|
.promise()
|
2023-09-25 07:59:45 -07:00
|
|
|
.then(async (response: IN8nHttpFullResponse) => {
|
2021-11-05 09:45:51 -07:00
|
|
|
if (didSendResponse) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2023-05-17 01:06:24 -07:00
|
|
|
const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData;
|
|
|
|
if (binaryData?.id) {
|
|
|
|
res.header(response.headers);
|
2023-09-25 07:59:45 -07:00
|
|
|
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
|
2024-05-07 04:48:20 -07:00
|
|
|
stream.pipe(res, { end: false });
|
|
|
|
await finished(stream);
|
2023-09-25 07:59:45 -07:00
|
|
|
responseCallback(null, { noWebhookResponse: true });
|
2023-05-17 01:06:24 -07:00
|
|
|
} else if (Buffer.isBuffer(response.body)) {
|
2021-11-05 09:45:51 -07:00
|
|
|
res.header(response.headers);
|
|
|
|
res.end(response.body);
|
2023-05-17 01:06:24 -07:00
|
|
|
responseCallback(null, { noWebhookResponse: true });
|
2021-11-05 09:45:51 -07:00
|
|
|
} else {
|
|
|
|
// TODO: This probably needs some more changes depending on the options on the
|
|
|
|
// Webhook Response node
|
2023-12-13 07:00:51 -08:00
|
|
|
const headers = response.headers;
|
|
|
|
let responseCode = response.statusCode;
|
|
|
|
let data = response.body as IDataObject;
|
|
|
|
|
|
|
|
// for formTrigger node redirection has to be handled by sending redirectURL in response body
|
|
|
|
if (
|
|
|
|
nodeType.description.name === 'formTrigger' &&
|
|
|
|
headers.location &&
|
|
|
|
String(responseCode).startsWith('3')
|
|
|
|
) {
|
|
|
|
responseCode = 200;
|
|
|
|
data = {
|
|
|
|
redirectURL: headers.location,
|
|
|
|
};
|
|
|
|
headers.location = undefined;
|
|
|
|
}
|
|
|
|
|
2021-11-05 09:45:51 -07:00
|
|
|
responseCallback(null, {
|
2023-12-13 07:00:51 -08:00
|
|
|
data,
|
|
|
|
headers,
|
|
|
|
responseCode,
|
2021-11-05 09:45:51 -07:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2024-05-07 04:48:20 -07:00
|
|
|
process.nextTick(() => res.end());
|
2021-11-05 09:45:51 -07:00
|
|
|
didSendResponse = true;
|
|
|
|
})
|
|
|
|
.catch(async (error) => {
|
2022-11-04 09:34:47 -07:00
|
|
|
ErrorReporter.error(error);
|
2023-10-25 07:35:22 -07:00
|
|
|
Container.get(Logger).error(
|
2021-11-05 09:45:51 -07:00
|
|
|
`Error with Webhook-Response for execution "${executionId}": "${error.message}"`,
|
|
|
|
{ executionId, workflowId: workflow.id },
|
|
|
|
);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2019-06-23 03:35:23 -07:00
|
|
|
// Start now to run the workflow
|
2024-01-26 04:49:39 -08:00
|
|
|
executionId = await Container.get(WorkflowRunner).run(
|
2021-11-05 09:45:51 -07:00
|
|
|
runData,
|
|
|
|
true,
|
|
|
|
!didSendResponse,
|
|
|
|
executionId,
|
|
|
|
responsePromise,
|
|
|
|
);
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2023-10-25 07:35:22 -07:00
|
|
|
Container.get(Logger).verbose(
|
2021-05-01 20:43:01 -07:00
|
|
|
`Started execution of workflow "${workflow.name}" from webhook with execution ID ${executionId}`,
|
|
|
|
{ executionId },
|
|
|
|
);
|
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (!didSendResponse) {
|
|
|
|
// Get a promise which resolves when the workflow did execute and send then response
|
|
|
|
const executePromise = Container.get(ActiveExecutions).getPostExecutePromise(
|
|
|
|
executionId,
|
|
|
|
) as Promise<IExecutionDb | undefined>;
|
|
|
|
executePromise
|
2024-04-10 05:02:02 -07:00
|
|
|
// eslint-disable-next-line complexity
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
.then(async (data) => {
|
|
|
|
if (data === undefined) {
|
|
|
|
if (!didSendResponse) {
|
|
|
|
responseCallback(null, {
|
|
|
|
data: {
|
|
|
|
message: 'Workflow executed successfully but no data was returned',
|
|
|
|
},
|
|
|
|
responseCode,
|
|
|
|
});
|
|
|
|
didSendResponse = true;
|
|
|
|
}
|
|
|
|
return undefined;
|
2021-08-29 11:58:11 -07:00
|
|
|
}
|
2022-07-20 08:50:39 -07:00
|
|
|
|
2023-12-27 09:22:33 -08:00
|
|
|
if (usePinData) {
|
|
|
|
data.data.resultData.pinData = pinData;
|
2021-08-29 11:58:11 -07:00
|
|
|
}
|
2020-11-13 14:31:27 -08:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
|
|
|
|
if (data.data.resultData.error || returnData?.error !== undefined) {
|
|
|
|
if (!didSendResponse) {
|
|
|
|
responseCallback(null, {
|
|
|
|
data: {
|
|
|
|
message: 'Error in workflow',
|
|
|
|
},
|
|
|
|
responseCode: 500,
|
|
|
|
});
|
|
|
|
}
|
2021-11-05 09:45:51 -07:00
|
|
|
didSendResponse = true;
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
return data;
|
2021-11-05 09:45:51 -07:00
|
|
|
}
|
|
|
|
|
2024-05-07 04:48:20 -07:00
|
|
|
// in `responseNode` mode `responseCallback` is called by `responsePromise`
|
|
|
|
if (responseMode === 'responseNode' && responsePromise) {
|
|
|
|
await Promise.allSettled([responsePromise.promise()]);
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
return undefined;
|
2021-08-29 11:58:11 -07:00
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (returnData === undefined) {
|
|
|
|
if (!didSendResponse) {
|
|
|
|
responseCallback(null, {
|
|
|
|
data: {
|
|
|
|
message:
|
|
|
|
'Workflow executed successfully but the last node did not return any data',
|
|
|
|
},
|
|
|
|
responseCode,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
didSendResponse = true;
|
|
|
|
return data;
|
|
|
|
}
|
2020-03-16 05:23:45 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
|
|
|
|
$executionId: executionId,
|
|
|
|
};
|
2019-10-16 05:01:39 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (!didSendResponse) {
|
|
|
|
let data: IDataObject | IDataObject[] | undefined;
|
2019-10-16 05:01:39 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (responseData === 'firstEntryJson') {
|
|
|
|
// Return the JSON data of the first entry
|
2019-10-16 05:01:39 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (returnData.data!.main[0]![0] === undefined) {
|
|
|
|
responseCallback(new Error('No item to return got found'), {});
|
|
|
|
didSendResponse = true;
|
|
|
|
return undefined;
|
|
|
|
}
|
2019-10-16 05:01:39 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
data = returnData.data!.main[0]![0].json;
|
2019-10-16 05:01:39 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
const responsePropertyName = workflow.expression.getSimpleParameterValue(
|
|
|
|
workflowStartNode,
|
|
|
|
webhookData.webhookDescription.responsePropertyName,
|
|
|
|
executionMode,
|
|
|
|
additionalKeys,
|
|
|
|
undefined,
|
|
|
|
undefined,
|
|
|
|
);
|
2021-08-29 11:58:11 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (responsePropertyName !== undefined) {
|
|
|
|
data = get(data, responsePropertyName as string) as IDataObject;
|
2021-08-29 11:58:11 -07:00
|
|
|
}
|
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
const responseContentType = workflow.expression.getSimpleParameterValue(
|
|
|
|
workflowStartNode,
|
|
|
|
webhookData.webhookDescription.responseContentType,
|
|
|
|
executionMode,
|
|
|
|
additionalKeys,
|
|
|
|
undefined,
|
|
|
|
undefined,
|
|
|
|
);
|
2019-10-16 05:01:39 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (responseContentType !== undefined) {
|
|
|
|
// Send the webhook response manually to be able to set the content-type
|
|
|
|
res.setHeader('Content-Type', responseContentType as string);
|
|
|
|
|
|
|
|
// Returning an object, boolean, number, ... causes problems so make sure to stringify if needed
|
|
|
|
if (
|
|
|
|
data !== null &&
|
|
|
|
data !== undefined &&
|
|
|
|
['Buffer', 'String'].includes(data.constructor.name)
|
|
|
|
) {
|
|
|
|
res.end(data);
|
|
|
|
} else {
|
|
|
|
res.end(JSON.stringify(data));
|
|
|
|
}
|
|
|
|
|
|
|
|
responseCallback(null, {
|
|
|
|
noWebhookResponse: true,
|
|
|
|
});
|
|
|
|
didSendResponse = true;
|
|
|
|
}
|
|
|
|
} else if (responseData === 'firstEntryBinary') {
|
|
|
|
// Return the binary data of the first entry
|
|
|
|
data = returnData.data!.main[0]![0];
|
|
|
|
|
|
|
|
if (data === undefined) {
|
|
|
|
responseCallback(new Error('No item was found to return'), {});
|
|
|
|
didSendResponse = true;
|
|
|
|
return undefined;
|
|
|
|
}
|
2020-03-16 05:23:45 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (data.binary === undefined) {
|
|
|
|
responseCallback(new Error('No binary data was found to return'), {});
|
|
|
|
didSendResponse = true;
|
|
|
|
return undefined;
|
|
|
|
}
|
2020-03-16 05:23:45 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
const responseBinaryPropertyName = workflow.expression.getSimpleParameterValue(
|
|
|
|
workflowStartNode,
|
|
|
|
webhookData.webhookDescription.responseBinaryPropertyName,
|
|
|
|
executionMode,
|
|
|
|
additionalKeys,
|
|
|
|
undefined,
|
|
|
|
'data',
|
|
|
|
);
|
2021-08-29 11:58:11 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (responseBinaryPropertyName === undefined && !didSendResponse) {
|
|
|
|
responseCallback(new Error("No 'responseBinaryPropertyName' is set"), {});
|
|
|
|
didSendResponse = true;
|
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
const binaryData = (data.binary as IBinaryKeyData)[
|
|
|
|
responseBinaryPropertyName as string
|
|
|
|
];
|
|
|
|
if (binaryData === undefined && !didSendResponse) {
|
|
|
|
responseCallback(
|
|
|
|
new Error(
|
|
|
|
`The binary property '${responseBinaryPropertyName}' which should be returned does not exist`,
|
|
|
|
),
|
|
|
|
{},
|
|
|
|
);
|
|
|
|
didSendResponse = true;
|
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (!didSendResponse) {
|
|
|
|
// Send the webhook response manually
|
|
|
|
res.setHeader('Content-Type', binaryData.mimeType);
|
|
|
|
if (binaryData.id) {
|
2023-09-25 07:59:45 -07:00
|
|
|
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
|
2024-05-07 04:48:20 -07:00
|
|
|
stream.pipe(res, { end: false });
|
|
|
|
await finished(stream);
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
} else {
|
2024-05-07 04:48:20 -07:00
|
|
|
res.write(Buffer.from(binaryData.data, BINARY_ENCODING));
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
responseCallback(null, {
|
|
|
|
noWebhookResponse: true,
|
|
|
|
});
|
2024-05-07 04:48:20 -07:00
|
|
|
process.nextTick(() => res.end());
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
}
|
|
|
|
} else if (responseData === 'noData') {
|
|
|
|
// Return without data
|
|
|
|
data = undefined;
|
|
|
|
} else {
|
|
|
|
// Return the JSON data of all the entries
|
|
|
|
data = [];
|
|
|
|
for (const entry of returnData.data!.main[0]!) {
|
|
|
|
data.push(entry.json);
|
2023-06-19 04:54:56 -07:00
|
|
|
}
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
if (!didSendResponse) {
|
2019-06-23 03:35:23 -07:00
|
|
|
responseCallback(null, {
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
data,
|
|
|
|
responseCode,
|
2019-06-23 03:35:23 -07:00
|
|
|
});
|
2021-08-29 11:58:11 -07:00
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
didSendResponse = true;
|
2019-06-23 03:35:23 -07:00
|
|
|
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
return data;
|
|
|
|
})
|
|
|
|
.catch((e) => {
|
2019-10-16 05:01:39 -07:00
|
|
|
if (!didSendResponse) {
|
2024-02-27 03:36:49 -08:00
|
|
|
responseCallback(
|
|
|
|
new ApplicationError('There was a problem executing the workflow', {
|
|
|
|
level: 'warning',
|
|
|
|
cause: e,
|
|
|
|
}),
|
|
|
|
{},
|
|
|
|
);
|
2019-10-16 05:01:39 -07:00
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2023-11-28 01:19:27 -08:00
|
|
|
throw new InternalServerError(e.message);
|
refactor(core): Move event and telemetry handling into workers in queue mode (#7138)
# Motivation
In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.
This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).
This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.
# Changes
Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).
By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.
Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.
This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.
We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).
Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.
# Refactor
I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
2023-09-13 22:58:15 -07:00
|
|
|
});
|
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
return executionId;
|
|
|
|
} catch (e) {
|
2023-08-01 08:32:30 -07:00
|
|
|
const error =
|
2023-11-28 01:19:27 -08:00
|
|
|
e instanceof UnprocessableRequestError
|
2023-08-01 08:32:30 -07:00
|
|
|
? e
|
2024-02-27 03:36:49 -08:00
|
|
|
: new ApplicationError('There was a problem executing the workflow', {
|
|
|
|
level: 'warning',
|
|
|
|
cause: e,
|
2024-03-26 06:22:57 -07:00
|
|
|
});
|
2023-08-01 08:32:30 -07:00
|
|
|
if (didSendResponse) throw error;
|
|
|
|
responseCallback(error, {});
|
|
|
|
return;
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
}
|