mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-21 01:21:28 -08:00
840 lines
25 KiB
TypeScript
840 lines
25 KiB
TypeScript
/* eslint-disable @typescript-eslint/no-unsafe-call */
|
|
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
|
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
|
import { Service } from 'typedi';
|
|
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
|
|
|
|
import type {
|
|
ExecutionError,
|
|
IDeferredPromise,
|
|
IExecuteData,
|
|
IExecuteResponsePromiseData,
|
|
IGetExecutePollFunctions,
|
|
IGetExecuteTriggerFunctions,
|
|
INode,
|
|
INodeExecutionData,
|
|
IRun,
|
|
IRunExecutionData,
|
|
IWorkflowBase,
|
|
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow,
|
|
WorkflowActivateMode,
|
|
WorkflowExecuteMode,
|
|
INodeType,
|
|
} from 'n8n-workflow';
|
|
import {
|
|
Workflow,
|
|
WorkflowActivationError,
|
|
ErrorReporterProxy as ErrorReporter,
|
|
WebhookPathTakenError,
|
|
ApplicationError,
|
|
} from 'n8n-workflow';
|
|
|
|
import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
|
|
import * as WebhookHelpers from '@/WebhookHelpers';
|
|
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
|
|
|
|
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
|
|
import { ActiveExecutions } from '@/ActiveExecutions';
|
|
import { ExecutionService } from './executions/execution.service';
|
|
import {
|
|
STARTING_NODES,
|
|
WORKFLOW_REACTIVATE_INITIAL_TIMEOUT,
|
|
WORKFLOW_REACTIVATE_MAX_TIMEOUT,
|
|
} from '@/constants';
|
|
import { NodeTypes } from '@/NodeTypes';
|
|
import { WorkflowRunner } from '@/WorkflowRunner';
|
|
import { ExternalHooks } from '@/ExternalHooks';
|
|
import { WebhookService } from './services/webhook.service';
|
|
import { Logger } from './Logger';
|
|
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
|
import { OrchestrationService } from '@/services/orchestration.service';
|
|
import { ActivationErrorsService } from '@/ActivationErrors.service';
|
|
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';
|
|
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
|
|
import { OnShutdown } from '@/decorators/OnShutdown';
|
|
|
|
interface QueuedActivation {
|
|
activationMode: WorkflowActivateMode;
|
|
lastTimeout: number;
|
|
timeout: NodeJS.Timeout;
|
|
workflowData: IWorkflowDb;
|
|
}
|
|
|
|
@Service()
|
|
export class ActiveWorkflowRunner {
|
|
private queuedActivations: { [workflowId: string]: QueuedActivation } = {};
|
|
|
|
constructor(
|
|
private readonly logger: Logger,
|
|
private readonly activeWorkflows: ActiveWorkflows,
|
|
private readonly activeExecutions: ActiveExecutions,
|
|
private readonly externalHooks: ExternalHooks,
|
|
private readonly nodeTypes: NodeTypes,
|
|
private readonly webhookService: WebhookService,
|
|
private readonly workflowRepository: WorkflowRepository,
|
|
private readonly orchestrationService: OrchestrationService,
|
|
private readonly activationErrorsService: ActivationErrorsService,
|
|
private readonly executionService: ExecutionService,
|
|
private readonly workflowStaticDataService: WorkflowStaticDataService,
|
|
private readonly activeWorkflowsService: ActiveWorkflowsService,
|
|
) {}
|
|
|
|
async init() {
|
|
await this.orchestrationService.init();
|
|
|
|
await this.addActiveWorkflows('init');
|
|
|
|
await this.externalHooks.run('activeWorkflows.initialized', []);
|
|
await this.webhookService.populateCache();
|
|
}
|
|
|
|
async getAllWorkflowActivationErrors() {
|
|
return await this.activationErrorsService.getAll();
|
|
}
|
|
|
|
/**
|
|
* Removes all the currently active workflows from memory.
|
|
*/
|
|
async removeAll() {
|
|
let activeWorkflowIds: string[] = [];
|
|
this.logger.verbose('Call to remove all active workflows received (removeAll)');
|
|
|
|
activeWorkflowIds.push(...this.activeWorkflows.allActiveWorkflows());
|
|
|
|
const activeWorkflows = await this.activeWorkflowsService.getAllActiveIdsInStorage();
|
|
activeWorkflowIds = [...activeWorkflowIds, ...activeWorkflows];
|
|
// Make sure IDs are unique
|
|
activeWorkflowIds = Array.from(new Set(activeWorkflowIds));
|
|
|
|
const removePromises = [];
|
|
for (const workflowId of activeWorkflowIds) {
|
|
removePromises.push(this.remove(workflowId));
|
|
}
|
|
|
|
await Promise.all(removePromises);
|
|
}
|
|
|
|
/**
|
|
* Returns the ids of the currently active workflows from memory.
|
|
*/
|
|
allActiveInMemory() {
|
|
return this.activeWorkflows.allActiveWorkflows();
|
|
}
|
|
|
|
/**
|
|
* Returns if the workflow is stored as `active`.
|
|
*
|
|
* @important Do not confuse with `ActiveWorkflows.isActive()`,
|
|
* which checks if the workflow is active in memory.
|
|
*/
|
|
async isActive(workflowId: string) {
|
|
const workflow = await this.workflowRepository.findOne({
|
|
select: ['active'],
|
|
where: { id: workflowId },
|
|
});
|
|
|
|
return !!workflow?.active;
|
|
}
|
|
|
|
/**
|
|
* Register workflow-defined webhooks in the `workflow_entity` table.
|
|
*/
|
|
async addWebhooks(
|
|
workflow: Workflow,
|
|
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
|
|
mode: WorkflowExecuteMode,
|
|
activation: WorkflowActivateMode,
|
|
) {
|
|
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true);
|
|
let path = '';
|
|
|
|
if (webhooks.length === 0) return;
|
|
|
|
this.logger.debug(`Adding webhooks for workflow "${workflow.name}" (ID ${workflow.id})`);
|
|
|
|
for (const webhookData of webhooks) {
|
|
const node = workflow.getNode(webhookData.node) as INode;
|
|
node.name = webhookData.node;
|
|
|
|
path = webhookData.path;
|
|
|
|
const webhook = this.webhookService.createWebhook({
|
|
workflowId: webhookData.workflowId,
|
|
webhookPath: path,
|
|
node: node.name,
|
|
method: webhookData.httpMethod,
|
|
});
|
|
|
|
if (webhook.webhookPath.startsWith('/')) {
|
|
webhook.webhookPath = webhook.webhookPath.slice(1);
|
|
}
|
|
if (webhook.webhookPath.endsWith('/')) {
|
|
webhook.webhookPath = webhook.webhookPath.slice(0, -1);
|
|
}
|
|
|
|
if ((path.startsWith(':') || path.includes('/:')) && node.webhookId) {
|
|
webhook.webhookId = node.webhookId;
|
|
webhook.pathLength = webhook.webhookPath.split('/').length;
|
|
}
|
|
|
|
try {
|
|
// TODO: this should happen in a transaction, that way we don't need to manually remove this in `catch`
|
|
await this.webhookService.storeWebhook(webhook);
|
|
await workflow.createWebhookIfNotExists(
|
|
webhookData,
|
|
NodeExecuteFunctions,
|
|
mode,
|
|
activation,
|
|
);
|
|
} catch (error) {
|
|
if (activation === 'init' && error.name === 'QueryFailedError') {
|
|
// n8n does not remove the registered webhooks on exit.
|
|
// This means that further initializations will always fail
|
|
// when inserting to database. This is why we ignore this error
|
|
// as it's expected to happen.
|
|
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
await this.clearWebhooks(workflow.id);
|
|
} catch (error1) {
|
|
ErrorReporter.error(error1);
|
|
this.logger.error(
|
|
`Could not remove webhooks of workflow "${workflow.id}" because of error: "${error1.message}"`,
|
|
);
|
|
}
|
|
|
|
// if it's a workflow from the the insert
|
|
// TODO check if there is standard error code for duplicate key violation that works
|
|
// with all databases
|
|
if (error instanceof Error && error.name === 'QueryFailedError') {
|
|
error = new WebhookPathTakenError(webhook.node, error);
|
|
} else if (error.detail) {
|
|
// it's a error running the webhook methods (checkExists, create)
|
|
error.message = error.detail;
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
}
|
|
await this.webhookService.populateCache();
|
|
|
|
await this.workflowStaticDataService.saveStaticData(workflow);
|
|
}
|
|
|
|
/**
|
|
* Clear workflow-defined webhooks from the `webhook_entity` table.
|
|
*/
|
|
async clearWebhooks(workflowId: string) {
|
|
const workflowData = await this.workflowRepository.findOne({
|
|
where: { id: workflowId },
|
|
relations: ['shared', 'shared.user', 'shared.user.globalRole'],
|
|
});
|
|
|
|
if (workflowData === null) {
|
|
throw new ApplicationError('Could not find workflow', { extra: { workflowId } });
|
|
}
|
|
|
|
const workflow = new Workflow({
|
|
id: workflowId,
|
|
name: workflowData.name,
|
|
nodes: workflowData.nodes,
|
|
connections: workflowData.connections,
|
|
active: workflowData.active,
|
|
nodeTypes: this.nodeTypes,
|
|
staticData: workflowData.staticData,
|
|
settings: workflowData.settings,
|
|
});
|
|
|
|
const mode = 'internal';
|
|
|
|
const additionalData = await WorkflowExecuteAdditionalData.getBase(
|
|
workflowData.shared[0].user.id,
|
|
);
|
|
|
|
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true);
|
|
|
|
for (const webhookData of webhooks) {
|
|
await workflow.deleteWebhook(webhookData, NodeExecuteFunctions, mode, 'update');
|
|
}
|
|
|
|
await this.workflowStaticDataService.saveStaticData(workflow);
|
|
|
|
await this.webhookService.deleteWorkflowWebhooks(workflowId);
|
|
}
|
|
|
|
async runWorkflow(
|
|
workflowData: IWorkflowDb,
|
|
node: INode,
|
|
data: INodeExecutionData[][],
|
|
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
|
|
mode: WorkflowExecuteMode,
|
|
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
|
|
) {
|
|
const nodeExecutionStack: IExecuteData[] = [
|
|
{
|
|
node,
|
|
data: {
|
|
main: data,
|
|
},
|
|
source: null,
|
|
},
|
|
];
|
|
|
|
const executionData: IRunExecutionData = {
|
|
startData: {},
|
|
resultData: {
|
|
runData: {},
|
|
},
|
|
executionData: {
|
|
contextData: {},
|
|
metadata: {},
|
|
nodeExecutionStack,
|
|
waitingExecution: {},
|
|
waitingExecutionSource: {},
|
|
},
|
|
};
|
|
|
|
// Start the workflow
|
|
const runData: IWorkflowExecutionDataProcess = {
|
|
userId: additionalData.userId,
|
|
executionMode: mode,
|
|
executionData,
|
|
workflowData,
|
|
};
|
|
|
|
const workflowRunner = new WorkflowRunner();
|
|
return await workflowRunner.run(runData, true, undefined, undefined, responsePromise);
|
|
}
|
|
|
|
/**
|
|
* Return poll function which gets the global functions from n8n-core
|
|
* and overwrites the emit to be able to start it in subprocess
|
|
*/
|
|
getExecutePollFunctions(
|
|
workflowData: IWorkflowDb,
|
|
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
|
|
mode: WorkflowExecuteMode,
|
|
activation: WorkflowActivateMode,
|
|
): IGetExecutePollFunctions {
|
|
return (workflow: Workflow, node: INode) => {
|
|
const returnFunctions = NodeExecuteFunctions.getExecutePollFunctions(
|
|
workflow,
|
|
node,
|
|
additionalData,
|
|
mode,
|
|
activation,
|
|
);
|
|
returnFunctions.__emit = (
|
|
data: INodeExecutionData[][],
|
|
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
|
|
donePromise?: IDeferredPromise<IRun | undefined>,
|
|
): void => {
|
|
this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`);
|
|
void this.workflowStaticDataService.saveStaticData(workflow);
|
|
const executePromise = this.runWorkflow(
|
|
workflowData,
|
|
node,
|
|
data,
|
|
additionalData,
|
|
mode,
|
|
responsePromise,
|
|
);
|
|
|
|
if (donePromise) {
|
|
void executePromise.then((executionId) => {
|
|
this.activeExecutions
|
|
.getPostExecutePromise(executionId)
|
|
.then(donePromise.resolve)
|
|
.catch(donePromise.reject);
|
|
});
|
|
} else {
|
|
void executePromise.catch((error: Error) => this.logger.error(error.message, { error }));
|
|
}
|
|
};
|
|
|
|
returnFunctions.__emitError = (error: ExecutionError): void => {
|
|
void this.executionService
|
|
.createErrorExecution(error, node, workflowData, workflow, mode)
|
|
.then(() => {
|
|
this.executeErrorWorkflow(error, workflowData, mode);
|
|
});
|
|
};
|
|
return returnFunctions;
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Return trigger function which gets the global functions from n8n-core
|
|
* and overwrites the emit to be able to start it in subprocess
|
|
*/
|
|
getExecuteTriggerFunctions(
|
|
workflowData: IWorkflowDb,
|
|
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
|
|
mode: WorkflowExecuteMode,
|
|
activation: WorkflowActivateMode,
|
|
): IGetExecuteTriggerFunctions {
|
|
return (workflow: Workflow, node: INode) => {
|
|
const returnFunctions = NodeExecuteFunctions.getExecuteTriggerFunctions(
|
|
workflow,
|
|
node,
|
|
additionalData,
|
|
mode,
|
|
activation,
|
|
);
|
|
returnFunctions.emit = (
|
|
data: INodeExecutionData[][],
|
|
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
|
|
donePromise?: IDeferredPromise<IRun | undefined>,
|
|
): void => {
|
|
this.logger.debug(`Received trigger for workflow "${workflow.name}"`);
|
|
void this.workflowStaticDataService.saveStaticData(workflow);
|
|
|
|
const executePromise = this.runWorkflow(
|
|
workflowData,
|
|
node,
|
|
data,
|
|
additionalData,
|
|
mode,
|
|
responsePromise,
|
|
);
|
|
|
|
if (donePromise) {
|
|
void executePromise.then((executionId) => {
|
|
this.activeExecutions
|
|
.getPostExecutePromise(executionId)
|
|
.then(donePromise.resolve)
|
|
.catch(donePromise.reject);
|
|
});
|
|
} else {
|
|
executePromise.catch((error: Error) => this.logger.error(error.message, { error }));
|
|
}
|
|
};
|
|
returnFunctions.emitError = (error: Error): void => {
|
|
this.logger.info(
|
|
`The trigger node "${node.name}" of workflow "${workflowData.name}" failed with the error: "${error.message}". Will try to reactivate.`,
|
|
{
|
|
nodeName: node.name,
|
|
workflowId: workflowData.id,
|
|
workflowName: workflowData.name,
|
|
},
|
|
);
|
|
|
|
// Remove the workflow as "active"
|
|
|
|
void this.activeWorkflows.remove(workflowData.id);
|
|
|
|
void this.activationErrorsService.register(workflowData.id, error.message);
|
|
|
|
// Run Error Workflow if defined
|
|
const activationError = new WorkflowActivationError(
|
|
`There was a problem with the trigger node "${node.name}", for that reason did the workflow had to be deactivated`,
|
|
{ cause: error, node },
|
|
);
|
|
this.executeErrorWorkflow(activationError, workflowData, mode);
|
|
|
|
this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity);
|
|
};
|
|
return returnFunctions;
|
|
};
|
|
}
|
|
|
|
executeErrorWorkflow(
|
|
error: ExecutionError,
|
|
workflowData: IWorkflowBase,
|
|
mode: WorkflowExecuteMode,
|
|
): void {
|
|
const fullRunData: IRun = {
|
|
data: {
|
|
resultData: {
|
|
error,
|
|
runData: {},
|
|
},
|
|
},
|
|
finished: false,
|
|
mode,
|
|
startedAt: new Date(),
|
|
stoppedAt: new Date(),
|
|
status: 'running',
|
|
};
|
|
|
|
WorkflowExecuteAdditionalData.executeErrorWorkflow(workflowData, fullRunData, mode);
|
|
}
|
|
|
|
/**
|
|
* Register as active in memory all workflows stored as `active`.
|
|
*/
|
|
async addActiveWorkflows(activationMode: WorkflowActivateMode) {
|
|
const dbWorkflows = await this.workflowRepository.getAllActive();
|
|
|
|
if (dbWorkflows.length === 0) return;
|
|
|
|
if (this.orchestrationService.isLeader) {
|
|
this.logger.info(' ================================');
|
|
this.logger.info(' Start Active Workflows:');
|
|
this.logger.info(' ================================');
|
|
}
|
|
|
|
for (const dbWorkflow of dbWorkflows) {
|
|
try {
|
|
const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow);
|
|
|
|
if (wasActivated) {
|
|
this.logger.verbose(`Successfully started workflow ${dbWorkflow.display()}`, {
|
|
workflowName: dbWorkflow.name,
|
|
workflowId: dbWorkflow.id,
|
|
});
|
|
this.logger.info(' => Started');
|
|
}
|
|
} catch (error) {
|
|
ErrorReporter.error(error);
|
|
this.logger.info(
|
|
' => ERROR: Workflow could not be activated on first try, keep on trying if not an auth issue',
|
|
);
|
|
|
|
this.logger.info(` ${error.message}`);
|
|
this.logger.error(
|
|
`Issue on initial workflow activation try of ${dbWorkflow.display()} (startup)`,
|
|
{
|
|
workflowName: dbWorkflow.name,
|
|
workflowId: dbWorkflow.id,
|
|
},
|
|
);
|
|
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
|
this.executeErrorWorkflow(error, dbWorkflow, 'internal');
|
|
|
|
// do not keep trying to activate on authorization error
|
|
if (error.message.includes('Authorization')) continue;
|
|
|
|
this.addQueuedWorkflowActivation('init', dbWorkflow);
|
|
}
|
|
}
|
|
|
|
this.logger.verbose('Finished activating workflows (startup)');
|
|
}
|
|
|
|
async clearAllActivationErrors() {
|
|
await this.activationErrorsService.clearAll();
|
|
}
|
|
|
|
async addAllTriggerAndPollerBasedWorkflows() {
|
|
await this.addActiveWorkflows('leadershipChange');
|
|
}
|
|
|
|
@OnShutdown()
|
|
async removeAllTriggerAndPollerBasedWorkflows() {
|
|
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
|
|
}
|
|
|
|
/**
|
|
* Register a workflow as active.
|
|
*
|
|
* An activatable workflow may be webhook-, trigger-, or poller-based:
|
|
*
|
|
* - A `webhook` is an HTTP-based node that can start a workflow when called
|
|
* by a third-party service.
|
|
* - A `poller` is an HTTP-based node that can start a workflow when detecting
|
|
* a change while regularly checking a third-party service.
|
|
* - A `trigger` is any non-HTTP-based node that can start a workflow, e.g. a
|
|
* time-based node like Schedule Trigger or a message-queue-based node.
|
|
*
|
|
* Note that despite the name, most "trigger" nodes are actually webhook-based
|
|
* and so qualify as `webhook`, e.g. Stripe Trigger.
|
|
*
|
|
* Triggers and pollers are registered as active in memory at `ActiveWorkflows`,
|
|
* but webhooks are registered by being entered in the `webhook_entity` table,
|
|
* since webhooks do not require continuous execution.
|
|
*/
|
|
async add(
|
|
workflowId: string,
|
|
activationMode: WorkflowActivateMode,
|
|
existingWorkflow?: WorkflowEntity,
|
|
) {
|
|
let workflow: Workflow;
|
|
|
|
let shouldAddWebhooks = true;
|
|
let shouldAddTriggersAndPollers = true;
|
|
|
|
/**
|
|
* In a multi-main scenario, webhooks are stored in the database, while triggers
|
|
* and pollers are run only by the leader main instance.
|
|
*
|
|
* - During a regular workflow activation (i.e. not leadership change), only the
|
|
* leader should add webhooks to prevent duplicate insertions, and only the leader
|
|
* should handle triggers and pollers to prevent duplicate work.
|
|
*
|
|
* - During a leadership change, webhooks remain in storage and so need not be added
|
|
* again, and the new leader should take over the triggers and pollers that stopped
|
|
* running when the former leader became unresponsive.
|
|
*/
|
|
if (this.orchestrationService.isMultiMainSetupEnabled) {
|
|
if (activationMode !== 'leadershipChange') {
|
|
shouldAddWebhooks = this.orchestrationService.isLeader;
|
|
shouldAddTriggersAndPollers = this.orchestrationService.isLeader;
|
|
} else {
|
|
shouldAddWebhooks = false;
|
|
shouldAddTriggersAndPollers = this.orchestrationService.isLeader;
|
|
}
|
|
}
|
|
|
|
const shouldActivate = shouldAddWebhooks || shouldAddTriggersAndPollers;
|
|
|
|
try {
|
|
const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId));
|
|
|
|
if (!dbWorkflow) {
|
|
throw new WorkflowActivationError(`Failed to find workflow with ID "${workflowId}"`);
|
|
}
|
|
|
|
if (shouldActivate) {
|
|
this.logger.info(` - ${dbWorkflow.display()}`);
|
|
this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, {
|
|
workflowName: dbWorkflow.name,
|
|
workflowId: dbWorkflow.id,
|
|
});
|
|
}
|
|
|
|
workflow = new Workflow({
|
|
id: dbWorkflow.id,
|
|
name: dbWorkflow.name,
|
|
nodes: dbWorkflow.nodes,
|
|
connections: dbWorkflow.connections,
|
|
active: dbWorkflow.active,
|
|
nodeTypes: this.nodeTypes,
|
|
staticData: dbWorkflow.staticData,
|
|
settings: dbWorkflow.settings,
|
|
});
|
|
|
|
const canBeActivated = workflow.checkIfWorkflowCanBeActivated(STARTING_NODES);
|
|
|
|
if (!canBeActivated) {
|
|
throw new WorkflowActivationError(
|
|
`Workflow ${dbWorkflow.display()} has no node to start the workflow - at least one trigger, poller or webhook node is required`,
|
|
);
|
|
}
|
|
|
|
const sharing = dbWorkflow.shared.find((shared) => shared.role.name === 'owner');
|
|
|
|
if (!sharing) {
|
|
throw new WorkflowActivationError(`Workflow ${dbWorkflow.display()} has no owner`);
|
|
}
|
|
|
|
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);
|
|
|
|
if (shouldAddWebhooks) {
|
|
await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
|
|
}
|
|
|
|
if (shouldAddTriggersAndPollers) {
|
|
await this.addTriggersAndPollers(dbWorkflow, workflow, {
|
|
activationMode,
|
|
executionMode: 'trigger',
|
|
additionalData,
|
|
});
|
|
}
|
|
|
|
// Workflow got now successfully activated so make sure nothing is left in the queue
|
|
this.removeQueuedWorkflowActivation(workflowId);
|
|
|
|
await this.activationErrorsService.deregister(workflowId);
|
|
|
|
const triggerCount = this.countTriggers(workflow, additionalData);
|
|
await this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount);
|
|
} catch (e) {
|
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
|
await this.activationErrorsService.register(workflowId, error.message);
|
|
|
|
throw e;
|
|
}
|
|
|
|
// If for example webhooks get created it sometimes has to save the
|
|
// id of them in the static data. So make sure that data gets persisted.
|
|
await this.workflowStaticDataService.saveStaticData(workflow);
|
|
|
|
return shouldActivate;
|
|
}
|
|
|
|
/**
|
|
* Count all triggers in the workflow, excluding Manual Trigger.
|
|
*/
|
|
private countTriggers(
|
|
workflow: Workflow,
|
|
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
|
|
) {
|
|
const triggerFilter = (nodeType: INodeType) =>
|
|
!!nodeType.trigger && !nodeType.description.name.includes('manualTrigger');
|
|
|
|
return (
|
|
workflow.queryNodes(triggerFilter).length +
|
|
workflow.getPollNodes().length +
|
|
WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true).length
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Add a workflow to the activation queue.
|
|
* Meaning it will keep on trying to activate it in regular
|
|
* amounts indefinitely.
|
|
*/
|
|
addQueuedWorkflowActivation(activationMode: WorkflowActivateMode, workflowData: WorkflowEntity) {
|
|
const workflowId = workflowData.id;
|
|
const workflowName = workflowData.name;
|
|
|
|
const retryFunction = async () => {
|
|
this.logger.info(`Try to activate workflow "${workflowName}" (${workflowId})`, {
|
|
workflowId,
|
|
workflowName,
|
|
});
|
|
try {
|
|
await this.add(workflowId, activationMode, workflowData);
|
|
} catch (error) {
|
|
ErrorReporter.error(error);
|
|
let lastTimeout = this.queuedActivations[workflowId].lastTimeout;
|
|
if (lastTimeout < WORKFLOW_REACTIVATE_MAX_TIMEOUT) {
|
|
lastTimeout = Math.min(lastTimeout * 2, WORKFLOW_REACTIVATE_MAX_TIMEOUT);
|
|
}
|
|
|
|
this.logger.info(
|
|
` -> Activation of workflow "${workflowName}" (${workflowId}) did fail with error: "${
|
|
error.message as string
|
|
}" | retry in ${Math.floor(lastTimeout / 1000)} seconds`,
|
|
{
|
|
workflowId,
|
|
workflowName,
|
|
},
|
|
);
|
|
|
|
this.queuedActivations[workflowId].lastTimeout = lastTimeout;
|
|
this.queuedActivations[workflowId].timeout = setTimeout(retryFunction, lastTimeout);
|
|
return;
|
|
}
|
|
this.logger.info(
|
|
` -> Activation of workflow "${workflowName}" (${workflowId}) was successful!`,
|
|
{
|
|
workflowId,
|
|
workflowName,
|
|
},
|
|
);
|
|
};
|
|
|
|
// Just to be sure that there is not chance that for any reason
|
|
// multiple run in parallel
|
|
this.removeQueuedWorkflowActivation(workflowId);
|
|
|
|
this.queuedActivations[workflowId] = {
|
|
activationMode,
|
|
lastTimeout: WORKFLOW_REACTIVATE_INITIAL_TIMEOUT,
|
|
timeout: setTimeout(retryFunction, WORKFLOW_REACTIVATE_INITIAL_TIMEOUT),
|
|
workflowData,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Remove a workflow from the activation queue
|
|
*/
|
|
removeQueuedWorkflowActivation(workflowId: string) {
|
|
if (this.queuedActivations[workflowId]) {
|
|
clearTimeout(this.queuedActivations[workflowId].timeout);
|
|
delete this.queuedActivations[workflowId];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Remove all workflows from the activation queue
|
|
*/
|
|
removeAllQueuedWorkflowActivations() {
|
|
for (const workflowId in this.queuedActivations) {
|
|
this.removeQueuedWorkflowActivation(workflowId);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Makes a workflow inactive
|
|
*
|
|
* @param {string} workflowId The id of the workflow to deactivate
|
|
*/
|
|
// TODO: this should happen in a transaction
|
|
async remove(workflowId: string) {
|
|
// Remove all the webhooks of the workflow
|
|
try {
|
|
await this.clearWebhooks(workflowId);
|
|
} catch (error) {
|
|
ErrorReporter.error(error);
|
|
this.logger.error(
|
|
`Could not remove webhooks of workflow "${workflowId}" because of error: "${error.message}"`,
|
|
);
|
|
}
|
|
|
|
await this.activationErrorsService.deregister(workflowId);
|
|
|
|
if (this.queuedActivations[workflowId] !== undefined) {
|
|
this.removeQueuedWorkflowActivation(workflowId);
|
|
}
|
|
|
|
// if it's active in memory then it's a trigger
|
|
// so remove from list of actives workflows
|
|
if (this.activeWorkflows.isActive(workflowId)) {
|
|
const removalSuccess = await this.activeWorkflows.remove(workflowId);
|
|
if (removalSuccess) {
|
|
this.logger.verbose(`Successfully deactivated workflow "${workflowId}"`, { workflowId });
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Register as active in memory a trigger- or poller-based workflow.
|
|
*/
|
|
async addTriggersAndPollers(
|
|
dbWorkflow: WorkflowEntity,
|
|
workflow: Workflow,
|
|
{
|
|
activationMode,
|
|
executionMode,
|
|
additionalData,
|
|
}: {
|
|
activationMode: WorkflowActivateMode;
|
|
executionMode: WorkflowExecuteMode;
|
|
additionalData: IWorkflowExecuteAdditionalDataWorkflow;
|
|
},
|
|
) {
|
|
const getTriggerFunctions = this.getExecuteTriggerFunctions(
|
|
dbWorkflow,
|
|
additionalData,
|
|
executionMode,
|
|
activationMode,
|
|
);
|
|
|
|
const getPollFunctions = this.getExecutePollFunctions(
|
|
dbWorkflow,
|
|
additionalData,
|
|
executionMode,
|
|
activationMode,
|
|
);
|
|
|
|
if (workflow.getTriggerNodes().length !== 0 || workflow.getPollNodes().length !== 0) {
|
|
this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`);
|
|
|
|
await this.activeWorkflows.add(
|
|
workflow.id,
|
|
workflow,
|
|
additionalData,
|
|
executionMode,
|
|
activationMode,
|
|
getTriggerFunctions,
|
|
getPollFunctions,
|
|
);
|
|
|
|
this.logger.verbose(`Workflow ${dbWorkflow.display()} activated`, {
|
|
workflowId: dbWorkflow.id,
|
|
workflowName: dbWorkflow.name,
|
|
});
|
|
}
|
|
}
|
|
|
|
async removeActivationError(workflowId: string) {
|
|
await this.activationErrorsService.deregister(workflowId);
|
|
}
|
|
}
|