From 23bd71b82aafb4d894091a115a168a049c5f594c Mon Sep 17 00:00:00 2001 From: Jan Oberhauser Date: Thu, 29 Sep 2022 11:50:18 +0200 Subject: [PATCH] feat(RabbitMQ Trigger Node): Automatically reconnect on disconnect (#4019) * feat(RabbitMQ Trigger Node): Automatically reconnect on disconnect * :zap: Retry indefinetly * :zap: Also automatically retry activation issues on startup --- packages/cli/commands/start.ts | 3 + packages/cli/src/ActiveWorkflowRunner.ts | 127 ++++++++++++++++-- packages/cli/src/Interfaces.ts | 8 ++ packages/cli/src/constants.ts | 3 + packages/core/src/ActiveWorkflows.ts | 6 +- .../nodes/RabbitMQ/RabbitMQTrigger.node.ts | 10 +- 6 files changed, 145 insertions(+), 12 deletions(-) diff --git a/packages/cli/commands/start.ts b/packages/cli/commands/start.ts index 611e401009..4477bb446d 100644 --- a/packages/cli/commands/start.ts +++ b/packages/cli/commands/start.ts @@ -92,6 +92,9 @@ export class Start extends Command { getLogger().info('\nStopping n8n...'); try { + // Stop with trying to activate workflows that could not be activated + activeWorkflowRunner?.removeAllQueuedWorkflowActivations(); + const externalHooks = ExternalHooks(); await externalHooks.run('n8n.stop', []); diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 89209768dd..c12b803533 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -40,6 +40,7 @@ import express from 'express'; import { Db, IActivationError, + IQueuedWorkflowActivations, IResponseCallbackData, IWebhookDb, IWorkflowDb, @@ -58,6 +59,7 @@ import { whereClause } from './WorkflowHelpers'; import { WorkflowEntity } from './databases/entities/WorkflowEntity'; import * as ActiveExecutions from './ActiveExecutions'; import { createErrorExecution } from './GenericHelpers'; +import { WORKFLOW_REACTIVATE_INITIAL_TIMEOUT, WORKFLOW_REACTIVATE_MAX_TIMEOUT } from './constants'; const activeExecutions = ActiveExecutions.getInstance(); @@ -70,6 +72,10 @@ export class ActiveWorkflowRunner { [key: string]: IActivationError; } = {}; + private queuedWorkflowActivations: { + [key: string]: IQueuedWorkflowActivations; + } = {}; + // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types async init() { // Get the active workflows from database @@ -102,7 +108,7 @@ export class ActiveWorkflowRunner { console.info(' ================================'); for (const workflowData of workflowsData) { - console.log(` - ${workflowData.name}`); + console.log(` - ${workflowData.name} (ID: ${workflowData.id})`); Logger.debug(`Initializing active workflow "${workflowData.name}" (startup)`, { workflowName: workflowData.name, workflowId: workflowData.id, @@ -115,15 +121,23 @@ export class ActiveWorkflowRunner { }); console.log(` => Started`); } catch (error) { - console.log(` => ERROR: Workflow could not be activated`); + console.log( + ` => ERROR: Workflow could not be activated on first try, keep on trying`, + ); // eslint-disable-next-line @typescript-eslint/restrict-template-expressions console.log(` ${error.message}`); - Logger.error(`Unable to initialize workflow "${workflowData.name}" (startup)`, { - workflowName: workflowData.name, - workflowId: workflowData.id, - }); + Logger.error( + `Issue on intital workflow activation try "${workflowData.name}" (startup)`, + { + workflowName: workflowData.name, + workflowId: workflowData.id, + }, + ); // eslint-disable-next-line @typescript-eslint/no-unsafe-argument this.executeErrorWorkflow(error, workflowData, 'internal'); + + // Keep on trying to activate the workflow + this.addQueuedWorkflowActivation('init', workflowData); } } Logger.verbose('Finished initializing active workflows (startup)'); @@ -722,6 +736,17 @@ export class ActiveWorkflowRunner { } }; returnFunctions.emitError = async (error: Error): Promise => { + Logger.info( + `The trigger node "${node.name}" of workflow "${workflowData.name}" failed with the error: "${error.message}". Will try to reactivate.`, + { + nodeName: node.name, + workflowId: workflowData.id.toString(), + workflowName: workflowData.name, + }, + ); + + // Remove the workflow as "active" + await this.activeWorkflows?.remove(workflowData.id.toString()); this.activationErrors[workflowData.id.toString()] = { time: new Date().getTime(), @@ -729,13 +754,16 @@ export class ActiveWorkflowRunner { message: error.message, }, }; + + // Run Error Workflow if defined const activationError = new WorkflowActivationError( - 'There was a problem with the trigger, for that reason did the workflow had to be deactivated', + `There was a problem with the trigger node "${node.name}", for that reason did the workflow had to be deactivated`, error, node, ); - this.executeErrorWorkflow(activationError, workflowData, mode); + + this.addQueuedWorkflowActivation(activation, workflowData); }; return returnFunctions; }; @@ -851,6 +879,9 @@ export class ActiveWorkflowRunner { }); } + // Workflow got now successfully activated so make sure nothing is left in the queue + this.removeQueuedWorkflowActivation(workflowId); + if (this.activationErrors[workflowId] !== undefined) { // If there were activation errors delete them delete this.activationErrors[workflowId]; @@ -874,6 +905,82 @@ export class ActiveWorkflowRunner { await WorkflowHelpers.saveStaticData(workflowInstance!); } + /** + * Add a workflow to the activation queue. + * Meaning it will keep on trying to activate it in regular + * amounts indefinetly. + */ + addQueuedWorkflowActivation( + activationMode: WorkflowActivateMode, + workflowData: IWorkflowDb, + ): void { + const workflowId = workflowData.id.toString(); + const workflowName = workflowData.name; + + const retryFunction = async () => { + Logger.info(`Try to activate workflow "${workflowName}" (${workflowId})`, { + workflowId, + workflowName, + }); + try { + await this.add(workflowId, activationMode, workflowData); + } catch (error) { + let lastTimeout = this.queuedWorkflowActivations[workflowId].lastTimeout; + if (lastTimeout < WORKFLOW_REACTIVATE_MAX_TIMEOUT) { + lastTimeout = Math.min(lastTimeout * 2, WORKFLOW_REACTIVATE_MAX_TIMEOUT); + } + + Logger.info( + ` -> Activation of workflow "${workflowName}" (${workflowId}) did fail with error: "${ + error.message as string + }" | retry in ${Math.floor(lastTimeout / 1000)} seconds`, + { + workflowId, + workflowName, + }, + ); + + this.queuedWorkflowActivations[workflowId].lastTimeout = lastTimeout; + this.queuedWorkflowActivations[workflowId].timeout = setTimeout(retryFunction, lastTimeout); + return; + } + Logger.info(` -> Activation of workflow "${workflowName}" (${workflowId}) was successful!`, { + workflowId, + workflowName, + }); + }; + + // Just to be sure that there is not chance that for any reason + // multiple run in parallel + this.removeQueuedWorkflowActivation(workflowId); + + this.queuedWorkflowActivations[workflowId] = { + activationMode, + lastTimeout: WORKFLOW_REACTIVATE_INITIAL_TIMEOUT, + timeout: setTimeout(retryFunction, WORKFLOW_REACTIVATE_INITIAL_TIMEOUT), + workflowData, + }; + } + + /** + * Remove a workflow from the activation queue + */ + removeQueuedWorkflowActivation(workflowId: string): void { + if (this.queuedWorkflowActivations[workflowId]) { + clearTimeout(this.queuedWorkflowActivations[workflowId].timeout); + delete this.queuedWorkflowActivations[workflowId]; + } + } + + /** + * Remove all workflows from the activation queue + */ + removeAllQueuedWorkflowActivations(): void { + for (const workflowId in this.queuedWorkflowActivations) { + this.removeQueuedWorkflowActivation(workflowId); + } + } + /** * Makes a workflow inactive * @@ -898,6 +1005,10 @@ export class ActiveWorkflowRunner { delete this.activationErrors[workflowId]; } + if (this.queuedWorkflowActivations[workflowId] !== undefined) { + this.removeQueuedWorkflowActivation(workflowId); + } + // if it's active in memory then it's a trigger // so remove from list of actives workflows if (this.activeWorkflows.isActive(workflowId)) { diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 0b92a8d1cb..3420054755 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -17,6 +17,7 @@ import { ITelemetryTrackProperties, IWorkflowBase as IWorkflowBaseWorkflow, Workflow, + WorkflowActivateMode, WorkflowExecuteMode, } from 'n8n-workflow'; @@ -47,6 +48,13 @@ export interface IActivationError { }; } +export interface IQueuedWorkflowActivations { + activationMode: WorkflowActivateMode; + lastTimeout: number; + timeout: NodeJS.Timeout; + workflowData: IWorkflowDb; +} + export interface ICustomRequest extends Request { parsedUrl: Url | undefined; } diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index bd4047b0ce..3f705bb8c3 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -34,3 +34,6 @@ export const NPM_COMMAND_TOKENS = { export const NPM_PACKAGE_STATUS_GOOD = 'OK'; export const UNKNOWN_FAILURE_REASON = 'Unknown failure reason'; + +export const WORKFLOW_REACTIVATE_INITIAL_TIMEOUT = 1000; +export const WORKFLOW_REACTIVATE_MAX_TIMEOUT = 180000; diff --git a/packages/core/src/ActiveWorkflows.ts b/packages/core/src/ActiveWorkflows.ts index 1f3c015196..f0969dfb7a 100644 --- a/packages/core/src/ActiveWorkflows.ts +++ b/packages/core/src/ActiveWorkflows.ts @@ -102,7 +102,8 @@ export class ActiveWorkflows { } catch (error) { // eslint-disable-next-line @typescript-eslint/no-unsafe-call throw new WorkflowActivationError( - 'There was a problem activating the workflow', + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-member-access + `There was a problem activating the workflow: "${error.message}"`, error, triggerNode, ); @@ -128,7 +129,8 @@ export class ActiveWorkflows { } catch (error) { // eslint-disable-next-line @typescript-eslint/no-unsafe-call throw new WorkflowActivationError( - 'There was a problem activating the workflow', + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-member-access + `There was a problem activating the workflow: "${error.message}"`, error, pollNode, ); diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts index aa6133c222..44e52f3ab7 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -17,8 +17,6 @@ import { rabbitDefaultOptions } from './DefaultOptions'; import { MessageTracker, rabbitmqConnectQueue } from './GenericFunctions'; -import * as amqplib from 'amqplib'; - export class RabbitMQTrigger implements INodeType { description: INodeTypeDescription = { displayName: 'RabbitMQ Trigger', @@ -181,12 +179,19 @@ export class RabbitMQTrigger implements INodeType { const messageTracker = new MessageTracker(); let consumerTag: string; + let closeGotCalled = false; const startConsumer = async () => { if (parallelMessages !== -1) { channel.prefetch(parallelMessages); } + channel.on('close', () => { + if (!closeGotCalled) { + self.emitError(new Error('Connection got closed unexpectedly')); + } + }); + const consumerInfo = await channel.consume(queue, async (message) => { if (message !== null) { try { @@ -270,6 +275,7 @@ export class RabbitMQTrigger implements INodeType { // The "closeFunction" function gets called by n8n whenever // the workflow gets deactivated and can so clean up. async function closeFunction() { + closeGotCalled = true; try { return messageTracker.closeChannel(channel, consumerTag); } catch (error) {