mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
Merge 91a827a4ec
into 8790a0df3d
This commit is contained in:
commit
b60955379c
|
@ -6,7 +6,7 @@ import type { ExecutionRepository } from '@/databases/repositories/execution.rep
|
|||
import { WaitingForms } from '@/webhooks/waiting-forms';
|
||||
|
||||
import type { IExecutionResponse } from '../../interfaces';
|
||||
import type { WaitingWebhookRequest } from '../webhook.types';
|
||||
import type { IWebhookResponsePromiseData, WaitingWebhookRequest } from '../webhook.types';
|
||||
|
||||
describe('WaitingForms', () => {
|
||||
const executionRepository = mock<ExecutionRepository>();
|
||||
|
@ -205,7 +205,7 @@ describe('WaitingForms', () => {
|
|||
// @ts-expect-error Protected method
|
||||
.spyOn(waitingForms, 'getWebhookExecutionData')
|
||||
// @ts-expect-error Protected method
|
||||
.mockResolvedValue(mock<IWebhookResponseCallbackData>());
|
||||
.mockResolvedValue(mock<IWebhookResponsePromiseData>());
|
||||
|
||||
const execution = mock<IExecutionResponse>({
|
||||
finished: false,
|
||||
|
|
|
@ -6,7 +6,7 @@ import { ConflictError } from '@/errors/response-errors/conflict.error';
|
|||
import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
||||
import type { IExecutionResponse } from '@/interfaces';
|
||||
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
|
||||
import type { IWebhookResponseCallbackData, WaitingWebhookRequest } from '@/webhooks/webhook.types';
|
||||
import type { IWebhookResponsePromiseData, WaitingWebhookRequest } from '@/webhooks/webhook.types';
|
||||
|
||||
describe('WaitingWebhooks', () => {
|
||||
const executionRepository = mock<ExecutionRepository>();
|
||||
|
@ -85,7 +85,7 @@ describe('WaitingWebhooks', () => {
|
|||
// @ts-expect-error Protected method
|
||||
.spyOn(waitingWebhooks, 'getWebhookExecutionData')
|
||||
// @ts-expect-error Protected method
|
||||
.mockResolvedValue(mock<IWebhookResponseCallbackData>());
|
||||
.mockResolvedValue(mock<IWebhookResponsePromiseData>());
|
||||
|
||||
const execution = mock<IExecutionResponse>({
|
||||
finished: false,
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
import { mock, type MockProxy } from 'jest-mock-extended';
|
||||
import type { Workflow, INode, IDataObject } from 'n8n-workflow';
|
||||
import type { Workflow, INode, IN8nHttpFullResponse } from 'n8n-workflow';
|
||||
import { FORM_NODE_TYPE, WAIT_NODE_TYPE } from 'n8n-workflow';
|
||||
|
||||
import { autoDetectResponseMode, handleFormRedirectionCase } from '../webhook-helpers';
|
||||
import type { IWebhookResponseCallbackData } from '../webhook.types';
|
||||
|
||||
describe('autoDetectResponseMode', () => {
|
||||
let workflow: MockProxy<Workflow>;
|
||||
|
@ -58,40 +57,46 @@ describe('autoDetectResponseMode', () => {
|
|||
|
||||
describe('handleFormRedirectionCase', () => {
|
||||
test('should return data unchanged if start node is WAIT_NODE_TYPE with resume not equal to form', () => {
|
||||
const data: IWebhookResponseCallbackData = {
|
||||
responseCode: 302,
|
||||
const response: IN8nHttpFullResponse = {
|
||||
statusCode: 302,
|
||||
headers: { location: 'http://example.com' },
|
||||
body: {},
|
||||
};
|
||||
const workflowStartNode = mock<INode>({
|
||||
type: WAIT_NODE_TYPE,
|
||||
parameters: { resume: 'webhook' },
|
||||
});
|
||||
const result = handleFormRedirectionCase(data, workflowStartNode);
|
||||
expect(result).toEqual(data);
|
||||
const result = handleFormRedirectionCase(response, workflowStartNode);
|
||||
expect(result).toEqual(response);
|
||||
});
|
||||
|
||||
test('should modify data if start node type matches and responseCode is a redirect', () => {
|
||||
const data: IWebhookResponseCallbackData = {
|
||||
responseCode: 302,
|
||||
const response: IN8nHttpFullResponse = {
|
||||
statusCode: 302,
|
||||
headers: { location: 'http://example.com' },
|
||||
body: {},
|
||||
};
|
||||
const workflowStartNode = mock<INode>({
|
||||
type: FORM_NODE_TYPE,
|
||||
parameters: {},
|
||||
});
|
||||
const result = handleFormRedirectionCase(data, workflowStartNode);
|
||||
expect(result.responseCode).toBe(200);
|
||||
expect(result.data).toEqual({ redirectURL: 'http://example.com' });
|
||||
expect((result?.headers as IDataObject)?.location).toBeUndefined();
|
||||
const result = handleFormRedirectionCase(response, workflowStartNode);
|
||||
expect(result.statusCode).toBe(200);
|
||||
expect(result.body).toEqual({ redirectURL: 'http://example.com' });
|
||||
expect(result.headers.location).toBeUndefined();
|
||||
});
|
||||
|
||||
test('should not modify data if location header is missing', () => {
|
||||
const data: IWebhookResponseCallbackData = { responseCode: 302, headers: {} };
|
||||
const response: IN8nHttpFullResponse = {
|
||||
statusCode: 302,
|
||||
headers: {},
|
||||
body: {},
|
||||
};
|
||||
const workflowStartNode = mock<INode>({
|
||||
type: FORM_NODE_TYPE,
|
||||
parameters: {},
|
||||
});
|
||||
const result = handleFormRedirectionCase(data, workflowStartNode);
|
||||
expect(result).toEqual(data);
|
||||
const result = handleFormRedirectionCase(response, workflowStartNode);
|
||||
expect(result).toEqual(response);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -7,7 +7,7 @@ import { ResponseError } from '@/errors/response-errors/abstract/response.error'
|
|||
import { createWebhookHandlerFor } from '@/webhooks/webhook-request-handler';
|
||||
import type {
|
||||
IWebhookManager,
|
||||
IWebhookResponseCallbackData,
|
||||
IWebhookResponsePromiseData,
|
||||
WebhookOptionsRequest,
|
||||
WebhookRequest,
|
||||
} from '@/webhooks/webhook.types';
|
||||
|
@ -150,14 +150,16 @@ describe('WebhookRequestHandler', () => {
|
|||
|
||||
const res = mock<Response>();
|
||||
|
||||
const executeWebhookResponse: IWebhookResponseCallbackData = {
|
||||
responseCode: 200,
|
||||
data: {},
|
||||
headers: {
|
||||
'x-custom-header': 'test',
|
||||
const executeWebhookResponse: IWebhookResponsePromiseData = {
|
||||
response: {
|
||||
statusCode: 200,
|
||||
body: {},
|
||||
headers: {
|
||||
'x-custom-header': 'test',
|
||||
},
|
||||
},
|
||||
};
|
||||
webhookManager.executeWebhook.mockResolvedValueOnce(executeWebhookResponse);
|
||||
// webhookManager.executeWebhook.mockResolvedValueOnce(executeWebhookResponse);
|
||||
|
||||
await handler(req, res);
|
||||
|
||||
|
@ -166,7 +168,7 @@ describe('WebhookRequestHandler', () => {
|
|||
expect(res.header).toHaveBeenCalledWith({
|
||||
'x-custom-header': 'test',
|
||||
});
|
||||
expect(res.json).toHaveBeenCalledWith(executeWebhookResponse.data);
|
||||
expect(res.json).toHaveBeenCalledWith(executeWebhookResponse.response.body);
|
||||
});
|
||||
|
||||
it('should send an error response if webhook execution throws', async () => {
|
||||
|
@ -204,16 +206,20 @@ describe('WebhookRequestHandler', () => {
|
|||
|
||||
const res = mock<Response>();
|
||||
|
||||
const executeWebhookResponse: IWebhookResponseCallbackData = {
|
||||
responseCode: 200,
|
||||
const executeWebhookResponse: IWebhookResponsePromiseData = {
|
||||
response: {
|
||||
statusCode: 200,
|
||||
headers: {},
|
||||
body: {},
|
||||
},
|
||||
};
|
||||
webhookManager.executeWebhook.mockResolvedValueOnce(executeWebhookResponse);
|
||||
// webhookManager.executeWebhook.mockResolvedValueOnce(executeWebhookResponse);
|
||||
|
||||
await handler(req, res);
|
||||
|
||||
expect(webhookManager.executeWebhook).toHaveBeenCalledWith(req, res);
|
||||
expect(res.status).toHaveBeenCalledWith(200);
|
||||
expect(res.json).toHaveBeenCalledWith(executeWebhookResponse.data);
|
||||
expect(res.json).toHaveBeenCalledWith(executeWebhookResponse.response.body);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { Service } from '@n8n/di';
|
||||
import type { Response } from 'express';
|
||||
import { Logger } from 'n8n-core';
|
||||
import { Workflow, CHAT_TRIGGER_NODE_TYPE } from 'n8n-workflow';
|
||||
import { Workflow, CHAT_TRIGGER_NODE_TYPE, createDeferredPromise } from 'n8n-workflow';
|
||||
import type { INode, IWebhookData, IHttpRequestMethods } from 'n8n-workflow';
|
||||
|
||||
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
|
||||
|
@ -14,7 +14,7 @@ import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-da
|
|||
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
|
||||
|
||||
import type {
|
||||
IWebhookResponseCallbackData,
|
||||
IWebhookResponsePromiseData,
|
||||
IWebhookManager,
|
||||
WebhookAccessControlOptions,
|
||||
WebhookRequest,
|
||||
|
@ -66,10 +66,7 @@ export class LiveWebhooks implements IWebhookManager {
|
|||
/**
|
||||
* Checks if a webhook for the given method and path exists and executes the workflow.
|
||||
*/
|
||||
async executeWebhook(
|
||||
request: WebhookRequest,
|
||||
response: Response,
|
||||
): Promise<IWebhookResponseCallbackData> {
|
||||
async executeWebhook(request: WebhookRequest, response: Response): Promise<void> {
|
||||
const httpMethod = request.method;
|
||||
const path = request.params.path;
|
||||
|
||||
|
@ -126,29 +123,24 @@ export class LiveWebhooks implements IWebhookManager {
|
|||
throw new NotFoundError('Could not find node to process webhook.');
|
||||
}
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
const executionMode = 'webhook';
|
||||
void WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
workflowData,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
request,
|
||||
response,
|
||||
async (error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
// Save static data if it changed
|
||||
await this.workflowStaticDataService.saveStaticData(workflow);
|
||||
resolve(data);
|
||||
},
|
||||
);
|
||||
});
|
||||
const executionMode = 'webhook';
|
||||
const responsePromise = createDeferredPromise<IWebhookResponsePromiseData>();
|
||||
await WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
workflowData,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
request,
|
||||
response,
|
||||
responsePromise,
|
||||
);
|
||||
|
||||
// Save static data if it changed
|
||||
await this.workflowStaticDataService.saveStaticData(workflow);
|
||||
}
|
||||
|
||||
private async findWebhook(path: string, httpMethod: IHttpRequestMethods) {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { Service } from '@n8n/di';
|
||||
import type express from 'express';
|
||||
import { InstanceSettings } from 'n8n-core';
|
||||
import { WebhookPathTakenError, Workflow } from 'n8n-workflow';
|
||||
import { createDeferredPromise, WebhookPathTakenError, Workflow } from 'n8n-workflow';
|
||||
import type {
|
||||
IWebhookData,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
|
@ -26,7 +26,7 @@ import type { WorkflowRequest } from '@/workflows/workflow.request';
|
|||
|
||||
import { WebhookService } from './webhook.service';
|
||||
import type {
|
||||
IWebhookResponseCallbackData,
|
||||
IWebhookResponsePromiseData,
|
||||
IWebhookManager,
|
||||
WebhookAccessControlOptions,
|
||||
WebhookRequest,
|
||||
|
@ -53,10 +53,7 @@ export class TestWebhooks implements IWebhookManager {
|
|||
* Return a promise that resolves when the test webhook is called.
|
||||
* Also inform the FE of the result and remove the test webhook.
|
||||
*/
|
||||
async executeWebhook(
|
||||
request: WebhookRequest,
|
||||
response: express.Response,
|
||||
): Promise<IWebhookResponseCallbackData> {
|
||||
async executeWebhook(request: WebhookRequest, response: express.Response): Promise<void> {
|
||||
const httpMethod = request.method;
|
||||
|
||||
let path = removeTrailingSlash(request.params.path);
|
||||
|
@ -113,59 +110,55 @@ export class TestWebhooks implements IWebhookManager {
|
|||
throw new NotFoundError('Could not find node to process webhook.');
|
||||
}
|
||||
|
||||
return await new Promise(async (resolve, reject) => {
|
||||
try {
|
||||
const executionMode = 'manual';
|
||||
const executionId = await WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhook,
|
||||
workflowEntity,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
const executionMode = 'manual';
|
||||
const responsePromise = createDeferredPromise<IWebhookResponsePromiseData>();
|
||||
try {
|
||||
const executionId = await WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhook,
|
||||
workflowEntity,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
pushRef,
|
||||
undefined, // IRunExecutionData
|
||||
undefined, // executionId
|
||||
request,
|
||||
response,
|
||||
responsePromise,
|
||||
destinationNode,
|
||||
);
|
||||
|
||||
// The workflow did not run as the request was probably setup related
|
||||
// or a ping so do not resolve the promise and wait for the real webhook
|
||||
// request instead.
|
||||
if (executionId === undefined) return;
|
||||
|
||||
// Inform editor-ui that webhook got received
|
||||
if (pushRef !== undefined) {
|
||||
this.push.send(
|
||||
{ type: 'testWebhookReceived', data: { workflowId: webhook?.workflowId, executionId } },
|
||||
pushRef,
|
||||
undefined, // IRunExecutionData
|
||||
undefined, // executionId
|
||||
request,
|
||||
response,
|
||||
(error: Error | null, data: IWebhookResponseCallbackData) => {
|
||||
if (error !== null) reject(error);
|
||||
else resolve(data);
|
||||
},
|
||||
destinationNode,
|
||||
);
|
||||
|
||||
// The workflow did not run as the request was probably setup related
|
||||
// or a ping so do not resolve the promise and wait for the real webhook
|
||||
// request instead.
|
||||
if (executionId === undefined) return;
|
||||
|
||||
// Inform editor-ui that webhook got received
|
||||
if (pushRef !== undefined) {
|
||||
this.push.send(
|
||||
{ type: 'testWebhookReceived', data: { workflowId: webhook?.workflowId, executionId } },
|
||||
pushRef,
|
||||
);
|
||||
}
|
||||
} catch {}
|
||||
|
||||
/**
|
||||
* Multi-main setup: In a manual webhook execution, the main process that
|
||||
* handles a webhook might not be the same as the main process that created
|
||||
* the webhook. If so, after the test webhook has been successfully executed,
|
||||
* the handler process commands the creator process to clear its test webhooks.
|
||||
*/
|
||||
if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) {
|
||||
void this.publisher.publishCommand({
|
||||
command: 'clear-test-webhooks',
|
||||
payload: { webhookKey: key, workflowEntity, pushRef },
|
||||
});
|
||||
return;
|
||||
}
|
||||
} catch {}
|
||||
|
||||
this.clearTimeout(key);
|
||||
/**
|
||||
* Multi-main setup: In a manual webhook execution, the main process that
|
||||
* handles a webhook might not be the same as the main process that created
|
||||
* the webhook. If so, after the test webhook has been successfully executed,
|
||||
* the handler process commands the creator process to clear its test webhooks.
|
||||
*/
|
||||
if (this.instanceSettings.isMultiMain && pushRef && !this.push.hasPushRef(pushRef)) {
|
||||
void this.publisher.publishCommand({
|
||||
command: 'clear-test-webhooks',
|
||||
payload: { webhookKey: key, workflowEntity, pushRef },
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
await this.deactivateWebhooks(workflow);
|
||||
});
|
||||
this.clearTimeout(key);
|
||||
|
||||
await this.deactivateWebhooks(workflow);
|
||||
}
|
||||
|
||||
clearTimeout(key: string) {
|
||||
|
|
|
@ -8,7 +8,7 @@ import { NotFoundError } from '@/errors/response-errors/not-found.error';
|
|||
import type { IExecutionResponse } from '@/interfaces';
|
||||
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
|
||||
|
||||
import type { IWebhookResponseCallbackData, WaitingWebhookRequest } from './webhook.types';
|
||||
import type { WaitingWebhookRequest } from './webhook.types';
|
||||
|
||||
@Service()
|
||||
export class WaitingForms extends WaitingWebhooks {
|
||||
|
@ -61,10 +61,7 @@ export class WaitingForms extends WaitingWebhooks {
|
|||
}
|
||||
}
|
||||
|
||||
async executeWebhook(
|
||||
req: WaitingWebhookRequest,
|
||||
res: express.Response,
|
||||
): Promise<IWebhookResponseCallbackData> {
|
||||
async executeWebhook(req: WaitingWebhookRequest, res: express.Response): Promise<void> {
|
||||
const { path: executionId, suffix } = req.params;
|
||||
|
||||
this.logReceivedWebhook(req.method, executionId);
|
||||
|
@ -107,10 +104,7 @@ export class WaitingForms extends WaitingWebhooks {
|
|||
message: 'Your response has been recorded',
|
||||
formTitle: 'Form Submitted',
|
||||
});
|
||||
|
||||
return {
|
||||
noWebhookResponse: true,
|
||||
};
|
||||
return;
|
||||
} else {
|
||||
lastNodeExecuted = completionPage;
|
||||
}
|
||||
|
@ -122,7 +116,7 @@ export class WaitingForms extends WaitingWebhooks {
|
|||
*/
|
||||
if (execution.mode === 'manual') execution.data.isTestWebhook = true;
|
||||
|
||||
return await this.getWebhookExecutionData({
|
||||
await this.getWebhookExecutionData({
|
||||
execution,
|
||||
req,
|
||||
res,
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import { Service } from '@n8n/di';
|
||||
import type express from 'express';
|
||||
import { Logger } from 'n8n-core';
|
||||
import type { INodes, IWorkflowBase } from 'n8n-workflow';
|
||||
import {
|
||||
createDeferredPromise,
|
||||
FORM_NODE_TYPE,
|
||||
type INodes,
|
||||
type IWorkflowBase,
|
||||
SEND_AND_WAIT_OPERATION,
|
||||
WAIT_NODE_TYPE,
|
||||
Workflow,
|
||||
|
@ -20,7 +20,7 @@ import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-da
|
|||
|
||||
import { WebhookService } from './webhook.service';
|
||||
import type {
|
||||
IWebhookResponseCallbackData,
|
||||
IWebhookResponsePromiseData,
|
||||
IWebhookManager,
|
||||
WaitingWebhookRequest,
|
||||
} from './webhook.types';
|
||||
|
@ -81,10 +81,7 @@ export class WaitingWebhooks implements IWebhookManager {
|
|||
});
|
||||
}
|
||||
|
||||
async executeWebhook(
|
||||
req: WaitingWebhookRequest,
|
||||
res: express.Response,
|
||||
): Promise<IWebhookResponseCallbackData> {
|
||||
async executeWebhook(req: WaitingWebhookRequest, res: express.Response): Promise<void> {
|
||||
const { path: executionId, suffix } = req.params;
|
||||
|
||||
this.logReceivedWebhook(req.method, executionId);
|
||||
|
@ -113,7 +110,7 @@ export class WaitingWebhooks implements IWebhookManager {
|
|||
const { nodes } = this.createWorkflow(workflowData);
|
||||
if (this.isSendAndWaitRequest(nodes, suffix)) {
|
||||
res.render('send-and-wait-no-action-required', { isTestWebhook: false });
|
||||
return { noWebhookResponse: true };
|
||||
return;
|
||||
} else {
|
||||
throw new ConflictError(`The execution "${executionId} has finished already.`);
|
||||
}
|
||||
|
@ -127,7 +124,7 @@ export class WaitingWebhooks implements IWebhookManager {
|
|||
*/
|
||||
if (execution.mode === 'manual') execution.data.isTestWebhook = true;
|
||||
|
||||
return await this.getWebhookExecutionData({
|
||||
await this.getWebhookExecutionData({
|
||||
execution,
|
||||
req,
|
||||
res,
|
||||
|
@ -151,7 +148,7 @@ export class WaitingWebhooks implements IWebhookManager {
|
|||
lastNodeExecuted: string;
|
||||
executionId: string;
|
||||
suffix?: string;
|
||||
}): Promise<IWebhookResponseCallbackData> {
|
||||
}): Promise<void> {
|
||||
// Set the node as disabled so that the data does not get executed again as it would result
|
||||
// in starting the wait all over again
|
||||
this.disableNode(execution, req.method);
|
||||
|
@ -188,7 +185,7 @@ export class WaitingWebhooks implements IWebhookManager {
|
|||
|
||||
if (this.isSendAndWaitRequest(workflow.nodes, suffix)) {
|
||||
res.render('send-and-wait-no-action-required', { isTestWebhook: false });
|
||||
return { noWebhookResponse: true };
|
||||
return;
|
||||
}
|
||||
|
||||
if (!execution.data.resultData.error && execution.status === 'waiting') {
|
||||
|
@ -203,7 +200,7 @@ export class WaitingWebhooks implements IWebhookManager {
|
|||
);
|
||||
|
||||
if (hasChildForms) {
|
||||
return { noWebhookResponse: true };
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -212,27 +209,20 @@ export class WaitingWebhooks implements IWebhookManager {
|
|||
|
||||
const runExecutionData = execution.data;
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
const executionMode = 'webhook';
|
||||
void WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
workflowData,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
runExecutionData.pushRef,
|
||||
runExecutionData,
|
||||
execution.id,
|
||||
req,
|
||||
res,
|
||||
|
||||
(error: Error | null, data: object) => {
|
||||
if (error !== null) {
|
||||
return reject(error);
|
||||
}
|
||||
resolve(data);
|
||||
},
|
||||
);
|
||||
});
|
||||
const executionMode = 'webhook';
|
||||
const responsePromise = createDeferredPromise<IWebhookResponsePromiseData>();
|
||||
await WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData,
|
||||
workflowData,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
runExecutionData.pushRef,
|
||||
runExecutionData,
|
||||
execution.id,
|
||||
req,
|
||||
res,
|
||||
responsePromise,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ import type {
|
|||
import {
|
||||
ApplicationError,
|
||||
BINARY_ENCODING,
|
||||
createDeferredPromise,
|
||||
ExecutionCancelledError,
|
||||
FORM_NODE_TYPE,
|
||||
FORM_TRIGGER_NODE_TYPE,
|
||||
|
@ -60,7 +59,7 @@ import * as WorkflowHelpers from '@/workflow-helpers';
|
|||
import { WorkflowRunner } from '@/workflow-runner';
|
||||
|
||||
import { WebhookService } from './webhook.service';
|
||||
import type { IWebhookResponseCallbackData, WebhookRequest } from './webhook.types';
|
||||
import type { IWebhookResponsePromiseData, WebhookRequest } from './webhook.types';
|
||||
|
||||
/**
|
||||
* Returns all the webhooks which should be created for the given workflow
|
||||
|
@ -107,7 +106,7 @@ export function autoDetectResponseMode(
|
|||
workflowStartNode: INode,
|
||||
workflow: Workflow,
|
||||
method: string,
|
||||
) {
|
||||
): WebhookResponseMode | undefined {
|
||||
if (workflowStartNode.type === WAIT_NODE_TYPE && workflowStartNode.parameters.resume !== 'form') {
|
||||
return undefined;
|
||||
}
|
||||
|
@ -137,26 +136,25 @@ export function autoDetectResponseMode(
|
|||
* for formTrigger and form nodes redirection has to be handled by sending redirectURL in response body
|
||||
*/
|
||||
export const handleFormRedirectionCase = (
|
||||
data: IWebhookResponseCallbackData,
|
||||
response: IN8nHttpFullResponse,
|
||||
workflowStartNode: INode,
|
||||
) => {
|
||||
): IN8nHttpFullResponse => {
|
||||
if (workflowStartNode.type === WAIT_NODE_TYPE && workflowStartNode.parameters.resume !== 'form') {
|
||||
return data;
|
||||
return response;
|
||||
}
|
||||
|
||||
const { headers, statusCode } = response;
|
||||
if (
|
||||
[FORM_NODE_TYPE, FORM_TRIGGER_NODE_TYPE, WAIT_NODE_TYPE].includes(workflowStartNode.type) &&
|
||||
(data?.headers as IDataObject)?.location &&
|
||||
String(data?.responseCode).startsWith('3')
|
||||
headers.location &&
|
||||
String(statusCode).startsWith('3')
|
||||
) {
|
||||
data.responseCode = 200;
|
||||
data.data = {
|
||||
redirectURL: (data?.headers as IDataObject)?.location,
|
||||
};
|
||||
(data.headers as IDataObject).location = undefined;
|
||||
response.statusCode = 200;
|
||||
response.body = { redirectURL: headers.location };
|
||||
delete headers.location;
|
||||
}
|
||||
|
||||
return data;
|
||||
return response;
|
||||
};
|
||||
|
||||
const { formDataFileSizeMax } = Container.get(GlobalConfig).endpoints;
|
||||
|
@ -177,7 +175,7 @@ export async function executeWebhook(
|
|||
executionId: string | undefined,
|
||||
req: WebhookRequest,
|
||||
res: express.Response,
|
||||
responseCallback: (error: Error | null, data: IWebhookResponseCallbackData) => void,
|
||||
responsePromise: IDeferredPromise<IWebhookResponsePromiseData>,
|
||||
destinationNode?: string,
|
||||
): Promise<string | undefined> {
|
||||
// Get the nodeType to know which responseMode is set
|
||||
|
@ -185,11 +183,6 @@ export async function executeWebhook(
|
|||
workflowStartNode.type,
|
||||
workflowStartNode.typeVersion,
|
||||
);
|
||||
if (nodeType === undefined) {
|
||||
const errorMessage = `The type of the webhook node "${workflowStartNode.name}" is not known`;
|
||||
responseCallback(new ApplicationError(errorMessage), {});
|
||||
throw new InternalServerError(errorMessage);
|
||||
}
|
||||
|
||||
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
|
||||
$executionId: executionId,
|
||||
|
@ -209,24 +202,28 @@ export async function executeWebhook(
|
|||
additionalData.executionId = executionId;
|
||||
}
|
||||
|
||||
// Get the responseMode
|
||||
let responseMode;
|
||||
|
||||
//check if response mode should be set automatically, e.g. multipage form
|
||||
responseMode = autoDetectResponseMode(workflowStartNode, workflow, req.method);
|
||||
|
||||
if (!responseMode) {
|
||||
responseMode = workflow.expression.getSimpleParameterValue(
|
||||
const responseMode =
|
||||
autoDetectResponseMode(workflowStartNode, workflow, req.method) ??
|
||||
(workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseMode,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
'onReceived',
|
||||
) as WebhookResponseMode;
|
||||
) as WebhookResponseMode);
|
||||
|
||||
if (!['onReceived', 'lastNode', 'responseNode'].includes(responseMode)) {
|
||||
// If the mode is not known we error. Is probably best like that instead of using
|
||||
// the default that people know as early as possible (probably already testing phase)
|
||||
// that something does not resolve properly.
|
||||
const errorMessage = `The response mode '${responseMode}' is not valid!`;
|
||||
responsePromise.reject(new ApplicationError(errorMessage));
|
||||
return;
|
||||
}
|
||||
|
||||
const responseCode = workflow.expression.getSimpleParameterValue(
|
||||
const responseCodeParam = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseCode as string,
|
||||
executionMode,
|
||||
|
@ -235,7 +232,7 @@ export async function executeWebhook(
|
|||
200,
|
||||
) as number;
|
||||
|
||||
const responseData = workflow.expression.getComplexParameterValue(
|
||||
const responseDataParam = workflow.expression.getComplexParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseData,
|
||||
executionMode,
|
||||
|
@ -244,35 +241,24 @@ export async function executeWebhook(
|
|||
'firstEntryJson',
|
||||
);
|
||||
|
||||
if (!['onReceived', 'lastNode', 'responseNode'].includes(responseMode)) {
|
||||
// If the mode is not known we error. Is probably best like that instead of using
|
||||
// the default that people know as early as possible (probably already testing phase)
|
||||
// that something does not resolve properly.
|
||||
const errorMessage = `The response mode '${responseMode}' is not valid!`;
|
||||
responseCallback(new ApplicationError(errorMessage), {});
|
||||
throw new InternalServerError(errorMessage);
|
||||
}
|
||||
const nodeVersion = workflowStartNode.typeVersion;
|
||||
// binaryData option is removed in versions higher than 1
|
||||
const binaryDataParam =
|
||||
nodeVersion === 1
|
||||
? (workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
'={{$parameter["options"]["binaryData"]}}',
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
false,
|
||||
) as boolean)
|
||||
: undefined;
|
||||
|
||||
// Add the Response and Request so that this data can be accessed in the node
|
||||
additionalData.httpRequest = req;
|
||||
additionalData.httpResponse = res;
|
||||
|
||||
let binaryData;
|
||||
|
||||
const nodeVersion = workflowStartNode.typeVersion;
|
||||
if (nodeVersion === 1) {
|
||||
// binaryData option is removed in versions higher than 1
|
||||
binaryData = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
'={{$parameter["options"]["binaryData"]}}',
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
false,
|
||||
);
|
||||
}
|
||||
|
||||
let didSendResponse = false;
|
||||
let runExecutionDataMerge = {};
|
||||
try {
|
||||
// Run the webhook function to see what should be returned and if
|
||||
|
@ -281,7 +267,7 @@ export async function executeWebhook(
|
|||
|
||||
// if `Webhook` or `Wait` node, and binaryData is enabled, skip pre-parse the request-body
|
||||
// always falsy for versions higher than 1
|
||||
if (!binaryData) {
|
||||
if (!binaryDataParam) {
|
||||
const { contentType } = req;
|
||||
if (contentType === 'multipart/form-data') {
|
||||
req.body = await parseFormData(req);
|
||||
|
@ -336,9 +322,6 @@ export async function executeWebhook(
|
|||
},
|
||||
});
|
||||
|
||||
responseCallback(new ApplicationError(errorMessage), {});
|
||||
didSendResponse = true;
|
||||
|
||||
// Add error to execution data that it can be logged and send to Editor-UI
|
||||
runExecutionDataMerge = {
|
||||
resultData: {
|
||||
|
@ -358,16 +341,20 @@ export async function executeWebhook(
|
|||
// which then so gets the chance to throw the error.
|
||||
workflowData: [[{ json: {} }]],
|
||||
};
|
||||
|
||||
responsePromise.reject(new ApplicationError(errorMessage));
|
||||
return;
|
||||
}
|
||||
|
||||
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
|
||||
$executionId: executionId,
|
||||
};
|
||||
|
||||
const responseHeadersParam = webhookData.webhookDescription.responseHeaders;
|
||||
if (webhookData.webhookDescription.responseHeaders !== undefined) {
|
||||
const responseHeaders = workflow.expression.getComplexParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseHeaders,
|
||||
responseHeadersParam,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
|
@ -381,90 +368,83 @@ export async function executeWebhook(
|
|||
| undefined;
|
||||
};
|
||||
|
||||
if (responseHeaders !== undefined && responseHeaders.entries !== undefined) {
|
||||
if (responseHeaders?.entries?.length) {
|
||||
for (const item of responseHeaders.entries) {
|
||||
res.setHeader(item.name, item.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (webhookResultData.noWebhookResponse === true && !didSendResponse) {
|
||||
if (webhookResultData.noWebhookResponse === true) {
|
||||
// The response got already send
|
||||
responseCallback(null, {
|
||||
noWebhookResponse: true,
|
||||
});
|
||||
didSendResponse = true;
|
||||
responsePromise.resolve({ noWebhookResponse: true });
|
||||
return;
|
||||
}
|
||||
|
||||
if (webhookResultData.workflowData === undefined) {
|
||||
// Workflow should not run
|
||||
if (webhookResultData.webhookResponse !== undefined) {
|
||||
// Data to respond with is given
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
data: webhookResultData.webhookResponse,
|
||||
responseCode,
|
||||
});
|
||||
didSendResponse = true;
|
||||
}
|
||||
responsePromise.resolve({
|
||||
response: {
|
||||
statusCode: responseCodeParam,
|
||||
body: webhookResultData.webhookResponse,
|
||||
headers: {},
|
||||
},
|
||||
});
|
||||
return;
|
||||
} else {
|
||||
// Send default response
|
||||
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
data: {
|
||||
message: 'Webhook call received',
|
||||
},
|
||||
responseCode,
|
||||
});
|
||||
didSendResponse = true;
|
||||
}
|
||||
responsePromise.resolve({
|
||||
response: {
|
||||
statusCode: responseCodeParam,
|
||||
body: { message: 'Webhook call received' },
|
||||
headers: {},
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Now that we know that the workflow should run we can return the default response
|
||||
// directly if responseMode it set to "onReceived" and a response should be sent
|
||||
if (responseMode === 'onReceived' && !didSendResponse) {
|
||||
// Return response directly and do not wait for the workflow to finish
|
||||
if (responseData === 'noData') {
|
||||
// Return without data
|
||||
responseCallback(null, {
|
||||
responseCode,
|
||||
});
|
||||
} else if (responseData) {
|
||||
// Return the data specified in the response data option
|
||||
responseCallback(null, {
|
||||
data: responseData as IDataObject,
|
||||
responseCode,
|
||||
});
|
||||
} else if (webhookResultData.webhookResponse !== undefined) {
|
||||
// Data to respond with is given
|
||||
responseCallback(null, {
|
||||
data: webhookResultData.webhookResponse,
|
||||
responseCode,
|
||||
});
|
||||
} else {
|
||||
responseCallback(null, {
|
||||
data: {
|
||||
message: 'Workflow was started',
|
||||
},
|
||||
responseCode,
|
||||
});
|
||||
}
|
||||
if (responseMode === 'onReceived') {
|
||||
void responsePromise.promise.then(async (resolveData) => {
|
||||
if (resolveData.noWebhookResponse) {
|
||||
// TODO: send 204
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
const { statusCode, headers, body } = resolveData.response;
|
||||
for (const [key, value] of Object.entries(headers)) {
|
||||
res.setHeader(key, value);
|
||||
}
|
||||
// TODO handle binary responses
|
||||
res.status(statusCode).json(body);
|
||||
});
|
||||
|
||||
didSendResponse = true;
|
||||
responsePromise.resolve({
|
||||
response: {
|
||||
statusCode: responseCodeParam,
|
||||
headers: {},
|
||||
body: responseDataParam ??
|
||||
webhookResultData.webhookResponse ?? { message: 'Workflow was started' },
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Initialize the data of the webhook node
|
||||
const nodeExecutionStack: IExecuteData[] = [];
|
||||
nodeExecutionStack.push({
|
||||
node: workflowStartNode,
|
||||
data: {
|
||||
main: webhookResultData.workflowData,
|
||||
const nodeExecutionStack: IExecuteData[] = [
|
||||
{
|
||||
node: workflowStartNode,
|
||||
data: {
|
||||
main: webhookResultData.workflowData,
|
||||
},
|
||||
source: null,
|
||||
},
|
||||
source: null,
|
||||
});
|
||||
];
|
||||
|
||||
runExecutionData =
|
||||
runExecutionData ||
|
||||
|
@ -517,43 +497,34 @@ export async function executeWebhook(
|
|||
runData.pushRef = runExecutionData.pushRef;
|
||||
}
|
||||
|
||||
let responsePromise: IDeferredPromise<IN8nHttpFullResponse> | undefined;
|
||||
if (responseMode === 'responseNode') {
|
||||
responsePromise = createDeferredPromise<IN8nHttpFullResponse>();
|
||||
responsePromise.promise
|
||||
.then(async (response: IN8nHttpFullResponse) => {
|
||||
if (didSendResponse) {
|
||||
.then(async (resolveData) => {
|
||||
if (resolveData.noWebhookResponse) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { response } = resolveData;
|
||||
const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData;
|
||||
if (binaryData?.id) {
|
||||
res.header(response.headers);
|
||||
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
|
||||
stream.pipe(res, { end: false });
|
||||
await finished(stream);
|
||||
responseCallback(null, { noWebhookResponse: true });
|
||||
responsePromise.resolve({ noWebhookResponse: true });
|
||||
} else if (Buffer.isBuffer(response.body)) {
|
||||
res.header(response.headers);
|
||||
res.end(response.body);
|
||||
responseCallback(null, { noWebhookResponse: true });
|
||||
res.write(response.body);
|
||||
responsePromise.resolve({ noWebhookResponse: true });
|
||||
} else {
|
||||
// TODO: This probably needs some more changes depending on the options on the
|
||||
// Webhook Response node
|
||||
|
||||
let data: IWebhookResponseCallbackData = {
|
||||
data: response.body as IDataObject,
|
||||
headers: response.headers,
|
||||
responseCode: response.statusCode,
|
||||
};
|
||||
|
||||
data = handleFormRedirectionCase(data, workflowStartNode);
|
||||
|
||||
responseCallback(null, data);
|
||||
let data: IWebhookResponsePromiseData = { response };
|
||||
data.response = handleFormRedirectionCase(data.response, workflowStartNode);
|
||||
responsePromise.resolve(data);
|
||||
}
|
||||
|
||||
process.nextTick(() => res.end());
|
||||
didSendResponse = true;
|
||||
})
|
||||
.catch(async (error) => {
|
||||
Container.get(ErrorReporter).error(error);
|
||||
|
@ -561,7 +532,7 @@ export async function executeWebhook(
|
|||
`Error with Webhook-Response for execution "${executionId}": "${error.message}"`,
|
||||
{ executionId, workflowId: workflow.id },
|
||||
);
|
||||
responseCallback(error, {});
|
||||
responsePromise.reject(error);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -578,7 +549,7 @@ export async function executeWebhook(
|
|||
executionId = await Container.get(WorkflowRunner).run(
|
||||
runData,
|
||||
true,
|
||||
!didSendResponse,
|
||||
true,
|
||||
executionId,
|
||||
responsePromise,
|
||||
);
|
||||
|
@ -602,221 +573,200 @@ export async function executeWebhook(
|
|||
});
|
||||
}
|
||||
|
||||
if (!didSendResponse) {
|
||||
executePromise
|
||||
// eslint-disable-next-line complexity
|
||||
.then(async (data) => {
|
||||
if (data === undefined) {
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
data: {
|
||||
message: 'Workflow executed successfully but no data was returned',
|
||||
},
|
||||
responseCode,
|
||||
});
|
||||
didSendResponse = true;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
executePromise
|
||||
// eslint-disable-next-line complexity
|
||||
.then(async (data) => {
|
||||
if (data === undefined) {
|
||||
responsePromise.resolve({
|
||||
response: {
|
||||
body: {
|
||||
message: 'Workflow executed successfully but no data was returned',
|
||||
},
|
||||
statusCode: responseCodeParam,
|
||||
headers: {},
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (usePinData) {
|
||||
data.data.resultData.pinData = pinData;
|
||||
}
|
||||
|
||||
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
|
||||
if (data.data.resultData.error || returnData?.error !== undefined) {
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
data: {
|
||||
message: 'Error in workflow',
|
||||
},
|
||||
responseCode: 500,
|
||||
});
|
||||
}
|
||||
didSendResponse = true;
|
||||
return data;
|
||||
}
|
||||
|
||||
// in `responseNode` mode `responseCallback` is called by `responsePromise`
|
||||
if (responseMode === 'responseNode' && responsePromise) {
|
||||
await Promise.allSettled([responsePromise.promise]);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (returnData === undefined) {
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
data: {
|
||||
message:
|
||||
'Workflow executed successfully but the last node did not return any data',
|
||||
},
|
||||
responseCode,
|
||||
});
|
||||
}
|
||||
didSendResponse = true;
|
||||
return data;
|
||||
}
|
||||
|
||||
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
|
||||
$executionId: executionId,
|
||||
};
|
||||
|
||||
if (!didSendResponse) {
|
||||
let data: IDataObject | IDataObject[] | undefined;
|
||||
|
||||
if (responseData === 'firstEntryJson') {
|
||||
// Return the JSON data of the first entry
|
||||
|
||||
if (returnData.data!.main[0]![0] === undefined) {
|
||||
responseCallback(new ApplicationError('No item to return got found'), {});
|
||||
didSendResponse = true;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
data = returnData.data!.main[0]![0].json;
|
||||
|
||||
const responsePropertyName = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responsePropertyName,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
);
|
||||
|
||||
if (responsePropertyName !== undefined) {
|
||||
data = get(data, responsePropertyName as string) as IDataObject;
|
||||
}
|
||||
|
||||
const responseContentType = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseContentType,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
);
|
||||
|
||||
if (responseContentType !== undefined) {
|
||||
// Send the webhook response manually to be able to set the content-type
|
||||
res.setHeader('Content-Type', responseContentType as string);
|
||||
|
||||
// Returning an object, boolean, number, ... causes problems so make sure to stringify if needed
|
||||
if (
|
||||
data !== null &&
|
||||
data !== undefined &&
|
||||
['Buffer', 'String'].includes(data.constructor.name)
|
||||
) {
|
||||
res.end(data);
|
||||
} else {
|
||||
res.end(JSON.stringify(data));
|
||||
}
|
||||
|
||||
responseCallback(null, {
|
||||
noWebhookResponse: true,
|
||||
});
|
||||
didSendResponse = true;
|
||||
}
|
||||
} else if (responseData === 'firstEntryBinary') {
|
||||
// Return the binary data of the first entry
|
||||
data = returnData.data!.main[0]![0];
|
||||
|
||||
if (data === undefined) {
|
||||
responseCallback(new ApplicationError('No item was found to return'), {});
|
||||
didSendResponse = true;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (data.binary === undefined) {
|
||||
responseCallback(new ApplicationError('No binary data was found to return'), {});
|
||||
didSendResponse = true;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const responseBinaryPropertyName = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseBinaryPropertyName,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
'data',
|
||||
);
|
||||
|
||||
if (responseBinaryPropertyName === undefined && !didSendResponse) {
|
||||
responseCallback(
|
||||
new ApplicationError("No 'responseBinaryPropertyName' is set"),
|
||||
{},
|
||||
);
|
||||
didSendResponse = true;
|
||||
}
|
||||
|
||||
const binaryData = (data.binary as IBinaryKeyData)[
|
||||
responseBinaryPropertyName as string
|
||||
];
|
||||
if (binaryData === undefined && !didSendResponse) {
|
||||
responseCallback(
|
||||
new ApplicationError(
|
||||
`The binary property '${responseBinaryPropertyName}' which should be returned does not exist`,
|
||||
),
|
||||
{},
|
||||
);
|
||||
didSendResponse = true;
|
||||
}
|
||||
|
||||
if (!didSendResponse) {
|
||||
// Send the webhook response manually
|
||||
res.setHeader('Content-Type', binaryData.mimeType);
|
||||
if (binaryData.id) {
|
||||
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
|
||||
stream.pipe(res, { end: false });
|
||||
await finished(stream);
|
||||
} else {
|
||||
res.write(Buffer.from(binaryData.data, BINARY_ENCODING));
|
||||
}
|
||||
|
||||
responseCallback(null, {
|
||||
noWebhookResponse: true,
|
||||
});
|
||||
process.nextTick(() => res.end());
|
||||
}
|
||||
} else if (responseData === 'noData') {
|
||||
// Return without data
|
||||
data = undefined;
|
||||
} else {
|
||||
// Return the JSON data of all the entries
|
||||
data = [];
|
||||
for (const entry of returnData.data!.main[0]!) {
|
||||
data.push(entry.json);
|
||||
}
|
||||
}
|
||||
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
data,
|
||||
responseCode,
|
||||
});
|
||||
}
|
||||
}
|
||||
didSendResponse = true;
|
||||
if (usePinData) {
|
||||
data.data.resultData.pinData = pinData;
|
||||
}
|
||||
|
||||
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
|
||||
if (data.data.resultData.error || returnData?.error !== undefined) {
|
||||
responsePromise.resolve({
|
||||
response: {
|
||||
body: {
|
||||
message: 'Error in workflow',
|
||||
},
|
||||
statusCode: 500,
|
||||
headers: {},
|
||||
},
|
||||
});
|
||||
return data;
|
||||
})
|
||||
.catch((e) => {
|
||||
if (!didSendResponse) {
|
||||
responseCallback(
|
||||
new ApplicationError('There was a problem executing the workflow', {
|
||||
level: 'warning',
|
||||
cause: e,
|
||||
}),
|
||||
{},
|
||||
);
|
||||
}
|
||||
|
||||
// in `responseNode` mode `responseCallback` is called by `responsePromise`
|
||||
if (responseMode === 'responseNode' && responsePromise) {
|
||||
await Promise.allSettled([responsePromise.promise]);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (returnData === undefined) {
|
||||
responsePromise.resolve({
|
||||
response: {
|
||||
body: {
|
||||
message: 'Workflow executed successfully but the last node did not return any data',
|
||||
},
|
||||
statusCode: responseCodeParam,
|
||||
headers: {},
|
||||
},
|
||||
});
|
||||
return data;
|
||||
}
|
||||
|
||||
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
|
||||
$executionId: executionId,
|
||||
};
|
||||
|
||||
let responseData: IDataObject | IDataObject[] | undefined;
|
||||
|
||||
if (responseDataParam === 'firstEntryJson') {
|
||||
// Return the JSON data of the first entry
|
||||
|
||||
if (returnData.data!.main[0]![0] === undefined) {
|
||||
responsePromise.reject(new ApplicationError('No item to return got found'));
|
||||
return;
|
||||
}
|
||||
|
||||
const internalServerError = new InternalServerError(e.message, e);
|
||||
if (e instanceof ExecutionCancelledError) internalServerError.level = 'warning';
|
||||
throw internalServerError;
|
||||
responseData = returnData.data!.main[0]![0].json;
|
||||
|
||||
const responsePropertyName = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responsePropertyName,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
);
|
||||
|
||||
if (responsePropertyName !== undefined) {
|
||||
responseData = get(responseData, responsePropertyName as string) as IDataObject;
|
||||
}
|
||||
|
||||
const responseContentType = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseContentType,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
);
|
||||
|
||||
if (responseContentType !== undefined) {
|
||||
// Send the webhook response manually to be able to set the content-type
|
||||
res.setHeader('Content-Type', responseContentType as string);
|
||||
|
||||
// Returning an object, boolean, number, ... causes problems so make sure to stringify if needed
|
||||
if (
|
||||
responseData !== null &&
|
||||
responseData !== undefined &&
|
||||
['Buffer', 'String'].includes(responseData.constructor.name)
|
||||
) {
|
||||
res.end(responseData);
|
||||
} else {
|
||||
res.end(JSON.stringify(responseData));
|
||||
}
|
||||
|
||||
responsePromise.resolve({ noWebhookResponse: true });
|
||||
return;
|
||||
}
|
||||
} else if (responseDataParam === 'firstEntryBinary') {
|
||||
// Return the binary data of the first entry
|
||||
responseData = returnData.data!.main[0]![0];
|
||||
|
||||
if (responseData === undefined) {
|
||||
responsePromise.reject(new ApplicationError('No item was found to return'));
|
||||
return;
|
||||
}
|
||||
|
||||
if (responseData.binary === undefined) {
|
||||
responsePromise.reject(new ApplicationError('No binary data was found to return'));
|
||||
return;
|
||||
}
|
||||
|
||||
const responseBinaryPropertyName = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseBinaryPropertyName,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
'data',
|
||||
);
|
||||
|
||||
if (responseBinaryPropertyName === undefined) {
|
||||
responsePromise.reject(new ApplicationError("No 'responseBinaryPropertyName' is set"));
|
||||
return;
|
||||
}
|
||||
|
||||
const binaryData = (responseData.binary as IBinaryKeyData)[
|
||||
responseBinaryPropertyName as string
|
||||
];
|
||||
if (binaryData === undefined) {
|
||||
responsePromise.reject(
|
||||
new ApplicationError(
|
||||
`The binary property '${responseBinaryPropertyName}' which should be returned does not exist`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Send the webhook response manually
|
||||
res.setHeader('Content-Type', binaryData.mimeType);
|
||||
if (binaryData.id) {
|
||||
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
|
||||
stream.pipe(res, { end: false });
|
||||
await finished(stream);
|
||||
} else {
|
||||
res.write(Buffer.from(binaryData.data, BINARY_ENCODING));
|
||||
}
|
||||
|
||||
responsePromise.resolve({ noWebhookResponse: true });
|
||||
process.nextTick(() => res.end());
|
||||
} else if (responseDataParam === 'noData') {
|
||||
// Return without data
|
||||
responseData = undefined;
|
||||
} else {
|
||||
// Return the JSON data of all the entries
|
||||
responseData = [];
|
||||
for (const entry of returnData.data!.main[0]!) {
|
||||
responseData.push(entry.json);
|
||||
}
|
||||
}
|
||||
|
||||
responsePromise.resolve({
|
||||
response: {
|
||||
body: responseData,
|
||||
statusCode: responseCodeParam,
|
||||
headers: {},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return data;
|
||||
})
|
||||
.catch((e) => {
|
||||
responsePromise.reject(
|
||||
new ApplicationError('There was a problem executing the workflow', {
|
||||
level: 'warning',
|
||||
cause: e,
|
||||
}),
|
||||
);
|
||||
|
||||
const internalServerError = new InternalServerError(e.message, e);
|
||||
if (e instanceof ExecutionCancelledError) internalServerError.level = 'warning';
|
||||
throw internalServerError;
|
||||
});
|
||||
return executionId;
|
||||
} catch (e) {
|
||||
const error =
|
||||
|
@ -826,8 +776,7 @@ export async function executeWebhook(
|
|||
level: 'warning',
|
||||
cause: e,
|
||||
});
|
||||
if (didSendResponse) throw error;
|
||||
responseCallback(error, {});
|
||||
responsePromise.reject(error);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,18 +42,7 @@ class WebhookRequestHandler {
|
|||
}
|
||||
|
||||
try {
|
||||
const response = await this.webhookManager.executeWebhook(req, res);
|
||||
|
||||
// Don't respond, if already responded
|
||||
if (response.noWebhookResponse !== true) {
|
||||
ResponseHelper.sendSuccessResponse(
|
||||
res,
|
||||
response.data,
|
||||
true,
|
||||
response.responseCode,
|
||||
response.headers,
|
||||
);
|
||||
}
|
||||
await this.webhookManager.executeWebhook(req, res);
|
||||
} catch (e) {
|
||||
const error = ensureError(e);
|
||||
Container.get(Logger).debug(
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import type { Request, Response } from 'express';
|
||||
import type { IDataObject, IHttpRequestMethods } from 'n8n-workflow';
|
||||
import type { IHttpRequestMethods, IN8nHttpFullResponse } from 'n8n-workflow';
|
||||
|
||||
export type WebhookOptionsRequest = Request & { method: 'OPTIONS' };
|
||||
|
||||
|
@ -26,12 +26,12 @@ export interface IWebhookManager {
|
|||
httpMethod: IHttpRequestMethods,
|
||||
) => Promise<WebhookAccessControlOptions | undefined>;
|
||||
|
||||
executeWebhook(req: WebhookRequest, res: Response): Promise<IWebhookResponseCallbackData>;
|
||||
executeWebhook(req: WebhookRequest, res: Response): Promise<void>;
|
||||
}
|
||||
|
||||
export interface IWebhookResponseCallbackData {
|
||||
data?: IDataObject | IDataObject[];
|
||||
headers?: object;
|
||||
noWebhookResponse?: boolean;
|
||||
responseCode?: number;
|
||||
}
|
||||
export type IWebhookResponsePromiseData =
|
||||
| { noWebhookResponse: true }
|
||||
| {
|
||||
noWebhookResponse?: false;
|
||||
response: IN8nHttpFullResponse;
|
||||
};
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import { GlobalConfig } from '@n8n/config';
|
||||
import { Container } from '@n8n/di';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import { agent as testAgent } from 'supertest';
|
||||
import type SuperAgentTest from 'supertest/lib/agent';
|
||||
|
||||
|
@ -10,7 +9,6 @@ import { TestWebhooks } from '@/webhooks/test-webhooks';
|
|||
import { WaitingForms } from '@/webhooks/waiting-forms';
|
||||
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
|
||||
import { WebhookServer } from '@/webhooks/webhook-server';
|
||||
import type { IWebhookResponseCallbackData } from '@/webhooks/webhook.types';
|
||||
import { mockInstance } from '@test/mocking';
|
||||
|
||||
let agent: SuperAgentTest;
|
||||
|
@ -60,9 +58,6 @@ describe('WebhookServer', () => {
|
|||
it('should handle regular requests', async () => {
|
||||
const pathPrefix = Container.get(GlobalConfig).endpoints[key];
|
||||
manager.getWebhookMethods.mockResolvedValueOnce(['GET']);
|
||||
manager.executeWebhook.mockResolvedValueOnce(
|
||||
mockResponse({ test: true }, { key: 'value ' }),
|
||||
);
|
||||
|
||||
const response = await agent
|
||||
.get(`/${pathPrefix}/abcd`)
|
||||
|
@ -75,13 +70,5 @@ describe('WebhookServer', () => {
|
|||
});
|
||||
});
|
||||
}
|
||||
|
||||
const mockResponse = (data = {}, headers = {}, status = 200) => {
|
||||
const response = mock<IWebhookResponseCallbackData>();
|
||||
response.responseCode = status;
|
||||
response.data = data;
|
||||
response.headers = headers;
|
||||
return response;
|
||||
};
|
||||
});
|
||||
});
|
||||
|
|
|
@ -23,6 +23,12 @@ import type { Readable } from 'stream';
|
|||
|
||||
import { formatPrivateKey, generatePairedItemData } from '../../utils/utilities';
|
||||
|
||||
type Options = {
|
||||
responseHeaders: { entries: Array<{ name: string; value: string }> };
|
||||
responseCode: number;
|
||||
responseKey: string;
|
||||
};
|
||||
|
||||
export class RespondToWebhook implements INodeType {
|
||||
description: INodeTypeDescription = {
|
||||
displayName: 'Respond to Webhook',
|
||||
|
@ -323,19 +329,16 @@ export class RespondToWebhook implements INodeType {
|
|||
}
|
||||
|
||||
const respondWith = this.getNodeParameter('respondWith', 0) as string;
|
||||
const options = this.getNodeParameter('options', 0, {});
|
||||
const options = this.getNodeParameter('options', 0, {}) as Options;
|
||||
|
||||
const headers = {} as IDataObject;
|
||||
if (options.responseHeaders) {
|
||||
for (const header of (options.responseHeaders as IDataObject).entries as IDataObject[]) {
|
||||
if (typeof header.name !== 'string') {
|
||||
header.name = header.name?.toString();
|
||||
}
|
||||
headers[header.name?.toLowerCase() as string] = header.value?.toString();
|
||||
const headers: IN8nHttpFullResponse['headers'] = {};
|
||||
if (options.responseHeaders?.entries?.length) {
|
||||
for (const header of options.responseHeaders.entries) {
|
||||
headers[String(header.name).toLowerCase()] = String(header.value);
|
||||
}
|
||||
}
|
||||
|
||||
let statusCode = (options.responseCode as number) || 200;
|
||||
let statusCode = options.responseCode ?? 200;
|
||||
let responseBody: IN8nHttpResponse | Readable;
|
||||
if (respondWith === 'json') {
|
||||
const responseBodyParameter = this.getNodeParameter('responseBody', 0) as string;
|
||||
|
@ -381,11 +384,11 @@ export class RespondToWebhook implements INodeType {
|
|||
} else if (respondWith === 'allIncomingItems') {
|
||||
const respondItems = items.map((item) => item.json);
|
||||
responseBody = options.responseKey
|
||||
? set({}, options.responseKey as string, respondItems)
|
||||
? set({}, options.responseKey, respondItems)
|
||||
: respondItems;
|
||||
} else if (respondWith === 'firstIncomingItem') {
|
||||
responseBody = options.responseKey
|
||||
? set({}, options.responseKey as string, items[0].json)
|
||||
? set({}, options.responseKey, items[0].json)
|
||||
: items[0].json;
|
||||
} else if (respondWith === 'text') {
|
||||
responseBody = this.getNodeParameter('responseBody', 0) as string;
|
||||
|
@ -418,7 +421,7 @@ export class RespondToWebhook implements INodeType {
|
|||
responseBody = { binaryData };
|
||||
} else {
|
||||
responseBody = Buffer.from(binaryData.data, BINARY_ENCODING);
|
||||
headers['content-length'] = (responseBody as Buffer).length;
|
||||
headers['content-length'] = String((responseBody as Buffer).length);
|
||||
}
|
||||
|
||||
if (!headers['content-type']) {
|
||||
|
@ -426,7 +429,7 @@ export class RespondToWebhook implements INodeType {
|
|||
}
|
||||
} else if (respondWith === 'redirect') {
|
||||
headers.location = this.getNodeParameter('redirectURL', 0) as string;
|
||||
statusCode = (options.responseCode as number) ?? 307;
|
||||
statusCode = options.responseCode ?? 307;
|
||||
} else if (respondWith !== 'noData') {
|
||||
throw new NodeOperationError(
|
||||
this.getNode(),
|
||||
|
|
|
@ -518,10 +518,12 @@ export interface PaginationOptions {
|
|||
|
||||
export type IN8nHttpResponse = IDataObject | Buffer | GenericValue | GenericValue[] | null;
|
||||
|
||||
type ResponseHeaders = Record<string, string>;
|
||||
|
||||
export interface IN8nHttpFullResponse {
|
||||
body: IN8nHttpResponse | Readable;
|
||||
__bodyResolved?: boolean;
|
||||
headers: IDataObject;
|
||||
headers: ResponseHeaders;
|
||||
statusCode: number;
|
||||
statusMessage?: string;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue