Add additional possibilities to load workflow

This commit is contained in:
Jan Oberhauser 2020-01-02 17:13:53 -06:00
parent 7707312715
commit 629ab09135
4 changed files with 188 additions and 13 deletions

View file

@ -20,6 +20,7 @@ import {
import {
IDataObject,
IExecuteData,
IExecuteWorkflowInfo,
INode,
INodeParameters,
INodeExecutionData,
@ -270,23 +271,32 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
* @param {INodeExecutionData[]} [inputData]
* @returns {(Promise<Array<INodeExecutionData[] | null>>)}
*/
export async function executeWorkflow(workflowId: string, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[]): Promise<Array<INodeExecutionData[] | null>> {
export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[]): Promise<Array<INodeExecutionData[] | null>> {
const mode = 'integrated';
if (workflowInfo.id === undefined && workflowInfo.code === undefined) {
throw new Error(`No information about the workflow to execute found. Please provide either the "id" or "code"!`);
}
if (Db.collections!.Workflow === null) {
// The first time executeWorkflow gets called the Database has
// to get initialized first
await Db.init();
}
const workflowData = await Db.collections!.Workflow!.findOne(workflowId);
let workflowData: IWorkflowBase | undefined;
if (workflowInfo.id !== undefined) {
workflowData = await Db.collections!.Workflow!.findOne(workflowInfo.id);
if (workflowData === undefined) {
throw new Error(`The workflow with the id "${workflowId}" does not exist.`);
throw new Error(`The workflow with the id "${workflowInfo.id}" does not exist.`);
}
} else {
workflowData = workflowInfo.code;
}
const nodeTypes = NodeTypes();
const workflow = new Workflow(workflowId as string | undefined, workflowData!.nodes, workflowData!.connections, workflowData!.active, nodeTypes, workflowData!.staticData);
const workflow = new Workflow(workflowInfo.id, workflowData!.nodes, workflowData!.connections, workflowData!.active, nodeTypes, workflowData!.staticData);
// Does not get used so set it simply to empty string
const executionId = '';
@ -294,7 +304,7 @@ export async function executeWorkflow(workflowId: string, additionalData: IWorkf
// Create new additionalData to have different workflow loaded and to call
// different webooks
const additionalDataIntegrated = await getBase(additionalData.credentials);
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(mode, executionId, workflowData, { parentProcessMode: additionalData.hooks!.mode });
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(mode, executionId, workflowData!, { parentProcessMode: additionalData.hooks!.mode });
// Find Start-Node
const requiredNodeTypes = ['n8n-nodes-base.start'];

View file

@ -13,6 +13,7 @@ import {
IDataObject,
IExecuteFunctions,
IExecuteSingleFunctions,
IExecuteWorkflowInfo,
INode,
INodeExecutionData,
INodeParameters,
@ -430,9 +431,8 @@ export function getExecuteTriggerFunctions(workflow: Workflow, node: INode, addi
export function getExecuteFunctions(workflow: Workflow, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IExecuteFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
return {
async executeWorkflow(workflowId: string, inputData?: INodeExecutionData[]): Promise<any> { // tslint:disable-line:no-any
// return additionalData.executeWorkflow(workflowId, additionalData, inputData);
return additionalData.executeWorkflow(workflowId, additionalData, inputData);
async executeWorkflow(workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[]): Promise<any> { // tslint:disable-line:no-any
return additionalData.executeWorkflow(workflowInfo, additionalData, inputData);
},
getContext(type: string): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);

View file

@ -1,5 +1,12 @@
import { OptionsWithUri } from 'request';
import {
readFile as fsReadFile,
} from 'fs';
import { promisify } from 'util';
const fsReadFileAsync = promisify(fsReadFile);
import { IExecuteFunctions } from 'n8n-core';
import {
ILoadOptionsFunctions,
@ -7,6 +14,8 @@ import {
INodePropertyOptions,
INodeType,
INodeTypeDescription,
IExecuteWorkflowInfo,
IWorkflowBase,
} from 'n8n-workflow';
@ -26,10 +35,50 @@ export class ExecuteWorkflow implements INodeType {
inputs: ['main'],
outputs: ['main'],
properties: [
{
displayName: 'Source',
name: 'source',
type: 'options',
options: [
{
name: 'Database',
value: 'database',
description: 'Load the workflow from the database by ID.',
},
{
name: 'Local File',
value: 'localFile',
description: 'Load the workflow from a locally saved file.',
},
{
name: 'Parameter',
value: 'parameter',
description: 'Load the workflow from a parameter.',
},
{
name: 'URL',
value: 'url',
description: 'Load the workflow from an URL.',
},
],
default: 'database',
description: 'Where to get the workflow to execute from.',
},
// ----------------------------------
// source:database
// ----------------------------------
{
displayName: 'Workflow',
name: 'workflowId',
type: 'options',
displayOptions: {
show: {
source: [
'database',
],
},
},
typeOptions: {
loadOptionsMethod: 'getWorkflows',
},
@ -37,6 +86,70 @@ export class ExecuteWorkflow implements INodeType {
required: true,
description: 'The workflow to execute.',
},
// ----------------------------------
// source:localFile
// ----------------------------------
{
displayName: 'Workflow Path',
name: 'workflowPath',
type: 'string',
displayOptions: {
show: {
source: [
'localFile',
],
},
},
default: '',
placeholder: '/data/workflow.json',
required: true,
description: 'The path to local JSON workflow file to execute.',
},
// ----------------------------------
// source:parameter
// ----------------------------------
{
displayName: 'Workflow JSON',
name: 'workflowJson',
type: 'string',
typeOptions: {
alwaysOpenEditWindow: true,
editor: 'code',
rows: 10,
},
displayOptions: {
show: {
source: [
'parameter',
],
},
},
default: '\n\n\n',
required: true,
description: 'The workflow JSON code to execute.',
},
// ----------------------------------
// source:url
// ----------------------------------
{
displayName: 'Workflow URL',
name: 'workflowUrl',
type: 'string',
displayOptions: {
show: {
source: [
'url',
],
},
},
default: '',
placeholder: 'https://example.com/workflow.json',
required: true,
description: 'The URL from which to load the workflow from.',
},
]
};
@ -67,8 +180,55 @@ export class ExecuteWorkflow implements INodeType {
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const workflowId = this.getNodeParameter('workflowId', 0) as string;
const receivedData = await this.executeWorkflow(workflowId, items);
const source = this.getNodeParameter('source', 0) as string;
const workflowInfo: IExecuteWorkflowInfo = {};
if (source === 'database') {
// Read workflow from database
workflowInfo.id = this.getNodeParameter('workflowId', 0) as string;
} else if (source === 'localFile') {
// Read workflow from filesystem
const workflowPath = this.getNodeParameter('workflowPath', 0) as string;
let workflowJson;
try {
workflowJson = await fsReadFileAsync(workflowPath, { encoding: 'utf8' }) as string;
} catch (error) {
if (error.code === 'ENOENT') {
throw new Error(`The file "${workflowPath}" could not be found.`);
}
throw error;
}
workflowInfo.code = JSON.parse(workflowJson) as IWorkflowBase;
} else if (source === 'parameter') {
// Read workflow from parameter
const workflowJson = this.getNodeParameter('workflowJson', 0) as string;
workflowInfo.code = JSON.parse(workflowJson) as IWorkflowBase;
} else if (source === 'url') {
// Read workflow from url
const workflowUrl = this.getNodeParameter('workflowUrl', 0) as string;
const requestOptions = {
headers: {
'accept': 'application/json,text/*;q=0.99',
},
method: 'GET',
uri: workflowUrl,
json: true,
gzip: true,
};
const response = await this.helpers.request(requestOptions);
workflowInfo.code = response;
}
const receivedData = await this.executeWorkflow(workflowInfo, items);
return receivedData;
}

View file

@ -154,7 +154,7 @@ export interface IExecuteContextData {
export interface IExecuteFunctions {
executeWorkflow(workflowId: string, inputData?: INodeExecutionData[]): Promise<any>; // tslint:disable-line:no-any
executeWorkflow(workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[]): Promise<any>; // tslint:disable-line:no-any
getContext(type: string): IContextObject;
getCredentials(type: string): ICredentialDataDecryptedObject | undefined;
getInputData(inputIndex?: number, inputName?: string): INodeExecutionData[];
@ -186,6 +186,11 @@ export interface IExecuteSingleFunctions {
};
}
export interface IExecuteWorkflowInfo {
code?: IWorkflowBase;
id?: string;
}
export interface ILoadOptionsFunctions {
getCredentials(type: string): ICredentialDataDecryptedObject | undefined;
getNodeParameter(parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object; //tslint:disable-line:no-any
@ -636,7 +641,7 @@ export interface IWorkflowExecuteHooks {
export interface IWorkflowExecuteAdditionalData {
credentials: IWorkflowCredentials;
encryptionKey: string;
executeWorkflow: (workflowId: string, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[]) => Promise<any>; // tslint:disable-line:no-any
executeWorkflow: (workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[]) => Promise<any>; // tslint:disable-line:no-any
// hooks?: IWorkflowExecuteHooks;
hooks?: WorkflowHooks;
httpResponse?: express.Response;