diff --git a/packages/cli/commands/start.ts b/packages/cli/commands/start.ts index bb6e64f56f..d2dbfe5a97 100644 --- a/packages/cli/commands/start.ts +++ b/packages/cli/commands/start.ts @@ -22,10 +22,11 @@ import { NodeTypes, Server, TestWebhooks, + WaitTracker, } from '../src'; import { IDataObject } from 'n8n-workflow'; -import { +import { getLogger, } from '../src/Logger'; @@ -284,6 +285,8 @@ export class Start extends Command { activeWorkflowRunner = ActiveWorkflowRunner.getInstance(); await activeWorkflowRunner.init(); + const waitTracker = WaitTracker(); + const editorUrl = GenericHelpers.getBaseUrl(); this.log(`\nEditor is now accessible via:\n${editorUrl}`); diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index b5ce727ceb..6e43857d34 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -37,7 +37,7 @@ import { WorkflowExecuteAdditionalData, } from '../src'; -import { +import { getLogger, } from '../src/Logger'; @@ -150,6 +150,7 @@ export class Worker extends Command { const additionalData = await WorkflowExecuteAdditionalData.getBase(undefined, executionTimeoutTimestamp); additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(currentExecutionDb.mode, job.data.executionId, currentExecutionDb.workflowData, { retryOf: currentExecutionDb.retryOf as string }); + additionalData.executionId = jobData.executionId; let workflowExecute: WorkflowExecute; let workflowRun: PCancelable; diff --git a/packages/cli/config/index.ts b/packages/cli/config/index.ts index 476cbbcfa7..0a5273799b 100644 --- a/packages/cli/config/index.ts +++ b/packages/cli/config/index.ts @@ -489,6 +489,12 @@ const config = convict({ env: 'N8N_ENDPOINT_WEBHOOK', doc: 'Path for webhook endpoint', }, + webhookWaiting: { + format: String, + default: 'webhook-waiting', + env: 'N8N_ENDPOINT_WEBHOOK_WAIT', + doc: 'Path for waiting-webhook endpoint', + }, webhookTest: { format: String, default: 'webhook-test', diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 8a1094b685..3741ccd19e 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -35,31 +35,43 @@ export class ActiveExecutions { * @returns {string} * @memberof ActiveExecutions */ - async add(executionData: IWorkflowExecutionDataProcess, process?: ChildProcess): Promise { + async add(executionData: IWorkflowExecutionDataProcess, process?: ChildProcess, executionId?: string): Promise { - const fullExecutionData: IExecutionDb = { - data: executionData.executionData!, - mode: executionData.executionMode, - finished: false, - startedAt: new Date(), - workflowData: executionData.workflowData, - }; + if (executionId === undefined) { + // Is a new execution so save in DB - if (executionData.retryOf !== undefined) { - fullExecutionData.retryOf = executionData.retryOf.toString(); + const fullExecutionData: IExecutionDb = { + data: executionData.executionData!, + mode: executionData.executionMode, + finished: false, + startedAt: new Date(), + workflowData: executionData.workflowData, + }; + + if (executionData.retryOf !== undefined) { + fullExecutionData.retryOf = executionData.retryOf.toString(); + } + + if (executionData.workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(executionData.workflowData.id.toString()) === true) { + fullExecutionData.workflowId = executionData.workflowData.id.toString(); + } + + const execution = ResponseHelper.flattenExecutionData(fullExecutionData); + + const executionResult = await Db.collections.Execution!.save(execution as IExecutionFlattedDb); + executionId = typeof executionResult.id === "object" ? executionResult.id!.toString() : executionResult.id + ""; + } else { + // Is an existing execution we want to finish so update in DB + + const execution = { + id: executionId, + waitTill: null, + }; + + // @ts-ignore + await Db.collections.Execution!.update(executionId, execution); } - if (executionData.workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(executionData.workflowData.id.toString()) === true) { - fullExecutionData.workflowId = executionData.workflowData.id.toString(); - } - - const execution = ResponseHelper.flattenExecutionData(fullExecutionData); - - // Save the Execution in DB - const executionResult = await Db.collections.Execution!.save(execution as IExecutionFlattedDb); - - const executionId = typeof executionResult.id === "object" ? executionResult.id!.toString() : executionResult.id + ""; - this.activeExecutions[executionId] = { executionData, process, diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index ee48be8dd7..211af6fa1a 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -209,7 +209,7 @@ export class ActiveWorkflowRunner { return new Promise((resolve, reject) => { const executionMode = 'webhook'; //@ts-ignore - WebhookHelpers.executeWebhook(workflow, webhookData, workflowData, workflowStartNode, executionMode, undefined, req, res, (error: Error | null, data: object) => { + WebhookHelpers.executeWebhook(workflow, webhookData, workflowData, workflowStartNode, executionMode, undefined, undefined, undefined, req, res, (error: Error | null, data: object) => { if (error !== null) { return reject(error); } @@ -282,7 +282,7 @@ export class ActiveWorkflowRunner { * @memberof ActiveWorkflowRunner */ async addWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode, activation: WorkflowActivateMode): Promise { - const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData); + const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true); let path = '' as string | undefined; for (const webhookData of webhooks) { @@ -368,7 +368,7 @@ export class ActiveWorkflowRunner { const additionalData = await WorkflowExecuteAdditionalData.getBase(); - const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData); + const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true); for (const webhookData of webhooks) { await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, 'update', false); diff --git a/packages/cli/src/CredentialsHelper.ts b/packages/cli/src/CredentialsHelper.ts index f3427831df..fab915f73e 100644 --- a/packages/cli/src/CredentialsHelper.ts +++ b/packages/cli/src/CredentialsHelper.ts @@ -143,7 +143,7 @@ export class CredentialsHelper extends ICredentialsHelper { if (expressionResolveValues) { try { const workflow = new Workflow({ nodes: Object.values(expressionResolveValues.workflow.nodes), connections: expressionResolveValues.workflow.connectionsBySourceNode, active: false, nodeTypes: expressionResolveValues.workflow.nodeTypes }); - decryptedData = workflow.expression.getParameterValue(decryptedData as INodeParameters, expressionResolveValues.runExecutionData, expressionResolveValues.runIndex, expressionResolveValues.itemIndex, expressionResolveValues.node.name, expressionResolveValues.connectionInputData, mode, false, decryptedData) as ICredentialDataDecryptedObject; + decryptedData = workflow.expression.getParameterValue(decryptedData as INodeParameters, expressionResolveValues.runExecutionData, expressionResolveValues.runIndex, expressionResolveValues.itemIndex, expressionResolveValues.node.name, expressionResolveValues.connectionInputData, mode, {}, false, decryptedData) as ICredentialDataDecryptedObject; } catch (e) { e.message += ' [Error resolving credentials]'; throw e; @@ -160,7 +160,7 @@ export class CredentialsHelper extends ICredentialsHelper { const workflow = new Workflow({ nodes: [node!], connections: {}, active: false, nodeTypes: mockNodeTypes }); // Resolve expressions if any are set - decryptedData = workflow.expression.getComplexParameterValue(node!, decryptedData as INodeParameters, mode, undefined, decryptedData) as ICredentialDataDecryptedObject; + decryptedData = workflow.expression.getComplexParameterValue(node!, decryptedData as INodeParameters, mode, {}, undefined, decryptedData) as ICredentialDataDecryptedObject; } // Load and apply the credentials overwrites if any exist diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index e55f77e8e6..5ee6a48b7e 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -150,6 +150,7 @@ export interface IExecutionBase { // Data in regular format with references export interface IExecutionDb extends IExecutionBase { data: IRunExecutionData; + waitTill?: Date; workflowData?: IWorkflowBase; } @@ -163,6 +164,7 @@ export interface IExecutionResponse extends IExecutionBase { data: IRunExecutionData; retryOf?: string; retrySuccessId?: string; + waitTill?: Date; workflowData: IWorkflowBase; } @@ -176,6 +178,7 @@ export interface IExecutionFlatted extends IExecutionBase { export interface IExecutionFlattedDb extends IExecutionBase { id: number | string; data: string; + waitTill?: Date | null; workflowData: IWorkflowBase; } @@ -204,6 +207,7 @@ export interface IExecutionsSummary { mode: WorkflowExecuteMode; retryOf?: string; retrySuccessId?: string; + waitTill?: Date; startedAt: Date; stoppedAt?: Date; workflowId: string; diff --git a/packages/cli/src/ResponseHelper.ts b/packages/cli/src/ResponseHelper.ts index 24a9d37b53..bb447a91ba 100644 --- a/packages/cli/src/ResponseHelper.ts +++ b/packages/cli/src/ResponseHelper.ts @@ -163,6 +163,7 @@ export function flattenExecutionData(fullExecutionData: IExecutionDb): IExecutio const returnData: IExecutionFlatted = Object.assign({}, { data: stringify(fullExecutionData.data), mode: fullExecutionData.mode, + waitTill: fullExecutionData.waitTill, startedAt: fullExecutionData.startedAt, stoppedAt: fullExecutionData.stoppedAt, finished: fullExecutionData.finished ? fullExecutionData.finished : false, @@ -200,6 +201,7 @@ export function unflattenExecutionData(fullExecutionData: IExecutionFlattedDb): workflowData: fullExecutionData.workflowData as IWorkflowDb, data: parse(fullExecutionData.data), mode: fullExecutionData.mode, + waitTill: fullExecutionData.waitTill ? fullExecutionData.waitTill : undefined, startedAt: fullExecutionData.startedAt, stoppedAt: fullExecutionData.stoppedAt, finished: fullExecutionData.finished ? fullExecutionData.finished : false, diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 36ac50bbaa..f180c4b469 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -64,6 +64,9 @@ import { Push, ResponseHelper, TestWebhooks, + WaitingWebhooks, + WaitTracker, + WaitTrackerClass, WebhookHelpers, WebhookServer, WorkflowExecuteAdditionalData, @@ -96,6 +99,7 @@ import { import { FindManyOptions, FindOneOptions, + IsNull, LessThanOrEqual, Not, } from 'typeorm'; @@ -124,9 +128,11 @@ class App { activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner; testWebhooks: TestWebhooks.TestWebhooks; endpointWebhook: string; + endpointWebhookWaiting: string; endpointWebhookTest: string; endpointPresetCredentials: string; externalHooks: IExternalHooksClass; + waitTracker: WaitTrackerClass; defaultWorkflowName: string; saveDataErrorExecution: string; saveDataSuccessExecution: string; @@ -150,6 +156,7 @@ class App { this.app = express(); this.endpointWebhook = config.get('endpoints.webhook') as string; + this.endpointWebhookWaiting = config.get('endpoints.webhookWaiting') as string; this.endpointWebhookTest = config.get('endpoints.webhookTest') as string; this.defaultWorkflowName = config.get('workflows.defaultName') as string; @@ -168,6 +175,7 @@ class App { this.push = Push.getInstance(); this.activeExecutionsInstance = ActiveExecutions.getInstance(); + this.waitTracker = WaitTracker(); this.protocol = config.get('protocol'); this.sslKey = config.get('ssl_key'); @@ -620,7 +628,6 @@ class App { return { name: `${nameToReturn} ${maxSuffix + 1}` }; })); - // Returns a specific workflow this.app.get(`/${this.restEndpoint}/workflows/:id`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise => { const workflow = await Db.collections.Workflow!.findOne(req.params.id, { relations: ['tags'] }); @@ -1621,6 +1628,9 @@ class App { executingWorkflowIds.push(...this.activeExecutionsInstance.getActiveExecutions().map(execution => execution.id.toString()) as string[]); const countFilter = JSON.parse(JSON.stringify(filter)); + if (countFilter.waitTill !== undefined) { + countFilter.waitTill = Not(IsNull()); + } countFilter.id = Not(In(executingWorkflowIds)); const resultsQuery = await Db.collections.Execution! @@ -1631,6 +1641,7 @@ class App { 'execution.mode', 'execution.retryOf', 'execution.retrySuccessId', + 'execution.waitTill', 'execution.startedAt', 'execution.stoppedAt', 'execution.workflowData', @@ -1639,7 +1650,14 @@ class App { .take(limit); Object.keys(filter).forEach((filterField) => { - resultsQuery.andWhere(`execution.${filterField} = :${filterField}`, {[filterField]: filter[filterField]}); + if (filterField === 'waitTill') { + resultsQuery.andWhere(`execution.${filterField} is not null`); + } else if(filterField === 'finished' && filter[filterField] === false) { + resultsQuery.andWhere(`execution.${filterField} = :${filterField}`, {[filterField]: filter[filterField]}); + resultsQuery.andWhere(`execution.waitTill is null`); + } else { + resultsQuery.andWhere(`execution.${filterField} = :${filterField}`, {[filterField]: filter[filterField]}); + } }); if (req.query.lastId) { resultsQuery.andWhere(`execution.id < :lastId`, {lastId: req.query.lastId}); @@ -1667,6 +1685,7 @@ class App { mode: result.mode, retryOf: result.retryOf ? result.retryOf.toString() : undefined, retrySuccessId: result.retrySuccessId ? result.retrySuccessId.toString() : undefined, + waitTill: result.waitTill as Date | undefined, startedAt: result.startedAt, stoppedAt: result.stoppedAt, workflowId: result.workflowData!.id ? result.workflowData!.id!.toString() : '', @@ -1893,15 +1912,22 @@ class App { // Manual executions should still be stoppable, so // try notifying the `activeExecutions` to stop it. const result = await this.activeExecutionsInstance.stopExecution(req.params.id); - if (result !== undefined) { - const returnData: IExecutionsStopData = { + + if (result === undefined) { + // If active execution could not be found check if it is a waiting one + try { + return await this.waitTracker.stopExecution(req.params.id); + } catch (error) { + // Ignore, if it errors as then it is probably a currently running + // execution + } + } else { + return { mode: result.mode, startedAt: new Date(result.startedAt), - stoppedAt: result.stoppedAt ? new Date(result.stoppedAt) : undefined, + stoppedAt: result.stoppedAt ? new Date(result.stoppedAt) : undefined, finished: result.finished, - }; - - return returnData; + } as IExecutionsStopData; } const currentJobs = await Queue.getInstance().getJobs(['active', 'waiting']); @@ -1932,17 +1958,19 @@ class App { // Stopt he execution and wait till it is done and we got the data const result = await this.activeExecutionsInstance.stopExecution(executionId); + let returnData: IExecutionsStopData; if (result === undefined) { - throw new Error(`The execution id "${executionId}" could not be found.`); + // If active execution could not be found check if it is a waiting one + returnData = await this.waitTracker.stopExecution(executionId); + } else { + returnData = { + mode: result.mode, + startedAt: new Date(result.startedAt), + stoppedAt: result.stoppedAt ? new Date(result.stoppedAt) : undefined, + finished: result.finished, + }; } - const returnData: IExecutionsStopData = { - mode: result.mode, - startedAt: new Date(result.startedAt), - stoppedAt: result.stoppedAt ? new Date(result.stoppedAt) : undefined, - finished: result.finished, - }; - return returnData; } })); @@ -1988,6 +2016,76 @@ class App { WebhookServer.registerProductionWebhooks.apply(this); } + // ---------------------------------------- + // Waiting Webhooks + // ---------------------------------------- + + const waitingWebhooks = new WaitingWebhooks(); + + // HEAD webhook-waiting requests + this.app.head(`/${this.endpointWebhookWaiting}/*`, async (req: express.Request, res: express.Response) => { + // Cut away the "/webhook-waiting/" to get the registred part of the url + const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhookWaiting.length + 2); + + let response; + try { + response = await waitingWebhooks.executeWebhook('HEAD', requestUrl, req, res); + } catch (error) { + ResponseHelper.sendErrorResponse(res, error); + return; + } + + if (response.noWebhookResponse === true) { + // Nothing else to do as the response got already sent + return; + } + + ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + }); + + // GET webhook-waiting requests + this.app.get(`/${this.endpointWebhookWaiting}/*`, async (req: express.Request, res: express.Response) => { + // Cut away the "/webhook-waiting/" to get the registred part of the url + const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhookWaiting.length + 2); + + let response; + try { + response = await waitingWebhooks.executeWebhook('GET', requestUrl, req, res); + } catch (error) { + ResponseHelper.sendErrorResponse(res, error); + return; + } + + if (response.noWebhookResponse === true) { + // Nothing else to do as the response got already sent + return; + } + + ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + }); + + // POST webhook-waiting requests + this.app.post(`/${this.endpointWebhookWaiting}/*`, async (req: express.Request, res: express.Response) => { + // Cut away the "/webhook-waiting/" to get the registred part of the url + const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhookWaiting.length + 2); + + let response; + try { + response = await waitingWebhooks.executeWebhook('POST', requestUrl, req, res); + } catch (error) { + ResponseHelper.sendErrorResponse(res, error); + return; + } + + if (response.noWebhookResponse === true) { + // Nothing else to do as the response got already sent + return; + } + + ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + }); + + // HEAD webhook requests (test for UI) this.app.head(`/${this.endpointWebhookTest}/*`, async (req: express.Request, res: express.Response) => { // Cut away the "/webhook-test/" to get the registred part of the url diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/TestWebhooks.ts index a8aa17720f..96e6f299a5 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/TestWebhooks.ts @@ -105,7 +105,7 @@ export class TestWebhooks { return new Promise(async (resolve, reject) => { try { const executionMode = 'manual'; - const executionId = await WebhookHelpers.executeWebhook(workflow, webhookData!, this.testWebhookData[webhookKey].workflowData, workflowStartNode, executionMode, this.testWebhookData[webhookKey].sessionId, request, response, (error: Error | null, data: IResponseCallbackData) => { + const executionId = await WebhookHelpers.executeWebhook(workflow, webhookData!, this.testWebhookData[webhookKey].workflowData, workflowStartNode, executionMode, this.testWebhookData[webhookKey].sessionId, undefined, undefined, request, response, (error: Error | null, data: IResponseCallbackData) => { if (error !== null) { return reject(error); } @@ -163,10 +163,9 @@ export class TestWebhooks { * @memberof TestWebhooks */ async needsWebhookData(workflowData: IWorkflowDb, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, sessionId?: string, destinationNode?: string): Promise { - const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, destinationNode); - - if (webhooks.length === 0) { - // No Webhooks found + const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, destinationNode, true); + if (!webhooks.find(webhook => webhook.webhookDescription.restartWebhook !== true)) { + // No webhooks found to start a workflow return false; } diff --git a/packages/cli/src/WaitTracker.ts b/packages/cli/src/WaitTracker.ts new file mode 100644 index 0000000000..81ee39e418 --- /dev/null +++ b/packages/cli/src/WaitTracker.ts @@ -0,0 +1,181 @@ +import { + ActiveExecutions, + DatabaseType, + Db, + GenericHelpers, + IExecutionFlattedDb, + IExecutionsStopData, + IWorkflowExecutionDataProcess, + ResponseHelper, + WorkflowCredentials, + WorkflowRunner, +} from '.'; + +import { + IRun, + LoggerProxy as Logger, + WorkflowOperationError, +} from 'n8n-workflow'; + +import { + FindManyOptions, + LessThanOrEqual, + ObjectLiteral, +} from 'typeorm'; + +import { DateUtils } from 'typeorm/util/DateUtils'; + + +export class WaitTrackerClass { + activeExecutionsInstance: ActiveExecutions.ActiveExecutions; + + private waitingExecutions: { + [key: string]: { + executionId: string, + timer: NodeJS.Timeout, + }; + } = {}; + + mainTimer: NodeJS.Timeout; + + + constructor() { + this.activeExecutionsInstance = ActiveExecutions.getInstance(); + + // Poll every 60 seconds a list of upcoming executions + this.mainTimer = setInterval(() => { + this.getwaitingExecutions(); + }, 60000); + + this.getwaitingExecutions(); + } + + + async getwaitingExecutions() { + Logger.debug('Wait tracker querying database for waiting executions'); + // Find all the executions which should be triggered in the next 70 seconds + const findQuery: FindManyOptions = { + select: ['id', 'waitTill'], + where: { + waitTill: LessThanOrEqual(new Date(Date.now() + 70000)), + }, + order: { + waitTill: 'ASC', + }, + }; + const dbType = await GenericHelpers.getConfigValue('database.type') as DatabaseType; + if (dbType === 'sqlite') { + // This is needed because of issue in TypeORM <> SQLite: + // https://github.com/typeorm/typeorm/issues/2286 + (findQuery.where! as ObjectLiteral).waitTill = LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(new Date(Date.now() + 70000))); + } + + const executions = await Db.collections.Execution!.find(findQuery); + + if (executions.length === 0) { + return; + } + + const executionIds = executions.map(execution => execution.id.toString()).join(', '); + Logger.debug(`Wait tracker found ${executions.length} executions. Setting timer for IDs: ${executionIds}`); + + // Add timers for each waiting execution that they get started at the correct time + for (const execution of executions) { + const executionId = execution.id.toString(); + if (this.waitingExecutions[executionId] === undefined) { + const triggerTime = execution.waitTill!.getTime() - new Date().getTime(); + this.waitingExecutions[executionId] = { + executionId, + timer: setTimeout(() => { + this.startExecution(executionId); + }, triggerTime), + }; + } + } + } + + + async stopExecution(executionId: string): Promise { + if (this.waitingExecutions[executionId] !== undefined) { + // The waiting execution was already sheduled to execute. + // So stop timer and remove. + clearTimeout(this.waitingExecutions[executionId].timer); + delete this.waitingExecutions[executionId]; + } + + // Also check in database + const execution = await Db.collections.Execution!.findOne(executionId); + + if (execution === undefined || !execution.waitTill) { + throw new Error(`The execution ID "${executionId}" could not be found.`); + } + + const fullExecutionData = ResponseHelper.unflattenExecutionData(execution); + + // Set in execution in DB as failed and remove waitTill time + const error = new WorkflowOperationError('Workflow-Execution has been canceled!'); + + fullExecutionData.data.resultData.error = { + ...error, + message: error.message, + stack: error.stack, + }; + + fullExecutionData.stoppedAt = new Date(); + fullExecutionData.waitTill = undefined; + + await Db.collections.Execution!.update(executionId, ResponseHelper.flattenExecutionData(fullExecutionData)); + + return { + mode: fullExecutionData.mode, + startedAt: new Date(fullExecutionData.startedAt), + stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined, + finished: fullExecutionData.finished, + }; + } + + + startExecution(executionId: string) { + Logger.debug(`Wait tracker resuming execution ${executionId}`, {executionId}); + delete this.waitingExecutions[executionId]; + + (async () => { + // Get the data to execute + const fullExecutionDataFlatted = await Db.collections.Execution!.findOne(executionId); + + if (fullExecutionDataFlatted === undefined) { + throw new Error(`The execution with the id "${executionId}" does not exist.`); + } + + const fullExecutionData = ResponseHelper.unflattenExecutionData(fullExecutionDataFlatted); + + if (fullExecutionData.finished === true) { + throw new Error('The execution did succeed and can so not be started again.'); + } + + const data: IWorkflowExecutionDataProcess = { + executionMode: fullExecutionData.mode, + executionData: fullExecutionData.data, + workflowData: fullExecutionData.workflowData, + }; + + // Start the execution again + const workflowRunner = new WorkflowRunner(); + await workflowRunner.run(data, false, false, executionId); + })().catch((error) => { + Logger.error(`There was a problem starting the waiting execution with id "${executionId}": "${error.message}"`, { executionId }); + }); + + } +} + + +let waitTrackerInstance: WaitTrackerClass | undefined; + +export function WaitTracker(): WaitTrackerClass { + if (waitTrackerInstance === undefined) { + waitTrackerInstance = new WaitTrackerClass(); + } + + return waitTrackerInstance; +} diff --git a/packages/cli/src/WaitingWebhooks.ts b/packages/cli/src/WaitingWebhooks.ts new file mode 100644 index 0000000000..f0b84d3804 --- /dev/null +++ b/packages/cli/src/WaitingWebhooks.ts @@ -0,0 +1,117 @@ +import { + Db, + IExecutionResponse, + IResponseCallbackData, + IWorkflowDb, + NodeTypes, + ResponseHelper, + WebhookHelpers, + WorkflowCredentials, + WorkflowExecuteAdditionalData, +} from '.'; + +import { + INode, + IRunExecutionData, + NodeHelpers, + WebhookHttpMethod, + Workflow, +} from 'n8n-workflow'; + +import * as express from 'express'; +import { + LoggerProxy as Logger, +} from 'n8n-workflow'; + +export class WaitingWebhooks { + + async executeWebhook(httpMethod: WebhookHttpMethod, fullPath: string, req: express.Request, res: express.Response): Promise { + Logger.debug(`Received waiting-webhoook "${httpMethod}" for path "${fullPath}"`); + + // Reset request parameters + req.params = {}; + + // Remove trailing slash + if (fullPath.endsWith('/')) { + fullPath = fullPath.slice(0, -1); + } + + const pathParts = fullPath.split('/'); + + const executionId = pathParts.shift(); + const path = pathParts.join('/'); + + const execution = await Db.collections.Execution?.findOne(executionId); + + if (execution === undefined) { + throw new ResponseHelper.ResponseError(`The execution "${executionId} does not exist.`, 404, 404); + } + + const fullExecutionData = ResponseHelper.unflattenExecutionData(execution); + + if (fullExecutionData.finished === true || fullExecutionData.data.resultData.error) { + throw new ResponseHelper.ResponseError(`The execution "${executionId} has finished already.`, 409, 409); + } + + return this.startExecution(httpMethod, path, fullExecutionData, req, res); + } + + + async startExecution(httpMethod: WebhookHttpMethod, path: string, fullExecutionData: IExecutionResponse, req: express.Request, res: express.Response): Promise { + const executionId = fullExecutionData.id; + + if (fullExecutionData.finished === true) { + throw new Error('The execution did succeed and can so not be started again.'); + } + + const lastNodeExecuted = fullExecutionData!.data.resultData.lastNodeExecuted as string; + + // Set the node as disabled so that the data does not get executed again as it would result + // in starting the wait all over again + fullExecutionData!.data.executionData!.nodeExecutionStack[0].node.disabled = true; + + // Remove waitTill information else the execution would stop + fullExecutionData!.data.waitTill = undefined; + + // Remove the data of the node execution again else it will display the node as executed twice + fullExecutionData!.data.resultData.runData[lastNodeExecuted].pop(); + + const workflowData = fullExecutionData.workflowData; + + const nodeTypes = NodeTypes(); + const workflow = new Workflow({ id: workflowData.id!.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings }); + + const additionalData = await WorkflowExecuteAdditionalData.getBase(); + + const webhookData = NodeHelpers.getNodeWebhooks(workflow, workflow.getNode(lastNodeExecuted) as INode, additionalData).filter((webhook) => { + return (webhook.httpMethod === httpMethod && webhook.path === path && webhook.webhookDescription.restartWebhook === true); + })[0]; + + if (webhookData === undefined) { + // If no data got found it means that the execution can not be started via a webhook. + // Return 404 because we do not want to give any data if the execution exists or not. + const errorMessage = `The execution "${executionId}" with webhook suffix path "${path}" is not known.`; + throw new ResponseHelper.ResponseError(errorMessage, 404, 404); + } + + const workflowStartNode = workflow.getNode(lastNodeExecuted); + + if (workflowStartNode === null) { + throw new ResponseHelper.ResponseError('Could not find node to process webhook.', 404, 404); + } + + const runExecutionData = fullExecutionData.data as IRunExecutionData; + + return new Promise((resolve, reject) => { + const executionMode = 'webhook'; + WebhookHelpers.executeWebhook(workflow, webhookData, workflowData as IWorkflowDb, workflowStartNode, executionMode, undefined, runExecutionData, fullExecutionData.id, req, res, (error: Error | null, data: object) => { + if (error !== null) { + return reject(error); + } + resolve(data); + }); + }); + + } + +} diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index a88572d3ac..6005f7740d 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -3,7 +3,6 @@ import { get } from 'lodash'; import { ActiveExecutions, - ExternalHooks, GenericHelpers, IExecutionDb, IResponseCallbackData, @@ -29,6 +28,7 @@ import { IRunExecutionData, IWebhookData, IWebhookResponseData, + IWorkflowDataProxyAdditionalKeys, IWorkflowExecuteAdditionalData, LoggerProxy as Logger, NodeHelpers, @@ -47,7 +47,7 @@ const activeExecutions = ActiveExecutions.getInstance(); * @param {Workflow} workflow * @returns {IWebhookData[]} */ -export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, destinationNode?: string): IWebhookData[] { +export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, destinationNode?: string, ignoreRestartWehbooks = false): IWebhookData[] { // Check all the nodes in the workflow if they have webhooks const returnData: IWebhookData[] = []; @@ -65,7 +65,7 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo // and no other ones continue; } - returnData.push.apply(returnData, NodeHelpers.getNodeWebhooks(workflow, node, additionalData)); + returnData.push.apply(returnData, NodeHelpers.getNodeWebhooks(workflow, node, additionalData, ignoreRestartWehbooks)); } return returnData; @@ -106,7 +106,7 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { * @param {((error: Error | null, data: IResponseCallbackData) => void)} responseCallback * @returns {(Promise)} */ - export async function executeWebhook(workflow: Workflow, webhookData: IWebhookData, workflowData: IWorkflowDb, workflowStartNode: INode, executionMode: WorkflowExecuteMode, sessionId: string | undefined, req: express.Request, res: express.Response, responseCallback: (error: Error | null, data: IResponseCallbackData) => void): Promise { +export async function executeWebhook(workflow: Workflow, webhookData: IWebhookData, workflowData: IWorkflowDb, workflowStartNode: INode, executionMode: WorkflowExecuteMode, sessionId: string | undefined, runExecutionData: IRunExecutionData | undefined, executionId: string | undefined, req: express.Request, res: express.Response, responseCallback: (error: Error | null, data: IResponseCallbackData) => void): Promise { // Get the nodeType to know which responseMode is set const nodeType = workflow.nodeTypes.getByName(workflowStartNode.type); if (nodeType === undefined) { @@ -115,9 +115,13 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { throw new ResponseHelper.ResponseError(errorMessage, 500, 500); } + const additionalKeys: IWorkflowDataProxyAdditionalKeys = { + $executionId: executionId, + }; + // Get the responseMode - const responseMode = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseMode'], executionMode, 'onReceived'); - const responseCode = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseCode'], executionMode, 200) as number; + const responseMode = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseMode'], executionMode, additionalKeys, 'onReceived'); + const responseCode = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseCode'], executionMode, additionalKeys, 200) as number; if (!['onReceived', 'lastNode'].includes(responseMode as string)) { // If the mode is not known we error. Is probably best like that instead of using @@ -174,8 +178,12 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { // Save static data if it changed await WorkflowHelpers.saveStaticData(workflow); + const additionalKeys: IWorkflowDataProxyAdditionalKeys = { + $executionId: executionId, + }; + if (webhookData.webhookDescription['responseHeaders'] !== undefined) { - const responseHeaders = workflow.expression.getComplexParameterValue(workflowStartNode, webhookData.webhookDescription['responseHeaders'], executionMode, undefined) as { + const responseHeaders = workflow.expression.getComplexParameterValue(workflowStartNode, webhookData.webhookDescription['responseHeaders'], executionMode, additionalKeys, undefined) as { entries?: Array<{ name: string; value: string; @@ -256,7 +264,7 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { } ); - const runExecutionData: IRunExecutionData = { + runExecutionData = runExecutionData || { startData: { }, resultData: { @@ -267,7 +275,13 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { nodeExecutionStack, waitingExecution: {}, }, - }; + } as IRunExecutionData; + + if (executionId !== undefined) { + // Set the data the webhook node did return on the waiting node if executionId + // already exists as it means that we are restarting an existing execution. + runExecutionData.executionData!.nodeExecutionStack[0].data.main = webhookResultData.workflowData; + } if (Object.keys(runExecutionDataMerge).length !== 0) { // If data to merge got defined add it to the execution data @@ -283,7 +297,7 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { // Start now to run the workflow const workflowRunner = new WorkflowRunner(); - const executionId = await workflowRunner.run(runData, true, !didSendResponse); + executionId = await workflowRunner.run(runData, true, !didSendResponse, executionId); Logger.verbose(`Started execution of workflow "${workflow.name}" from webhook with execution ID ${executionId}`, { executionId }); @@ -330,7 +344,11 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { return data; } - const responseData = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseData'], executionMode, 'firstEntryJson'); + const additionalKeys: IWorkflowDataProxyAdditionalKeys = { + $executionId: executionId, + }; + + const responseData = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseData'], executionMode, additionalKeys, 'firstEntryJson'); if (didSendResponse === false) { let data: IDataObject | IDataObject[]; @@ -345,13 +363,13 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { data = returnData.data!.main[0]![0].json; - const responsePropertyName = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responsePropertyName'], executionMode, undefined); + const responsePropertyName = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responsePropertyName'], executionMode, additionalKeys, undefined); if (responsePropertyName !== undefined) { data = get(data, responsePropertyName as string) as IDataObject; } - const responseContentType = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseContentType'], executionMode, undefined); + const responseContentType = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseContentType'], executionMode, additionalKeys, undefined); if (responseContentType !== undefined) { // Send the webhook response manually to be able to set the content-type @@ -384,7 +402,7 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { didSendResponse = true; } - const responseBinaryPropertyName = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseBinaryPropertyName'], executionMode, 'data'); + const responseBinaryPropertyName = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseBinaryPropertyName'], executionMode, additionalKeys, 'data'); if (responseBinaryPropertyName === undefined && didSendResponse === false) { responseCallback(new Error('No "responseBinaryPropertyName" is set.'), {}); diff --git a/packages/cli/src/WebhookServer.ts b/packages/cli/src/WebhookServer.ts index 245e3a2504..83c28ab2d0 100644 --- a/packages/cli/src/WebhookServer.ts +++ b/packages/cli/src/WebhookServer.ts @@ -26,6 +26,11 @@ import * as config from '../config'; import * as parseUrl from 'parseurl'; export function registerProductionWebhooks() { + + // ---------------------------------------- + // Regular Webhooks + // ---------------------------------------- + // HEAD webhook requests this.app.head(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => { // Cut away the "/webhook/" to get the registred part of the url diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 8a55fcdb08..cc25661354 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -256,7 +256,7 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx if (execution === undefined) { // Something went badly wrong if this happens. // This check is here mostly to make typescript happy. - return undefined; + return; } const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution); @@ -267,11 +267,9 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx return; } - if (fullExecutionData.data === undefined) { fullExecutionData.data = { - startData: { - }, + startData: {}, resultData: { runData: {}, }, @@ -351,7 +349,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { saveManualExecutions = this.workflowData.settings.saveManualExecutions as boolean; } - if (isManualMode && saveManualExecutions === false) { + if (isManualMode && saveManualExecutions === false && !fullRunData.waitTill) { // Data is always saved, so we remove from database await Db.collections.Execution!.delete(this.executionId); return; @@ -369,12 +367,14 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { if (workflowDidSucceed === true && saveDataSuccessExecution === 'none' || workflowDidSucceed === false && saveDataErrorExecution === 'none' ) { - if (!isManualMode) { - executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf); + if (!fullRunData.waitTill) { + if (!isManualMode) { + executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf); + } + // Data is always saved, so we remove from database + await Db.collections.Execution!.delete(this.executionId); + return; } - // Data is always saved, so we remove from database - await Db.collections.Execution!.delete(this.executionId); - return; } const fullExecutionData: IExecutionDb = { @@ -384,6 +384,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { startedAt: fullRunData.startedAt, stoppedAt: fullRunData.stoppedAt, workflowData: this.workflowData, + waitTill: fullRunData.waitTill, }; if (this.retryOf !== undefined) { @@ -469,6 +470,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { startedAt: fullRunData.startedAt, stoppedAt: fullRunData.stoppedAt, workflowData: this.workflowData, + waitTill: fullRunData.data.waitTill, }; if (this.retryOf !== undefined) { @@ -731,6 +733,7 @@ export async function getBase(currentNodeParameters?: INodeParameters, execution const timezone = config.get('generic.timezone') as string; const webhookBaseUrl = urlBaseWebhook + config.get('endpoints.webhook') as string; + const webhookWaitingBaseUrl = urlBaseWebhook + config.get('endpoints.webhookWaiting') as string; const webhookTestBaseUrl = urlBaseWebhook + config.get('endpoints.webhookTest') as string; const encryptionKey = await UserSettings.getEncryptionKey(); @@ -745,6 +748,7 @@ export async function getBase(currentNodeParameters?: INodeParameters, execution restApiUrl: urlBaseWebhook + config.get('endpoints.rest') as string, timezone, webhookBaseUrl, + webhookWaitingBaseUrl, webhookTestBaseUrl, currentNodeParameters, executionTimeoutTimestamp, diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 564d01a975..9a8c66430f 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -123,19 +123,18 @@ export class WorkflowRunner { * @returns {Promise} * @memberof WorkflowRunner */ - async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean): Promise { + async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean, executionId?: string): Promise { const executionsProcess = config.get('executions.process') as string; const executionsMode = config.get('executions.mode') as string; - let executionId: string; if (executionsMode === 'queue' && data.executionMode !== 'manual') { // Do not run "manual" executions in bull because sending events to the // frontend would not be possible - executionId = await this.runBull(data, loadStaticData, realtime); + executionId = await this.runBull(data, loadStaticData, realtime, executionId); } else if (executionsProcess === 'main') { - executionId = await this.runMainProcess(data, loadStaticData); + executionId = await this.runMainProcess(data, loadStaticData, executionId); } else { - executionId = await this.runSubprocess(data, loadStaticData); + executionId = await this.runSubprocess(data, loadStaticData, executionId); } const externalHooks = ExternalHooks(); @@ -162,7 +161,7 @@ export class WorkflowRunner { * @returns {Promise} * @memberof WorkflowRunner */ - async runMainProcess(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise { + async runMainProcess(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, restartExecutionId?: string): Promise { if (loadStaticData === true && data.workflowData.id) { data.workflowData.staticData = await WorkflowHelpers.getStaticDataById(data.workflowData.id as string); } @@ -186,7 +185,10 @@ export class WorkflowRunner { const additionalData = await WorkflowExecuteAdditionalData.getBase(undefined, workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000); // Register the active execution - const executionId = await this.activeExecutions.add(data, undefined); + const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId) as string; + additionalData.executionId = executionId; + + Logger.verbose(`Execution for workflow ${data.workflowData.name} was assigned id ${executionId}`, {executionId}); let workflowExecution: PCancelable; try { @@ -240,12 +242,12 @@ export class WorkflowRunner { return executionId; } - async runBull(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean): Promise { + async runBull(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean, restartExecutionId?: string): Promise { // TODO: If "loadStaticData" is set to true it has to load data new on worker // Register the active execution - const executionId = await this.activeExecutions.add(data, undefined); + const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId); const jobData: IBullJobData = { executionId, @@ -412,7 +414,7 @@ export class WorkflowRunner { * @returns {Promise} * @memberof WorkflowRunner */ - async runSubprocess(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise { + async runSubprocess(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, restartExecutionId?: string): Promise { let startedAt = new Date(); const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js')); @@ -421,7 +423,7 @@ export class WorkflowRunner { } // Register the active execution - const executionId = await this.activeExecutions.add(data, subprocess); + const executionId = await this.activeExecutions.add(data, subprocess, restartExecutionId); // Supply all nodeTypes and credentialTypes const nodeTypeData = WorkflowHelpers.getAllNodeTypeData() as ITransferNodeTypes; diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 715b8ada44..6b3262322b 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -150,6 +150,7 @@ export class WorkflowRunnerProcess { this.workflow = new Workflow({ id: this.data.workflowData.id as string | undefined, name: this.data.workflowData.name, nodes: this.data.workflowData!.nodes, connections: this.data.workflowData!.connections, active: this.data.workflowData!.active, nodeTypes, staticData: this.data.workflowData!.staticData, settings: this.data.workflowData!.settings }); const additionalData = await WorkflowExecuteAdditionalData.getBase(undefined, workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000); additionalData.hooks = this.getProcessForwardHooks(); + additionalData.executionId = inputData.executionId; additionalData.sendMessageToUI = async (source: string, message: any) => { // tslint:disable-line:no-any if (workflowRunner.data!.executionMode !== 'manual') { diff --git a/packages/cli/src/databases/entities/ExecutionEntity.ts b/packages/cli/src/databases/entities/ExecutionEntity.ts index b788f5e153..ba6b60807f 100644 --- a/packages/cli/src/databases/entities/ExecutionEntity.ts +++ b/packages/cli/src/databases/entities/ExecutionEntity.ts @@ -53,4 +53,8 @@ export class ExecutionEntity implements IExecutionFlattedDb { @Index() @Column({ nullable: true }) workflowId: string; + + @Index() + @Column({ type: resolveDataType('datetime') as ColumnOptions['type'], nullable: true }) + waitTill: Date; } diff --git a/packages/cli/src/databases/mysqldb/migrations/1626183952959-AddWaitColumn.ts b/packages/cli/src/databases/mysqldb/migrations/1626183952959-AddWaitColumn.ts new file mode 100644 index 0000000000..ee2aa560e1 --- /dev/null +++ b/packages/cli/src/databases/mysqldb/migrations/1626183952959-AddWaitColumn.ts @@ -0,0 +1,22 @@ +import {MigrationInterface, QueryRunner} from "typeorm"; +import * as config from '../../../../config'; + +export class AddWaitColumnId1626183952959 implements MigrationInterface { + name = 'AddWaitColumnId1626183952959'; + + async up(queryRunner: QueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + + await queryRunner.query('ALTER TABLE `' + tablePrefix + 'execution_entity` ADD `waitTill` DATETIME NULL'); + await queryRunner.query('CREATE INDEX `IDX_' + tablePrefix + 'ca4a71b47f28ac6ea88293a8e2` ON `' + tablePrefix + 'execution_entity` (`waitTill`)'); + } + + async down(queryRunner: QueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + + await queryRunner.query( + 'DROP INDEX `IDX_' + tablePrefix + 'ca4a71b47f28ac6ea88293a8e2` ON `' + tablePrefix + 'execution_entity`' + ); + await queryRunner.query('ALTER TABLE `' + tablePrefix + 'execution_entity` DROP COLUMN `waitTill`'); + } +} diff --git a/packages/cli/src/databases/mysqldb/migrations/index.ts b/packages/cli/src/databases/mysqldb/migrations/index.ts index 1054d68cfc..b48bc58aff 100644 --- a/packages/cli/src/databases/mysqldb/migrations/index.ts +++ b/packages/cli/src/databases/mysqldb/migrations/index.ts @@ -8,6 +8,7 @@ import { ChangeCredentialDataSize1620729500000 } from './1620729500000-ChangeCre import { CreateTagEntity1617268711084 } from './1617268711084-CreateTagEntity'; import { UniqueWorkflowNames1620826335440 } from './1620826335440-UniqueWorkflowNames'; import { CertifyCorrectCollation1623936588000 } from './1623936588000-CertifyCorrectCollation'; +import { AddWaitColumnId1626183952959 } from './1626183952959-AddWaitColumn'; export const mysqlMigrations = [ InitialMigration1588157391238, @@ -20,4 +21,5 @@ export const mysqlMigrations = [ CreateTagEntity1617268711084, UniqueWorkflowNames1620826335440, CertifyCorrectCollation1623936588000, + AddWaitColumnId1626183952959, ]; diff --git a/packages/cli/src/databases/postgresdb/migrations/1626176912946-AddwaitTill.ts b/packages/cli/src/databases/postgresdb/migrations/1626176912946-AddwaitTill.ts new file mode 100644 index 0000000000..3bef043a83 --- /dev/null +++ b/packages/cli/src/databases/postgresdb/migrations/1626176912946-AddwaitTill.ts @@ -0,0 +1,31 @@ +import {MigrationInterface, QueryRunner} from "typeorm"; +import * as config from '../../../../config'; + +export class AddwaitTill1626176912946 implements MigrationInterface { + name = 'AddwaitTill1626176912946'; + + async up(queryRunner: QueryRunner): Promise { + let tablePrefix = config.get('database.tablePrefix'); + const tablePrefixPure = tablePrefix; + const schema = config.get('database.postgresdb.schema'); + if (schema) { + tablePrefix = schema + '.' + tablePrefix; + } + + await queryRunner.query(`ALTER TABLE ${tablePrefix}execution_entity ADD "waitTill" TIMESTAMP`); + await queryRunner.query(`CREATE INDEX IF NOT EXISTS IDX_${tablePrefixPure}ca4a71b47f28ac6ea88293a8e2 ON ${tablePrefix}execution_entity ("waitTill")`); + } + + async down(queryRunner: QueryRunner): Promise { + let tablePrefix = config.get('database.tablePrefix'); + const tablePrefixPure = tablePrefix; + const schema = config.get('database.postgresdb.schema'); + if (schema) { + tablePrefix = schema + '.' + tablePrefix; + } + + await queryRunner.query(`DROP INDEX IDX_${tablePrefixPure}ca4a71b47f28ac6ea88293a8e2`); + await queryRunner.query(`ALTER TABLE ${tablePrefix}webhook_entity DROP COLUMN "waitTill"`); + } + +} diff --git a/packages/cli/src/databases/postgresdb/migrations/index.ts b/packages/cli/src/databases/postgresdb/migrations/index.ts index 0f6cc669c9..83983dd039 100644 --- a/packages/cli/src/databases/postgresdb/migrations/index.ts +++ b/packages/cli/src/databases/postgresdb/migrations/index.ts @@ -5,6 +5,7 @@ import { AddWebhookId1611144599516 } from './1611144599516-AddWebhookId'; import { MakeStoppedAtNullable1607431743768 } from './1607431743768-MakeStoppedAtNullable'; import { CreateTagEntity1617270242566 } from './1617270242566-CreateTagEntity'; import { UniqueWorkflowNames1620824779533 } from './1620824779533-UniqueWorkflowNames'; +import { AddwaitTill1626176912946 } from './1626176912946-AddwaitTill'; export const postgresMigrations = [ InitialMigration1587669153312, @@ -14,4 +15,5 @@ export const postgresMigrations = [ MakeStoppedAtNullable1607431743768, CreateTagEntity1617270242566, UniqueWorkflowNames1620824779533, + AddwaitTill1626176912946, ]; diff --git a/packages/cli/src/databases/sqlite/migrations/1621707690587-AddWaitColumn.ts b/packages/cli/src/databases/sqlite/migrations/1621707690587-AddWaitColumn.ts new file mode 100644 index 0000000000..17ed2c2a9a --- /dev/null +++ b/packages/cli/src/databases/sqlite/migrations/1621707690587-AddWaitColumn.ts @@ -0,0 +1,31 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; +import * as config from '../../../../config'; + +export class AddWaitColumn1621707690587 implements MigrationInterface { + name = 'AddWaitColumn1621707690587'; + + async up(queryRunner: QueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + + await queryRunner.query(`CREATE TABLE IF NOT EXISTS "${tablePrefix}temporary_execution_entity" ("id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "data" text NOT NULL, "finished" boolean NOT NULL, "mode" varchar NOT NULL, "retryOf" varchar, "retrySuccessId" varchar, "startedAt" datetime NOT NULL, "stoppedAt" datetime, "workflowData" text NOT NULL, "workflowId" varchar, "waitTill" DATETIME)`, undefined); + await queryRunner.query(`INSERT INTO "${tablePrefix}temporary_execution_entity"("id", "data", "finished", "mode", "retryOf", "retrySuccessId", "startedAt", "stoppedAt", "workflowData", "workflowId") SELECT "id", "data", "finished", "mode", "retryOf", "retrySuccessId", "startedAt", "stoppedAt", "workflowData", "workflowId" FROM "${tablePrefix}execution_entity"`); + await queryRunner.query(`DROP TABLE "${tablePrefix}execution_entity"`); + await queryRunner.query(`ALTER TABLE "${tablePrefix}temporary_execution_entity" RENAME TO "${tablePrefix}execution_entity"`); + await queryRunner.query(`CREATE INDEX "IDX_${tablePrefix}cefb067df2402f6aed0638a6c1" ON "${tablePrefix}execution_entity" ("stoppedAt")`); + await queryRunner.query(`CREATE INDEX "IDX_${tablePrefix}ca4a71b47f28ac6ea88293a8e2" ON "${tablePrefix}execution_entity" ("waitTill")`); + await queryRunner.query(`VACUUM;`); + } + + async down(queryRunner: QueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + + await queryRunner.query(`CREATE TABLE IF NOT EXISTS "${tablePrefix}temporary_execution_entity" ("id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "data" text NOT NULL, "finished" boolean NOT NULL, "mode" varchar NOT NULL, "retryOf" varchar, "retrySuccessId" varchar, "startedAt" datetime NOT NULL, "stoppedAt" datetime, "workflowData" text NOT NULL, "workflowId" varchar)`, undefined); + await queryRunner.query(`INSERT INTO "${tablePrefix}temporary_execution_entity"("id", "data", "finished", "mode", "retryOf", "retrySuccessId", "startedAt", "stoppedAt", "workflowData", "workflowId") SELECT "id", "data", "finished", "mode", "retryOf", "retrySuccessId", "startedAt", "stoppedAt", "workflowData", "workflowId" FROM "${tablePrefix}execution_entity"`); + await queryRunner.query(`DROP TABLE "${tablePrefix}execution_entity"`); + await queryRunner.query(`ALTER TABLE "${tablePrefix}temporary_execution_entity" RENAME TO "${tablePrefix}execution_entity"`); + await queryRunner.query(`CREATE INDEX "IDX_${tablePrefix}cefb067df2402f6aed0638a6c1" ON "${tablePrefix}execution_entity" ("stoppedAt")`); + await queryRunner.query(`VACUUM;`); + + } + +} diff --git a/packages/cli/src/databases/sqlite/migrations/index.ts b/packages/cli/src/databases/sqlite/migrations/index.ts index 14b5184954..64038d9e30 100644 --- a/packages/cli/src/databases/sqlite/migrations/index.ts +++ b/packages/cli/src/databases/sqlite/migrations/index.ts @@ -5,6 +5,7 @@ import { AddWebhookId1611071044839 } from './1611071044839-AddWebhookId'; import { MakeStoppedAtNullable1607431743769 } from './1607431743769-MakeStoppedAtNullable'; import { CreateTagEntity1617213344594 } from './1617213344594-CreateTagEntity'; import { UniqueWorkflowNames1620821879465 } from './1620821879465-UniqueWorkflowNames'; +import { AddWaitColumn1621707690587 } from './1621707690587-AddWaitColumn'; export const sqliteMigrations = [ InitialMigration1588102412422, @@ -14,4 +15,5 @@ export const sqliteMigrations = [ MakeStoppedAtNullable1607431743769, CreateTagEntity1617213344594, UniqueWorkflowNames1620821879465, + AddWaitColumn1621707690587, ]; diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index cc7e942fe5..296348cf65 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -5,6 +5,8 @@ export * from './ExternalHooks'; export * from './Interfaces'; export * from './LoadNodesAndCredentials'; export * from './NodeTypes'; +export * from './WaitTracker'; +export * from './WaitingWebhooks'; export * from './WorkflowCredentials'; export * from './WorkflowRunner'; diff --git a/packages/core/src/Constants.ts b/packages/core/src/Constants.ts index a11b6d9402..4e72f53f9c 100644 --- a/packages/core/src/Constants.ts +++ b/packages/core/src/Constants.ts @@ -5,4 +5,6 @@ export const EXTENSIONS_SUBDIRECTORY = 'custom'; export const USER_FOLDER_ENV_OVERWRITE = 'N8N_USER_FOLDER'; export const USER_SETTINGS_FILE_NAME = 'config'; export const USER_SETTINGS_SUBFOLDER = '.n8n'; +export const PLACEHOLDER_EMPTY_EXECUTION_ID = '__UNKOWN__'; export const TUNNEL_SUBDOMAIN_ENV = 'N8N_TUNNEL_SUBDOMAIN'; +export const WAIT_TIME_UNLIMITED = '3000-01-01T00:00:00.000Z'; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index cb94d6c2d2..582f5ac1ca 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -4,6 +4,7 @@ import { ILoadOptionsFunctions, IResponseError, IWorkflowSettings, + PLACEHOLDER_EMPTY_EXECUTION_ID, } from './'; import { @@ -28,6 +29,7 @@ import { IWebhookData, IWebhookDescription, IWebhookFunctions, + IWorkflowDataProxyAdditionalKeys, IWorkflowDataProxyData, IWorkflowExecuteAdditionalData, IWorkflowMetadata, @@ -322,6 +324,23 @@ export function returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExe +/** + * Returns the additional keys for Expressions and Function-Nodes + * + * @export + * @param {IWorkflowExecuteAdditionalData} additionalData + * @returns {(IWorkflowDataProxyAdditionalKeys)} + */ +export function getAdditionalKeys(additionalData: IWorkflowExecuteAdditionalData): IWorkflowDataProxyAdditionalKeys { + const executionId = additionalData.executionId || PLACEHOLDER_EMPTY_EXECUTION_ID; + return { + $executionId: executionId, + $resumeWebhookUrl: `${additionalData.webhookWaitingBaseUrl}/${executionId}`, + }; +} + + + /** * Returns the requested decrypted credentials if the node has access to them. * @@ -420,7 +439,7 @@ export function getNode(node: INode): INode { * @param {*} [fallbackValue] * @returns {(NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object)} */ -export function getNodeParameter(workflow: Workflow, runExecutionData: IRunExecutionData | null, runIndex: number, connectionInputData: INodeExecutionData[], node: INode, parameterName: string, itemIndex: number, mode: WorkflowExecuteMode, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object { //tslint:disable-line:no-any +export function getNodeParameter(workflow: Workflow, runExecutionData: IRunExecutionData | null, runIndex: number, connectionInputData: INodeExecutionData[], node: INode, parameterName: string, itemIndex: number, mode: WorkflowExecuteMode, additionalKeys: IWorkflowDataProxyAdditionalKeys, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object { //tslint:disable-line:no-any const nodeType = workflow.nodeTypes.getByName(node.type); if (nodeType === undefined) { throw new Error(`Node type "${node.type}" is not known so can not return paramter value!`); @@ -434,7 +453,7 @@ export function getNodeParameter(workflow: Workflow, runExecutionData: IRunExecu let returnData; try { - returnData = workflow.expression.getParameterValue(value, runExecutionData, runIndex, itemIndex, node.name, connectionInputData, mode); + returnData = workflow.expression.getParameterValue(value, runExecutionData, runIndex, itemIndex, node.name, connectionInputData, mode, additionalKeys); } catch (e) { e.message += ` [Error in parameter: "${parameterName}"]`; throw e; @@ -469,7 +488,7 @@ export function continueOnFail(node: INode): boolean { * @param {boolean} [isTest] * @returns {(string | undefined)} */ -export function getNodeWebhookUrl(name: string, workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, isTest?: boolean): string | undefined { +export function getNodeWebhookUrl(name: string, workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, additionalKeys: IWorkflowDataProxyAdditionalKeys, isTest?: boolean): string | undefined { let baseUrl = additionalData.webhookBaseUrl; if (isTest === true) { baseUrl = additionalData.webhookTestBaseUrl; @@ -480,12 +499,12 @@ export function getNodeWebhookUrl(name: string, workflow: Workflow, node: INode, return undefined; } - const path = workflow.expression.getSimpleParameterValue(node, webhookDescription['path'], mode); + const path = workflow.expression.getSimpleParameterValue(node, webhookDescription['path'], mode, additionalKeys); if (path === undefined) { return undefined; } - const isFullPath: boolean = workflow.expression.getSimpleParameterValue(node, webhookDescription['isFullPath'], mode, false) as boolean; + const isFullPath: boolean = workflow.expression.getSimpleParameterValue(node, webhookDescription['isFullPath'], mode, additionalKeys, false) as boolean; return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id!, node, path.toString(), isFullPath); } @@ -588,7 +607,7 @@ export function getExecutePollFunctions(workflow: Workflow, node: INode, additio const runIndex = 0; const connectionInputData: INodeExecutionData[] = []; - return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, fallbackValue); + return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, getAdditionalKeys(additionalData), fallbackValue); }, getRestApiUrl: (): string => { return additionalData.restApiUrl; @@ -654,7 +673,7 @@ export function getExecuteTriggerFunctions(workflow: Workflow, node: INode, addi const runIndex = 0; const connectionInputData: INodeExecutionData[] = []; - return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, fallbackValue); + return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, getAdditionalKeys(additionalData), fallbackValue); }, getRestApiUrl: (): string => { return additionalData.restApiUrl; @@ -706,7 +725,7 @@ export function getExecuteFunctions(workflow: Workflow, runExecutionData: IRunEx return continueOnFail(node); }, evaluateExpression: (expression: string, itemIndex: number) => { - return workflow.expression.resolveSimpleParameterValue('=' + expression, {}, runExecutionData, runIndex, itemIndex, node.name, connectionInputData, mode); + return workflow.expression.resolveSimpleParameterValue('=' + expression, {}, runExecutionData, runIndex, itemIndex, node.name, connectionInputData, mode, getAdditionalKeys(additionalData)); }, async executeWorkflow(workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[]): Promise { // tslint:disable-line:no-any return additionalData.executeWorkflow(workflowInfo, additionalData, inputData); @@ -717,6 +736,9 @@ export function getExecuteFunctions(workflow: Workflow, runExecutionData: IRunEx async getCredentials(type: string, itemIndex?: number): Promise { return await getCredentials(workflow, node, type, additionalData, mode, runExecutionData, runIndex, connectionInputData, itemIndex); }, + getExecutionId: (): string => { + return additionalData.executionId!; + }, getInputData: (inputIndex = 0, inputName = 'main') => { if (!inputData.hasOwnProperty(inputName)) { @@ -729,17 +751,15 @@ export function getExecuteFunctions(workflow: Workflow, runExecutionData: IRunEx throw new Error(`Could not get input index "${inputIndex}" of input "${inputName}"!`); } - if (inputData[inputName][inputIndex] === null) { // return []; throw new Error(`Value "${inputIndex}" of input "${inputName}" did not get set!`); } - // TODO: Maybe do clone of data only here so it only clones the data that is really needed return inputData[inputName][inputIndex] as INodeExecutionData[]; }, getNodeParameter: (parameterName: string, itemIndex: number, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object => { //tslint:disable-line:no-any - return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, fallbackValue); + return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, getAdditionalKeys(additionalData), fallbackValue); }, getMode: (): WorkflowExecuteMode => { return mode; @@ -757,14 +777,17 @@ export function getExecuteFunctions(workflow: Workflow, runExecutionData: IRunEx return getWorkflowMetadata(workflow); }, getWorkflowDataProxy: (itemIndex: number): IWorkflowDataProxyData => { - const dataProxy = new WorkflowDataProxy(workflow, runExecutionData, runIndex, itemIndex, node.name, connectionInputData, {}, mode); + const dataProxy = new WorkflowDataProxy(workflow, runExecutionData, runIndex, itemIndex, node.name, connectionInputData, {}, mode, getAdditionalKeys(additionalData)); return dataProxy.getDataProxy(); }, getWorkflowStaticData(type: string): IDataObject { return workflow.getStaticData(type, node); }, prepareOutputData: NodeHelpers.prepareOutputData, - sendMessageToUI(message: any): void { // tslint:disable-line:no-any + async putExecutionToWait(waitTill: Date): Promise { + runExecutionData.waitTill = waitTill; + }, + sendMessageToUI(message : any): void { // tslint:disable-line:no-any if (mode !== 'manual') { return; } @@ -819,7 +842,7 @@ export function getExecuteSingleFunctions(workflow: Workflow, runExecutionData: }, evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => { evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex; - return workflow.expression.resolveSimpleParameterValue('=' + expression, {}, runExecutionData, runIndex, evaluateItemIndex, node.name, connectionInputData, mode); + return workflow.expression.resolveSimpleParameterValue('=' + expression, {}, runExecutionData, runIndex, evaluateItemIndex, node.name, connectionInputData, mode, getAdditionalKeys(additionalData)); }, getContext(type: string): IContextObject { return NodeHelpers.getContext(runExecutionData, type, node); @@ -865,13 +888,13 @@ export function getExecuteSingleFunctions(workflow: Workflow, runExecutionData: return getTimezone(workflow, additionalData); }, getNodeParameter: (parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object => { //tslint:disable-line:no-any - return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, fallbackValue); + return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, getAdditionalKeys(additionalData), fallbackValue); }, getWorkflow: () => { return getWorkflowMetadata(workflow); }, getWorkflowDataProxy: (): IWorkflowDataProxyData => { - const dataProxy = new WorkflowDataProxy(workflow, runExecutionData, runIndex, itemIndex, node.name, connectionInputData, {}, mode); + const dataProxy = new WorkflowDataProxy(workflow, runExecutionData, runIndex, itemIndex, node.name, connectionInputData, {}, mode, getAdditionalKeys(additionalData)); return dataProxy.getDataProxy(); }, getWorkflowStaticData(type: string): IDataObject { @@ -928,7 +951,7 @@ export function getLoadOptionsFunctions(workflow: Workflow, node: INode, path: s const runIndex = 0; const connectionInputData: INodeExecutionData[] = []; - return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, 'internal' as WorkflowExecuteMode, fallbackValue); + return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, 'internal' as WorkflowExecuteMode, getAdditionalKeys(additionalData), fallbackValue); }, getTimezone: (): string => { return getTimezone(workflow, additionalData); @@ -983,10 +1006,10 @@ export function getExecuteHookFunctions(workflow: Workflow, node: INode, additio const runIndex = 0; const connectionInputData: INodeExecutionData[] = []; - return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, fallbackValue); + return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, getAdditionalKeys(additionalData), fallbackValue); }, getNodeWebhookUrl: (name: string): string | undefined => { - return getNodeWebhookUrl(name, workflow, node, additionalData, mode, isTest); + return getNodeWebhookUrl(name, workflow, node, additionalData, mode, getAdditionalKeys(additionalData), isTest); }, getTimezone: (): string => { return getTimezone(workflow, additionalData); @@ -1063,7 +1086,7 @@ export function getExecuteWebhookFunctions(workflow: Workflow, node: INode, addi const runIndex = 0; const connectionInputData: INodeExecutionData[] = []; - return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, fallbackValue); + return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, mode, getAdditionalKeys(additionalData), fallbackValue); }, getParamsData(): object { if (additionalData.httpRequest === undefined) { @@ -1090,7 +1113,7 @@ export function getExecuteWebhookFunctions(workflow: Workflow, node: INode, addi return additionalData.httpResponse; }, getNodeWebhookUrl: (name: string): string | undefined => { - return getNodeWebhookUrl(name, workflow, node, additionalData, mode); + return getNodeWebhookUrl(name, workflow, node, additionalData, mode, getAdditionalKeys(additionalData)); }, getTimezone: (): string => { return getTimezone(workflow, additionalData); diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index b893b85cb0..a3e78b022b 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -31,7 +31,6 @@ export class WorkflowExecute { private additionalData: IWorkflowExecuteAdditionalData; private mode: WorkflowExecuteMode; - constructor(additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, runExecutionData?: IRunExecutionData) { this.additionalData = additionalData; this.mode = mode; @@ -512,6 +511,13 @@ export class WorkflowExecute { this.runExecutionData.startData = {}; } + if (this.runExecutionData.waitTill) { + const lastNodeExecuted = this.runExecutionData.resultData.lastNodeExecuted as string; + this.runExecutionData.executionData!.nodeExecutionStack[0].node.disabled = true; + this.runExecutionData.waitTill = undefined; + this.runExecutionData.resultData.runData[lastNodeExecuted].pop(); + } + let currentExecutionTry = ''; let lastExecutionTry = ''; @@ -693,7 +699,7 @@ export class WorkflowExecute { } } - if (nodeSuccessData === null) { + if (nodeSuccessData === null && !this.runExecutionData.waitTill!!) { // If null gets returned it means that the node did succeed // but did not have any data. So the branch should end // (meaning the nodes afterwards should not be processed) @@ -767,6 +773,15 @@ export class WorkflowExecute { continue; } + if (this.runExecutionData.waitTill!!) { + await this.executeHook('nodeExecuteAfter', [executionNode.name, taskData, this.runExecutionData]); + + // Add the node back to the stack that the workflow can start to execute again from that node + this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); + + break; + } + // Add the nodes to which the current node has an output connection to that they can // be executed next if (workflow.connectionsBySourceNode.hasOwnProperty(executionNode.name)) { @@ -849,6 +864,9 @@ export class WorkflowExecute { message: executionError.message, stack: executionError.stack, } as ExecutionError; + } else if (this.runExecutionData.waitTill!!) { + Logger.verbose(`Workflow execution will wait until ${this.runExecutionData.waitTill}`, { workflowId: workflow.id }); + fullRunData.waitTill = this.runExecutionData.waitTill; } else { Logger.verbose(`Workflow execution finished successfully`, { workflowId: workflow.id }); fullRunData.finished = true; diff --git a/packages/core/test/Helpers.ts b/packages/core/test/Helpers.ts index b5662a24e7..ce59151abc 100644 --- a/packages/core/test/Helpers.ts +++ b/packages/core/test/Helpers.ts @@ -758,6 +758,7 @@ export function WorkflowExecuteAdditionalData(waitPromise: IDeferredPromise