n8n/packages/core/src/ActiveWorkflows.ts
Iván Ovejero 56c4c6991f
🎨 Set up linting and formatting (#2120)
* ⬆️ Upgrade TS to 4.3.5

* 👕 Add ESLint configs

* 🎨 Add Prettier config

* 📦 Add deps and commands

*  Adjust global .editorconfig to new ruleset

* 🔥 Remove unneeded local .editorconfig

* 📦 Update deps in editor-ui

* 🔨 Limit Prettier to only TS files

*  Add recommended VSCode extensions

* 👕 Fix build

* 🔥 Remove Vue setting from global config

*  Disable prefer-default-export per feedback

* ✏️ Add forgotten divider

* 👕 Disable no-plusplus

* 👕 Disable class-methods-use-this

* ✏️ Alphabetize overrides

* 👕 Add one-var consecutive override

*  Revert one-var consecutive override

This reverts commit b9252cf935.

* 🎨 👕 Lint and format workflow package (#2121)

* 🎨 Format /workflow package

* 👕 Lint /workflow package

* 🎨 Re-format /workflow package

* 👕 Re-lint /workflow package

* ✏️ Fix typo

*  Consolidate if-checks

* 🔥 Remove prefer-default-export exceptions

* 🔥 Remove no-plusplus exceptions

* 🔥 Remove class-methods-use-this exceptions

* 🎨 👕 Lint and format node-dev package (#2122)

* 🎨 Format /node-dev package

*  Exclude templates from ESLint config

This keeps the templates consistent with the codebase while preventing lint exceptions from being made part of the templates.

* 👕 Lint /node-dev package

* 🔥 Remove prefer-default-export exceptions

* 🔥 Remove no-plusplus exceptions

* 🎨 👕 Lint and format core package (#2123)

* 🎨 Format /core package

* 👕 Lint /core package

* 🎨 Re-format /core package

* 👕 Re-lint /core package

* 🔥 Remove prefer-default-export exceptions

* 🔥 Remove no-plusplus exceptions

* 🔥 Remove class-methods-use-this exceptions

* 🎨 👕 Lint and format cli package (#2124)

* 🎨 Format /cli package

* 👕 Exclude migrations from linting

* 👕 Lint /cli package

* 🎨 Re-format /cli package

* 👕 Re-lint /cli package

* 👕 Fix build

* 🔥 Remove prefer-default-export exceptions

*  Update exceptions in ActiveExecutions

* 🔥 Remove no-plusplus exceptions

* 🔥 Remove class-methods-use-this exceptions

* 👕 fix lint issues

* 🔧 use package specific linter, remove tslint command

* 🔨 resolve build issue, sync dependencies

* 🔧 change lint command

Co-authored-by: Ben Hesseldieck <b.hesseldieck@gmail.com>
2021-08-29 20:58:11 +02:00

274 lines
7.4 KiB
TypeScript

/* eslint-disable no-continue */
/* eslint-disable no-await-in-loop */
/* eslint-disable no-restricted-syntax */
import { CronJob } from 'cron';
import {
IGetExecutePollFunctions,
IGetExecuteTriggerFunctions,
INode,
IPollResponse,
ITriggerResponse,
IWorkflowExecuteAdditionalData,
LoggerProxy as Logger,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
} from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle
import { ITriggerTime, IWorkflowData } from '.';
export class ActiveWorkflows {
private workflowData: {
[key: string]: IWorkflowData;
} = {};
/**
* Returns if the workflow is active
*
* @param {string} id The id of the workflow to check
* @returns {boolean}
* @memberof ActiveWorkflows
*/
isActive(id: string): boolean {
// eslint-disable-next-line no-prototype-builtins
return this.workflowData.hasOwnProperty(id);
}
/**
* Returns the ids of the currently active workflows
*
* @returns {string[]}
* @memberof ActiveWorkflows
*/
allActiveWorkflows(): string[] {
return Object.keys(this.workflowData);
}
/**
* Returns the Workflow data for the workflow with
* the given id if it is currently active
*
* @param {string} id
* @returns {(WorkflowData | undefined)}
* @memberof ActiveWorkflows
*/
get(id: string): IWorkflowData | undefined {
return this.workflowData[id];
}
/**
* Makes a workflow active
*
* @param {string} id The id of the workflow to activate
* @param {Workflow} workflow The workflow to activate
* @param {IWorkflowExecuteAdditionalData} additionalData The additional data which is needed to run workflows
* @returns {Promise<void>}
* @memberof ActiveWorkflows
*/
async add(
id: string,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
getTriggerFunctions: IGetExecuteTriggerFunctions,
getPollFunctions: IGetExecutePollFunctions,
): Promise<void> {
this.workflowData[id] = {};
const triggerNodes = workflow.getTriggerNodes();
let triggerResponse: ITriggerResponse | undefined;
this.workflowData[id].triggerResponses = [];
for (const triggerNode of triggerNodes) {
triggerResponse = await workflow.runTrigger(
triggerNode,
getTriggerFunctions,
additionalData,
mode,
activation,
);
if (triggerResponse !== undefined) {
// If a response was given save it
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.workflowData[id].triggerResponses!.push(triggerResponse);
}
}
const pollNodes = workflow.getPollNodes();
if (pollNodes.length) {
this.workflowData[id].pollResponses = [];
for (const pollNode of pollNodes) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.workflowData[id].pollResponses!.push(
await this.activatePolling(
pollNode,
workflow,
additionalData,
getPollFunctions,
mode,
activation,
),
);
}
}
}
/**
* Activates polling for the given node
*
* @param {INode} node
* @param {Workflow} workflow
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {IGetExecutePollFunctions} getPollFunctions
* @returns {Promise<IPollResponse>}
* @memberof ActiveWorkflows
*/
async activatePolling(
node: INode,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
getPollFunctions: IGetExecutePollFunctions,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
): Promise<IPollResponse> {
const pollFunctions = getPollFunctions(workflow, node, additionalData, mode, activation);
const pollTimes = pollFunctions.getNodeParameter('pollTimes') as unknown as {
item: ITriggerTime[];
};
// Define the order the cron-time-parameter appear
const parameterOrder = [
'second', // 0 - 59
'minute', // 0 - 59
'hour', // 0 - 23
'dayOfMonth', // 1 - 31
'month', // 0 - 11(Jan - Dec)
'weekday', // 0 - 6(Sun - Sat)
];
// Get all the trigger times
const cronTimes: string[] = [];
let cronTime: string[];
let parameterName: string;
if (pollTimes.item !== undefined) {
for (const item of pollTimes.item) {
cronTime = [];
if (item.mode === 'custom') {
cronTimes.push((item.cronExpression as string).trim());
continue;
}
if (item.mode === 'everyMinute') {
cronTimes.push(`${Math.floor(Math.random() * 60).toString()} * * * * *`);
continue;
}
if (item.mode === 'everyX') {
if (item.unit === 'minutes') {
cronTimes.push(`${Math.floor(Math.random() * 60).toString()} */${item.value} * * * *`);
} else if (item.unit === 'hours') {
cronTimes.push(`${Math.floor(Math.random() * 60).toString()} 0 */${item.value} * * *`);
}
continue;
}
for (parameterName of parameterOrder) {
if (item[parameterName] !== undefined) {
// Value is set so use it
cronTime.push(item[parameterName] as string);
} else if (parameterName === 'second') {
// For seconds we use by default a random one to make sure to
// balance the load a little bit over time
cronTime.push(Math.floor(Math.random() * 60).toString());
} else {
// For all others set "any"
cronTime.push('*');
}
}
cronTimes.push(cronTime.join(' '));
}
}
// The trigger function to execute when the cron-time got reached
const executeTrigger = async () => {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
Logger.info(`Polling trigger initiated for workflow "${workflow.name}"`, {
workflowName: workflow.name,
workflowId: workflow.id,
});
const pollResponse = await workflow.runPoll(node, pollFunctions);
if (pollResponse !== null) {
// eslint-disable-next-line no-underscore-dangle
pollFunctions.__emit(pollResponse);
}
};
// Execute the trigger directly to be able to know if it works
await executeTrigger();
const timezone = pollFunctions.getTimezone();
// Start the cron-jobs
const cronJobs: CronJob[] = [];
// eslint-disable-next-line @typescript-eslint/no-shadow
for (const cronTime of cronTimes) {
const cronTimeParts = cronTime.split(' ');
if (cronTimeParts.length > 0 && cronTimeParts[0].includes('*')) {
throw new Error('The polling interval is too short. It has to be at least a minute!');
}
cronJobs.push(new CronJob(cronTime, executeTrigger, undefined, true, timezone));
}
// Stop the cron-jobs
async function closeFunction() {
for (const cronJob of cronJobs) {
cronJob.stop();
}
}
return {
closeFunction,
};
}
/**
* Makes a workflow inactive
*
* @param {string} id The id of the workflow to deactivate
* @returns {Promise<void>}
* @memberof ActiveWorkflows
*/
async remove(id: string): Promise<void> {
if (!this.isActive(id)) {
// Workflow is currently not registered
throw new Error(
`The workflow with the id "${id}" is currently not active and can so not be removed`,
);
}
const workflowData = this.workflowData[id];
if (workflowData.triggerResponses) {
for (const triggerResponse of workflowData.triggerResponses) {
if (triggerResponse.closeFunction) {
await triggerResponse.closeFunction();
}
}
}
if (workflowData.pollResponses) {
for (const pollResponse of workflowData.pollResponses) {
if (pollResponse.closeFunction) {
await pollResponse.closeFunction();
}
}
}
delete this.workflowData[id];
}
}