2019-08-08 11:38:25 -07:00
import {
ActiveExecutions ,
2020-05-14 05:27:19 -07:00
CredentialsOverwrites ,
CredentialTypes ,
2020-05-05 15:59:58 -07:00
ExternalHooks ,
2020-05-14 05:27:19 -07:00
ICredentialsOverwrite ,
ICredentialsTypeData ,
2019-08-08 11:38:25 -07:00
IProcessMessageDataHook ,
ITransferNodeTypes ,
IWorkflowExecutionDataProcess ,
IWorkflowExecutionDataProcessWithExecution ,
2020-01-17 17:34:31 -08:00
NodeTypes ,
2019-08-08 11:38:25 -07:00
Push ,
WorkflowExecuteAdditionalData ,
2019-10-14 22:36:53 -07:00
WorkflowHelpers ,
2019-08-08 11:38:25 -07:00
} from './' ;
import {
IProcessMessage ,
2020-01-17 17:34:31 -08:00
WorkflowExecute ,
2019-08-08 11:38:25 -07:00
} from 'n8n-core' ;
import {
IExecutionError ,
IRun ,
2020-01-17 17:34:31 -08:00
Workflow ,
2019-12-19 14:07:55 -08:00
WorkflowHooks ,
2019-08-08 11:38:25 -07:00
WorkflowExecuteMode ,
} from 'n8n-workflow' ;
2020-01-17 17:34:31 -08:00
import * as config from '../config' ;
import * as PCancelable from 'p-cancelable' ;
2019-08-09 04:12:00 -07:00
import { join as pathJoin } from 'path' ;
2019-08-08 11:38:25 -07:00
import { fork } from 'child_process' ;
export class WorkflowRunner {
activeExecutions : ActiveExecutions.ActiveExecutions ;
2020-05-14 05:27:19 -07:00
credentialsOverwrites : ICredentialsOverwrite ;
2019-08-08 11:38:25 -07:00
push : Push.Push ;
constructor ( ) {
this . push = Push . getInstance ( ) ;
this . activeExecutions = ActiveExecutions . getInstance ( ) ;
2020-05-14 05:27:19 -07:00
this . credentialsOverwrites = CredentialsOverwrites ( ) . getAll ( ) ;
2019-08-08 11:38:25 -07:00
}
/ * *
* The process did send a hook message so execute the appropiate hook
*
2019-12-19 14:07:55 -08:00
* @param { WorkflowHooks } workflowHooks
2019-08-08 11:38:25 -07:00
* @param { IProcessMessageDataHook } hookData
* @memberof WorkflowRunner
* /
2019-12-19 14:07:55 -08:00
processHookMessage ( workflowHooks : WorkflowHooks , hookData : IProcessMessageDataHook ) {
workflowHooks . executeHookFunctions ( hookData . hook , hookData . parameters ) ;
2019-08-08 11:38:25 -07:00
}
/ * *
* The process did error
*
* @param { IExecutionError } error
* @param { Date } startedAt
* @param { WorkflowExecuteMode } executionMode
* @param { string } executionId
* @memberof WorkflowRunner
* /
processError ( error : IExecutionError , startedAt : Date , executionMode : WorkflowExecuteMode , executionId : string ) {
const fullRunData : IRun = {
data : {
resultData : {
error ,
runData : { } ,
} ,
} ,
finished : false ,
mode : executionMode ,
startedAt ,
stoppedAt : new Date ( ) ,
} ;
// Remove from active execution with empty data. That will
// set the execution to failed.
this . activeExecutions . remove ( executionId , fullRunData ) ;
// Also send to Editor UI
2019-12-19 14:07:55 -08:00
WorkflowExecuteAdditionalData . pushExecutionFinished ( executionMode , fullRunData , executionId ) ;
2019-08-08 11:38:25 -07:00
}
/ * *
2020-01-17 17:34:31 -08:00
* Run the workflow
2019-08-08 11:38:25 -07:00
*
* @param { IWorkflowExecutionDataProcess } data
2019-10-14 22:36:53 -07:00
* @param { boolean } [ loadStaticData ] If set will the static data be loaded from
* the workflow and added to input data
2019-08-08 11:38:25 -07:00
* @returns { Promise < string > }
* @memberof WorkflowRunner
* /
2019-10-14 22:36:53 -07:00
async run ( data : IWorkflowExecutionDataProcess , loadStaticData? : boolean ) : Promise < string > {
2020-05-05 15:59:58 -07:00
const externalHooks = ExternalHooks ( ) ;
await externalHooks . run ( 'workflow.execute' , [ data . workflowData , data . executionMode ] ) ;
2020-01-17 17:49:31 -08:00
const executionsProcess = config . get ( 'executions.process' ) as string ;
if ( executionsProcess === 'main' ) {
return this . runMainProcess ( data , loadStaticData ) ;
2020-01-17 17:34:31 -08:00
}
return this . runSubprocess ( data , loadStaticData ) ;
}
/ * *
* Run the workflow in current process
*
* @param { IWorkflowExecutionDataProcess } data
* @param { boolean } [ loadStaticData ] If set will the static data be loaded from
* the workflow and added to input data
* @returns { Promise < string > }
* @memberof WorkflowRunner
* /
2020-01-17 17:49:31 -08:00
async runMainProcess ( data : IWorkflowExecutionDataProcess , loadStaticData? : boolean ) : Promise < string > {
2020-01-17 17:34:31 -08:00
if ( loadStaticData === true && data . workflowData . id ) {
data . workflowData . staticData = await WorkflowHelpers . getStaticDataById ( data . workflowData . id as string ) ;
}
const nodeTypes = NodeTypes ( ) ;
2020-02-15 17:07:01 -08:00
const workflow = new Workflow ( { id : data.workflowData.id as string | undefined , name : data.workflowData.name , nodes : data.workflowData ! . nodes , connections : data.workflowData ! . connections , active : data.workflowData ! . active , nodeTypes , staticData : data.workflowData ! . staticData } ) ;
2020-01-17 17:34:31 -08:00
const additionalData = await WorkflowExecuteAdditionalData . getBase ( data . credentials ) ;
// Register the active execution
const executionId = this . activeExecutions . add ( data , undefined ) ;
additionalData . hooks = WorkflowExecuteAdditionalData . getWorkflowHooksMain ( data , executionId ) ;
let workflowExecution : PCancelable < IRun > ;
if ( data . executionData !== undefined ) {
const workflowExecute = new WorkflowExecute ( additionalData , data . executionMode , data . executionData ) ;
workflowExecution = workflowExecute . processRunExecutionData ( workflow ) ;
} else if ( data . runData === undefined || data . startNodes === undefined || data . startNodes . length === 0 || data . destinationNode === undefined ) {
// Execute all nodes
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute ( additionalData , data . executionMode ) ;
workflowExecution = workflowExecute . run ( workflow , undefined , data . destinationNode ) ;
} else {
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute ( additionalData , data . executionMode ) ;
workflowExecution = workflowExecute . runPartialWorkflow ( workflow , data . runData , data . startNodes , data . destinationNode ) ;
}
this . activeExecutions . attachWorkflowExecution ( executionId , workflowExecution ) ;
2020-07-29 05:12:54 -07:00
// Soft timeout to stop workflow execution after current running node
let executionTimeout : NodeJS.Timeout ;
let workflowTimeout = config . get ( 'executions.timeout' ) as number > 0 && config . get ( 'executions.timeout' ) as number ; // initialize with default
if ( data . workflowData . settings && data . workflowData . settings . executionTimeout ) {
2020-07-29 05:19:35 -07:00
workflowTimeout = data . workflowData . settings ! . executionTimeout as number > 0 && data . workflowData . settings ! . executionTimeout as number ; // preference on workflow setting
2020-07-29 05:12:54 -07:00
}
if ( workflowTimeout ) {
const timeout = Math . min ( workflowTimeout , config . get ( 'executions.maxTimeout' ) as number ) * 1000 ; // as seconds
executionTimeout = setTimeout ( ( ) = > {
2020-07-29 05:19:35 -07:00
this . activeExecutions . stopExecution ( executionId , 'timeout' ) ;
} , timeout ) ;
2020-07-29 05:12:54 -07:00
}
2020-04-29 10:33:03 -07:00
workflowExecution . then ( ( fullRunData ) = > {
2020-07-29 05:12:54 -07:00
clearTimeout ( executionTimeout ) ;
if ( workflowExecution . isCanceled ) {
fullRunData . finished = false ;
}
2020-04-29 10:33:03 -07:00
this . activeExecutions . remove ( executionId , fullRunData ) ;
2020-07-29 05:19:35 -07:00
} ) ;
2020-04-29 10:33:03 -07:00
2020-01-17 17:34:31 -08:00
return executionId ;
}
/ * *
* Run the workflow
*
* @param { IWorkflowExecutionDataProcess } data
* @param { boolean } [ loadStaticData ] If set will the static data be loaded from
* the workflow and added to input data
* @returns { Promise < string > }
* @memberof WorkflowRunner
* /
async runSubprocess ( data : IWorkflowExecutionDataProcess , loadStaticData? : boolean ) : Promise < string > {
2019-08-08 11:38:25 -07:00
const startedAt = new Date ( ) ;
2019-08-09 04:12:00 -07:00
const subprocess = fork ( pathJoin ( __dirname , 'WorkflowRunnerProcess.js' ) ) ;
2019-08-08 11:38:25 -07:00
2019-10-14 22:36:53 -07:00
if ( loadStaticData === true && data . workflowData . id ) {
data . workflowData . staticData = await WorkflowHelpers . getStaticDataById ( data . workflowData . id as string ) ;
}
2019-08-08 11:38:25 -07:00
// Register the active execution
2020-01-17 17:34:31 -08:00
const executionId = this . activeExecutions . add ( data , subprocess ) ;
2019-08-08 11:38:25 -07:00
2019-12-19 14:07:55 -08:00
// Check if workflow contains a "executeWorkflow" Node as in this
2020-05-16 10:05:40 -07:00
// case we can not know which nodeTypes and credentialTypes will
// be needed and so have to load all of them in the workflowRunnerProcess
2019-12-19 14:07:55 -08:00
let loadAllNodeTypes = false ;
for ( const node of data . workflowData . nodes ) {
if ( node . type === 'n8n-nodes-base.executeWorkflow' ) {
loadAllNodeTypes = true ;
break ;
}
}
let nodeTypeData : ITransferNodeTypes ;
2020-05-16 10:05:40 -07:00
let credentialTypeData : ICredentialsTypeData ;
2019-12-19 14:07:55 -08:00
if ( loadAllNodeTypes === true ) {
2020-05-16 10:05:40 -07:00
// Supply all nodeTypes and credentialTypes
2019-12-19 14:07:55 -08:00
nodeTypeData = WorkflowHelpers . getAllNodeTypeData ( ) ;
2020-05-16 10:05:40 -07:00
const credentialTypes = CredentialTypes ( ) ;
credentialTypeData = credentialTypes . credentialTypes ;
2019-12-19 14:07:55 -08:00
} else {
2020-05-16 10:05:40 -07:00
// Supply only nodeTypes and credentialTypes which the workflow needs
2019-12-19 14:07:55 -08:00
nodeTypeData = WorkflowHelpers . getNodeTypeData ( data . workflowData . nodes ) ;
2020-05-16 10:05:40 -07:00
credentialTypeData = WorkflowHelpers . getCredentialsData ( data . credentials ) ;
2019-12-19 14:07:55 -08:00
}
2019-08-08 11:38:25 -07:00
2020-05-14 05:27:19 -07:00
2019-08-08 11:38:25 -07:00
( data as unknown as IWorkflowExecutionDataProcessWithExecution ) . executionId = executionId ;
( data as unknown as IWorkflowExecutionDataProcessWithExecution ) . nodeTypeData = nodeTypeData ;
2020-05-14 05:27:19 -07:00
( data as unknown as IWorkflowExecutionDataProcessWithExecution ) . credentialsOverwrite = this . credentialsOverwrites ;
( data as unknown as IWorkflowExecutionDataProcessWithExecution ) . credentialsTypeData = credentialTypeData ; // TODO: Still needs correct value
2019-08-08 11:38:25 -07:00
2019-12-19 14:07:55 -08:00
const workflowHooks = WorkflowExecuteAdditionalData . getWorkflowHooksMain ( data , executionId ) ;
2019-08-08 11:38:25 -07:00
// Send all data to subprocess it needs to run the workflow
subprocess . send ( { type : 'startWorkflow' , data } as IProcessMessage ) ;
2020-07-29 05:12:54 -07:00
// Start timeout for the execution
let executionTimeout : NodeJS.Timeout ;
let workflowTimeout = config . get ( 'executions.timeout' ) as number > 0 && config . get ( 'executions.timeout' ) as number ; // initialize with default
if ( data . workflowData . settings && data . workflowData . settings . executionTimeout ) {
2020-07-29 05:19:35 -07:00
workflowTimeout = data . workflowData . settings ! . executionTimeout as number > 0 && data . workflowData . settings ! . executionTimeout as number ; // preference on workflow setting
2020-07-29 05:12:54 -07:00
}
if ( workflowTimeout ) {
const timeout = Math . min ( workflowTimeout , config . get ( 'executions.maxTimeout' ) as number ) * 1000 ; // as seconds
executionTimeout = setTimeout ( ( ) = > {
2020-07-29 05:19:35 -07:00
this . activeExecutions . stopExecution ( executionId , 'timeout' ) ;
2020-07-29 05:12:54 -07:00
2020-07-29 05:19:35 -07:00
executionTimeout = setTimeout ( ( ) = > subprocess . kill ( ) , Math . max ( timeout * 0.2 , 5000 ) ) ; // minimum 5 seconds
} , timeout ) ;
2020-07-29 05:12:54 -07:00
}
2019-08-08 11:38:25 -07:00
// Listen to data from the subprocess
subprocess . on ( 'message' , ( message : IProcessMessage ) = > {
if ( message . type === 'end' ) {
2020-07-29 05:12:54 -07:00
clearTimeout ( executionTimeout ) ;
2019-08-08 11:38:25 -07:00
this . activeExecutions . remove ( executionId ! , message . data . runData ) ;
2020-07-29 05:12:54 -07:00
} else if ( message . type === 'processError' ) {
clearTimeout ( executionTimeout ) ;
2019-08-08 11:38:25 -07:00
const executionError = message . data . executionError as IExecutionError ;
this . processError ( executionError , startedAt , data . executionMode , executionId ) ;
} else if ( message . type === 'processHook' ) {
2019-12-19 14:07:55 -08:00
this . processHookMessage ( workflowHooks , message . data as IProcessMessageDataHook ) ;
2020-07-29 05:12:54 -07:00
} else if ( message . type === 'timeout' ) {
// Execution timed out and its process has been terminated
const timeoutError = { message : 'Workflow execution timed out!' } as IExecutionError ;
this . processError ( timeoutError , startedAt , data . executionMode , executionId ) ;
2019-08-08 11:38:25 -07:00
}
} ) ;
2020-07-29 05:12:54 -07:00
// Also get informed when the processes does exit especially when it did crash or timed out
2019-08-08 11:38:25 -07:00
subprocess . on ( 'exit' , ( code , signal ) = > {
2020-07-29 05:12:54 -07:00
if ( signal === 'SIGTERM' ) {
// Execution timed out and its process has been terminated
const timeoutError = {
message : 'Workflow execution timed out!' ,
} as IExecutionError ;
this . processError ( timeoutError , startedAt , data . executionMode , executionId ) ;
} else if ( code !== 0 ) {
2019-08-08 11:38:25 -07:00
// Process did exit with error code, so something went wrong.
const executionError = {
message : 'Workflow execution process did crash for an unknown reason!' ,
} as IExecutionError ;
this . processError ( executionError , startedAt , data . executionMode , executionId ) ;
}
2020-07-29 05:12:54 -07:00
clearTimeout ( executionTimeout ) ;
2019-08-08 11:38:25 -07:00
} ) ;
return executionId ;
}
}