2019-06-23 03:35:23 -07:00
import {
IActivationError ,
Db ,
NodeTypes ,
IResponseCallbackData ,
IWorkflowDb ,
2019-08-08 11:38:25 -07:00
IWorkflowExecutionDataProcess ,
2019-06-23 03:35:23 -07:00
ResponseHelper ,
WebhookHelpers ,
2019-08-08 11:38:25 -07:00
WorkflowCredentials ,
2019-06-23 03:35:23 -07:00
WorkflowHelpers ,
2019-08-08 11:38:25 -07:00
WorkflowRunner ,
2019-06-23 03:35:23 -07:00
WorkflowExecuteAdditionalData ,
} from './' ;
import {
ActiveWorkflows ,
ActiveWebhooks ,
2019-08-08 11:38:25 -07:00
NodeExecuteFunctions ,
2019-06-23 03:35:23 -07:00
} from 'n8n-core' ;
import {
2019-08-08 11:38:25 -07:00
IExecuteData ,
IGetExecuteTriggerFunctions ,
INode ,
INodeExecutionData ,
IRunExecutionData ,
2019-06-23 03:35:23 -07:00
IWebhookData ,
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow ,
WebhookHttpMethod ,
Workflow ,
WorkflowExecuteMode ,
} from 'n8n-workflow' ;
import * as express from 'express' ;
export class ActiveWorkflowRunner {
private activeWorkflows : ActiveWorkflows | null = null ;
private activeWebhooks : ActiveWebhooks | null = null ;
private activationErrors : {
[ key : string ] : IActivationError ;
} = { } ;
async init() {
// Get the active workflows from database
const workflowsData : IWorkflowDb [ ] = await Db . collections . Workflow ! . find ( { active : true } ) as IWorkflowDb [ ] ;
this . activeWebhooks = new ActiveWebhooks ( ) ;
// Add them as active workflows
this . activeWorkflows = new ActiveWorkflows ( ) ;
if ( workflowsData . length !== 0 ) {
console . log ( '\n ================================' ) ;
console . log ( ' Start Active Workflows:' ) ;
console . log ( ' ================================' ) ;
for ( const workflowData of workflowsData ) {
console . log ( ` - ${ workflowData . name } ` ) ;
try {
await this . add ( workflowData . id . toString ( ) , workflowData ) ;
console . log ( ` => Started ` ) ;
} catch ( error ) {
console . log ( ` => ERROR: Workflow could not be activated: ` ) ;
console . log ( ` ${ error . message } ` ) ;
}
}
}
}
/ * *
* Removes all the currently active workflows
*
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
async removeAll ( ) : Promise < void > {
if ( this . activeWorkflows === null ) {
return ;
}
const activeWorkflows = this . activeWorkflows . allActiveWorkflows ( ) ;
const removePromises = [ ] ;
for ( const workflowId of activeWorkflows ) {
removePromises . push ( this . remove ( workflowId ) ) ;
}
await Promise . all ( removePromises ) ;
return ;
}
/ * *
* Checks if a webhook for the given method and path exists and executes the workflow .
*
* @param { WebhookHttpMethod } httpMethod
* @param { string } path
* @param { express . Request } req
* @param { express . Response } res
* @returns { Promise < object > }
* @memberof ActiveWorkflowRunner
* /
async executeWebhook ( httpMethod : WebhookHttpMethod , path : string , req : express.Request , res : express.Response ) : Promise < IResponseCallbackData > {
if ( this . activeWorkflows === null ) {
2019-08-28 08:16:09 -07:00
throw new ResponseHelper . ResponseError ( 'The "activeWorkflows" instance did not get initialized yet.' , 404 , 404 ) ;
2019-06-23 03:35:23 -07:00
}
const webhookData : IWebhookData | undefined = this . activeWebhooks ! . get ( httpMethod , path ) ;
if ( webhookData === undefined ) {
// The requested webhook is not registred
2019-08-28 08:16:09 -07:00
throw new ResponseHelper . ResponseError ( 'The requested webhook is not registred.' , 404 , 404 ) ;
2019-06-23 03:35:23 -07:00
}
// Get the node which has the webhook defined to know where to start from and to
// get additional data
const workflowStartNode = webhookData . workflow . getNode ( webhookData . node ) ;
if ( workflowStartNode === null ) {
2019-08-28 08:16:09 -07:00
throw new ResponseHelper . ResponseError ( 'Could not find node to process webhook.' , 404 , 404 ) ;
2019-06-23 03:35:23 -07:00
}
const executionMode = 'webhook' ;
const workflowData = await Db . collections . Workflow ! . findOne ( webhookData . workflow . id ! ) ;
if ( workflowData === undefined ) {
2019-08-28 08:16:09 -07:00
throw new ResponseHelper . ResponseError ( ` Could not find workflow with id " ${ webhookData . workflow . id } " ` , 404 , 404 ) ;
2019-06-23 03:35:23 -07:00
}
return new Promise ( ( resolve , reject ) = > {
WebhookHelpers . executeWebhook ( webhookData , workflowData , workflowStartNode , executionMode , undefined , req , res , ( error : Error | null , data : object ) = > {
if ( error !== null ) {
return reject ( error ) ;
}
resolve ( data ) ;
} ) ;
} ) ;
}
/ * *
* Returns the ids of the currently active workflows
*
* @returns { string [ ] }
* @memberof ActiveWorkflowRunner
* /
getActiveWorkflows ( ) : string [ ] {
if ( this . activeWorkflows === null ) {
return [ ] ;
}
return this . activeWorkflows . allActiveWorkflows ( ) ;
}
/ * *
* Returns if the workflow is active
*
* @param { string } id The id of the workflow to check
* @returns { boolean }
* @memberof ActiveWorkflowRunner
* /
isActive ( id : string ) : boolean {
if ( this . activeWorkflows !== null ) {
return this . activeWorkflows . isActive ( id ) ;
}
return false ;
}
/ * *
* Return error if there was a problem activating the workflow
*
* @param { string } id The id of the workflow to return the error of
* @returns { ( IActivationError | undefined ) }
* @memberof ActiveWorkflowRunner
* /
getActivationError ( id : string ) : IActivationError | undefined {
if ( this . activationErrors [ id ] === undefined ) {
return undefined ;
}
return this . activationErrors [ id ] ;
}
/ * *
* Adds all the webhooks of the workflow
*
* @param { Workflow } workflow
* @param { IWorkflowExecuteAdditionalDataWorkflow } additionalData
* @param { WorkflowExecuteMode } mode
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
async addWorkflowWebhooks ( workflow : Workflow , additionalData : IWorkflowExecuteAdditionalDataWorkflow , mode : WorkflowExecuteMode ) : Promise < void > {
const webhooks = WebhookHelpers . getWorkflowWebhooks ( workflow , additionalData ) ;
for ( const webhookData of webhooks ) {
await this . activeWebhooks ! . add ( webhookData , mode ) ;
}
}
/ * *
* Remove all the webhooks of the workflow
*
* @param { string } workflowId
* @returns
* @memberof ActiveWorkflowRunner
* /
removeWorkflowWebhooks ( workflowId : string ) : Promise < boolean > {
return this . activeWebhooks ! . removeByWorkflowId ( workflowId ) ;
}
2019-08-08 11:38:25 -07:00
/ * *
* Return trigger function which gets the global functions from n8n - core
* and overwrites the emit to be able to start it in subprocess
*
* @param { IWorkflowDb } workflowData
* @param { IWorkflowExecuteAdditionalDataWorkflow } additionalData
* @param { WorkflowExecuteMode } mode
* @returns { IGetExecuteTriggerFunctions }
* @memberof ActiveWorkflowRunner
* /
getExecuteTriggerFunctions ( workflowData : IWorkflowDb , additionalData : IWorkflowExecuteAdditionalDataWorkflow , mode : WorkflowExecuteMode ) : IGetExecuteTriggerFunctions {
return ( ( workflow : Workflow , node : INode ) = > {
const returnFunctions = NodeExecuteFunctions . getExecuteTriggerFunctions ( workflow , node , additionalData , mode ) ;
returnFunctions . emit = ( data : INodeExecutionData [ ] [ ] ) : void = > {
const nodeExecutionStack : IExecuteData [ ] = [
{
node ,
data : {
main : data ,
}
}
] ;
const executionData : IRunExecutionData = {
startData : { } ,
resultData : {
runData : { } ,
} ,
executionData : {
contextData : { } ,
nodeExecutionStack ,
waitingExecution : { } ,
} ,
} ;
// Start the workflow
const runData : IWorkflowExecutionDataProcess = {
credentials : additionalData.credentials ,
executionMode : mode ,
executionData ,
workflowData ,
} ;
const workflowRunner = new WorkflowRunner ( ) ;
workflowRunner . run ( runData ) ;
} ;
return returnFunctions ;
} ) ;
}
2019-06-23 03:35:23 -07:00
/ * *
* Makes a workflow active
*
* @param { string } workflowId The id of the workflow to activate
* @param { IWorkflowDb } [ workflowData ] If workflowData is given it saves the DB query
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
async add ( workflowId : string , workflowData? : IWorkflowDb ) : Promise < void > {
if ( this . activeWorkflows === null ) {
throw new Error ( ` The "activeWorkflows" instance did not get initialized yet. ` ) ;
}
let workflowInstance : Workflow ;
try {
if ( workflowData === undefined ) {
workflowData = await Db . collections . Workflow ! . findOne ( workflowId ) as IWorkflowDb ;
}
if ( ! workflowData ) {
throw new Error ( ` Could not find workflow with id " ${ workflowId } ". ` ) ;
}
const nodeTypes = NodeTypes ( ) ;
workflowInstance = new Workflow ( workflowId , workflowData . nodes , workflowData . connections , workflowData . active , nodeTypes , workflowData . staticData , workflowData . settings ) ;
const canBeActivated = workflowInstance . checkIfWorkflowCanBeActivated ( [ 'n8n-nodes-base.start' ] ) ;
if ( canBeActivated === false ) {
throw new Error ( ` The workflow can not be activated because it does not contain any nodes which could start the workflow. Only workflows which have trigger or webhook nodes can be activated. ` ) ;
}
const mode = 'trigger' ;
2019-08-08 11:38:25 -07:00
const credentials = await WorkflowCredentials ( workflowData . nodes ) ;
const additionalData = await WorkflowExecuteAdditionalData . getBase ( mode , credentials ) ;
const getTriggerFunctions = this . getExecuteTriggerFunctions ( workflowData , additionalData , mode ) ;
2019-06-23 03:35:23 -07:00
// Add the workflows which have webhooks defined
await this . addWorkflowWebhooks ( workflowInstance , additionalData , mode ) ;
2019-08-08 11:38:25 -07:00
await this . activeWorkflows . add ( workflowId , workflowInstance , additionalData , getTriggerFunctions ) ;
2019-06-23 03:35:23 -07:00
if ( this . activationErrors [ workflowId ] !== undefined ) {
// If there were any activation errors delete them
delete this . activationErrors [ workflowId ] ;
}
} catch ( error ) {
// There was a problem activating the workflow
// Save the error
this . activationErrors [ workflowId ] = {
time : new Date ( ) . getTime ( ) ,
error : {
message : error.message ,
} ,
} ;
throw error ;
}
2019-08-08 11:38:25 -07:00
// If for example webhooks get created it sometimes has to save the
// id of them in the static data. So make sure that data gets persisted.
2019-06-23 03:35:23 -07:00
await WorkflowHelpers . saveStaticData ( workflowInstance ! ) ;
}
/ * *
* Makes a workflow inactive
*
* @param { string } workflowId The id of the workflow to deactivate
* @returns { Promise < void > }
* @memberof ActiveWorkflowRunner
* /
async remove ( workflowId : string ) : Promise < void > {
if ( this . activeWorkflows !== null ) {
const workflowData = this . activeWorkflows . get ( workflowId ) ;
// Remove all the webhooks of the workflow
await this . removeWorkflowWebhooks ( workflowId ) ;
if ( workflowData ) {
// Save the static workflow data if needed
await WorkflowHelpers . saveStaticData ( workflowData . workflow ) ;
}
if ( this . activationErrors [ workflowId ] !== undefined ) {
// If there were any activation errors delete them
delete this . activationErrors [ workflowId ] ;
}
// Remove the workflow from the "list" of active workflows
return this . activeWorkflows . remove ( workflowId ) ;
}
throw new Error ( ` The "activeWorkflows" instance did not get initialized yet. ` ) ;
}
}
let workflowRunnerInstance : ActiveWorkflowRunner | undefined ;
export function getInstance ( ) : ActiveWorkflowRunner {
if ( workflowRunnerInstance === undefined ) {
workflowRunnerInstance = new ActiveWorkflowRunner ( ) ;
}
return workflowRunnerInstance ;
}