From 584033ab4a99c6047b31df50ca038d071b7a01ac Mon Sep 17 00:00:00 2001 From: Jan Oberhauser Date: Tue, 31 Dec 2019 14:19:37 -0600 Subject: [PATCH] :sparkles: Add polling support to Trigger-Nodes --- packages/cli/src/ActiveWorkflowRunner.ts | 105 +++++--- packages/cli/src/NodeTypes.ts | 10 + packages/core/package.json | 1 + packages/core/src/ActiveWorkflows.ts | 125 ++++++++- packages/core/src/Interfaces.ts | 43 +++- packages/core/src/NodeExecuteFunctions.ts | 52 ++++ .../nodes/Toggl/GenericFunctions.ts | 21 +- .../nodes/Toggl/TogglTrigger.node.ts | 240 ++---------------- packages/nodes-base/package.json | 6 +- packages/workflow/src/Interfaces.ts | 24 ++ packages/workflow/src/NodeHelpers.ts | 188 ++++++++++++++ packages/workflow/src/Workflow.ts | 74 +++++- 12 files changed, 600 insertions(+), 289 deletions(-) diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 6a0e5cb02b..9604fd9de6 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -21,6 +21,7 @@ import { import { IExecuteData, + IGetExecutePollFunctions, IGetExecuteTriggerFunctions, INode, INodeExecutionData, @@ -218,6 +219,73 @@ export class ActiveWorkflowRunner { } + /** + * Runs the given workflow + * + * @param {IWorkflowDb} workflowData + * @param {INode} node + * @param {INodeExecutionData[][]} data + * @param {IWorkflowExecuteAdditionalDataWorkflow} additionalData + * @param {WorkflowExecuteMode} mode + * @returns + * @memberof ActiveWorkflowRunner + */ + runWorkflow(workflowData: IWorkflowDb, node: INode, data: INodeExecutionData[][], additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode) { + const nodeExecutionStack: IExecuteData[] = [ + { + node, + data: { + main: data, + } + } + ]; + + const executionData: IRunExecutionData = { + startData: {}, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack, + waitingExecution: {}, + }, + }; + + // Start the workflow + const runData: IWorkflowExecutionDataProcess = { + credentials: additionalData.credentials, + executionMode: mode, + executionData, + workflowData, + }; + + const workflowRunner = new WorkflowRunner(); + return workflowRunner.run(runData, true); + } + + + /** + * Return poll function which gets the global functions from n8n-core + * and overwrites the __emit to be able to start it in subprocess + * + * @param {IWorkflowDb} workflowData + * @param {IWorkflowExecuteAdditionalDataWorkflow} additionalData + * @param {WorkflowExecuteMode} mode + * @returns {IGetExecutePollFunctions} + * @memberof ActiveWorkflowRunner + */ + getExecutePollFunctions(workflowData: IWorkflowDb, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): IGetExecutePollFunctions { + return ((workflow: Workflow, node: INode) => { + const returnFunctions = NodeExecuteFunctions.getExecutePollFunctions(workflow, node, additionalData, mode); + returnFunctions.__emit = (data: INodeExecutionData[][]): void => { + this.runWorkflow(workflowData, node, data, additionalData, mode); + }; + return returnFunctions; + }); + } + + /** * Return trigger function which gets the global functions from n8n-core * and overwrites the emit to be able to start it in subprocess @@ -232,43 +300,13 @@ export class ActiveWorkflowRunner { return ((workflow: Workflow, node: INode) => { const returnFunctions = NodeExecuteFunctions.getExecuteTriggerFunctions(workflow, node, additionalData, mode); returnFunctions.emit = (data: INodeExecutionData[][]): void => { - - const nodeExecutionStack: IExecuteData[] = [ - { - node, - data: { - main: data, - } - } - ]; - - const executionData: IRunExecutionData = { - startData: {}, - resultData: { - runData: {}, - }, - executionData: { - contextData: {}, - nodeExecutionStack, - waitingExecution: {}, - }, - }; - - // Start the workflow - const runData: IWorkflowExecutionDataProcess = { - credentials: additionalData.credentials, - executionMode: mode, - executionData, - workflowData, - }; - - const workflowRunner = new WorkflowRunner(); - workflowRunner.run(runData, true); + this.runWorkflow(workflowData, node, data, additionalData, mode); }; return returnFunctions; }); } + /** * Makes a workflow active * @@ -303,10 +341,11 @@ export class ActiveWorkflowRunner { const credentials = await WorkflowCredentials(workflowData.nodes); const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials); const getTriggerFunctions = this.getExecuteTriggerFunctions(workflowData, additionalData, mode); + const getPollFunctions = this.getExecutePollFunctions(workflowData, additionalData, mode); // Add the workflows which have webhooks defined await this.addWorkflowWebhooks(workflowInstance, additionalData, mode); - await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions); + await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions); if (this.activationErrors[workflowId] !== undefined) { // If there were any activation errors delete them diff --git a/packages/cli/src/NodeTypes.ts b/packages/cli/src/NodeTypes.ts index aaae63effd..66b2363bfe 100644 --- a/packages/cli/src/NodeTypes.ts +++ b/packages/cli/src/NodeTypes.ts @@ -2,6 +2,7 @@ import { INodeType, INodeTypes, INodeTypeData, + NodeHelpers, } from 'n8n-workflow'; @@ -11,6 +12,15 @@ class NodeTypesClass implements INodeTypes { async init(nodeTypes: INodeTypeData): Promise { + // Some nodeTypes need to get special parameters applied like the + // polling nodes the polling times + for (const nodeTypeData of Object.values(nodeTypes)) { + const applyParameters = NodeHelpers.getSpecialNodeParameters(nodeTypeData.type) + + if (applyParameters.length) { + nodeTypeData.type.description.properties.unshift.apply(nodeTypeData.type.description.properties, applyParameters); + } + } this.nodeTypes = nodeTypes; } diff --git a/packages/core/package.json b/packages/core/package.json index 716f466f02..7186564ec2 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -39,6 +39,7 @@ "typescript": "~3.7.4" }, "dependencies": { + "cron": "^1.7.2", "crypto-js": "^3.1.9-1", "lodash.get": "^4.4.2", "mmmagic": "^0.5.2", diff --git a/packages/core/src/ActiveWorkflows.ts b/packages/core/src/ActiveWorkflows.ts index 0a7b3df268..9cd4f0079a 100644 --- a/packages/core/src/ActiveWorkflows.ts +++ b/packages/core/src/ActiveWorkflows.ts @@ -1,19 +1,23 @@ +import { CronJob } from 'cron'; + import { + IGetExecutePollFunctions, IGetExecuteTriggerFunctions, + INode, + IPollResponse, ITriggerResponse, IWorkflowExecuteAdditionalData, Workflow, } from 'n8n-workflow'; - -export interface WorkflowData { - workflow: Workflow; - triggerResponse?: ITriggerResponse; -} +import { + ITriggerTime, + IWorkflowData, +} from './'; export class ActiveWorkflows { private workflowData: { - [key: string]: WorkflowData; + [key: string]: IWorkflowData; } = {}; @@ -48,7 +52,7 @@ export class ActiveWorkflows { * @returns {(WorkflowData | undefined)} * @memberof ActiveWorkflows */ - get(id: string): WorkflowData | undefined { + get(id: string): IWorkflowData | undefined { return this.workflowData[id]; } @@ -62,7 +66,7 @@ export class ActiveWorkflows { * @returns {Promise} * @memberof ActiveWorkflows */ - async add(id: string, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, getTriggerFunctions: IGetExecuteTriggerFunctions): Promise { + async add(id: string, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, getTriggerFunctions: IGetExecuteTriggerFunctions, getPollFunctions: IGetExecutePollFunctions): Promise { console.log('ADD ID (active): ' + id); this.workflowData[id] = { @@ -78,9 +82,110 @@ export class ActiveWorkflows { this.workflowData[id].triggerResponse = triggerResponse; } } + + const pollNodes = workflow.getPollNodes(); + for (const pollNode of pollNodes) { + this.workflowData[id].pollResponse = await this.activatePolling(pollNode, workflow, additionalData, getPollFunctions); + } } + /** + * Activates polling for the given node + * + * @param {INode} node + * @param {Workflow} workflow + * @param {IWorkflowExecuteAdditionalData} additionalData + * @param {IGetExecutePollFunctions} getPollFunctions + * @returns {Promise} + * @memberof ActiveWorkflows + */ + async activatePolling(node: INode, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, getPollFunctions: IGetExecutePollFunctions): Promise { + const mode = 'trigger'; + + const pollFunctions = getPollFunctions(workflow, node, additionalData, mode); + + const pollTimes = pollFunctions.getNodeParameter('pollTimes') as unknown as { + item: ITriggerTime[]; + }; + + // Define the order the cron-time-parameter appear + const parameterOrder = [ + 'second', // 0 - 59 + 'minute', // 0 - 59 + 'hour', // 0 - 23 + 'dayOfMonth', // 1 - 31 + 'month', // 0 - 11(Jan - Dec) + 'weekday', // 0 - 6(Sun - Sat) + ]; + + // Get all the trigger times + const cronTimes: string[] = []; + let cronTime: string[]; + let parameterName: string; + if (pollTimes.item !== undefined) { + for (const item of pollTimes.item) { + cronTime = []; + if (item.mode === 'custom') { + cronTimes.push(item.cronExpression as string); + continue; + } + if (item.mode === 'everyMinute') { + cronTimes.push(`${Math.floor(Math.random() * 60).toString()} * * * * *`); + continue; + } + + for (parameterName of parameterOrder) { + if (item[parameterName] !== undefined) { + // Value is set so use it + cronTime.push(item[parameterName] as string); + } else if (parameterName === 'second') { + // For seconds we use by default a random one to make sure to + // balance the load a little bit over time + cronTime.push(Math.floor(Math.random() * 60).toString()); + } else { + // For all others set "any" + cronTime.push('*'); + } + } + + cronTimes.push(cronTime.join(' ')); + } + } + + // The trigger function to execute when the cron-time got reached + const executeTrigger = async () => { + const pollResponse = await workflow.runPoll(node, pollFunctions); + + if (pollResponse !== null) { + // TODO: Run workflow + pollFunctions.__emit(pollResponse); + } + }; + + // Execute the trigger directly to be able to know if it works + await executeTrigger(); + + const timezone = pollFunctions.getTimezone(); + + // Start the cron-jobs + const cronJobs: CronJob[] = []; + for (const cronTime of cronTimes) { + cronJobs.push(new CronJob(cronTime, executeTrigger, undefined, true, timezone)); + } + + // Stop the cron-jobs + async function closeFunction() { + for (const cronJob of cronJobs) { + cronJob.stop(); + } + } + + return { + closeFunction, + }; + } + /** * Makes a workflow inactive @@ -103,6 +208,10 @@ export class ActiveWorkflows { await workflowData.triggerResponse.closeFunction(); } + if (workflowData.pollResponse && workflowData.pollResponse.closeFunction) { + await workflowData.pollResponse.closeFunction(); + } + delete this.workflowData[id]; } diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index 76b6eea932..3b9a501969 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -8,13 +8,16 @@ import { ILoadOptionsFunctions as ILoadOptionsFunctionsBase, INodeExecutionData, INodeType, + IPollFunctions as IPollFunctionsBase, + IPollResponse, ITriggerFunctions as ITriggerFunctionsBase, + ITriggerResponse, IWebhookFunctions as IWebhookFunctionsBase, IWorkflowSettings as IWorkflowSettingsWorkflow, + Workflow, } from 'n8n-workflow'; -import * as request from 'request'; import * as requestPromise from 'request-promise-native'; interface Constructable { @@ -31,7 +34,7 @@ export interface IProcessMessage { export interface IExecuteFunctions extends IExecuteFunctionsBase { helpers: { prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise; - request: request.RequestAPI, + request: requestPromise.RequestPromiseAPI, returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[]; }; } @@ -40,7 +43,16 @@ export interface IExecuteFunctions extends IExecuteFunctionsBase { export interface IExecuteSingleFunctions extends IExecuteSingleFunctionsBase { helpers: { prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise; - request: request.RequestAPI < requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl >, + request: requestPromise.RequestPromiseAPI, + }; +} + + +export interface IPollFunctions extends IPollFunctionsBase { + helpers: { + prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise; + request: requestPromise.RequestPromiseAPI, + returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[]; }; } @@ -48,12 +60,22 @@ export interface IExecuteSingleFunctions extends IExecuteSingleFunctionsBase { export interface ITriggerFunctions extends ITriggerFunctionsBase { helpers: { prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise; - request: request.RequestAPI, + request: requestPromise.RequestPromiseAPI, returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[]; }; } +export interface ITriggerTime { + mode: string; + hour: number; + minute: number; + dayOfMonth: number; + weekeday: number; + [key: string]: string | number; +} + + export interface IUserSettings { encryptionKey?: string; tunnelSubdomain?: string; @@ -61,14 +83,14 @@ export interface IUserSettings { export interface ILoadOptionsFunctions extends ILoadOptionsFunctionsBase { helpers: { - request?: request.RequestAPI, + request?: requestPromise.RequestPromiseAPI, }; } export interface IHookFunctions extends IHookFunctionsBase { helpers: { - request: request.RequestAPI, + request: requestPromise.RequestPromiseAPI, }; } @@ -76,7 +98,7 @@ export interface IHookFunctions extends IHookFunctionsBase { export interface IWebhookFunctions extends IWebhookFunctionsBase { helpers: { prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise; - request: request.RequestAPI, + request: requestPromise.RequestPromiseAPI, returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[]; }; } @@ -98,3 +120,10 @@ export interface INodeDefinitionFile { export interface INodeInputDataConnections { [key: string]: INodeExecutionData[][]; } + + +export interface IWorkflowData { + pollResponse?: IPollResponse; + triggerResponse?: ITriggerResponse; + workflow: Workflow; +} diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 775c2fd754..4fc7c2dd40 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -17,6 +17,7 @@ import { INodeExecutionData, INodeParameters, INodeType, + IPollFunctions, IRunExecutionData, ITaskDataConnections, ITriggerFunctions, @@ -310,6 +311,57 @@ export function getWebhookDescription(name: string, workflow: Workflow, node: IN +/** + * Returns the execute functions the poll nodes have access to. + * + * @export + * @param {Workflow} workflow + * @param {INode} node + * @param {IWorkflowExecuteAdditionalData} additionalData + * @param {WorkflowExecuteMode} mode + * @returns {ITriggerFunctions} + */ +// TODO: Check if I can get rid of: additionalData, and so then maybe also at ActiveWorkflowRunner.add +export function getExecutePollFunctions(workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IPollFunctions { + return ((workflow: Workflow, node: INode) => { + return { + __emit: (data: INodeExecutionData[][]): void => { + throw new Error('Overwrite NodeExecuteFunctions.getExecutePullFunctions.__emit function!'); + }, + getCredentials(type: string): ICredentialDataDecryptedObject | undefined { + return getCredentials(workflow, node, type, additionalData); + }, + getMode: (): WorkflowExecuteMode => { + return mode; + }, + getNodeParameter: (parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object => { //tslint:disable-line:no-any + const runExecutionData: IRunExecutionData | null = null; + const itemIndex = 0; + const runIndex = 0; + const connectionInputData: INodeExecutionData[] = []; + + return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue); + }, + getRestApiUrl: (): string => { + return additionalData.restApiUrl; + }, + getTimezone: (): string => { + return getTimezone(workflow, additionalData); + }, + getWorkflowStaticData(type: string): IDataObject { + return workflow.getStaticData(type, node); + }, + helpers: { + prepareBinaryData, + request: requestPromise, + returnJsonArray, + }, + }; + })(workflow, node); +} + + + /** * Returns the execute functions the trigger nodes have access to. * diff --git a/packages/nodes-base/nodes/Toggl/GenericFunctions.ts b/packages/nodes-base/nodes/Toggl/GenericFunctions.ts index c971d7e2bb..37c6bdb80b 100644 --- a/packages/nodes-base/nodes/Toggl/GenericFunctions.ts +++ b/packages/nodes-base/nodes/Toggl/GenericFunctions.ts @@ -5,22 +5,21 @@ import { IHookFunctions, ILoadOptionsFunctions, IExecuteSingleFunctions, + IPollFunctions, ITriggerFunctions, - BINARY_ENCODING, - getLoadOptionsFunctions } from 'n8n-core'; import { IDataObject, } from 'n8n-workflow'; -export async function togglApiRequest(this: ITriggerFunctions | IHookFunctions | IExecuteFunctions | IExecuteSingleFunctions | ILoadOptionsFunctions, method: string, resource: string, body: any = {}, query?: IDataObject, uri?: string): Promise { // tslint:disable-line:no-any +export async function togglApiRequest(this: ITriggerFunctions | IPollFunctions | IHookFunctions | IExecuteFunctions | IExecuteSingleFunctions | ILoadOptionsFunctions, method: string, resource: string, body: any = {}, query?: IDataObject, uri?: string): Promise { // tslint:disable-line:no-any const credentials = this.getCredentials('togglApi'); if (credentials === undefined) { throw new Error('No credentials got returned!'); } const headerWithAuthentication = Object.assign({}, - { Authorization: ` Basic ${Buffer.from(`${credentials.username}:${credentials.password}`).toString(BINARY_ENCODING)}` }); + { Authorization: ` Basic ${Buffer.from(`${credentials.username}:${credentials.password}`).toString('base64')}` }); const options: OptionsWithUri = { headers: headerWithAuthentication, @@ -36,11 +35,15 @@ export async function togglApiRequest(this: ITriggerFunctions | IHookFunctions | try { return await this.helpers.request!(options); } catch (error) { - const errorMessage = error.response.body.message || error.response.body.Message; - - if (errorMessage !== undefined) { - throw errorMessage; + if (error.statusCode === 403) { + throw new Error('The Toggle credentials are probably invalid!'); } - throw error.response.body; + + const errorMessage = error.response.body && (error.response.body.message || error.response.body.Message); + if (errorMessage !== undefined) { + throw new Error(errorMessage); + } + + throw error; } } diff --git a/packages/nodes-base/nodes/Toggl/TogglTrigger.node.ts b/packages/nodes-base/nodes/Toggl/TogglTrigger.node.ts index 4b515ff26d..6080f13421 100644 --- a/packages/nodes-base/nodes/Toggl/TogglTrigger.node.ts +++ b/packages/nodes-base/nodes/Toggl/TogglTrigger.node.ts @@ -1,19 +1,18 @@ -import { ITriggerFunctions } from 'n8n-core'; +import { IPollFunctions } from 'n8n-core'; import { + INodeExecutionData, INodeType, INodeTypeDescription, - ITriggerResponse, IDataObject, } from 'n8n-workflow'; -import { CronJob } from 'cron'; import * as moment from 'moment'; import { togglApiRequest } from './GenericFunctions'; export class TogglTrigger implements INodeType { description: INodeTypeDescription = { displayName: 'Toggl', - name: 'Toggl', + name: 'toggl', icon: 'file:toggl.png', group: ['trigger'], version: 1, @@ -28,6 +27,7 @@ export class TogglTrigger implements INodeType { required: true, } ], + polling: true, inputs: [], outputs: ['main'], properties: [ @@ -44,232 +44,36 @@ export class TogglTrigger implements INodeType { required: true, default: 'newTimeEntry', }, - { - displayName: 'Mode', - name: 'mode', - type: 'options', - options: [ - { - name: 'Every Minute', - value: 'everyMinute' - }, - { - name: 'Every Hour', - value: 'everyHour' - }, - { - name: 'Every Day', - value: 'everyDay' - }, - { - name: 'Every Week', - value: 'everyWeek' - }, - { - name: 'Every Month', - value: 'everyMonth' - }, - { - name: 'Custom', - value: 'custom' - }, - ], - default: 'everyDay', - description: 'How often to trigger.', - }, - { - displayName: 'Hour', - name: 'hour', - type: 'number', - typeOptions: { - minValue: 0, - maxValue: 23, - }, - displayOptions: { - hide: { - mode: [ - 'custom', - 'everyHour', - 'everyMinute' - ], - }, - }, - default: 14, - description: 'The hour of the day to trigger (24h format).', - }, - { - displayName: 'Minute', - name: 'minute', - type: 'number', - typeOptions: { - minValue: 0, - maxValue: 59, - }, - displayOptions: { - hide: { - mode: [ - 'custom', - 'everyMinute' - ], - }, - }, - default: 0, - description: 'The minute of the day to trigger.', - }, - { - displayName: 'Day of Month', - name: 'dayOfMonth', - type: 'number', - displayOptions: { - show: { - mode: [ - 'everyMonth', - ], - }, - }, - typeOptions: { - minValue: 1, - maxValue: 31, - }, - default: 1, - description: 'The day of the month to trigger.', - }, - { - displayName: 'Weekday', - name: 'weekday', - type: 'options', - displayOptions: { - show: { - mode: [ - 'everyWeek', - ], - }, - }, - options: [ - { - name: 'Monday', - value: '1', - }, - { - name: 'Tuesday', - value: '2', - }, - { - name: 'Wednesday', - value: '3', - }, - { - name: 'Thursday', - value: '4', - }, - { - name: 'Friday', - value: '5', - }, - { - name: 'Saturday', - value: '6', - }, - { - name: 'Sunday', - value: '0', - }, - ], - default: '1', - description: 'The weekday to trigger.', - }, - { - displayName: 'Cron Expression', - name: 'cronExpression', - type: 'string', - displayOptions: { - show: { - mode: [ - 'custom', - ], - }, - }, - default: '* * * * * *', - description: 'Use custom cron expression. Values and ranges as follows:
  • Seconds: 0-59
  • Minutes: 0 - 59
  • Hours: 0 - 23
  • Day of Month: 1 - 31
  • Months: 0 - 11 (Jan - Dec)
  • Day of Week: 0 - 6 (Sun - Sat)
', - }, ] }; - async trigger(this: ITriggerFunctions): Promise { + async poll(this: IPollFunctions): Promise { const webhookData = this.getWorkflowStaticData('node'); - const mode = this.getNodeParameter('mode') as string; const event = this.getNodeParameter('event') as string; - // Get all the trigger times - let cronTime; let endpoint: string; - //let parameterName: string; - if (mode === 'custom') { - const cronExpression = this.getNodeParameter('cronExpression') as string; - cronTime = cronExpression as string; - } - if (mode === 'everyMinute') { - cronTime = `* * * * *`; - } - if (mode === 'everyHour') { - const minute = this.getNodeParameter('minute') as string; - cronTime = `${minute} * * * *`; - } - if (mode === 'everyDay') { - const hour = this.getNodeParameter('hour') as string; - const minute = this.getNodeParameter('minute') as string; - cronTime = `${minute} ${hour} * * *`; - } - if (mode === 'everyWeek') { - const weekday = this.getNodeParameter('weekday') as string; - const hour = this.getNodeParameter('hour') as string; - const minute = this.getNodeParameter('minute') as string; - cronTime = `${minute} ${hour} * * ${weekday}`; - } - if (mode === 'everyMonth') { - const dayOfMonth = this.getNodeParameter('dayOfMonth') as string; - const hour = this.getNodeParameter('hour') as string; - const minute = this.getNodeParameter('minute') as string; - cronTime = `${minute} ${hour} ${dayOfMonth} * *`; - } + if (event === 'newTimeEntry') { endpoint = '/time_entries'; + } else { + throw new Error(`The defined event "${event}" is not supported`); } - const executeTrigger = async () => { - const qs: IDataObject = {}; - let timeEntries = []; - qs.start_date = webhookData.lastTimeChecked; - qs.end_date = moment().format(); - try { - timeEntries = await togglApiRequest.call(this, 'GET', endpoint, {}, qs); - } catch (err) { - throw new Error(`Toggl Trigger Error: ${err}`); - } - if (Array.isArray(timeEntries) && timeEntries.length !== 0) { - this.emit([this.helpers.returnJsonArray(timeEntries)]); - } + const qs: IDataObject = {}; + let timeEntries = []; + qs.start_date = webhookData.lastTimeChecked; + qs.end_date = moment().format(); + + try { + timeEntries = await togglApiRequest.call(this, 'GET', endpoint, {}, qs); webhookData.lastTimeChecked = qs.end_date; - }; - - const timezone = this.getTimezone(); - - // Start the cron-jobs - const cronJob = new CronJob(cronTime as string, executeTrigger, undefined, true, timezone); - - // Stop the cron-jobs - async function closeFunction() { - cronJob.stop(); + } catch (err) { + throw new Error(`Toggl Trigger Error: ${err}`); + } + if (Array.isArray(timeEntries) && timeEntries.length !== 0) { + return [this.helpers.returnJsonArray(timeEntries)]; } - async function manualTriggerFunction() { - executeTrigger(); - } - if (webhookData.lastTimeChecked === undefined) { - webhookData.lastTimeChecked = moment().format(); - } - return { - closeFunction, - manualTriggerFunction, - }; + return null; } + } diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index ac4cf7b610..8280bf53e7 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -156,8 +156,8 @@ "dist/nodes/Trello/Trello.node.js", "dist/nodes/Trello/TrelloTrigger.node.js", "dist/nodes/Twilio/Twilio.node.js", - "dist/nodes/Typeform/TypeformTrigger.node.js", - "dist/nodes/Toggl/TogglTrigger.node.js", + "dist/nodes/Typeform/TypeformTrigger.node.js", + "dist/nodes/Toggl/TogglTrigger.node.js", "dist/nodes/Vero/Vero.node.js", "dist/nodes/WriteBinaryFile.node.js", "dist/nodes/Webhook.node.js", @@ -194,7 +194,7 @@ "aws4": "^1.8.0", "basic-auth": "^2.0.1", "cheerio": "^1.0.0-rc.3", - "cron": "^1.6.0", + "cron": "^1.7.2", "glob-promise": "^3.4.0", "gm": "^1.23.1", "googleapis": "^46.0.0", diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 7824ee52cb..be0330d810 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -107,6 +107,10 @@ export interface IDataObject { } +export interface IGetExecutePollFunctions { + (workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IPollFunctions; +} + export interface IGetExecuteTriggerFunctions { (workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): ITriggerFunctions; } @@ -208,6 +212,19 @@ export interface IHookFunctions { }; } +export interface IPollFunctions { + __emit(data: INodeExecutionData[][]): void; + getCredentials(type: string): ICredentialDataDecryptedObject | undefined; + getMode(): WorkflowExecuteMode; + getNodeParameter(parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object; //tslint:disable-line:no-any + getRestApiUrl(): string; + getTimezone(): string; + getWorkflowStaticData(type: string): IDataObject; + helpers: { + [key: string]: (...args: any[]) => any //tslint:disable-line:no-any + }; +} + export interface ITriggerFunctions { emit(data: INodeExecutionData[][]): void; getCredentials(type: string): ICredentialDataDecryptedObject | undefined; @@ -285,6 +302,7 @@ export interface INodeExecutionData { export interface INodeExecuteFunctions { + getExecutePollFunctions: IGetExecutePollFunctions; getExecuteTriggerFunctions: IGetExecuteTriggerFunctions; getExecuteFunctions: IGetExecuteFunctions; getExecuteSingleFunctions: IGetExecuteSingleFunctions; @@ -363,6 +381,10 @@ export interface IParameterDependencies { [key: string]: string[]; } +export interface IPollResponse { + closeFunction?: () => Promise; +} + export interface ITriggerResponse { closeFunction?: () => Promise; // To manually trigger the run @@ -376,6 +398,7 @@ export interface INodeType { description: INodeTypeDescription; execute?(this: IExecuteFunctions): Promise; executeSingle?(this: IExecuteSingleFunctions): Promise; + poll?(this: IPollFunctions): Promise; trigger?(this: ITriggerFunctions): Promise; webhook?(this: IWebhookFunctions): Promise; hooks?: { @@ -447,6 +470,7 @@ export interface INodeTypeDescription { properties: INodeProperties[]; credentials?: INodeCredentialDescription[]; maxNodes?: number; // How many nodes of that type can be created in a workflow + polling?: boolean; subtitle?: string; hooks?: { [key: string]: INodeHookDescription[] | undefined; diff --git a/packages/workflow/src/NodeHelpers.ts b/packages/workflow/src/NodeHelpers.ts index a9f22441da..d398be8394 100644 --- a/packages/workflow/src/NodeHelpers.ts +++ b/packages/workflow/src/NodeHelpers.ts @@ -23,6 +23,194 @@ import { import { get } from 'lodash'; + + +/** + * Gets special parameters which should be added to nodeTypes depending + * on their type or configuration + * + * @export + * @param {INodeType} nodeType + * @returns + */ +export function getSpecialNodeParameters(nodeType: INodeType) { + if (nodeType.description.polling === true) { + return [ + { + displayName: 'Poll Times', + name: 'pollTimes', + type: 'fixedCollection', + typeOptions: { + multipleValues: true, + multipleValueButtonText: 'Add Poll Time', + }, + default: {}, + description: 'Time at which polling should occur.', + placeholder: 'Add Poll Time', + options: [ + { + name: 'item', + displayName: 'Item', + values: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Every Minute', + value: 'everyMinute', + }, + { + name: 'Every Hour', + value: 'everyHour', + }, + { + name: 'Every Day', + value: 'everyDay', + }, + { + name: 'Every Week', + value: 'everyWeek', + }, + { + name: 'Every Month', + value: 'everyMonth', + }, + { + name: 'Custom', + value: 'custom', + }, + ], + default: 'everyDay', + description: 'How often to trigger.', + }, + { + displayName: 'Hour', + name: 'hour', + type: 'number', + typeOptions: { + minValue: 0, + maxValue: 23, + }, + displayOptions: { + hide: { + mode: [ + 'custom', + 'everyHour', + 'everyMinute', + ], + }, + }, + default: 14, + description: 'The hour of the day to trigger (24h format).', + }, + { + displayName: 'Minute', + name: 'minute', + type: 'number', + typeOptions: { + minValue: 0, + maxValue: 59, + }, + displayOptions: { + hide: { + mode: [ + 'custom', + 'everyMinute', + ], + }, + }, + default: 0, + description: 'The minute of the day to trigger.', + }, + { + displayName: 'Day of Month', + name: 'dayOfMonth', + type: 'number', + displayOptions: { + show: { + mode: [ + 'everyMonth', + ], + }, + }, + typeOptions: { + minValue: 1, + maxValue: 31, + }, + default: 1, + description: 'The day of the month to trigger.', + }, + { + displayName: 'Weekday', + name: 'weekday', + type: 'options', + displayOptions: { + show: { + mode: [ + 'everyWeek', + ], + }, + }, + options: [ + { + name: 'Monday', + value: '1', + }, + { + name: 'Tuesday', + value: '2', + }, + { + name: 'Wednesday', + value: '3', + }, + { + name: 'Thursday', + value: '4', + }, + { + name: 'Friday', + value: '5', + }, + { + name: 'Saturday', + value: '6', + }, + { + name: 'Sunday', + value: '0', + }, + ], + default: '1', + description: 'The weekday to trigger.', + }, + { + displayName: 'Cron Expression', + name: 'cronExpression', + type: 'string', + displayOptions: { + show: { + mode: [ + 'custom', + ], + }, + }, + default: '* * * * * *', + description: 'Use custom cron expression. Values and ranges as follows:
  • Seconds: 0-59
  • Minutes: 0 - 59
  • Hours: 0 - 23
  • Day of Month: 1 - 31
  • Months: 0 - 11 (Jan - Dec)
  • Day of Week: 0 - 6 (Sun - Sat)
', + }, + ], + }, + ], + }, + ]; + } + + return []; +} + + /** * Returns if the parameter should be displayed or not * diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index 9efaaa67ec..1ee7991391 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -3,27 +3,28 @@ import { IConnections, IGetExecuteTriggerFunctions, INode, - NodeHelpers, INodes, INodeExecuteFunctions, INodeExecutionData, - INodeParameters, INodeIssues, - NodeParameterValue, + INodeParameters, INodeType, INodeTypes, - ObservableObject, + IPollFunctions, IRunExecutionData, ITaskDataConnections, ITriggerResponse, IWebhookData, IWebhookResponseData, - WebhookSetupMethodNames, - WorkflowDataProxy, IWorfklowIssues, IWorkflowExecuteAdditionalData, - WorkflowExecuteMode, IWorkflowSettings, + NodeHelpers, + NodeParameterValue, + ObservableObject, + WebhookSetupMethodNames, + WorkflowDataProxy, + WorkflowExecuteMode, } from './'; // @ts-ignore @@ -188,7 +189,7 @@ export class Workflow { continue; } - if (nodeType.trigger !== undefined || nodeType.webhook !== undefined) { + if (nodeType.poll !== undefined || nodeType.trigger !== undefined || nodeType.webhook !== undefined) { // Is a trigger node. So workflow can be activated. return true; } @@ -289,6 +290,30 @@ export class Workflow { * @memberof Workflow */ getTriggerNodes(): INode[] { + return this.queryNodes((nodeType: INodeType) => !!nodeType.trigger ); + } + + + /** + * Returns all the poll nodes in the workflow + * + * @returns {INode[]} + * @memberof Workflow + */ + getPollNodes(): INode[] { + return this.queryNodes((nodeType: INodeType) => !!nodeType.poll ); + } + + + /** + * Returns all the nodes in the workflow for which the given + * checkFunction return true + * + * @param {(nodeType: INodeType) => boolean} checkFunction + * @returns {INode[]} + * @memberof Workflow + */ + queryNodes(checkFunction: (nodeType: INodeType) => boolean): INode[] { const returnNodes: INode[] = []; // Check if it has any of them @@ -304,7 +329,7 @@ export class Workflow { nodeType = this.nodeTypes.getByName(node.type); - if (nodeType !== undefined && nodeType.trigger) { + if (nodeType !== undefined && checkFunction(nodeType)) { returnNodes.push(node); } } @@ -729,14 +754,14 @@ export class Workflow { // Check which node to return as start node - // Check if there are any trigger nodes and then return the first one + // Check if there are any trigger or poll nodes and then return the first one let node: INode; let nodeType: INodeType; for (const nodeName of nodeNames) { node = this.nodes[nodeName]; nodeType = this.nodeTypes.getByName(node.type) as INodeType; - if (nodeType.trigger !== undefined) { + if (nodeType.trigger !== undefined || nodeType.poll !== undefined) { return node; } } @@ -994,6 +1019,30 @@ export class Workflow { } + /** + * Runs the given trigger node so that it can trigger the workflow + * when the node has data. + * + * @param {INode} node + * @param {IPollFunctions} pollFunctions + * @returns + * @memberof Workflow + */ + async runPoll(node: INode, pollFunctions: IPollFunctions): Promise { + const nodeType = this.nodeTypes.getByName(node.type); + + if (nodeType === undefined) { + throw new Error(`The node type "${node.type}" of node "${node.name}" is not known.`); + } + + if (!nodeType.poll) { + throw new Error(`The node type "${node.type}" of node "${node.name}" does not have a poll function defined.`); + } + + return nodeType.poll!.call(pollFunctions); + } + + /** * Executes the webhook data to see what it should return and if the * workflow should be started or not @@ -1096,6 +1145,9 @@ export class Workflow { } else if (nodeType.execute) { const thisArgs = nodeExecuteFunctions.getExecuteFunctions(this, runExecutionData, runIndex, connectionInputData, inputData, node, additionalData, mode); return nodeType.execute.call(thisArgs); + } else if (nodeType.poll) { + const thisArgs = nodeExecuteFunctions.getExecutePollFunctions(this, node, additionalData, mode); + return nodeType.poll.call(thisArgs); } else if (nodeType.trigger) { if (mode === 'manual') { // In manual mode start the trigger