diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index 9a06868f3a..28a02b9b39 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -12,7 +12,7 @@ import * as PCancelable from 'p-cancelable'; import { Command, flags } from '@oclif/command'; import { UserSettings, WorkflowExecute } from 'n8n-core'; -import { INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow'; +import { IExecuteResponsePromiseData, INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow'; import { FindOneOptions } from 'typeorm'; @@ -25,11 +25,13 @@ import { GenericHelpers, IBullJobData, IBullJobResponse, + IBullWebhookResponse, IExecutionFlattedDb, InternalHooksManager, LoadNodesAndCredentials, NodeTypes, ResponseHelper, + WebhookHelpers, WorkflowExecuteAdditionalData, } from '../src'; @@ -172,6 +174,16 @@ export class Worker extends Command { currentExecutionDb.workflowData, { retryOf: currentExecutionDb.retryOf as string }, ); + + additionalData.hooks.hookFunctions.sendResponse = [ + async (response: IExecuteResponsePromiseData): Promise => { + await job.progress({ + executionId: job.data.executionId as string, + response: WebhookHelpers.encodeWebhookResponse(response), + } as IBullWebhookResponse); + }, + ]; + additionalData.executionId = jobData.executionId; let workflowExecute: WorkflowExecute; diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index dac67322c6..cd02ebe679 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -5,9 +5,12 @@ /* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-non-null-assertion */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import { IRun } from 'n8n-workflow'; - -import { createDeferredPromise } from 'n8n-core'; +import { + createDeferredPromise, + IDeferredPromise, + IExecuteResponsePromiseData, + IRun, +} from 'n8n-workflow'; import { ChildProcess } from 'child_process'; // eslint-disable-next-line import/no-extraneous-dependencies @@ -116,6 +119,28 @@ export class ActiveExecutions { this.activeExecutions[executionId].workflowExecution = workflowExecution; } + attachResponsePromise( + executionId: string, + responsePromise: IDeferredPromise, + ): void { + if (this.activeExecutions[executionId] === undefined) { + throw new Error( + `No active execution with id "${executionId}" got found to attach to workflowExecution to!`, + ); + } + + this.activeExecutions[executionId].responsePromise = responsePromise; + } + + resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void { + if (this.activeExecutions[executionId] === undefined) { + return; + } + + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + this.activeExecutions[executionId].responsePromise?.resolve(response); + } + /** * Remove an active execution * @@ -193,6 +218,7 @@ export class ActiveExecutions { this.activeExecutions[executionId].postExecutePromises.push(waitPromise); + // eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-member-access return waitPromise.promise(); } diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 181671c4ff..dd8ac09c3c 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -12,7 +12,9 @@ import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; import { + IDeferredPromise, IExecuteData, + IExecuteResponsePromiseData, IGetExecutePollFunctions, IGetExecuteTriggerFunctions, INode, @@ -40,8 +42,6 @@ import { NodeTypes, ResponseHelper, WebhookHelpers, - // eslint-disable-next-line @typescript-eslint/no-unused-vars - WorkflowCredentials, WorkflowExecuteAdditionalData, WorkflowHelpers, WorkflowRunner, @@ -550,6 +550,7 @@ export class ActiveWorkflowRunner { data: INodeExecutionData[][], additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode, + responsePromise?: IDeferredPromise, ) { const nodeExecutionStack: IExecuteData[] = [ { @@ -580,7 +581,7 @@ export class ActiveWorkflowRunner { }; const workflowRunner = new WorkflowRunner(); - return workflowRunner.run(runData, true); + return workflowRunner.run(runData, true, undefined, undefined, responsePromise); } /** @@ -641,13 +642,16 @@ export class ActiveWorkflowRunner { mode, activation, ); - returnFunctions.emit = (data: INodeExecutionData[][]): void => { + returnFunctions.emit = ( + data: INodeExecutionData[][], + responsePromise?: IDeferredPromise, + ): void => { // eslint-disable-next-line @typescript-eslint/restrict-template-expressions Logger.debug(`Received trigger for workflow "${workflow.name}"`); WorkflowHelpers.saveStaticData(workflow); // eslint-disable-next-line id-denylist - this.runWorkflow(workflowData, node, data, additionalData, mode).catch((err) => - console.error(err), + this.runWorkflow(workflowData, node, data, additionalData, mode, responsePromise).catch( + (error) => console.error(error), ); }; return returnFunctions; diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index d5c11a8f3f..556aa74492 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -7,19 +7,19 @@ import { ICredentialsEncrypted, ICredentialType, IDataObject, + IDeferredPromise, + IExecuteResponsePromiseData, IRun, IRunData, IRunExecutionData, ITaskData, ITelemetrySettings, IWorkflowBase as IWorkflowBaseWorkflow, - // eslint-disable-next-line @typescript-eslint/no-unused-vars - IWorkflowCredentials, Workflow, WorkflowExecuteMode, } from 'n8n-workflow'; -import { IDeferredPromise, WorkflowExecute } from 'n8n-core'; +import { WorkflowExecute } from 'n8n-core'; // eslint-disable-next-line import/no-extraneous-dependencies import * as PCancelable from 'p-cancelable'; @@ -47,6 +47,11 @@ export interface IBullJobResponse { success: boolean; } +export interface IBullWebhookResponse { + executionId: string; + response: IExecuteResponsePromiseData; +} + export interface ICustomRequest extends Request { parsedUrl: Url | undefined; } @@ -237,6 +242,7 @@ export interface IExecutingWorkflowData { process?: ChildProcess; startedAt: Date; postExecutePromises: Array>; + responsePromise?: IDeferredPromise; workflowExecution?: PCancelable; } @@ -490,6 +496,7 @@ export interface IPushDataConsoleMessage { export interface IResponseCallbackData { data?: IDataObject | IDataObject[]; + headers?: object; noWebhookResponse?: boolean; responseCode?: number; } diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index 9143c59ee4..5d215a2bd6 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -1,12 +1,21 @@ +/* eslint-disable @typescript-eslint/no-unsafe-member-access */ import * as Bull from 'bull'; import * as config from '../config'; // eslint-disable-next-line import/no-cycle -import { IBullJobData } from './Interfaces'; +import { IBullJobData, IBullWebhookResponse } from './Interfaces'; +// eslint-disable-next-line import/no-cycle +import * as ActiveExecutions from './ActiveExecutions'; +// eslint-disable-next-line import/no-cycle +import * as WebhookHelpers from './WebhookHelpers'; export class Queue { + private activeExecutions: ActiveExecutions.ActiveExecutions; + private jobQueue: Bull.Queue; constructor() { + this.activeExecutions = ActiveExecutions.getInstance(); + const prefix = config.get('queue.bull.prefix') as string; const redisOptions = config.get('queue.bull.redis') as object; // Disabling ready check is necessary as it allows worker to @@ -16,6 +25,14 @@ export class Queue { // More here: https://github.com/OptimalBits/bull/issues/890 // @ts-ignore this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false }); + + this.jobQueue.on('global:progress', (jobId, progress: IBullWebhookResponse) => { + this.activeExecutions.resolveResponsePromise( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + progress.executionId, + WebhookHelpers.decodeWebhookResponse(progress.response), + ); + }); } async add(jobData: IBullJobData, jobOptions: object): Promise { diff --git a/packages/cli/src/ResponseHelper.ts b/packages/cli/src/ResponseHelper.ts index f6deb551be..e8430c695d 100644 --- a/packages/cli/src/ResponseHelper.ts +++ b/packages/cli/src/ResponseHelper.ts @@ -72,11 +72,16 @@ export function sendSuccessResponse( data: any, raw?: boolean, responseCode?: number, + responseHeader?: object, ) { if (responseCode !== undefined) { res.status(responseCode); } + if (responseHeader) { + res.header(responseHeader); + } + if (raw === true) { if (typeof data === 'string') { res.send(data); diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index a3e0f2d1f5..655ef52e78 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -2669,7 +2669,13 @@ class App { return; } - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + ResponseHelper.sendSuccessResponse( + res, + response.data, + true, + response.responseCode, + response.headers, + ); }, ); @@ -2720,7 +2726,13 @@ class App { return; } - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + ResponseHelper.sendSuccessResponse( + res, + response.data, + true, + response.responseCode, + response.headers, + ); }, ); @@ -2746,7 +2758,13 @@ class App { return; } - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + ResponseHelper.sendSuccessResponse( + res, + response.data, + true, + response.responseCode, + response.headers, + ); }, ); diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index ef0c47ac21..203bf20b08 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable no-param-reassign */ /* eslint-disable @typescript-eslint/prefer-optional-chain */ /* eslint-disable @typescript-eslint/no-shadow */ @@ -18,9 +19,13 @@ import { get } from 'lodash'; import { BINARY_ENCODING, NodeExecuteFunctions } from 'n8n-core'; import { + createDeferredPromise, IBinaryKeyData, IDataObject, + IDeferredPromise, IExecuteData, + IExecuteResponsePromiseData, + IN8nHttpFullResponse, INode, IRunExecutionData, IWebhookData, @@ -34,20 +39,20 @@ import { } from 'n8n-workflow'; // eslint-disable-next-line import/no-cycle import { - ActiveExecutions, GenericHelpers, IExecutionDb, IResponseCallbackData, IWorkflowDb, IWorkflowExecutionDataProcess, ResponseHelper, - // eslint-disable-next-line @typescript-eslint/no-unused-vars - WorkflowCredentials, WorkflowExecuteAdditionalData, WorkflowHelpers, WorkflowRunner, } from '.'; +// eslint-disable-next-line import/no-cycle +import * as ActiveExecutions from './ActiveExecutions'; + const activeExecutions = ActiveExecutions.getInstance(); /** @@ -91,6 +96,35 @@ export function getWorkflowWebhooks( return returnData; } +export function decodeWebhookResponse( + response: IExecuteResponsePromiseData, +): IExecuteResponsePromiseData { + if ( + typeof response === 'object' && + typeof response.body === 'object' && + (response.body as IDataObject)['__@N8nEncodedBuffer@__'] + ) { + response.body = Buffer.from( + (response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string, + BINARY_ENCODING, + ); + } + + return response; +} + +export function encodeWebhookResponse( + response: IExecuteResponsePromiseData, +): IExecuteResponsePromiseData { + if (typeof response === 'object' && Buffer.isBuffer(response.body)) { + response.body = { + '__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING), + }; + } + + return response; +} + /** * Returns all the webhooks which should be created for the give workflow * @@ -169,7 +203,7 @@ export async function executeWebhook( 200, ) as number; - if (!['onReceived', 'lastNode'].includes(responseMode as string)) { + if (!['onReceived', 'lastNode', 'responseNode'].includes(responseMode as string)) { // If the mode is not known we error. Is probably best like that instead of using // the default that people know as early as possible (probably already testing phase) // that something does not resolve properly. @@ -356,9 +390,52 @@ export async function executeWebhook( workflowData, }; + let responsePromise: IDeferredPromise | undefined; + if (responseMode === 'responseNode') { + responsePromise = await createDeferredPromise(); + responsePromise + .promise() + .then((response: IN8nHttpFullResponse) => { + if (didSendResponse) { + return; + } + + if (Buffer.isBuffer(response.body)) { + res.header(response.headers); + res.end(response.body); + + responseCallback(null, { + noWebhookResponse: true, + }); + } else { + // TODO: This probably needs some more changes depending on the options on the + // Webhook Response node + responseCallback(null, { + data: response.body as IDataObject, + headers: response.headers, + responseCode: response.statusCode, + }); + } + + didSendResponse = true; + }) + .catch(async (error) => { + Logger.error( + `Error with Webhook-Response for execution "${executionId}": "${error.message}"`, + { executionId, workflowId: workflow.id }, + ); + }); + } + // Start now to run the workflow const workflowRunner = new WorkflowRunner(); - executionId = await workflowRunner.run(runData, true, !didSendResponse, executionId); + executionId = await workflowRunner.run( + runData, + true, + !didSendResponse, + executionId, + responsePromise, + ); Logger.verbose( `Started execution of workflow "${workflow.name}" from webhook with execution ID ${executionId}`, @@ -398,6 +475,20 @@ export async function executeWebhook( return data; } + if (responseMode === 'responseNode') { + if (!didSendResponse) { + // Return an error if no Webhook-Response node did send any data + responseCallback(null, { + data: { + message: 'Workflow executed sucessfully.', + }, + responseCode, + }); + didSendResponse = true; + } + return undefined; + } + if (returnData === undefined) { if (!didSendResponse) { responseCallback(null, { diff --git a/packages/cli/src/WebhookServer.ts b/packages/cli/src/WebhookServer.ts index 4cf3afc7b4..c63526bfcc 100644 --- a/packages/cli/src/WebhookServer.ts +++ b/packages/cli/src/WebhookServer.ts @@ -64,7 +64,13 @@ export function registerProductionWebhooks() { return; } - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + ResponseHelper.sendSuccessResponse( + res, + response.data, + true, + response.responseCode, + response.headers, + ); }, ); @@ -115,7 +121,13 @@ export function registerProductionWebhooks() { return; } - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + ResponseHelper.sendSuccessResponse( + res, + response.data, + true, + response.responseCode, + response.headers, + ); }, ); @@ -141,7 +153,13 @@ export function registerProductionWebhooks() { return; } - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + ResponseHelper.sendSuccessResponse( + res, + response.data, + true, + response.responseCode, + response.headers, + ); }, ); @@ -173,7 +191,13 @@ export function registerProductionWebhooks() { return; } - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + ResponseHelper.sendSuccessResponse( + res, + response.data, + true, + response.responseCode, + response.headers, + ); }, ); @@ -199,7 +223,13 @@ export function registerProductionWebhooks() { return; } - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + ResponseHelper.sendSuccessResponse( + res, + response.data, + true, + response.responseCode, + response.headers, + ); }, ); @@ -225,7 +255,13 @@ export function registerProductionWebhooks() { return; } - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + ResponseHelper.sendSuccessResponse( + res, + response.data, + true, + response.responseCode, + response.headers, + ); }, ); } diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 8984384aaa..fd18ff3d04 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -15,6 +15,8 @@ import { IProcessMessage, WorkflowExecute } from 'n8n-core'; import { ExecutionError, + IDeferredPromise, + IExecuteResponsePromiseData, IRun, LoggerProxy as Logger, Workflow, @@ -41,9 +43,7 @@ import { IBullJobResponse, ICredentialsOverwrite, ICredentialsTypeData, - IExecutionDb, IExecutionFlattedDb, - IExecutionResponse, IProcessMessageDataHook, ITransferNodeTypes, IWorkflowExecutionDataProcess, @@ -51,6 +51,7 @@ import { NodeTypes, Push, ResponseHelper, + WebhookHelpers, WorkflowExecuteAdditionalData, WorkflowHelpers, } from '.'; @@ -146,6 +147,7 @@ export class WorkflowRunner { loadStaticData?: boolean, realtime?: boolean, executionId?: string, + responsePromise?: IDeferredPromise, ): Promise { const executionsProcess = config.get('executions.process') as string; const executionsMode = config.get('executions.mode') as string; @@ -153,11 +155,17 @@ export class WorkflowRunner { if (executionsMode === 'queue' && data.executionMode !== 'manual') { // Do not run "manual" executions in bull because sending events to the // frontend would not be possible - executionId = await this.runBull(data, loadStaticData, realtime, executionId); + executionId = await this.runBull( + data, + loadStaticData, + realtime, + executionId, + responsePromise, + ); } else if (executionsProcess === 'main') { - executionId = await this.runMainProcess(data, loadStaticData, executionId); + executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise); } else { - executionId = await this.runSubprocess(data, loadStaticData, executionId); + executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise); } const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); @@ -200,6 +208,7 @@ export class WorkflowRunner { data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, restartExecutionId?: string, + responsePromise?: IDeferredPromise, ): Promise { if (loadStaticData === true && data.workflowData.id) { data.workflowData.staticData = await WorkflowHelpers.getStaticDataById( @@ -256,6 +265,15 @@ export class WorkflowRunner { executionId, true, ); + + additionalData.hooks.hookFunctions.sendResponse = [ + async (response: IExecuteResponsePromiseData): Promise => { + if (responsePromise) { + responsePromise.resolve(response); + } + }, + ]; + additionalData.sendMessageToUI = WorkflowExecuteAdditionalData.sendMessageToUI.bind({ sessionId: data.sessionId, }); @@ -341,11 +359,15 @@ export class WorkflowRunner { loadStaticData?: boolean, realtime?: boolean, restartExecutionId?: string, + responsePromise?: IDeferredPromise, ): Promise { // TODO: If "loadStaticData" is set to true it has to load data new on worker // Register the active execution const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId); + if (responsePromise) { + this.activeExecutions.attachResponsePromise(executionId, responsePromise); + } const jobData: IBullJobData = { executionId, @@ -545,6 +567,7 @@ export class WorkflowRunner { data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, restartExecutionId?: string, + responsePromise?: IDeferredPromise, ): Promise { let startedAt = new Date(); const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js')); @@ -653,6 +676,10 @@ export class WorkflowRunner { } else if (message.type === 'end') { clearTimeout(executionTimeout); this.activeExecutions.remove(executionId, message.data.runData); + } else if (message.type === 'sendResponse') { + if (responsePromise) { + responsePromise.resolve(WebhookHelpers.decodeWebhookResponse(message.data.response)); + } } else if (message.type === 'sendMessageToUI') { // eslint-disable-next-line @typescript-eslint/no-unsafe-call WorkflowExecuteAdditionalData.sendMessageToUI.bind({ sessionId: data.sessionId })( diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index d7039d69af..e8b8274c9f 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -10,6 +10,7 @@ import { IProcessMessage, UserSettings, WorkflowExecute } from 'n8n-core'; import { ExecutionError, IDataObject, + IExecuteResponsePromiseData, IExecuteWorkflowInfo, ILogger, INodeExecutionData, @@ -33,6 +34,7 @@ import { IWorkflowExecuteProcess, IWorkflowExecutionDataProcessWithExecution, NodeTypes, + WebhookHelpers, WorkflowExecuteAdditionalData, WorkflowHelpers, } from '.'; @@ -200,6 +202,15 @@ export class WorkflowRunnerProcess { workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000, ); additionalData.hooks = this.getProcessForwardHooks(); + + additionalData.hooks.hookFunctions.sendResponse = [ + async (response: IExecuteResponsePromiseData): Promise => { + await sendToParentProcess('sendResponse', { + response: WebhookHelpers.encodeWebhookResponse(response), + }); + }, + ]; + additionalData.executionId = inputData.executionId; // eslint-disable-next-line @typescript-eslint/no-explicit-any diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index d6dc5da8ad..29b0c140e7 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -22,6 +22,7 @@ import { ICredentialsExpressionResolveValues, IDataObject, IExecuteFunctions, + IExecuteResponsePromiseData, IExecuteSingleFunctions, IExecuteWorkflowInfo, IHttpRequestOptions, @@ -1635,6 +1636,9 @@ export function getExecuteFunctions( Logger.warn(`There was a problem sending messsage to UI: ${error.message}`); } }, + async sendResponse(response: IExecuteResponsePromiseData): Promise { + await additionalData.hooks?.executeHookFunctions('sendResponse', [response]); + }, helpers: { httpRequest, prepareBinaryData, diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 15d1cce2b8..b0c6167aa9 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -12,7 +12,6 @@ export * from './ActiveWorkflows'; export * from './ActiveWebhooks'; export * from './Constants'; export * from './Credentials'; -export * from './DeferredPromise'; export * from './Interfaces'; export * from './LoadNodeParameterOptions'; export * from './NodeExecuteFunctions'; diff --git a/packages/core/test/Helpers.ts b/packages/core/test/Helpers.ts index 387ac67a85..eb59201828 100644 --- a/packages/core/test/Helpers.ts +++ b/packages/core/test/Helpers.ts @@ -4,6 +4,7 @@ import { ICredentialDataDecryptedObject, ICredentialsHelper, IDataObject, + IDeferredPromise, IExecuteWorkflowInfo, INodeCredentialsDetails, INodeExecutionData, @@ -20,7 +21,7 @@ import { WorkflowHooks, } from 'n8n-workflow'; -import { Credentials, IDeferredPromise, IExecuteFunctions } from '../src'; +import { Credentials, IExecuteFunctions } from '../src'; export class CredentialsHelper extends ICredentialsHelper { getDecrypted( diff --git a/packages/core/test/WorkflowExecute.test.ts b/packages/core/test/WorkflowExecute.test.ts index 364fb23d5a..b1ac658dc6 100644 --- a/packages/core/test/WorkflowExecute.test.ts +++ b/packages/core/test/WorkflowExecute.test.ts @@ -1,6 +1,14 @@ -import { IConnections, ILogger, INode, IRun, LoggerProxy, Workflow } from 'n8n-workflow'; +import { + createDeferredPromise, + IConnections, + ILogger, + INode, + IRun, + LoggerProxy, + Workflow, +} from 'n8n-workflow'; -import { createDeferredPromise, WorkflowExecute } from '../src'; +import { WorkflowExecute } from '../src'; import * as Helpers from './Helpers'; diff --git a/packages/nodes-base/nodes/RespondToWebhook.node.ts b/packages/nodes-base/nodes/RespondToWebhook.node.ts new file mode 100644 index 0000000000..3525168158 --- /dev/null +++ b/packages/nodes-base/nodes/RespondToWebhook.node.ts @@ -0,0 +1,278 @@ +import { + BINARY_ENCODING, +} from 'n8n-core'; + +import { + IDataObject, + IExecuteFunctions, + IN8nHttpFullResponse, + IN8nHttpResponse, + INodeExecutionData, + INodeType, + INodeTypeDescription, + NodeOperationError, +} from 'n8n-workflow'; + +export class RespondToWebhook implements INodeType { + description: INodeTypeDescription = { + displayName: 'Respond to Webhook', + icon: 'file:webhook.svg', + name: 'respondToWebhook', + group: ['transform'], + version: 1, + description: 'Returns data for Webhook', + defaults: { + name: 'Respond to Webhook', + color: '#885577', + }, + inputs: ['main'], + outputs: ['main'], + credentials: [ + ], + properties: [ + { + displayName: 'Respond With', + name: 'respondWith', + type: 'options', + options: [ + { + name: 'First Incoming Item', + value: 'firstIncomingItem', + }, + { + name: 'Text', + value: 'text', + }, + { + name: 'JSON', + value: 'json', + }, + { + name: 'Binary', + value: 'binary', + }, + { + name: 'No Data', + value: 'noData', + }, + ], + default: 'firstIncomingItem', + description: 'The data that should be returned', + }, + { + displayName: 'When using expressions, note that this node will only run for the first item in the input data.', + name: 'webhookNotice', + type: 'notice', + displayOptions: { + show: { + respondWith: [ + 'json', + 'text', + ], + }, + }, + default: '', + }, + { + displayName: 'Response Body', + name: 'responseBody', + type: 'json', + displayOptions: { + show: { + respondWith: [ + 'json', + ], + }, + }, + default: '', + placeholder: '{ "key": "value" }', + description: 'The HTTP Response JSON data', + }, + { + displayName: 'Response Body', + name: 'responseBody', + type: 'string', + displayOptions: { + show: { + respondWith: [ + 'text', + ], + }, + }, + default: '', + placeholder: 'e.g. Workflow started', + description: 'The HTTP Response text data', + }, + { + displayName: 'Response Data Source', + name: 'responseDataSource', + type: 'options', + displayOptions: { + show: { + respondWith: [ + 'binary', + ], + }, + }, + options: [ + { + name: 'Choose Automatically From Input', + value: 'automatically', + description: 'Use if input data will contain a single piece of binary data', + }, + { + name: 'Specify Myself', + value: 'set', + description: 'Enter the name of the input field the binary data will be in', + }, + ], + default: 'automatically', + }, + { + displayName: 'Input Field Name', + name: 'inputFieldName', + type: 'string', + required: true, + default: 'data', + displayOptions: { + show: { + respondWith: [ + 'binary', + ], + responseDataSource: [ + 'set', + ], + }, + }, + description: 'The name of the node input field with the binary data', + }, + + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Option', + default: {}, + options: [ + { + displayName: 'Response Code', + name: 'responseCode', + type: 'number', + typeOptions: { + minValue: 100, + maxValue: 599, + }, + default: 200, + description: 'The HTTP Response code to return. Defaults to 200.', + }, + { + displayName: 'Response Headers', + name: 'responseHeaders', + placeholder: 'Add Response Header', + description: 'Add headers to the webhook response', + type: 'fixedCollection', + typeOptions: { + multipleValues: true, + }, + default: {}, + options: [ + { + name: 'entries', + displayName: 'Entries', + values: [ + { + displayName: 'Name', + name: 'name', + type: 'string', + default: '', + description: 'Name of the header', + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + description: 'Value of the header', + }, + ], + }, + ], + }, + ], + }, + ], + }; + + execute(this: IExecuteFunctions): Promise { + const items = this.getInputData(); + + const respondWith = this.getNodeParameter('respondWith', 0) as string; + const options = this.getNodeParameter('options', 0, {}) as IDataObject; + + const headers = {} as IDataObject; + if (options.responseHeaders) { + for (const header of (options.responseHeaders as IDataObject).entries as IDataObject[]) { + if (typeof header.name !== 'string') { + header.name = header.name?.toString(); + } + headers[header.name?.toLowerCase() as string] = header.value?.toString(); + } + } + + let responseBody: IN8nHttpResponse; + if (respondWith === 'json') { + const responseBodyParameter = this.getNodeParameter('responseBody', 0) as string; + if (responseBodyParameter) { + responseBody = JSON.parse(responseBodyParameter); + } + } else if (respondWith === 'firstIncomingItem') { + responseBody = items[0].json; + } else if (respondWith === 'text') { + responseBody = this.getNodeParameter('responseBody', 0) as string; + } else if (respondWith === 'binary') { + const item = this.getInputData()[0]; + + if (item.binary === undefined) { + throw new NodeOperationError(this.getNode(), 'No binary data exists on the first item!'); + } + + let responseBinaryPropertyName: string; + + const responseDataSource = this.getNodeParameter('responseDataSource', 0) as string; + + if (responseDataSource === 'set') { + responseBinaryPropertyName = this.getNodeParameter('inputFieldName', 0) as string; + } else { + const binaryKeys = Object.keys(item.binary); + if (binaryKeys.length === 0) { + throw new NodeOperationError(this.getNode(), 'No binary data exists on the first item!'); + } + responseBinaryPropertyName = binaryKeys[0]; + } + + const binaryData = item.binary[responseBinaryPropertyName]; + + if (binaryData === undefined) { + throw new NodeOperationError(this.getNode(), `No binary data property "${responseBinaryPropertyName}" does not exists on item!`); + } + + if (headers['content-type']) { + headers['content-type'] = binaryData.mimeType; + } + responseBody = Buffer.from(binaryData.data, BINARY_ENCODING); + } else if (respondWith !== 'noData') { + throw new NodeOperationError(this.getNode(), `The Response Data option "${respondWith}" is not supported!`); + } + + const response: IN8nHttpFullResponse = { + body: responseBody, + headers, + statusCode: options.responseCode as number || 200, + }; + + this.sendResponse(response); + + return this.prepareOutputData(items); + } + +} diff --git a/packages/nodes-base/nodes/Wait.node.ts b/packages/nodes-base/nodes/Wait.node.ts index 2a5f064a62..e8c3609b1b 100644 --- a/packages/nodes-base/nodes/Wait.node.ts +++ b/packages/nodes-base/nodes/Wait.node.ts @@ -304,6 +304,11 @@ export class Wait implements INodeType { value: 'lastNode', description: 'Returns data of the last executed node', }, + { + name: 'Response Node finishes', + value: 'responseNode', + description: 'Returns data the response node did set', + }, ], default: 'onReceived', description: 'When and how to respond to the webhook', diff --git a/packages/nodes-base/nodes/Webhook.node.ts b/packages/nodes-base/nodes/Webhook.node.ts index e04fe2813f..1e53c1529c 100644 --- a/packages/nodes-base/nodes/Webhook.node.ts +++ b/packages/nodes-base/nodes/Webhook.node.ts @@ -9,7 +9,6 @@ import { INodeType, INodeTypeDescription, IWebhookResponseData, - NodeApiError, NodeOperationError, } from 'n8n-workflow'; @@ -143,10 +142,54 @@ export class Webhook implements INodeType { required: true, description: 'The path to listen to.', }, + { + displayName: 'Respond', + name: 'responseMode', + type: 'options', + options: [ + { + name: 'Immediately', + value: 'onReceived', + description: 'As soon as this node executes', + }, + { + name: 'When last node finishes', + value: 'lastNode', + description: 'Returns data of the last-executed node', + }, + { + name: 'Using \'Respond to Webhook\' node', + value: 'responseNode', + description: 'Response defined in that node', + }, + ], + default: 'onReceived', + description: 'When and how to respond to the webhook.', + }, + { + displayName: 'Insert a \'Respond to Webhook\' node to control when and how you respond. More details', + name: 'webhookNotice', + type: 'notice', + displayOptions: { + show: { + responseMode: [ + 'responseNode', + ], + }, + }, + default: '', + }, { displayName: 'Response Code', name: 'responseCode', type: 'number', + displayOptions: { + hide: { + responseMode: [ + 'responseNode', + ], + }, + }, typeOptions: { minValue: 100, maxValue: 599, @@ -154,25 +197,6 @@ export class Webhook implements INodeType { default: 200, description: 'The HTTP Response code to return', }, - { - displayName: 'Respond When', - name: 'responseMode', - type: 'options', - options: [ - { - name: 'Webhook received', - value: 'onReceived', - description: 'Returns directly with defined Response Code', - }, - { - name: 'Last node finishes', - value: 'lastNode', - description: 'Returns data of the last executed node', - }, - ], - default: 'onReceived', - description: 'When and how to respond to the webhook.', - }, { displayName: 'Response Data', name: 'responseData', diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 969fc9971e..9d107f7f14 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -552,6 +552,7 @@ "dist/nodes/Reddit/Reddit.node.js", "dist/nodes/Redis/Redis.node.js", "dist/nodes/RenameKeys.node.js", + "dist/nodes/RespondToWebhook.node.js", "dist/nodes/Rocketchat/Rocketchat.node.js", "dist/nodes/RssFeedRead.node.js", "dist/nodes/Rundeck/Rundeck.node.js", diff --git a/packages/core/src/DeferredPromise.ts b/packages/workflow/src/DeferredPromise.ts similarity index 100% rename from packages/core/src/DeferredPromise.ts rename to packages/workflow/src/DeferredPromise.ts diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index b700d069ff..7d7dfb6dab 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -6,6 +6,7 @@ import * as express from 'express'; import * as FormData from 'form-data'; import { URLSearchParams } from 'url'; +import { IDeferredPromise } from './DeferredPromise'; import { Workflow } from './Workflow'; import { WorkflowHooks } from './WorkflowHooks'; import { WorkflowOperationError } from './WorkflowErrors'; @@ -208,6 +209,9 @@ export interface IDataObject { [key: string]: GenericValue | IDataObject | GenericValue[] | IDataObject[]; } +// export type IExecuteResponsePromiseData = IDataObject; +export type IExecuteResponsePromiseData = IDataObject | IN8nHttpFullResponse; + export interface INodeTypeNameVersion { name: string; version: number; @@ -324,13 +328,13 @@ export interface IHttpRequestOptions { json?: boolean; } -export type IN8nHttpResponse = IDataObject | Buffer | GenericValue | GenericValue[]; +export type IN8nHttpResponse = IDataObject | Buffer | GenericValue | GenericValue[] | null; export interface IN8nHttpFullResponse { body: IN8nHttpResponse; headers: IDataObject; statusCode: number; - statusMessage: string; + statusMessage?: string; } export interface IExecuteFunctions { @@ -371,7 +375,8 @@ export interface IExecuteFunctions { outputIndex?: number, ): Promise; putExecutionToWait(waitTill: Date): Promise; - sendMessageToUI(message: any): void; + sendMessageToUI(message: any): void; // tslint:disable-line:no-any + sendResponse(response: IExecuteResponsePromiseData): void; // tslint:disable-line:no-any helpers: { httpRequest( requestOptions: IHttpRequestOptions, @@ -492,7 +497,10 @@ export interface IPollFunctions { } export interface ITriggerFunctions { - emit(data: INodeExecutionData[][]): void; + emit( + data: INodeExecutionData[][], + responsePromise?: IDeferredPromise, + ): void; getCredentials(type: string): Promise; getMode(): WorkflowExecuteMode; getActivationMode(): WorkflowActivateMode; @@ -975,6 +983,7 @@ export interface IWorkflowExecuteHooks { nodeExecuteBefore?: Array<(nodeName: string) => Promise>; workflowExecuteAfter?: Array<(data: IRun, newStaticData: IDataObject) => Promise>; workflowExecuteBefore?: Array<(workflow: Workflow, data: IRunExecutionData) => Promise>; + sendResponse?: Array<(response: IExecuteResponsePromiseData) => Promise>; } export interface IWorkflowExecuteAdditionalData { diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index 860eaea0dd..256b5aa2bc 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -16,6 +16,8 @@ import { Expression, IConnections, + IDeferredPromise, + IExecuteResponsePromiseData, IGetExecuteTriggerFunctions, INode, INodeExecuteFunctions, @@ -946,10 +948,23 @@ export class Workflow { // Add the manual trigger response which resolves when the first time data got emitted triggerResponse!.manualTriggerResponse = new Promise((resolve) => { - // eslint-disable-next-line @typescript-eslint/no-shadow - triggerFunctions.emit = ((resolve) => (data: INodeExecutionData[][]) => { - resolve(data); - })(resolve); + triggerFunctions.emit = ( + (resolveEmit) => + ( + data: INodeExecutionData[][], + responsePromise?: IDeferredPromise, + ) => { + additionalData.hooks!.hookFunctions.sendResponse = [ + async (response: IExecuteResponsePromiseData): Promise => { + if (responsePromise) { + responsePromise.resolve(response); + } + }, + ]; + + resolveEmit(data); + } + )(resolve); }); return triggerResponse; diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index e73f572cd0..9913f7a3ef 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -3,6 +3,7 @@ import * as LoggerProxy from './LoggerProxy'; import * as NodeHelpers from './NodeHelpers'; import * as ObservableObject from './ObservableObject'; +export * from './DeferredPromise'; export * from './Interfaces'; export * from './Expression'; export * from './NodeErrors';