n8n/packages/core/src/ActiveWorkflows.ts

249 lines
6.4 KiB
TypeScript

/* eslint-disable @typescript-eslint/no-unsafe-argument */
import { CronJob } from 'cron';
import type {
IGetExecutePollFunctions,
IGetExecuteTriggerFunctions,
INode,
IPollResponse,
ITriggerResponse,
IWorkflowExecuteAdditionalData,
TriggerTime,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { LoggerProxy as Logger, toCronExpression, WorkflowActivationError } from 'n8n-workflow';
import type { IWorkflowData } from './Interfaces';
export class ActiveWorkflows {
private workflowData: {
[key: string]: IWorkflowData;
} = {};
/**
* Returns if the workflow is active
*
* @param {string} id The id of the workflow to check
*/
isActive(id: string): boolean {
return this.workflowData.hasOwnProperty(id);
}
/**
* Returns the ids of the currently active workflows
*
*/
allActiveWorkflows(): string[] {
return Object.keys(this.workflowData);
}
/**
* Returns the Workflow data for the workflow with
* the given id if it is currently active
*
*/
get(id: string): IWorkflowData | undefined {
return this.workflowData[id];
}
/**
* Makes a workflow active
*
* @param {string} id The id of the workflow to activate
* @param {Workflow} workflow The workflow to activate
* @param {IWorkflowExecuteAdditionalData} additionalData The additional data which is needed to run workflows
*/
async add(
id: string,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
getTriggerFunctions: IGetExecuteTriggerFunctions,
getPollFunctions: IGetExecutePollFunctions,
): Promise<void> {
this.workflowData[id] = {};
const triggerNodes = workflow.getTriggerNodes();
let triggerResponse: ITriggerResponse | undefined;
this.workflowData[id].triggerResponses = [];
for (const triggerNode of triggerNodes) {
try {
triggerResponse = await workflow.runTrigger(
triggerNode,
getTriggerFunctions,
additionalData,
mode,
activation,
);
if (triggerResponse !== undefined) {
// If a response was given save it
this.workflowData[id].triggerResponses!.push(triggerResponse);
}
} catch (error) {
throw new WorkflowActivationError(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem activating the workflow: "${error.message}"`,
{ cause: error as Error, node: triggerNode },
);
}
}
const pollNodes = workflow.getPollNodes();
if (pollNodes.length) {
this.workflowData[id].pollResponses = [];
for (const pollNode of pollNodes) {
try {
this.workflowData[id].pollResponses!.push(
await this.activatePolling(
pollNode,
workflow,
additionalData,
getPollFunctions,
mode,
activation,
),
);
} catch (error) {
throw new WorkflowActivationError(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem activating the workflow: "${error.message}"`,
{ cause: error as Error, node: pollNode },
);
}
}
}
}
/**
* Activates polling for the given node
*
*/
async activatePolling(
node: INode,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
getPollFunctions: IGetExecutePollFunctions,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
): Promise<IPollResponse> {
const pollFunctions = getPollFunctions(workflow, node, additionalData, mode, activation);
const pollTimes = pollFunctions.getNodeParameter('pollTimes') as unknown as {
item: TriggerTime[];
};
// Get all the trigger times
const cronTimes = (pollTimes.item || []).map(toCronExpression);
// The trigger function to execute when the cron-time got reached
const executeTrigger = async (testingTrigger = false) => {
Logger.debug(`Polling trigger initiated for workflow "${workflow.name}"`, {
workflowName: workflow.name,
workflowId: workflow.id,
});
try {
const pollResponse = await workflow.runPoll(node, pollFunctions);
if (pollResponse !== null) {
pollFunctions.__emit(pollResponse);
}
} catch (error) {
// If the poll function failes in the first activation
// throw the error back so we let the user know there is
// an issue with the trigger.
if (testingTrigger) {
throw error;
}
pollFunctions.__emitError(error);
}
};
// Execute the trigger directly to be able to know if it works
await executeTrigger(true);
const timezone = pollFunctions.getTimezone();
// Start the cron-jobs
const cronJobs: CronJob[] = [];
for (const cronTime of cronTimes) {
const cronTimeParts = cronTime.split(' ');
if (cronTimeParts.length > 0 && cronTimeParts[0].includes('*')) {
throw new Error('The polling interval is too short. It has to be at least a minute!');
}
cronJobs.push(new CronJob(cronTime, executeTrigger, undefined, true, timezone));
}
// Stop the cron-jobs
async function closeFunction() {
for (const cronJob of cronJobs) {
cronJob.stop();
}
}
return {
closeFunction,
};
}
/**
* Makes a workflow inactive
*
* @param {string} id The id of the workflow to deactivate
*/
async remove(id: string): Promise<void> {
if (!this.isActive(id)) {
// Workflow is currently not registered
throw new Error(
`The workflow with the id "${id}" is currently not active and can so not be removed`,
);
}
const workflowData = this.workflowData[id];
if (workflowData.triggerResponses) {
for (const triggerResponse of workflowData.triggerResponses) {
if (triggerResponse.closeFunction) {
try {
await triggerResponse.closeFunction();
} catch (error) {
Logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem deactivating trigger of workflow "${id}": "${error.message}"`,
{
workflowId: id,
},
);
}
}
}
}
if (workflowData.pollResponses) {
for (const pollResponse of workflowData.pollResponses) {
if (pollResponse.closeFunction) {
try {
await pollResponse.closeFunction();
} catch (error) {
Logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem deactivating polling trigger of workflow "${id}": "${error.message}"`,
{
workflowId: id,
},
);
}
}
}
}
delete this.workflowData[id];
}
}