2023-01-27 05:56:56 -08:00
|
|
|
import type {
|
2019-12-31 12:19:37 -08:00
|
|
|
IGetExecutePollFunctions,
|
2019-08-08 11:38:25 -07:00
|
|
|
IGetExecuteTriggerFunctions,
|
2019-12-31 12:19:37 -08:00
|
|
|
INode,
|
2019-06-23 03:35:23 -07:00
|
|
|
ITriggerResponse,
|
|
|
|
IWorkflowExecuteAdditionalData,
|
2022-08-19 03:45:04 -07:00
|
|
|
TriggerTime,
|
2019-06-23 03:35:23 -07:00
|
|
|
Workflow,
|
2021-03-23 11:08:47 -07:00
|
|
|
WorkflowActivateMode,
|
|
|
|
WorkflowExecuteMode,
|
2019-06-23 03:35:23 -07:00
|
|
|
} from 'n8n-workflow';
|
2023-11-07 04:48:48 -08:00
|
|
|
import {
|
2023-11-30 00:06:19 -08:00
|
|
|
ApplicationError,
|
2024-02-26 05:33:00 -08:00
|
|
|
ErrorReporterProxy as ErrorReporter,
|
2023-11-07 04:48:48 -08:00
|
|
|
LoggerProxy as Logger,
|
|
|
|
toCronExpression,
|
2024-02-26 05:33:00 -08:00
|
|
|
TriggerCloseError,
|
2023-11-07 04:48:48 -08:00
|
|
|
WorkflowActivationError,
|
|
|
|
WorkflowDeactivationError,
|
|
|
|
} from 'n8n-workflow';
|
2024-09-30 06:38:56 -07:00
|
|
|
import { Service } from 'typedi';
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2022-11-09 06:25:00 -08:00
|
|
|
import type { IWorkflowData } from './Interfaces';
|
2024-09-30 06:38:56 -07:00
|
|
|
import { ScheduledTaskManager } from './ScheduledTaskManager';
|
2021-05-01 20:43:01 -07:00
|
|
|
|
2023-12-22 02:28:42 -08:00
|
|
|
@Service()
|
2019-06-23 03:35:23 -07:00
|
|
|
export class ActiveWorkflows {
|
2024-07-16 11:42:48 -07:00
|
|
|
constructor(private readonly scheduledTaskManager: ScheduledTaskManager) {}
|
|
|
|
|
2023-12-22 02:28:42 -08:00
|
|
|
private activeWorkflows: { [workflowId: string]: IWorkflowData } = {};
|
2019-06-23 03:35:23 -07:00
|
|
|
|
|
|
|
/**
|
2023-11-07 04:48:48 -08:00
|
|
|
* Returns if the workflow is active in memory.
|
2019-06-23 03:35:23 -07:00
|
|
|
*/
|
2023-11-07 04:48:48 -08:00
|
|
|
isActive(workflowId: string) {
|
|
|
|
return this.activeWorkflows.hasOwnProperty(workflowId);
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2023-11-07 04:48:48 -08:00
|
|
|
* Returns the IDs of the currently active workflows in memory.
|
2019-06-23 03:35:23 -07:00
|
|
|
*/
|
2023-11-07 04:48:48 -08:00
|
|
|
allActiveWorkflows() {
|
|
|
|
return Object.keys(this.activeWorkflows);
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2023-11-07 04:48:48 -08:00
|
|
|
* Returns the workflow data for the given ID if currently active in memory.
|
2019-06-23 03:35:23 -07:00
|
|
|
*/
|
2023-11-07 04:48:48 -08:00
|
|
|
get(workflowId: string) {
|
|
|
|
return this.activeWorkflows[workflowId];
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Makes a workflow active
|
|
|
|
*
|
2023-11-07 04:48:48 -08:00
|
|
|
* @param {string} workflowId The id of the workflow to activate
|
2019-06-23 03:35:23 -07:00
|
|
|
* @param {Workflow} workflow The workflow to activate
|
|
|
|
* @param {IWorkflowExecuteAdditionalData} additionalData The additional data which is needed to run workflows
|
|
|
|
*/
|
2021-03-23 11:08:47 -07:00
|
|
|
async add(
|
2023-11-07 04:48:48 -08:00
|
|
|
workflowId: string,
|
2021-03-23 11:08:47 -07:00
|
|
|
workflow: Workflow,
|
|
|
|
additionalData: IWorkflowExecuteAdditionalData,
|
|
|
|
mode: WorkflowExecuteMode,
|
|
|
|
activation: WorkflowActivateMode,
|
|
|
|
getTriggerFunctions: IGetExecuteTriggerFunctions,
|
|
|
|
getPollFunctions: IGetExecutePollFunctions,
|
2023-11-07 04:48:48 -08:00
|
|
|
) {
|
|
|
|
this.activeWorkflows[workflowId] = {};
|
2019-06-23 03:35:23 -07:00
|
|
|
const triggerNodes = workflow.getTriggerNodes();
|
|
|
|
|
|
|
|
let triggerResponse: ITriggerResponse | undefined;
|
2023-11-07 04:48:48 -08:00
|
|
|
|
|
|
|
this.activeWorkflows[workflowId].triggerResponses = [];
|
|
|
|
|
2019-06-23 03:35:23 -07:00
|
|
|
for (const triggerNode of triggerNodes) {
|
2022-06-06 00:17:35 -07:00
|
|
|
try {
|
|
|
|
triggerResponse = await workflow.runTrigger(
|
|
|
|
triggerNode,
|
|
|
|
getTriggerFunctions,
|
|
|
|
additionalData,
|
|
|
|
mode,
|
|
|
|
activation,
|
|
|
|
);
|
|
|
|
if (triggerResponse !== undefined) {
|
|
|
|
// If a response was given save it
|
2023-07-31 02:00:48 -07:00
|
|
|
|
2024-06-24 08:49:59 -07:00
|
|
|
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
|
2023-11-07 04:48:48 -08:00
|
|
|
this.activeWorkflows[workflowId].triggerResponses!.push(triggerResponse);
|
2022-06-06 00:17:35 -07:00
|
|
|
}
|
2023-11-07 04:48:48 -08:00
|
|
|
} catch (e) {
|
|
|
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
|
|
|
|
2022-06-06 00:17:35 -07:00
|
|
|
throw new WorkflowActivationError(
|
2022-09-29 02:50:18 -07:00
|
|
|
`There was a problem activating the workflow: "${error.message}"`,
|
2024-05-23 06:12:01 -07:00
|
|
|
{ cause: error, node: triggerNode },
|
2022-06-06 00:17:35 -07:00
|
|
|
);
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
}
|
2019-12-31 12:19:37 -08:00
|
|
|
|
2023-11-07 04:48:48 -08:00
|
|
|
const pollingNodes = workflow.getPollNodes();
|
|
|
|
|
|
|
|
if (pollingNodes.length === 0) return;
|
|
|
|
|
|
|
|
for (const pollNode of pollingNodes) {
|
|
|
|
try {
|
2024-07-16 11:42:48 -07:00
|
|
|
await this.activatePolling(
|
|
|
|
pollNode,
|
|
|
|
workflow,
|
|
|
|
additionalData,
|
|
|
|
getPollFunctions,
|
|
|
|
mode,
|
|
|
|
activation,
|
2023-11-07 04:48:48 -08:00
|
|
|
);
|
|
|
|
} catch (e) {
|
|
|
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
|
|
|
|
|
|
|
throw new WorkflowActivationError(
|
|
|
|
`There was a problem activating the workflow: "${error.message}"`,
|
|
|
|
{ cause: error, node: pollNode },
|
|
|
|
);
|
2020-01-10 11:38:55 -08:00
|
|
|
}
|
2019-12-31 12:19:37 -08:00
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
2019-12-31 12:19:37 -08:00
|
|
|
/**
|
|
|
|
* Activates polling for the given node
|
|
|
|
*/
|
2021-03-23 11:08:47 -07:00
|
|
|
async activatePolling(
|
|
|
|
node: INode,
|
|
|
|
workflow: Workflow,
|
|
|
|
additionalData: IWorkflowExecuteAdditionalData,
|
|
|
|
getPollFunctions: IGetExecutePollFunctions,
|
|
|
|
mode: WorkflowExecuteMode,
|
|
|
|
activation: WorkflowActivateMode,
|
2024-07-16 11:42:48 -07:00
|
|
|
): Promise<void> {
|
2021-03-23 11:08:47 -07:00
|
|
|
const pollFunctions = getPollFunctions(workflow, node, additionalData, mode, activation);
|
2019-12-31 12:19:37 -08:00
|
|
|
|
|
|
|
const pollTimes = pollFunctions.getNodeParameter('pollTimes') as unknown as {
|
2022-08-19 03:45:04 -07:00
|
|
|
item: TriggerTime[];
|
2019-12-31 12:19:37 -08:00
|
|
|
};
|
|
|
|
|
|
|
|
// Get all the trigger times
|
2022-08-19 03:45:04 -07:00
|
|
|
const cronTimes = (pollTimes.item || []).map(toCronExpression);
|
2019-12-31 12:19:37 -08:00
|
|
|
// The trigger function to execute when the cron-time got reached
|
2022-09-15 13:16:54 -07:00
|
|
|
const executeTrigger = async (testingTrigger = false) => {
|
2021-10-21 11:34:16 -07:00
|
|
|
Logger.debug(`Polling trigger initiated for workflow "${workflow.name}"`, {
|
2021-05-01 20:43:01 -07:00
|
|
|
workflowName: workflow.name,
|
|
|
|
workflowId: workflow.id,
|
|
|
|
});
|
2019-12-31 12:19:37 -08:00
|
|
|
|
2022-09-15 13:16:54 -07:00
|
|
|
try {
|
|
|
|
const pollResponse = await workflow.runPoll(node, pollFunctions);
|
|
|
|
|
|
|
|
if (pollResponse !== null) {
|
|
|
|
pollFunctions.__emit(pollResponse);
|
|
|
|
}
|
|
|
|
} catch (error) {
|
2024-07-16 11:42:48 -07:00
|
|
|
// If the poll function fails in the first activation
|
2022-09-15 13:16:54 -07:00
|
|
|
// throw the error back so we let the user know there is
|
|
|
|
// an issue with the trigger.
|
|
|
|
if (testingTrigger) {
|
|
|
|
throw error;
|
|
|
|
}
|
2023-11-07 04:48:48 -08:00
|
|
|
pollFunctions.__emitError(error as Error);
|
2019-12-31 12:19:37 -08:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// Execute the trigger directly to be able to know if it works
|
2022-09-15 13:16:54 -07:00
|
|
|
await executeTrigger(true);
|
2019-12-31 12:19:37 -08:00
|
|
|
|
|
|
|
for (const cronTime of cronTimes) {
|
2020-11-12 03:42:04 -08:00
|
|
|
const cronTimeParts = cronTime.split(' ');
|
|
|
|
if (cronTimeParts.length > 0 && cronTimeParts[0].includes('*')) {
|
2023-11-30 00:06:19 -08:00
|
|
|
throw new ApplicationError(
|
2024-06-04 08:24:18 -07:00
|
|
|
'The polling interval is too short. It has to be at least a minute.',
|
2023-11-30 00:06:19 -08:00
|
|
|
);
|
2020-11-12 03:42:04 -08:00
|
|
|
}
|
|
|
|
|
2024-07-16 11:42:48 -07:00
|
|
|
this.scheduledTaskManager.registerCron(workflow, cronTime, executeTrigger);
|
2019-12-31 12:19:37 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-23 03:35:23 -07:00
|
|
|
/**
|
2023-11-07 04:48:48 -08:00
|
|
|
* Makes a workflow inactive in memory.
|
2019-06-23 03:35:23 -07:00
|
|
|
*/
|
2023-11-07 04:48:48 -08:00
|
|
|
async remove(workflowId: string) {
|
|
|
|
if (!this.isActive(workflowId)) {
|
|
|
|
Logger.warn(`Cannot deactivate already inactive workflow ID "${workflowId}"`);
|
2023-08-18 05:04:49 -07:00
|
|
|
return false;
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
2024-07-16 11:42:48 -07:00
|
|
|
this.scheduledTaskManager.deregisterCrons(workflowId);
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2024-07-16 11:42:48 -07:00
|
|
|
const w = this.activeWorkflows[workflowId];
|
2024-01-23 03:34:40 -08:00
|
|
|
for (const r of w.triggerResponses ?? []) {
|
2024-07-16 11:42:48 -07:00
|
|
|
await this.closeTrigger(r, workflowId);
|
2024-01-23 03:34:40 -08:00
|
|
|
}
|
2019-12-31 12:19:37 -08:00
|
|
|
|
2023-11-07 04:48:48 -08:00
|
|
|
delete this.activeWorkflows[workflowId];
|
2023-08-18 05:04:49 -07:00
|
|
|
|
|
|
|
return true;
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
2023-11-07 04:48:48 -08:00
|
|
|
|
|
|
|
async removeAllTriggerAndPollerBasedWorkflows() {
|
|
|
|
for (const workflowId of Object.keys(this.activeWorkflows)) {
|
2024-01-23 03:34:40 -08:00
|
|
|
await this.remove(workflowId);
|
2023-11-07 04:48:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-07-16 11:42:48 -07:00
|
|
|
private async closeTrigger(response: ITriggerResponse, workflowId: string) {
|
2023-11-07 04:48:48 -08:00
|
|
|
if (!response.closeFunction) return;
|
|
|
|
|
|
|
|
try {
|
|
|
|
await response.closeFunction();
|
|
|
|
} catch (e) {
|
2024-02-26 05:33:00 -08:00
|
|
|
if (e instanceof TriggerCloseError) {
|
|
|
|
Logger.error(
|
|
|
|
`There was a problem calling "closeFunction" on "${e.node.name}" in workflow "${workflowId}"`,
|
|
|
|
);
|
2024-07-16 11:42:48 -07:00
|
|
|
ErrorReporter.error(e, { extra: { workflowId } });
|
2024-02-26 05:33:00 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2023-11-07 04:48:48 -08:00
|
|
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
|
|
|
|
|
|
|
throw new WorkflowDeactivationError(
|
2024-07-16 11:42:48 -07:00
|
|
|
`Failed to deactivate trigger of workflow ID "${workflowId}": "${error.message}"`,
|
2023-11-07 04:48:48 -08:00
|
|
|
{ cause: error, workflowId },
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|