2021-08-29 11:58:11 -07:00
/* eslint-disable prefer-spread */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable no-param-reassign */
/* eslint-disable no-console */
/* eslint-disable no-await-in-loop */
/* eslint-disable no-restricted-syntax */
/* eslint-disable @typescript-eslint/no-floating-promises */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { ActiveWorkflows , NodeExecuteFunctions } from 'n8n-core' ;
2019-06-23 03:35:23 -07:00
import {
2019-08-08 11:38:25 -07:00
IExecuteData ,
2019-12-31 12:19:37 -08:00
IGetExecutePollFunctions ,
2019-08-08 11:38:25 -07:00
IGetExecuteTriggerFunctions ,
INode ,
INodeExecutionData ,
IRunExecutionData ,
2019-06-23 03:35:23 -07:00
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow ,
2020-10-22 06:46:03 -07:00
NodeHelpers ,
2019-06-23 03:35:23 -07:00
WebhookHttpMethod ,
Workflow ,
2021-03-23 11:08:47 -07:00
WorkflowActivateMode ,
2019-06-23 03:35:23 -07:00
WorkflowExecuteMode ,
2021-08-29 11:58:11 -07:00
LoggerProxy as Logger ,
2019-06-23 03:35:23 -07:00
} from 'n8n-workflow' ;
import * as express from 'express' ;
2021-08-29 11:58:11 -07:00
// eslint-disable-next-line import/no-cycle
2021-05-12 16:01:12 -07:00
import {
2021-08-29 11:58:11 -07:00
Db ,
IActivationError ,
IResponseCallbackData ,
IWebhookDb ,
IWorkflowDb ,
IWorkflowExecutionDataProcess ,
NodeTypes ,
ResponseHelper ,
WebhookHelpers ,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
WorkflowCredentials ,
WorkflowExecuteAdditionalData ,
WorkflowHelpers ,
WorkflowRunner ,
} from '.' ;
2021-09-23 23:42:41 -07:00
import config = require ( '../config' ) ;
2019-06-23 03:35:23 -07:00
2021-07-23 11:56:18 -07:00
const WEBHOOK_PROD_UNREGISTERED_HINT = ` The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list) ` ;
2019-06-23 03:35:23 -07:00
export class ActiveWorkflowRunner {
private activeWorkflows : ActiveWorkflows | null = null ;
2020-05-27 16:32:49 -07:00
2019-06-23 03:35:23 -07:00
private activationErrors : {
[ key : string ] : IActivationError ;
} = { } ;
2021-08-29 11:58:11 -07:00
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
2019-06-23 03:35:23 -07:00
async init() {
// Get the active workflows from database
2020-05-27 16:32:49 -07:00
// NOTE
// Here I guess we can have a flag on the workflow table like hasTrigger
// so intead of pulling all the active wehhooks just pull the actives that have a trigger
2021-08-29 11:58:11 -07:00
const workflowsData : IWorkflowDb [ ] = ( await Db . collections . Workflow ! . find ( {
active : true ,
} ) ) as IWorkflowDb [ ] ;
2019-06-23 03:35:23 -07:00
2021-09-23 23:42:41 -07:00
if ( ! config . get ( 'endpoints.skipWebhoooksDeregistrationOnShutdown' ) ) {
// Do not clean up database when skip registration is done.
// This flag is set when n8n is running in scaled mode.
// Impact is minimal, but for a short while, n8n will stop accepting requests.
// Also, users had issues when running multiple "main process"
// instances if many of them start at the same time
// This is not officially supported but there is no reason
// it should not work.
// Clear up active workflow table
await Db . collections . Webhook ? . clear ( ) ;
}
2020-08-30 02:06:46 -07:00
2019-06-23 03:35:23 -07:00
this . activeWorkflows = new ActiveWorkflows ( ) ;
if ( workflowsData . length !== 0 ) {
2021-05-01 20:43:01 -07:00
console . info ( ' ================================' ) ;
console . info ( ' Start Active Workflows:' ) ;
console . info ( ' ================================' ) ;
2019-06-23 03:35:23 -07:00
for ( const workflowData of workflowsData ) {
2020-08-30 02:06:46 -07:00
console . log ( ` - ${ workflowData . name } ` ) ;
2021-08-29 11:58:11 -07:00
Logger . debug ( ` Initializing active workflow " ${ workflowData . name } " (startup) ` , {
workflowName : workflowData.name ,
workflowId : workflowData.id ,
} ) ;
2020-08-30 02:06:46 -07:00
try {
2021-03-23 11:08:47 -07:00
await this . add ( workflowData . id . toString ( ) , 'init' , workflowData ) ;
2021-08-29 11:58:11 -07:00
Logger . verbose ( ` Successfully started workflow " ${ workflowData . name } " ` , {
workflowName : workflowData.name ,
workflowId : workflowData.id ,
} ) ;
2020-08-30 02:06:46 -07:00
console . log ( ` => Started ` ) ;
} catch ( error ) {
console . log ( ` => ERROR: Workflow could not be activated: ` ) ;
2021-08-29 11:58:11 -07:00
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
2020-08-30 02:06:46 -07:00
console . log ( ` ${ error . message } ` ) ;
2021-08-29 11:58:11 -07:00
Logger . error ( ` Unable to initialize workflow " ${ workflowData . name } " (startup) ` , {
workflowName : workflowData.name ,
workflowId : workflowData.id ,
} ) ;
2019-06-23 03:35:23 -07:00
}
}
2021-05-01 20:43:01 -07:00
Logger . verbose ( 'Finished initializing active workflows (startup)' ) ;
2019-06-23 03:35:23 -07:00
}
}
2021-08-29 11:58:11 -07:00
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
2021-02-09 14:32:40 -08:00
async initWebhooks() {
this . activeWorkflows = new ActiveWorkflows ( ) ;
}
2019-06-23 03:35:23 -07:00
/ * *
* Removes all the currently active workflows
*
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
async removeAll ( ) : Promise < void > {
2020-08-30 02:06:46 -07:00
const activeWorkflowId : string [ ] = [ ] ;
2021-05-01 20:43:01 -07:00
Logger . verbose ( 'Call to remove all active workflows received (removeAll)' ) ;
2020-08-30 02:06:46 -07:00
if ( this . activeWorkflows !== null ) {
// TODO: This should be renamed!
activeWorkflowId . push . apply ( activeWorkflowId , this . activeWorkflows . allActiveWorkflows ( ) ) ;
2019-06-23 03:35:23 -07:00
}
2020-08-30 02:06:46 -07:00
const activeWorkflows = await this . getActiveWorkflows ( ) ;
2021-08-29 11:58:11 -07:00
activeWorkflowId . push . apply (
activeWorkflowId ,
activeWorkflows . map ( ( workflow ) = > workflow . id ) ,
) ;
2019-06-23 03:35:23 -07:00
const removePromises = [ ] ;
2020-08-30 02:06:46 -07:00
for ( const workflowId of activeWorkflowId ) {
2019-06-23 03:35:23 -07:00
removePromises . push ( this . remove ( workflowId ) ) ;
}
await Promise . all ( removePromises ) ;
}
/ * *
* Checks if a webhook for the given method and path exists and executes the workflow .
*
* @param { WebhookHttpMethod } httpMethod
* @param { string } path
* @param { express . Request } req
* @param { express . Response } res
* @returns { Promise < object > }
* @memberof ActiveWorkflowRunner
* /
2021-08-29 11:58:11 -07:00
async executeWebhook (
httpMethod : WebhookHttpMethod ,
path : string ,
req : express.Request ,
res : express.Response ,
) : Promise < IResponseCallbackData > {
2021-05-01 20:43:01 -07:00
Logger . debug ( ` Received webhoook " ${ httpMethod } " for path " ${ path } " ` ) ;
2019-06-23 03:35:23 -07:00
if ( this . activeWorkflows === null ) {
2021-08-29 11:58:11 -07:00
throw new ResponseHelper . ResponseError (
'The "activeWorkflows" instance did not get initialized yet.' ,
404 ,
404 ,
) ;
2019-06-23 03:35:23 -07:00
}
2021-01-28 06:44:10 -08:00
// Reset request parameters
req . params = { } ;
2021-02-09 00:14:40 -08:00
// Remove trailing slash
if ( path . endsWith ( '/' ) ) {
path = path . slice ( 0 , - 1 ) ;
}
2021-08-29 11:58:11 -07:00
let webhook = ( await Db . collections . Webhook ? . findOne ( {
webhookPath : path ,
method : httpMethod ,
} ) ) as IWebhookDb ;
2021-01-23 11:00:32 -08:00
let webhookId : string | undefined ;
2019-06-23 03:35:23 -07:00
2021-01-23 11:00:32 -08:00
// check if path is dynamic
2020-05-27 16:32:49 -07:00
if ( webhook === undefined ) {
2021-01-23 11:00:32 -08:00
// check if a dynamic webhook path exists
const pathElements = path . split ( '/' ) ;
webhookId = pathElements . shift ( ) ;
2021-08-29 11:58:11 -07:00
const dynamicWebhooks = await Db . collections . Webhook ? . find ( {
webhookId ,
method : httpMethod ,
pathLength : pathElements.length ,
} ) ;
2021-01-28 06:44:10 -08:00
if ( dynamicWebhooks === undefined || dynamicWebhooks . length === 0 ) {
2021-01-23 11:00:32 -08:00
// The requested webhook is not registered
2021-08-29 11:58:11 -07:00
throw new ResponseHelper . ResponseError (
` The requested webhook " ${ httpMethod } ${ path } " is not registered. ` ,
404 ,
404 ,
WEBHOOK_PROD_UNREGISTERED_HINT ,
) ;
2021-01-23 11:00:32 -08:00
}
2021-01-28 06:44:10 -08:00
2021-02-09 00:14:40 -08:00
let maxMatches = 0 ;
const pathElementsSet = new Set ( pathElements ) ;
// check if static elements match in path
// if more results have been returned choose the one with the most static-route matches
2021-08-29 11:58:11 -07:00
dynamicWebhooks . forEach ( ( dynamicWebhook ) = > {
const staticElements = dynamicWebhook . webhookPath
. split ( '/' )
. filter ( ( ele ) = > ! ele . startsWith ( ':' ) ) ;
const allStaticExist = staticElements . every ( ( staticEle ) = > pathElementsSet . has ( staticEle ) ) ;
2021-02-09 00:14:40 -08:00
if ( allStaticExist && staticElements . length > maxMatches ) {
maxMatches = staticElements . length ;
webhook = dynamicWebhook ;
}
// handle routes with no static elements
else if ( staticElements . length === 0 && ! webhook ) {
webhook = dynamicWebhook ;
2021-01-23 11:00:32 -08:00
}
2021-02-09 00:14:40 -08:00
} ) ;
if ( webhook === undefined ) {
2021-08-29 11:58:11 -07:00
throw new ResponseHelper . ResponseError (
` The requested webhook " ${ httpMethod } ${ path } " is not registered. ` ,
404 ,
404 ,
WEBHOOK_PROD_UNREGISTERED_HINT ,
) ;
2021-01-23 11:00:32 -08:00
}
2021-08-29 11:58:11 -07:00
// @ts-ignore
// eslint-disable-next-line no-param-reassign
path = webhook . webhookPath ;
2021-01-23 11:00:32 -08:00
// extracting params from path
2021-08-29 11:58:11 -07:00
// @ts-ignore
webhook . webhookPath . split ( '/' ) . forEach ( ( ele , index ) = > {
2021-01-23 11:00:32 -08:00
if ( ele . startsWith ( ':' ) ) {
// write params to req.params
req . params [ ele . slice ( 1 ) ] = pathElements [ index ] ;
}
} ) ;
2019-06-23 03:35:23 -07:00
}
2020-05-27 16:32:49 -07:00
const workflowData = await Db . collections . Workflow ! . findOne ( webhook . workflowId ) ;
2020-01-22 15:06:43 -08:00
if ( workflowData === undefined ) {
2021-08-29 11:58:11 -07:00
throw new ResponseHelper . ResponseError (
` Could not find workflow with id " ${ webhook . workflowId } " ` ,
404 ,
404 ,
) ;
2020-01-22 15:06:43 -08:00
}
const nodeTypes = NodeTypes ( ) ;
2021-08-29 11:58:11 -07:00
const workflow = new Workflow ( {
id : webhook.workflowId.toString ( ) ,
name : workflowData.name ,
nodes : workflowData.nodes ,
connections : workflowData.connections ,
active : workflowData.active ,
nodeTypes ,
staticData : workflowData.staticData ,
settings : workflowData.settings ,
} ) ;
2020-05-27 16:32:49 -07:00
2021-08-20 09:57:30 -07:00
const additionalData = await WorkflowExecuteAdditionalData . getBase ( ) ;
2020-05-27 16:32:49 -07:00
2021-08-29 11:58:11 -07:00
const webhookData = NodeHelpers . getNodeWebhooks (
workflow ,
workflow . getNode ( webhook . node ) as INode ,
additionalData ,
) . filter ( ( webhook ) = > {
return webhook . httpMethod === httpMethod && webhook . path === path ;
2020-05-27 16:32:49 -07:00
} ) [ 0 ] ;
2020-01-22 15:06:43 -08:00
2019-06-23 03:35:23 -07:00
// Get the node which has the webhook defined to know where to start from and to
// get additional data
2020-01-22 15:06:43 -08:00
const workflowStartNode = workflow . getNode ( webhookData . node ) ;
2020-05-27 16:32:49 -07:00
2019-06-23 03:35:23 -07:00
if ( workflowStartNode === null ) {
2019-08-28 08:16:09 -07:00
throw new ResponseHelper . ResponseError ( 'Could not find node to process webhook.' , 404 , 404 ) ;
2019-06-23 03:35:23 -07:00
}
return new Promise ( ( resolve , reject ) = > {
2020-01-22 15:06:43 -08:00
const executionMode = 'webhook' ;
2021-08-29 11:58:11 -07:00
// @ts-ignore
WebhookHelpers . executeWebhook (
workflow ,
webhookData ,
workflowData ,
workflowStartNode ,
executionMode ,
undefined ,
undefined ,
undefined ,
req ,
res ,
// eslint-disable-next-line consistent-return
( error : Error | null , data : object ) = > {
if ( error !== null ) {
return reject ( error ) ;
}
resolve ( data ) ;
} ,
) ;
2019-06-23 03:35:23 -07:00
} ) ;
}
2020-07-24 07:43:23 -07:00
/ * *
* Gets all request methods associated with a single webhook
*
* @param { string } path webhook path
* @returns { Promise < string [ ] > }
* @memberof ActiveWorkflowRunner
* /
2021-05-12 16:01:12 -07:00
async getWebhookMethods ( path : string ) : Promise < string [ ] > {
2021-08-29 11:58:11 -07:00
const webhooks = ( await Db . collections . Webhook ? . find ( { webhookPath : path } ) ) as IWebhookDb [ ] ;
2020-07-24 07:24:18 -07:00
// Gather all request methods in string array
2021-08-29 11:58:11 -07:00
const webhookMethods : string [ ] = webhooks . map ( ( webhook ) = > webhook . method ) ;
2020-07-24 07:24:18 -07:00
return webhookMethods ;
}
2019-06-23 03:35:23 -07:00
/ * *
* Returns the ids of the currently active workflows
*
* @returns { string [ ] }
* @memberof ActiveWorkflowRunner
* /
2020-10-21 01:26:15 -07:00
async getActiveWorkflows ( ) : Promise < IWorkflowDb [ ] > {
2021-08-29 11:58:11 -07:00
const activeWorkflows = ( await Db . collections . Workflow ? . find ( {
where : { active : true } ,
select : [ 'id' ] ,
} ) ) as IWorkflowDb [ ] ;
return activeWorkflows . filter (
( workflow ) = > this . activationErrors [ workflow . id . toString ( ) ] === undefined ,
) ;
2019-06-23 03:35:23 -07:00
}
/ * *
* Returns if the workflow is active
*
* @param { string } id The id of the workflow to check
* @returns { boolean }
* @memberof ActiveWorkflowRunner
* /
2020-05-27 16:32:49 -07:00
async isActive ( id : string ) : Promise < boolean > {
2021-08-29 11:58:11 -07:00
const workflow = ( await Db . collections . Workflow ? . findOne ( { id : Number ( id ) } ) ) as IWorkflowDb ;
return workflow ? . active ;
2019-06-23 03:35:23 -07:00
}
/ * *
* Return error if there was a problem activating the workflow
*
* @param { string } id The id of the workflow to return the error of
* @returns { ( IActivationError | undefined ) }
* @memberof ActiveWorkflowRunner
* /
getActivationError ( id : string ) : IActivationError | undefined {
if ( this . activationErrors [ id ] === undefined ) {
return undefined ;
}
return this . activationErrors [ id ] ;
}
/ * *
* Adds all the webhooks of the workflow
*
* @param { Workflow } workflow
* @param { IWorkflowExecuteAdditionalDataWorkflow } additionalData
* @param { WorkflowExecuteMode } mode
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
2021-08-29 11:58:11 -07:00
async addWorkflowWebhooks (
workflow : Workflow ,
additionalData : IWorkflowExecuteAdditionalDataWorkflow ,
mode : WorkflowExecuteMode ,
activation : WorkflowActivateMode ,
) : Promise < void > {
2021-08-21 05:11:32 -07:00
const webhooks = WebhookHelpers . getWorkflowWebhooks ( workflow , additionalData , undefined , true ) ;
2020-06-10 06:39:15 -07:00
let path = '' as string | undefined ;
2019-06-23 03:35:23 -07:00
for ( const webhookData of webhooks ) {
2020-05-27 16:32:49 -07:00
const node = workflow . getNode ( webhookData . node ) as INode ;
node . name = webhookData . node ;
2021-04-11 06:45:17 -07:00
path = webhookData . path ;
2020-06-10 06:39:15 -07:00
2020-05-27 16:32:49 -07:00
const webhook = {
workflowId : webhookData.workflowId ,
2021-04-11 06:45:17 -07:00
webhookPath : path ,
2020-05-27 16:32:49 -07:00
node : node.name ,
method : webhookData.httpMethod ,
} as IWebhookDb ;
2021-01-23 11:00:32 -08:00
if ( webhook . webhookPath . startsWith ( '/' ) ) {
webhook . webhookPath = webhook . webhookPath . slice ( 1 ) ;
}
2021-02-09 00:14:40 -08:00
if ( webhook . webhookPath . endsWith ( '/' ) ) {
webhook . webhookPath = webhook . webhookPath . slice ( 0 , - 1 ) ;
}
2021-01-23 11:00:32 -08:00
if ( ( path . startsWith ( ':' ) || path . includes ( '/:' ) ) && node . webhookId ) {
webhook . webhookId = node . webhookId ;
webhook . pathLength = webhook . webhookPath . split ( '/' ) . length ;
}
2020-05-27 16:32:49 -07:00
try {
2021-08-29 11:58:11 -07:00
// eslint-disable-next-line no-await-in-loop
2020-05-27 16:32:49 -07:00
await Db . collections . Webhook ? . insert ( webhook ) ;
2021-08-29 11:58:11 -07:00
const webhookExists = await workflow . runWebhookMethod (
'checkExists' ,
webhookData ,
NodeExecuteFunctions ,
mode ,
activation ,
false ,
) ;
2020-10-21 08:50:23 -07:00
if ( webhookExists !== true ) {
2020-05-27 16:32:49 -07:00
// If webhook does not exist yet create it
2021-08-29 11:58:11 -07:00
await workflow . runWebhookMethod (
'create' ,
webhookData ,
NodeExecuteFunctions ,
mode ,
activation ,
false ,
) ;
2020-05-27 16:32:49 -07:00
}
} catch ( error ) {
2021-09-23 23:42:41 -07:00
if (
activation === 'init' &&
config . get ( 'endpoints.skipWebhoooksDeregistrationOnShutdown' ) &&
error . name === 'QueryFailedError'
) {
// When skipWebhoooksDeregistrationOnShutdown is enabled,
// n8n does not remove the registered webhooks on exit.
// This means that further initializations will always fail
// when inserting to database. This is why we ignore this error
// as it's expected to happen.
// eslint-disable-next-line no-continue
continue ;
}
2020-10-21 08:50:23 -07:00
try {
await this . removeWorkflowWebhooks ( workflow . id as string ) ;
} catch ( error ) {
2021-08-29 11:58:11 -07:00
console . error (
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
` Could not remove webhooks of workflow " ${ workflow . id } " because of error: " ${ error . message } " ` ,
) ;
2020-10-21 08:50:23 -07:00
}
2020-05-30 16:03:58 -07:00
let errorMessage = '' ;
2020-05-27 16:32:49 -07:00
2020-05-30 16:03:58 -07:00
// if it's a workflow from the the insert
2021-01-23 11:00:32 -08:00
// TODO check if there is standard error code for duplicate key violation that works
2020-05-30 16:03:58 -07:00
// with all databases
2021-01-23 11:35:38 -08:00
if ( error . name === 'QueryFailedError' ) {
2021-01-28 06:44:10 -08:00
errorMessage = ` The webhook path [ ${ webhook . webhookPath } ] and method [ ${ webhook . method } ] already exist. ` ;
2020-05-30 16:03:58 -07:00
} else if ( error . detail ) {
// it's a error runnig the webhook methods (checkExists, create)
errorMessage = error . detail ;
} else {
2021-08-29 11:58:11 -07:00
// eslint-disable-next-line @typescript-eslint/no-unused-vars
2020-06-10 06:39:15 -07:00
errorMessage = error . message ;
2020-05-30 16:03:58 -07:00
}
2021-06-12 08:06:56 -07:00
throw error ;
2020-05-27 16:32:49 -07:00
}
2019-06-23 03:35:23 -07:00
}
2020-05-27 16:32:49 -07:00
// Save static data!
await WorkflowHelpers . saveStaticData ( workflow ) ;
2019-06-23 03:35:23 -07:00
}
/ * *
* Remove all the webhooks of the workflow
*
* @param { string } workflowId
* @returns
* @memberof ActiveWorkflowRunner
* /
2020-01-22 15:06:43 -08:00
async removeWorkflowWebhooks ( workflowId : string ) : Promise < void > {
const workflowData = await Db . collections . Workflow ! . findOne ( workflowId ) ;
if ( workflowData === undefined ) {
throw new Error ( ` Could not find workflow with id " ${ workflowId } " ` ) ;
}
const nodeTypes = NodeTypes ( ) ;
2021-08-29 11:58:11 -07:00
const workflow = new Workflow ( {
id : workflowId ,
name : workflowData.name ,
nodes : workflowData.nodes ,
connections : workflowData.connections ,
active : workflowData.active ,
nodeTypes ,
staticData : workflowData.staticData ,
settings : workflowData.settings ,
} ) ;
2020-01-22 15:06:43 -08:00
2020-05-27 16:32:49 -07:00
const mode = 'internal' ;
2020-01-22 15:06:43 -08:00
2021-08-20 09:57:30 -07:00
const additionalData = await WorkflowExecuteAdditionalData . getBase ( ) ;
2020-05-27 16:32:49 -07:00
2021-08-21 05:11:32 -07:00
const webhooks = WebhookHelpers . getWorkflowWebhooks ( workflow , additionalData , undefined , true ) ;
2020-05-27 16:32:49 -07:00
for ( const webhookData of webhooks ) {
2021-08-29 11:58:11 -07:00
await workflow . runWebhookMethod (
'delete' ,
webhookData ,
NodeExecuteFunctions ,
mode ,
'update' ,
false ,
) ;
2020-05-27 16:32:49 -07:00
}
2020-09-16 14:55:34 -07:00
await WorkflowHelpers . saveStaticData ( workflow ) ;
2020-05-27 16:32:49 -07:00
const webhook = {
workflowId : workflowData.id ,
} as IWebhookDb ;
await Db . collections . Webhook ? . delete ( webhook ) ;
2019-06-23 03:35:23 -07:00
}
2019-12-31 12:19:37 -08:00
/ * *
* Runs the given workflow
*
* @param { IWorkflowDb } workflowData
* @param { INode } node
* @param { INodeExecutionData [ ] [ ] } data
* @param { IWorkflowExecuteAdditionalDataWorkflow } additionalData
* @param { WorkflowExecuteMode } mode
* @returns
* @memberof ActiveWorkflowRunner
* /
2021-08-29 11:58:11 -07:00
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async runWorkflow (
workflowData : IWorkflowDb ,
node : INode ,
data : INodeExecutionData [ ] [ ] ,
additionalData : IWorkflowExecuteAdditionalDataWorkflow ,
mode : WorkflowExecuteMode ,
) {
2019-12-31 12:19:37 -08:00
const nodeExecutionStack : IExecuteData [ ] = [
{
node ,
data : {
main : data ,
2020-10-22 06:46:03 -07:00
} ,
} ,
2019-12-31 12:19:37 -08:00
] ;
const executionData : IRunExecutionData = {
startData : { } ,
resultData : {
runData : { } ,
} ,
executionData : {
contextData : { } ,
nodeExecutionStack ,
waitingExecution : { } ,
} ,
} ;
// Start the workflow
const runData : IWorkflowExecutionDataProcess = {
executionMode : mode ,
executionData ,
workflowData ,
} ;
const workflowRunner = new WorkflowRunner ( ) ;
return workflowRunner . run ( runData , true ) ;
}
/ * *
* Return poll function which gets the global functions from n8n - core
* and overwrites the __emit to be able to start it in subprocess
*
* @param { IWorkflowDb } workflowData
* @param { IWorkflowExecuteAdditionalDataWorkflow } additionalData
* @param { WorkflowExecuteMode } mode
* @returns { IGetExecutePollFunctions }
* @memberof ActiveWorkflowRunner
* /
2021-08-29 11:58:11 -07:00
getExecutePollFunctions (
workflowData : IWorkflowDb ,
additionalData : IWorkflowExecuteAdditionalDataWorkflow ,
mode : WorkflowExecuteMode ,
activation : WorkflowActivateMode ,
) : IGetExecutePollFunctions {
return ( workflow : Workflow , node : INode ) = > {
const returnFunctions = NodeExecuteFunctions . getExecutePollFunctions (
workflow ,
node ,
additionalData ,
mode ,
activation ,
) ;
// eslint-disable-next-line no-underscore-dangle
2019-12-31 12:19:37 -08:00
returnFunctions . __emit = ( data : INodeExecutionData [ ] [ ] ) : void = > {
2021-08-29 11:58:11 -07:00
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
2021-05-01 20:43:01 -07:00
Logger . debug ( ` Received event to trigger execution for workflow " ${ workflow . name } " ` ) ;
2019-12-31 12:19:37 -08:00
this . runWorkflow ( workflowData , node , data , additionalData , mode ) ;
} ;
return returnFunctions ;
2021-08-29 11:58:11 -07:00
} ;
2019-12-31 12:19:37 -08:00
}
2019-08-08 11:38:25 -07:00
/ * *
* Return trigger function which gets the global functions from n8n - core
* and overwrites the emit to be able to start it in subprocess
*
* @param { IWorkflowDb } workflowData
* @param { IWorkflowExecuteAdditionalDataWorkflow } additionalData
* @param { WorkflowExecuteMode } mode
* @returns { IGetExecuteTriggerFunctions }
* @memberof ActiveWorkflowRunner
* /
2021-08-29 11:58:11 -07:00
getExecuteTriggerFunctions (
workflowData : IWorkflowDb ,
additionalData : IWorkflowExecuteAdditionalDataWorkflow ,
mode : WorkflowExecuteMode ,
activation : WorkflowActivateMode ,
) : IGetExecuteTriggerFunctions {
return ( workflow : Workflow , node : INode ) = > {
const returnFunctions = NodeExecuteFunctions . getExecuteTriggerFunctions (
workflow ,
node ,
additionalData ,
mode ,
activation ,
) ;
2019-08-08 11:38:25 -07:00
returnFunctions . emit = ( data : INodeExecutionData [ ] [ ] ) : void = > {
2021-08-29 11:58:11 -07:00
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
2021-05-01 20:43:01 -07:00
Logger . debug ( ` Received trigger for workflow " ${ workflow . name } " ` ) ;
2020-05-08 15:55:47 -07:00
WorkflowHelpers . saveStaticData ( workflow ) ;
2021-08-29 11:58:11 -07:00
// eslint-disable-next-line id-denylist
this . runWorkflow ( workflowData , node , data , additionalData , mode ) . catch ( ( err ) = >
console . error ( err ) ,
) ;
2019-08-08 11:38:25 -07:00
} ;
return returnFunctions ;
2021-08-29 11:58:11 -07:00
} ;
2019-08-08 11:38:25 -07:00
}
2019-06-23 03:35:23 -07:00
/ * *
* Makes a workflow active
*
* @param { string } workflowId The id of the workflow to activate
* @param { IWorkflowDb } [ workflowData ] If workflowData is given it saves the DB query
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
2021-08-29 11:58:11 -07:00
async add (
workflowId : string ,
activation : WorkflowActivateMode ,
workflowData? : IWorkflowDb ,
) : Promise < void > {
2019-06-23 03:35:23 -07:00
if ( this . activeWorkflows === null ) {
throw new Error ( ` The "activeWorkflows" instance did not get initialized yet. ` ) ;
}
let workflowInstance : Workflow ;
try {
if ( workflowData === undefined ) {
2021-08-29 11:58:11 -07:00
workflowData = ( await Db . collections . Workflow ! . findOne ( workflowId ) ) as IWorkflowDb ;
2019-06-23 03:35:23 -07:00
}
if ( ! workflowData ) {
throw new Error ( ` Could not find workflow with id " ${ workflowId } ". ` ) ;
}
const nodeTypes = NodeTypes ( ) ;
2021-08-29 11:58:11 -07:00
workflowInstance = new Workflow ( {
id : workflowId ,
name : workflowData.name ,
nodes : workflowData.nodes ,
connections : workflowData.connections ,
active : workflowData.active ,
nodeTypes ,
staticData : workflowData.staticData ,
settings : workflowData.settings ,
} ) ;
2019-06-23 03:35:23 -07:00
2021-08-29 11:58:11 -07:00
const canBeActivated = workflowInstance . checkIfWorkflowCanBeActivated ( [
'n8n-nodes-base.start' ,
] ) ;
if ( ! canBeActivated ) {
2021-05-01 20:43:01 -07:00
Logger . error ( ` Unable to activate workflow " ${ workflowData . name } " ` ) ;
2021-08-29 11:58:11 -07:00
throw new Error (
` The workflow can not be activated because it does not contain any nodes which could start the workflow. Only workflows which have trigger or webhook nodes can be activated. ` ,
) ;
2019-06-23 03:35:23 -07:00
}
const mode = 'trigger' ;
2021-08-20 09:57:30 -07:00
const additionalData = await WorkflowExecuteAdditionalData . getBase ( ) ;
2021-08-29 11:58:11 -07:00
const getTriggerFunctions = this . getExecuteTriggerFunctions (
workflowData ,
additionalData ,
mode ,
activation ,
) ;
const getPollFunctions = this . getExecutePollFunctions (
workflowData ,
additionalData ,
mode ,
activation ,
) ;
2019-06-23 03:35:23 -07:00
// Add the workflows which have webhooks defined
2021-03-23 11:08:47 -07:00
await this . addWorkflowWebhooks ( workflowInstance , additionalData , mode , activation ) ;
2020-05-27 16:32:49 -07:00
2021-08-29 11:58:11 -07:00
if (
workflowInstance . getTriggerNodes ( ) . length !== 0 ||
workflowInstance . getPollNodes ( ) . length !== 0
) {
await this . activeWorkflows . add (
workflowId ,
workflowInstance ,
additionalData ,
mode ,
activation ,
getTriggerFunctions ,
getPollFunctions ,
) ;
Logger . verbose ( ` Successfully activated workflow " ${ workflowData . name } " ` , {
workflowId ,
workflowName : workflowData.name ,
} ) ;
2020-05-27 16:32:49 -07:00
}
2019-06-23 03:35:23 -07:00
if ( this . activationErrors [ workflowId ] !== undefined ) {
2020-01-22 15:06:43 -08:00
// If there were activation errors delete them
2019-06-23 03:35:23 -07:00
delete this . activationErrors [ workflowId ] ;
}
} catch ( error ) {
// There was a problem activating the workflow
// Save the error
this . activationErrors [ workflowId ] = {
time : new Date ( ) . getTime ( ) ,
error : {
message : error.message ,
} ,
} ;
throw error ;
}
2019-08-08 11:38:25 -07:00
// If for example webhooks get created it sometimes has to save the
// id of them in the static data. So make sure that data gets persisted.
2019-06-23 03:35:23 -07:00
await WorkflowHelpers . saveStaticData ( workflowInstance ! ) ;
}
/ * *
* Makes a workflow inactive
*
* @param { string } workflowId The id of the workflow to deactivate
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
async remove ( workflowId : string ) : Promise < void > {
if ( this . activeWorkflows !== null ) {
// Remove all the webhooks of the workflow
2020-09-23 00:42:39 -07:00
try {
await this . removeWorkflowWebhooks ( workflowId ) ;
} catch ( error ) {
2021-08-29 11:58:11 -07:00
console . error (
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
` Could not remove webhooks of workflow " ${ workflowId } " because of error: " ${ error . message } " ` ,
) ;
2020-09-23 00:42:39 -07:00
}
2019-06-23 03:35:23 -07:00
if ( this . activationErrors [ workflowId ] !== undefined ) {
// If there were any activation errors delete them
delete this . activationErrors [ workflowId ] ;
}
2020-05-27 16:32:49 -07:00
// if it's active in memory then it's a trigger
// so remove from list of actives workflows
if ( this . activeWorkflows . isActive ( workflowId ) ) {
2021-05-12 16:00:46 -07:00
await this . activeWorkflows . remove ( workflowId ) ;
Logger . verbose ( ` Successfully deactivated workflow " ${ workflowId } " ` , { workflowId } ) ;
2020-05-27 16:32:49 -07:00
}
return ;
2019-06-23 03:35:23 -07:00
}
throw new Error ( ` The "activeWorkflows" instance did not get initialized yet. ` ) ;
}
}
let workflowRunnerInstance : ActiveWorkflowRunner | undefined ;
export function getInstance ( ) : ActiveWorkflowRunner {
if ( workflowRunnerInstance === undefined ) {
workflowRunnerInstance = new ActiveWorkflowRunner ( ) ;
}
return workflowRunnerInstance ;
}