2019-06-23 03:35:23 -07:00
import {
IActivationError ,
Db ,
NodeTypes ,
IResponseCallbackData ,
IWorkflowDb ,
2019-08-08 11:38:25 -07:00
IWorkflowExecutionDataProcess ,
2019-06-23 03:35:23 -07:00
ResponseHelper ,
WebhookHelpers ,
2019-08-08 11:38:25 -07:00
WorkflowCredentials ,
2019-06-23 03:35:23 -07:00
WorkflowHelpers ,
2019-08-08 11:38:25 -07:00
WorkflowRunner ,
2019-06-23 03:35:23 -07:00
WorkflowExecuteAdditionalData ,
2020-05-27 16:32:49 -07:00
IWebhookDb ,
2019-06-23 03:35:23 -07:00
} from './' ;
import {
ActiveWorkflows ,
2019-08-08 11:38:25 -07:00
NodeExecuteFunctions ,
2019-06-23 03:35:23 -07:00
} from 'n8n-core' ;
import {
2019-08-08 11:38:25 -07:00
IExecuteData ,
2019-12-31 12:19:37 -08:00
IGetExecutePollFunctions ,
2019-08-08 11:38:25 -07:00
IGetExecuteTriggerFunctions ,
INode ,
INodeExecutionData ,
IRunExecutionData ,
2020-05-27 16:32:49 -07:00
NodeHelpers ,
2019-06-23 03:35:23 -07:00
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow ,
WebhookHttpMethod ,
Workflow ,
WorkflowExecuteMode ,
} from 'n8n-workflow' ;
import * as express from 'express' ;
export class ActiveWorkflowRunner {
private activeWorkflows : ActiveWorkflows | null = null ;
2020-05-27 16:32:49 -07:00
2019-06-23 03:35:23 -07:00
private activationErrors : {
[ key : string ] : IActivationError ;
} = { } ;
async init() {
2020-05-27 16:32:49 -07:00
2019-06-23 03:35:23 -07:00
// Get the active workflows from database
2020-05-27 16:32:49 -07:00
// NOTE
// Here I guess we can have a flag on the workflow table like hasTrigger
// so intead of pulling all the active wehhooks just pull the actives that have a trigger
const workflowsData : IWorkflowDb [ ] = await Db . collections . Workflow ! . find ( { active : true } ) as IWorkflowDb [ ] ;
2019-06-23 03:35:23 -07:00
2020-08-30 02:06:46 -07:00
// Clear up active workflow table
await Db . collections . Webhook ? . clear ( ) ;
2019-06-23 03:35:23 -07:00
this . activeWorkflows = new ActiveWorkflows ( ) ;
if ( workflowsData . length !== 0 ) {
console . log ( '\n ================================' ) ;
console . log ( ' Start Active Workflows:' ) ;
console . log ( ' ================================' ) ;
for ( const workflowData of workflowsData ) {
2020-08-30 02:06:46 -07:00
console . log ( ` - ${ workflowData . name } ` ) ;
try {
await this . add ( workflowData . id . toString ( ) , workflowData ) ;
console . log ( ` => Started ` ) ;
} catch ( error ) {
console . log ( ` => ERROR: Workflow could not be activated: ` ) ;
console . log ( ` ${ error . message } ` ) ;
2019-06-23 03:35:23 -07:00
}
}
}
}
/ * *
* Removes all the currently active workflows
*
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
async removeAll ( ) : Promise < void > {
2020-08-30 02:06:46 -07:00
const activeWorkflowId : string [ ] = [ ] ;
if ( this . activeWorkflows !== null ) {
// TODO: This should be renamed!
activeWorkflowId . push . apply ( activeWorkflowId , this . activeWorkflows . allActiveWorkflows ( ) ) ;
2019-06-23 03:35:23 -07:00
}
2020-08-30 02:06:46 -07:00
const activeWorkflows = await this . getActiveWorkflows ( ) ;
activeWorkflowId . push . apply ( activeWorkflowId , activeWorkflows . map ( workflow = > workflow . id ) ) ;
2019-06-23 03:35:23 -07:00
const removePromises = [ ] ;
2020-08-30 02:06:46 -07:00
for ( const workflowId of activeWorkflowId ) {
2019-06-23 03:35:23 -07:00
removePromises . push ( this . remove ( workflowId ) ) ;
}
await Promise . all ( removePromises ) ;
return ;
}
/ * *
* Checks if a webhook for the given method and path exists and executes the workflow .
*
* @param { WebhookHttpMethod } httpMethod
* @param { string } path
* @param { express . Request } req
* @param { express . Response } res
* @returns { Promise < object > }
* @memberof ActiveWorkflowRunner
* /
async executeWebhook ( httpMethod : WebhookHttpMethod , path : string , req : express.Request , res : express.Response ) : Promise < IResponseCallbackData > {
if ( this . activeWorkflows === null ) {
2019-08-28 08:16:09 -07:00
throw new ResponseHelper . ResponseError ( 'The "activeWorkflows" instance did not get initialized yet.' , 404 , 404 ) ;
2019-06-23 03:35:23 -07:00
}
2020-05-27 16:32:49 -07:00
const webhook = await Db . collections . Webhook ? . findOne ( { webhookPath : path , method : httpMethod } ) as IWebhookDb ;
2019-06-23 03:35:23 -07:00
2020-05-27 16:32:49 -07:00
// check if something exist
if ( webhook === undefined ) {
2020-03-31 09:54:25 -07:00
// The requested webhook is not registered
throw new ResponseHelper . ResponseError ( ` The requested webhook " ${ httpMethod } ${ path } " is not registered. ` , 404 , 404 ) ;
2019-06-23 03:35:23 -07:00
}
2020-05-27 16:32:49 -07:00
const workflowData = await Db . collections . Workflow ! . findOne ( webhook . workflowId ) ;
2020-01-22 15:06:43 -08:00
if ( workflowData === undefined ) {
2020-05-27 16:32:49 -07:00
throw new ResponseHelper . ResponseError ( ` Could not find workflow with id " ${ webhook . workflowId } " ` , 404 , 404 ) ;
2020-01-22 15:06:43 -08:00
}
const nodeTypes = NodeTypes ( ) ;
2020-05-27 16:32:49 -07:00
const workflow = new Workflow ( { id : webhook.workflowId.toString ( ) , name : workflowData.name , nodes : workflowData.nodes , connections : workflowData.connections , active : workflowData.active , nodeTypes , staticData : workflowData.staticData , settings : workflowData.settings } ) ;
const credentials = await WorkflowCredentials ( [ workflow . getNode ( webhook . node as string ) as INode ] ) ;
const additionalData = await WorkflowExecuteAdditionalData . getBase ( credentials ) ;
const webhookData = NodeHelpers . getNodeWebhooks ( workflow , workflow . getNode ( webhook . node as string ) as INode , additionalData ) . filter ( ( webhook ) = > {
return ( webhook . httpMethod === httpMethod && webhook . path === path ) ;
} ) [ 0 ] ;
2020-01-22 15:06:43 -08:00
2019-06-23 03:35:23 -07:00
// Get the node which has the webhook defined to know where to start from and to
// get additional data
2020-01-22 15:06:43 -08:00
const workflowStartNode = workflow . getNode ( webhookData . node ) ;
2020-05-27 16:32:49 -07:00
2019-06-23 03:35:23 -07:00
if ( workflowStartNode === null ) {
2019-08-28 08:16:09 -07:00
throw new ResponseHelper . ResponseError ( 'Could not find node to process webhook.' , 404 , 404 ) ;
2019-06-23 03:35:23 -07:00
}
return new Promise ( ( resolve , reject ) = > {
2020-01-22 15:06:43 -08:00
const executionMode = 'webhook' ;
2020-05-27 16:32:49 -07:00
//@ts-ignore
2020-01-22 15:06:43 -08:00
WebhookHelpers . executeWebhook ( workflow , webhookData , workflowData , workflowStartNode , executionMode , undefined , req , res , ( error : Error | null , data : object ) = > {
2019-06-23 03:35:23 -07:00
if ( error !== null ) {
return reject ( error ) ;
}
resolve ( data ) ;
} ) ;
} ) ;
}
2020-07-24 07:43:23 -07:00
/ * *
* Gets all request methods associated with a single webhook
*
* @param { string } path webhook path
* @returns { Promise < string [ ] > }
* @memberof ActiveWorkflowRunner
* /
2020-07-24 07:24:18 -07:00
async getWebhookMethods ( path : string ) : Promise < string [ ] > {
const webhooks = await Db . collections . Webhook ? . find ( { webhookPath : path } ) as IWebhookDb [ ] ;
// Gather all request methods in string array
2020-07-24 07:43:23 -07:00
const webhookMethods : string [ ] = webhooks . map ( webhook = > webhook . method ) ;
2020-07-24 07:24:18 -07:00
return webhookMethods ;
}
2019-06-23 03:35:23 -07:00
/ * *
* Returns the ids of the currently active workflows
*
* @returns { string [ ] }
* @memberof ActiveWorkflowRunner
* /
2020-05-27 16:32:49 -07:00
getActiveWorkflows ( ) : Promise < IWorkflowDb [ ] > {
2020-08-30 02:06:46 -07:00
return Db . collections . Workflow ? . find ( { where : { active : true } , select : [ 'id' ] } ) as Promise < IWorkflowDb [ ] > ;
2019-06-23 03:35:23 -07:00
}
/ * *
* Returns if the workflow is active
*
* @param { string } id The id of the workflow to check
* @returns { boolean }
* @memberof ActiveWorkflowRunner
* /
2020-05-27 16:32:49 -07:00
async isActive ( id : string ) : Promise < boolean > {
const workflow = await Db . collections . Workflow ? . findOne ( { id } ) as IWorkflowDb ;
return workflow ? . active as boolean ;
2019-06-23 03:35:23 -07:00
}
/ * *
* Return error if there was a problem activating the workflow
*
* @param { string } id The id of the workflow to return the error of
* @returns { ( IActivationError | undefined ) }
* @memberof ActiveWorkflowRunner
* /
getActivationError ( id : string ) : IActivationError | undefined {
if ( this . activationErrors [ id ] === undefined ) {
return undefined ;
}
return this . activationErrors [ id ] ;
}
/ * *
* Adds all the webhooks of the workflow
*
* @param { Workflow } workflow
* @param { IWorkflowExecuteAdditionalDataWorkflow } additionalData
* @param { WorkflowExecuteMode } mode
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
async addWorkflowWebhooks ( workflow : Workflow , additionalData : IWorkflowExecuteAdditionalDataWorkflow , mode : WorkflowExecuteMode ) : Promise < void > {
const webhooks = WebhookHelpers . getWorkflowWebhooks ( workflow , additionalData ) ;
2020-06-10 06:39:15 -07:00
let path = '' as string | undefined ;
2019-06-23 03:35:23 -07:00
for ( const webhookData of webhooks ) {
2020-05-27 16:32:49 -07:00
const node = workflow . getNode ( webhookData . node ) as INode ;
node . name = webhookData . node ;
path = node . parameters . path as string ;
if ( node . parameters . path === undefined ) {
2020-09-12 03:16:07 -07:00
path = workflow . expression . getSimpleParameterValue ( node , webhookData . webhookDescription [ 'path' ] ) as string | undefined ;
2020-06-10 06:39:15 -07:00
if ( path === undefined ) {
// TODO: Use a proper logger
console . error ( ` No webhook path could be found for node " ${ node . name } " in workflow " ${ workflow . id } ". ` ) ;
continue ;
}
2020-05-27 16:32:49 -07:00
}
2020-09-12 03:16:07 -07:00
const isFullPath : boolean = workflow . expression . getSimpleParameterValue ( node , webhookData . webhookDescription [ 'isFullPath' ] , false ) as boolean ;
2020-06-10 06:39:15 -07:00
2020-05-27 16:32:49 -07:00
const webhook = {
workflowId : webhookData.workflowId ,
2020-06-10 06:39:15 -07:00
webhookPath : NodeHelpers.getNodeWebhookPath ( workflow . id as string , node , path , isFullPath ) ,
2020-05-27 16:32:49 -07:00
node : node.name ,
method : webhookData.httpMethod ,
} as IWebhookDb ;
try {
2020-06-20 18:59:06 -07:00
2020-05-27 16:32:49 -07:00
await Db . collections . Webhook ? . insert ( webhook ) ;
const webhookExists = await workflow . runWebhookMethod ( 'checkExists' , webhookData , NodeExecuteFunctions , mode , false ) ;
if ( webhookExists === false ) {
// If webhook does not exist yet create it
await workflow . runWebhookMethod ( 'create' , webhookData , NodeExecuteFunctions , mode , false ) ;
}
} catch ( error ) {
2020-05-30 16:03:58 -07:00
let errorMessage = '' ;
2020-05-27 16:32:49 -07:00
await Db . collections . Webhook ? . delete ( { workflowId : workflow.id } ) ;
2020-05-30 16:03:58 -07:00
// if it's a workflow from the the insert
// TODO check if there is standard error code for deplicate key violation that works
// with all databases
2020-06-20 18:59:06 -07:00
if ( error . name === 'MongoError' || error . name === 'QueryFailedError' ) {
2020-05-30 16:03:58 -07:00
errorMessage = ` The webhook path [ ${ webhook . webhookPath } ] and method [ ${ webhook . method } ] already exist. ` ;
} else if ( error . detail ) {
// it's a error runnig the webhook methods (checkExists, create)
errorMessage = error . detail ;
} else {
2020-06-10 06:39:15 -07:00
errorMessage = error . message ;
2020-05-30 16:03:58 -07:00
}
throw new Error ( errorMessage ) ;
2020-05-27 16:32:49 -07:00
}
2019-06-23 03:35:23 -07:00
}
2020-05-27 16:32:49 -07:00
// Save static data!
await WorkflowHelpers . saveStaticData ( workflow ) ;
2019-06-23 03:35:23 -07:00
}
/ * *
* Remove all the webhooks of the workflow
*
* @param { string } workflowId
* @returns
* @memberof ActiveWorkflowRunner
* /
2020-01-22 15:06:43 -08:00
async removeWorkflowWebhooks ( workflowId : string ) : Promise < void > {
const workflowData = await Db . collections . Workflow ! . findOne ( workflowId ) ;
if ( workflowData === undefined ) {
throw new Error ( ` Could not find workflow with id " ${ workflowId } " ` ) ;
}
const nodeTypes = NodeTypes ( ) ;
2020-02-15 17:07:01 -08:00
const workflow = new Workflow ( { id : workflowId , name : workflowData.name , nodes : workflowData.nodes , connections : workflowData.connections , active : workflowData.active , nodeTypes , staticData : workflowData.staticData , settings : workflowData.settings } ) ;
2020-01-22 15:06:43 -08:00
2020-05-27 16:32:49 -07:00
const mode = 'internal' ;
2020-01-22 15:06:43 -08:00
2020-05-27 16:32:49 -07:00
const credentials = await WorkflowCredentials ( workflowData . nodes ) ;
const additionalData = await WorkflowExecuteAdditionalData . getBase ( credentials ) ;
const webhooks = WebhookHelpers . getWorkflowWebhooks ( workflow , additionalData ) ;
for ( const webhookData of webhooks ) {
await workflow . runWebhookMethod ( 'delete' , webhookData , NodeExecuteFunctions , mode , false ) ;
}
2020-09-16 14:55:34 -07:00
await WorkflowHelpers . saveStaticData ( workflow ) ;
2020-06-20 18:59:06 -07:00
// if it's a mongo objectId convert it to string
if ( typeof workflowData . id === 'object' ) {
workflowData . id = workflowData . id . toString ( ) ;
}
2020-05-27 16:32:49 -07:00
const webhook = {
workflowId : workflowData.id ,
} as IWebhookDb ;
await Db . collections . Webhook ? . delete ( webhook ) ;
2019-06-23 03:35:23 -07:00
}
2019-12-31 12:19:37 -08:00
/ * *
* Runs the given workflow
*
* @param { IWorkflowDb } workflowData
* @param { INode } node
* @param { INodeExecutionData [ ] [ ] } data
* @param { IWorkflowExecuteAdditionalDataWorkflow } additionalData
* @param { WorkflowExecuteMode } mode
* @returns
* @memberof ActiveWorkflowRunner
* /
runWorkflow ( workflowData : IWorkflowDb , node : INode , data : INodeExecutionData [ ] [ ] , additionalData : IWorkflowExecuteAdditionalDataWorkflow , mode : WorkflowExecuteMode ) {
const nodeExecutionStack : IExecuteData [ ] = [
{
node ,
data : {
main : data ,
}
}
] ;
const executionData : IRunExecutionData = {
startData : { } ,
resultData : {
runData : { } ,
} ,
executionData : {
contextData : { } ,
nodeExecutionStack ,
waitingExecution : { } ,
} ,
} ;
// Start the workflow
const runData : IWorkflowExecutionDataProcess = {
credentials : additionalData.credentials ,
executionMode : mode ,
executionData ,
workflowData ,
} ;
const workflowRunner = new WorkflowRunner ( ) ;
return workflowRunner . run ( runData , true ) ;
}
/ * *
* Return poll function which gets the global functions from n8n - core
* and overwrites the __emit to be able to start it in subprocess
*
* @param { IWorkflowDb } workflowData
* @param { IWorkflowExecuteAdditionalDataWorkflow } additionalData
* @param { WorkflowExecuteMode } mode
* @returns { IGetExecutePollFunctions }
* @memberof ActiveWorkflowRunner
* /
getExecutePollFunctions ( workflowData : IWorkflowDb , additionalData : IWorkflowExecuteAdditionalDataWorkflow , mode : WorkflowExecuteMode ) : IGetExecutePollFunctions {
return ( ( workflow : Workflow , node : INode ) = > {
const returnFunctions = NodeExecuteFunctions . getExecutePollFunctions ( workflow , node , additionalData , mode ) ;
returnFunctions . __emit = ( data : INodeExecutionData [ ] [ ] ) : void = > {
this . runWorkflow ( workflowData , node , data , additionalData , mode ) ;
} ;
return returnFunctions ;
} ) ;
}
2019-08-08 11:38:25 -07:00
/ * *
* Return trigger function which gets the global functions from n8n - core
* and overwrites the emit to be able to start it in subprocess
*
* @param { IWorkflowDb } workflowData
* @param { IWorkflowExecuteAdditionalDataWorkflow } additionalData
* @param { WorkflowExecuteMode } mode
* @returns { IGetExecuteTriggerFunctions }
* @memberof ActiveWorkflowRunner
* /
getExecuteTriggerFunctions ( workflowData : IWorkflowDb , additionalData : IWorkflowExecuteAdditionalDataWorkflow , mode : WorkflowExecuteMode ) : IGetExecuteTriggerFunctions {
return ( ( workflow : Workflow , node : INode ) = > {
const returnFunctions = NodeExecuteFunctions . getExecuteTriggerFunctions ( workflow , node , additionalData , mode ) ;
returnFunctions . emit = ( data : INodeExecutionData [ ] [ ] ) : void = > {
2020-05-08 15:55:47 -07:00
WorkflowHelpers . saveStaticData ( workflow ) ;
2020-10-14 07:38:48 -07:00
this . runWorkflow ( workflowData , node , data , additionalData , mode ) . catch ( ( err ) = > console . error ( err ) ) ;
2019-08-08 11:38:25 -07:00
} ;
return returnFunctions ;
} ) ;
}
2019-06-23 03:35:23 -07:00
/ * *
* Makes a workflow active
*
* @param { string } workflowId The id of the workflow to activate
* @param { IWorkflowDb } [ workflowData ] If workflowData is given it saves the DB query
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
async add ( workflowId : string , workflowData? : IWorkflowDb ) : Promise < void > {
if ( this . activeWorkflows === null ) {
throw new Error ( ` The "activeWorkflows" instance did not get initialized yet. ` ) ;
}
let workflowInstance : Workflow ;
try {
if ( workflowData === undefined ) {
workflowData = await Db . collections . Workflow ! . findOne ( workflowId ) as IWorkflowDb ;
}
if ( ! workflowData ) {
throw new Error ( ` Could not find workflow with id " ${ workflowId } ". ` ) ;
}
const nodeTypes = NodeTypes ( ) ;
2020-02-15 17:07:01 -08:00
workflowInstance = new Workflow ( { id : workflowId , name : workflowData.name , nodes : workflowData.nodes , connections : workflowData.connections , active : workflowData.active , nodeTypes , staticData : workflowData.staticData , settings : workflowData.settings } ) ;
2019-06-23 03:35:23 -07:00
const canBeActivated = workflowInstance . checkIfWorkflowCanBeActivated ( [ 'n8n-nodes-base.start' ] ) ;
if ( canBeActivated === false ) {
throw new Error ( ` The workflow can not be activated because it does not contain any nodes which could start the workflow. Only workflows which have trigger or webhook nodes can be activated. ` ) ;
}
const mode = 'trigger' ;
2019-08-08 11:38:25 -07:00
const credentials = await WorkflowCredentials ( workflowData . nodes ) ;
2019-12-19 14:07:55 -08:00
const additionalData = await WorkflowExecuteAdditionalData . getBase ( credentials ) ;
2019-08-08 11:38:25 -07:00
const getTriggerFunctions = this . getExecuteTriggerFunctions ( workflowData , additionalData , mode ) ;
2019-12-31 12:19:37 -08:00
const getPollFunctions = this . getExecutePollFunctions ( workflowData , additionalData , mode ) ;
2019-06-23 03:35:23 -07:00
// Add the workflows which have webhooks defined
await this . addWorkflowWebhooks ( workflowInstance , additionalData , mode ) ;
2020-05-27 16:32:49 -07:00
if ( workflowInstance . getTriggerNodes ( ) . length !== 0
|| workflowInstance . getPollNodes ( ) . length !== 0 ) {
await this . activeWorkflows . add ( workflowId , workflowInstance , additionalData , getTriggerFunctions , getPollFunctions ) ;
}
2019-06-23 03:35:23 -07:00
if ( this . activationErrors [ workflowId ] !== undefined ) {
2020-01-22 15:06:43 -08:00
// If there were activation errors delete them
2019-06-23 03:35:23 -07:00
delete this . activationErrors [ workflowId ] ;
}
} catch ( error ) {
// There was a problem activating the workflow
// Save the error
this . activationErrors [ workflowId ] = {
time : new Date ( ) . getTime ( ) ,
error : {
message : error.message ,
} ,
} ;
throw error ;
}
2019-08-08 11:38:25 -07:00
// If for example webhooks get created it sometimes has to save the
// id of them in the static data. So make sure that data gets persisted.
2019-06-23 03:35:23 -07:00
await WorkflowHelpers . saveStaticData ( workflowInstance ! ) ;
}
/ * *
* Makes a workflow inactive
*
* @param { string } workflowId The id of the workflow to deactivate
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
async remove ( workflowId : string ) : Promise < void > {
2020-05-27 16:32:49 -07:00
2019-06-23 03:35:23 -07:00
if ( this . activeWorkflows !== null ) {
// Remove all the webhooks of the workflow
2020-09-23 00:42:39 -07:00
try {
await this . removeWorkflowWebhooks ( workflowId ) ;
} catch ( error ) {
console . error ( ` Could not remove webhooks of workflow " ${ workflowId } " because of error: " ${ error . message } " ` ) ;
}
2019-06-23 03:35:23 -07:00
if ( this . activationErrors [ workflowId ] !== undefined ) {
// If there were any activation errors delete them
delete this . activationErrors [ workflowId ] ;
}
2020-05-27 16:32:49 -07:00
// if it's active in memory then it's a trigger
// so remove from list of actives workflows
if ( this . activeWorkflows . isActive ( workflowId ) ) {
this . activeWorkflows . remove ( workflowId ) ;
}
return ;
2019-06-23 03:35:23 -07:00
}
throw new Error ( ` The "activeWorkflows" instance did not get initialized yet. ` ) ;
}
}
let workflowRunnerInstance : ActiveWorkflowRunner | undefined ;
export function getInstance ( ) : ActiveWorkflowRunner {
if ( workflowRunnerInstance === undefined ) {
workflowRunnerInstance = new ActiveWorkflowRunner ( ) ;
}
return workflowRunnerInstance ;
}