From dc5ec8f9460f62e26f10ece117070090c686003a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 5 Feb 2024 09:26:55 +0100 Subject: [PATCH] refactor(core): Streamline flows in multi-main mode (no-changelog) (#8446) --- packages/cli/src/ActiveWorkflowRunner.ts | 97 ++++++++------ packages/cli/src/commands/start.ts | 40 +----- .../cli/src/services/orchestration.service.ts | 26 ++++ .../orchestration/main/MultiMainSetup.ee.ts | 10 +- .../main/handleCommandMessageMain.ts | 124 +++++++++++++----- .../services/redis/RedisServiceCommands.ts | 7 +- .../cli/src/workflows/workflow.service.ts | 13 -- .../integration/ActiveWorkflowRunner.test.ts | 124 +----------------- .../test/integration/shared/utils/index.ts | 7 +- .../workflows/workflow.service.test.ts | 31 ----- packages/workflow/src/ErrorReporterProxy.ts | 5 +- packages/workflow/src/Interfaces.ts | 4 +- 12 files changed, 205 insertions(+), 283 deletions(-) diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index ffeb8876d3..d08e68f698 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -223,7 +223,8 @@ export class ActiveWorkflowRunner { } /** - * Clear workflow-defined webhooks from the `webhook_entity` table. + * Remove all webhooks of a workflow from the database, and + * deregister those webhooks from external services. */ async clearWebhooks(workflowId: string) { const workflowData = await this.workflowRepository.findOne({ @@ -418,9 +419,10 @@ export class ActiveWorkflowRunner { } /** - * Register as active in memory all workflows stored as `active`. + * Register as active in memory all workflows stored as `active`, + * only on instance init or (in multi-main setup) on leadership change. */ - async addActiveWorkflows(activationMode: WorkflowActivateMode) { + async addActiveWorkflows(activationMode: 'init' | 'leadershipChange') { const dbWorkflows = await this.workflowRepository.getAllActive(); if (dbWorkflows.length === 0) return; @@ -433,7 +435,9 @@ export class ActiveWorkflowRunner { for (const dbWorkflow of dbWorkflows) { try { - const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow); + const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow, { + shouldPublish: false, + }); if (wasActivated) { this.logger.verbose(`Successfully started workflow ${dbWorkflow.display()}`, { @@ -471,15 +475,21 @@ export class ActiveWorkflowRunner { } async clearAllActivationErrors() { + this.logger.debug('Clearing all activation errors'); + await this.activationErrorsService.clearAll(); } async addAllTriggerAndPollerBasedWorkflows() { + this.logger.debug('Adding all trigger- and poller-based workflows'); + await this.addActiveWorkflows('leadershipChange'); } @OnShutdown() async removeAllTriggerAndPollerBasedWorkflows() { + this.logger.debug('Removing all trigger- and poller-based workflows'); + await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows(); } @@ -506,35 +516,24 @@ export class ActiveWorkflowRunner { workflowId: string, activationMode: WorkflowActivateMode, existingWorkflow?: WorkflowEntity, + { shouldPublish } = { shouldPublish: true }, ) { - let workflow: Workflow; + if (this.orchestrationService.isMultiMainSetupEnabled && shouldPublish) { + await this.orchestrationService.publish('add-webhooks-triggers-and-pollers', { + workflowId, + }); - let shouldAddWebhooks = true; - let shouldAddTriggersAndPollers = true; - - /** - * In a multi-main scenario, webhooks are stored in the database, while triggers - * and pollers are run only by the leader main instance. - * - * - During a regular workflow activation (i.e. not leadership change), only the - * leader should add webhooks to prevent duplicate insertions, and only the leader - * should handle triggers and pollers to prevent duplicate work. - * - * - During a leadership change, webhooks remain in storage and so need not be added - * again, and the new leader should take over the triggers and pollers that stopped - * running when the former leader became unresponsive. - */ - if (this.orchestrationService.isMultiMainSetupEnabled) { - if (activationMode !== 'leadershipChange') { - shouldAddWebhooks = this.orchestrationService.isLeader; - shouldAddTriggersAndPollers = this.orchestrationService.isLeader; - } else { - shouldAddWebhooks = false; - shouldAddTriggersAndPollers = this.orchestrationService.isLeader; - } + return; } - const shouldActivate = shouldAddWebhooks || shouldAddTriggersAndPollers; + let workflow: Workflow; + + const shouldAddWebhooks = this.orchestrationService.shouldAddWebhooks(activationMode); + const shouldAddTriggersAndPollers = this.orchestrationService.shouldAddTriggersAndPollers(); + + const shouldDisplayActivationMessage = + (shouldAddWebhooks || shouldAddTriggersAndPollers) && + ['init', 'leadershipChange'].includes(activationMode); try { const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId)); @@ -543,7 +542,7 @@ export class ActiveWorkflowRunner { throw new WorkflowActivationError(`Failed to find workflow with ID "${workflowId}"`); } - if (shouldActivate) { + if (shouldDisplayActivationMessage) { this.logger.info(` - ${dbWorkflow.display()}`); this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, { workflowName: dbWorkflow.name, @@ -608,7 +607,7 @@ export class ActiveWorkflowRunner { // id of them in the static data. So make sure that data gets persisted. await this.workflowStaticDataService.saveStaticData(workflow); - return shouldActivate; + return shouldDisplayActivationMessage; } /** @@ -709,7 +708,21 @@ export class ActiveWorkflowRunner { */ // TODO: this should happen in a transaction async remove(workflowId: string) { - // Remove all the webhooks of the workflow + if (this.orchestrationService.isMultiMainSetupEnabled) { + try { + await this.clearWebhooks(workflowId); + } catch (error) { + ErrorReporter.error(error); + this.logger.error( + `Could not remove webhooks of workflow "${workflowId}" because of error: "${error.message}"`, + ); + } + + await this.orchestrationService.publish('remove-triggers-and-pollers', { workflowId }); + + return; + } + try { await this.clearWebhooks(workflowId); } catch (error) { @@ -727,11 +740,21 @@ export class ActiveWorkflowRunner { // if it's active in memory then it's a trigger // so remove from list of actives workflows - if (this.activeWorkflows.isActive(workflowId)) { - const removalSuccess = await this.activeWorkflows.remove(workflowId); - if (removalSuccess) { - this.logger.verbose(`Successfully deactivated workflow "${workflowId}"`, { workflowId }); - } + await this.removeWorkflowTriggersAndPollers(workflowId); + } + + /** + * Stop running active triggers and pollers for a workflow. + */ + async removeWorkflowTriggersAndPollers(workflowId: string) { + if (!this.activeWorkflows.isActive(workflowId)) return; + + const wasRemoved = await this.activeWorkflows.remove(workflowId); + + if (wasRemoved) { + this.logger.warn(`Removed triggers and pollers for workflow "${workflowId}"`, { + workflowId, + }); } } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 82664ed90e..bbba0a2f66 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -226,31 +226,11 @@ export class Start extends BaseCommand { if (!orchestrationService.isMultiMainSetupEnabled) return; orchestrationService.multiMainSetup - .addListener('leadershipChange', async () => { - if (orchestrationService.isLeader) { - this.logger.debug('[Leadership change] Clearing all activation errors...'); - - await this.activeWorkflowRunner.clearAllActivationErrors(); - - this.logger.debug( - '[Leadership change] Adding all trigger- and poller-based workflows...', - ); - - await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows(); - } else { - this.logger.debug( - '[Leadership change] Removing all trigger- and poller-based workflows...', - ); - - await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); - } - }) - .addListener('leadershipVacant', async () => { - this.logger.debug( - '[Leadership vacant] Removing all trigger- and poller-based workflows...', - ); - + .on('leader-stepdown', async () => { await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); + }) + .on('leader-takeover', async () => { + await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows(); }); } @@ -370,16 +350,8 @@ export class Start extends BaseCommand { if (!orchestrationService.isMultiMainSetupEnabled) return; orchestrationService.multiMainSetup - .addListener('leadershipChange', async () => { - if (orchestrationService.isLeader) { - this.pruningService.startPruning(); - } else { - this.pruningService.stopPruning(); - } - }) - .addListener('leadershipVacant', () => { - this.pruningService.stopPruning(); - }); + .on('leader-stepdown', () => this.pruningService.stopPruning()) + .on('leader-takeover', () => this.pruningService.startPruning()); } async catch(error: Error) { diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index af56955805..45100bbe2b 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -6,6 +6,7 @@ import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/Redis import { RedisService } from './redis.service'; import { MultiMainSetup } from './orchestration/main/MultiMainSetup.ee'; +import type { WorkflowActivateMode } from 'n8n-workflow'; @Service() export class OrchestrationService { @@ -118,4 +119,29 @@ export class OrchestrationService { await this.redisPublisher.publishToCommandChannel({ command }); } + + // ---------------------------------- + // activations + // ---------------------------------- + + /** + * Whether this instance may add webhooks to the `webhook_entity` table. + */ + shouldAddWebhooks(activationMode: WorkflowActivateMode) { + if (activationMode === 'init') return false; + + if (activationMode === 'leadershipChange') return false; + + return this.isLeader; // 'update' or 'activate' + } + + /** + * Whether this instance may add triggers and pollers to memory. + * + * In both single- and multi-main setup, only the leader is allowed to manage + * triggers and pollers in memory, to ensure they are not duplicated. + */ + shouldAddTriggersAndPollers() { + return this.isLeader; + } } diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 070834ac7b..8d9cd5da23 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -62,11 +62,9 @@ export class MultiMainSetup extends EventEmitter { if (config.getEnv('multiMainSetup.instanceType') === 'leader') { config.set('multiMainSetup.instanceType', 'follower'); - this.emit('leadershipChange'); // stop triggers, pollers, pruning + this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning - EventReporter.report('[Multi-main setup] Leader failed to renew leader key', { - level: 'info', - }); + EventReporter.info('[Multi-main setup] Leader failed to renew leader key'); } return; @@ -79,7 +77,7 @@ export class MultiMainSetup extends EventEmitter { config.set('multiMainSetup.instanceType', 'follower'); - this.emit('leadershipVacant'); // stop triggers, pollers, pruning + this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning await this.tryBecomeLeader(); } @@ -99,7 +97,7 @@ export class MultiMainSetup extends EventEmitter { await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); - this.emit('leadershipChange'); // start triggers, pollers, pruning + this.emit('leader-takeover'); // gained leadership - start triggers, pollers, pruning } else { config.set('multiMainSetup.instanceType', 'follower'); } diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index ddf2c0e7fe..02cc5aae0a 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -7,24 +7,30 @@ import { License } from '@/License'; import { Logger } from '@/Logger'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { Push } from '@/push'; +import { TestWebhooks } from '@/TestWebhooks'; import { OrchestrationService } from '@/services/orchestration.service'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; -import { TestWebhooks } from '@/TestWebhooks'; export async function handleCommandMessageMain(messageString: string) { const queueModeId = config.getEnv('redis.queueModeId'); const isMainInstance = config.getEnv('generic.instanceType') === 'main'; const message = messageToRedisServiceCommandObject(messageString); const logger = Container.get(Logger); - const activeWorkflowRunner = Container.get(ActiveWorkflowRunner); if (message) { logger.debug( `RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`, ); + + const selfSendingAllowed = [ + 'add-webhooks-triggers-and-pollers', + 'remove-triggers-and-pollers', + ].includes(message.command); + if ( - message.senderId === queueModeId || - (message.targets && !message.targets.includes(queueModeId)) + !selfSendingAllowed && + (message.senderId === queueModeId || + (message.targets && !message.targets.includes(queueModeId))) ) { // Skipping command message because it's not for this instance logger.debug( @@ -71,52 +77,106 @@ export async function handleCommandMessageMain(messageString: string) { await Container.get(ExternalSecretsManager).reloadAllProviders(); break; - case 'workflowActiveStateChanged': { + case 'add-webhooks-triggers-and-pollers': { if (!debounceMessageReceiver(message, 100)) { message.payload = { result: 'debounced' }; return message; } - const { workflowId, oldState, newState, versionId } = message.payload ?? {}; + const orchestrationService = Container.get(OrchestrationService); - if ( - typeof workflowId !== 'string' || - typeof oldState !== 'boolean' || - typeof newState !== 'boolean' || - typeof versionId !== 'string' - ) { - break; - } + if (orchestrationService.isFollower) break; - if (!oldState && newState) { - try { - await activeWorkflowRunner.add(workflowId, 'activate'); - push.broadcast('workflowActivated', { workflowId }); - } catch (e) { - const error = e instanceof Error ? e : new Error(`${e}`); + if (typeof message.payload?.workflowId !== 'string') break; - await Container.get(WorkflowRepository).update(workflowId, { - active: false, - versionId, + const { workflowId } = message.payload; + + try { + await Container.get(ActiveWorkflowRunner).add(workflowId, 'activate', undefined, { + shouldPublish: false, // prevent leader re-publishing message + }); + + push.broadcast('workflowActivated', { workflowId }); + + // instruct followers to show activation in UI + await orchestrationService.publish('display-workflow-activation', { workflowId }); + } catch (error) { + if (error instanceof Error) { + await Container.get(WorkflowRepository).update(workflowId, { active: false }); + + Container.get(Push).broadcast('workflowFailedToActivate', { + workflowId, + errorMessage: error.message, }); - await Container.get(OrchestrationService).publish('workflowFailedToActivate', { + await Container.get(OrchestrationService).publish('workflow-failed-to-activate', { workflowId, errorMessage: error.message, }); } - } else if (oldState && !newState) { - await activeWorkflowRunner.remove(workflowId); - push.broadcast('workflowDeactivated', { workflowId }); - } else { - await activeWorkflowRunner.remove(workflowId); - await activeWorkflowRunner.add(workflowId, 'update'); } - await activeWorkflowRunner.removeActivationError(workflowId); + break; } - case 'workflowFailedToActivate': { + case 'remove-triggers-and-pollers': { + if (!debounceMessageReceiver(message, 100)) { + message.payload = { result: 'debounced' }; + return message; + } + + const orchestrationService = Container.get(OrchestrationService); + + if (orchestrationService.isFollower) break; + + if (typeof message.payload?.workflowId !== 'string') break; + + const { workflowId } = message.payload; + + const activeWorkflowRunner = Container.get(ActiveWorkflowRunner); + + await activeWorkflowRunner.removeActivationError(workflowId); + await activeWorkflowRunner.removeWorkflowTriggersAndPollers(workflowId); + + push.broadcast('workflowDeactivated', { workflowId }); + + // instruct followers to show workflow deactivation in UI + await orchestrationService.publish('display-workflow-deactivation', { workflowId }); + + break; + } + + case 'display-workflow-activation': { + if (!debounceMessageReceiver(message, 100)) { + message.payload = { result: 'debounced' }; + return message; + } + + const { workflowId } = message.payload ?? {}; + + if (typeof workflowId !== 'string') break; + + push.broadcast('workflowActivated', { workflowId }); + + break; + } + + case 'display-workflow-deactivation': { + if (!debounceMessageReceiver(message, 100)) { + message.payload = { result: 'debounced' }; + return message; + } + + const { workflowId } = message.payload ?? {}; + + if (typeof workflowId !== 'string') break; + + push.broadcast('workflowDeactivated', { workflowId }); + + break; + } + + case 'workflow-failed-to-activate': { if (!debounceMessageReceiver(message, 100)) { message.payload = { result: 'debounced' }; return message; diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index e1c20d71a6..b7c15ac0ef 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -7,8 +7,11 @@ export type RedisServiceCommand = | 'stopWorker' | 'reloadLicense' | 'reloadExternalSecretsProviders' - | 'workflowActiveStateChanged' // multi-main only - | 'workflowFailedToActivate' // multi-main only + | 'display-workflow-activation' // multi-main only + | 'display-workflow-deactivation' // multi-main only + | 'add-webhooks-triggers-and-pollers' // multi-main only + | 'remove-triggers-and-pollers' // multi-main only + | 'workflow-failed-to-activate' // multi-main only | 'relay-execution-lifecycle-event' // multi-main only | 'clear-test-webhooks'; // multi-main only diff --git a/packages/cli/src/workflows/workflow.service.ts b/packages/cli/src/workflows/workflow.service.ts index 56c19900ee..5cf6d6974e 100644 --- a/packages/cli/src/workflows/workflow.service.ts +++ b/packages/cli/src/workflows/workflow.service.ts @@ -80,8 +80,6 @@ export class WorkflowService { ); } - const oldState = shared.workflow.active; - if ( !forceSave && workflow.versionId !== '' && @@ -227,17 +225,6 @@ export class WorkflowService { await this.orchestrationService.init(); - const newState = updatedWorkflow.active; - - if (this.orchestrationService.isMultiMainSetupEnabled && oldState !== newState) { - await this.orchestrationService.publish('workflowActiveStateChanged', { - workflowId, - oldState, - newState, - versionId: shared.workflow.versionId, - }); - } - return updatedWorkflow; } diff --git a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts index 4088e00d11..571d2dedbd 100644 --- a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts +++ b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts @@ -1,6 +1,6 @@ import { Container } from 'typedi'; import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow'; -import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow'; +import type { IWebhookData } from 'n8n-workflow'; import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; @@ -14,13 +14,11 @@ import * as AdditionalData from '@/WorkflowExecuteAdditionalData'; import type { User } from '@db/entities/User'; import type { WebhookEntity } from '@db/entities/WebhookEntity'; import { NodeTypes } from '@/NodeTypes'; -import { OrchestrationService } from '@/services/orchestration.service'; import { ExecutionService } from '@/executions/execution.service'; import { WorkflowService } from '@/workflows/workflow.service'; import { ActiveWorkflowsService } from '@/services/activeWorkflows.service'; import { mockInstance } from '../shared/mocking'; -import { chooseRandomly } from './shared/random'; import { setSchedulerAsLoadedNode } from './shared/utils'; import * as testDb from './shared/testDb'; import { createOwner } from './shared/db/users'; @@ -33,11 +31,6 @@ mockInstance(ExecutionService); mockInstance(WorkflowService); const webhookService = mockInstance(WebhookService); -const orchestrationService = mockInstance(OrchestrationService, { - isMultiMainSetupEnabled: false, - isLeader: false, - isFollower: false, -}); setSchedulerAsLoadedNode(); @@ -47,14 +40,6 @@ let activeWorkflowsService: ActiveWorkflowsService; let activeWorkflowRunner: ActiveWorkflowRunner; let owner: User; -const NON_LEADERSHIP_CHANGE_MODES: WorkflowActivateMode[] = [ - 'init', - 'create', - 'update', - 'activate', - 'manual', -]; - beforeAll(async () => { await testDb.init(); @@ -215,113 +200,6 @@ describe('executeErrorWorkflow()', () => { }); }); -describe('add()', () => { - describe('in single-main scenario', () => { - test('should add webhooks, triggers and pollers', async () => { - const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); - - const workflow = await createWorkflow({ active: true }, owner); - - const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); - const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); - - await activeWorkflowRunner.init(); - - addWebhooksSpy.mockReset(); - addTriggersAndPollersSpy.mockReset(); - - await activeWorkflowRunner.add(workflow.id, mode); - - expect(addWebhooksSpy).toHaveBeenCalledTimes(1); - expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); - }); - }); - - describe('in multi-main scenario', () => { - describe('leader', () => { - describe('on non-leadership-change activation mode', () => { - test('should add webhooks only', async () => { - const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); - - const workflow = await createWorkflow({ active: true }, owner); - - jest.replaceProperty(orchestrationService, 'isMultiMainSetupEnabled', true); - jest.replaceProperty(orchestrationService, 'isLeader', true); - - const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); - const addTriggersAndPollersSpy = jest.spyOn( - activeWorkflowRunner, - 'addTriggersAndPollers', - ); - - await activeWorkflowRunner.init(); - addWebhooksSpy.mockReset(); - addTriggersAndPollersSpy.mockReset(); - - await activeWorkflowRunner.add(workflow.id, mode); - - expect(addWebhooksSpy).toHaveBeenCalledTimes(1); - expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); - }); - }); - - describe('on leadership change activation mode', () => { - test('should add triggers and pollers only', async () => { - const mode = 'leadershipChange'; - - jest.replaceProperty(orchestrationService, 'isMultiMainSetupEnabled', true); - jest.replaceProperty(orchestrationService, 'isLeader', true); - - const workflow = await createWorkflow({ active: true }, owner); - - const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); - const addTriggersAndPollersSpy = jest.spyOn( - activeWorkflowRunner, - 'addTriggersAndPollers', - ); - - await activeWorkflowRunner.init(); - addWebhooksSpy.mockReset(); - addTriggersAndPollersSpy.mockReset(); - - await activeWorkflowRunner.add(workflow.id, mode); - - expect(addWebhooksSpy).not.toHaveBeenCalled(); - expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); - }); - }); - }); - - describe('follower', () => { - describe('on any activation mode', () => { - test('should not add webhooks, triggers or pollers', async () => { - const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); - - jest.replaceProperty(orchestrationService, 'isMultiMainSetupEnabled', true); - jest.replaceProperty(orchestrationService, 'isLeader', false); - - const workflow = await createWorkflow({ active: true }, owner); - - const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); - const addTriggersAndPollersSpy = jest.spyOn( - activeWorkflowRunner, - 'addTriggersAndPollers', - ); - - await activeWorkflowRunner.init(); - addWebhooksSpy.mockReset(); - addTriggersAndPollersSpy.mockReset(); - - await activeWorkflowRunner.add(workflow.id, mode); - - expect(addWebhooksSpy).not.toHaveBeenCalled(); - expect(addTriggersAndPollersSpy).not.toHaveBeenCalled(); - }); - }); - }); - }); -}); - describe('addWebhooks()', () => { test('should call `WebhookService.storeWebhook()`', async () => { const mockWebhook = { path: 'fake-path' } as unknown as IWebhookData; diff --git a/packages/cli/test/integration/shared/utils/index.ts b/packages/cli/test/integration/shared/utils/index.ts index 584f6e0b2f..e2c2225b8b 100644 --- a/packages/cli/test/integration/shared/utils/index.ts +++ b/packages/cli/test/integration/shared/utils/index.ts @@ -31,9 +31,12 @@ export { setupTestServer } from './testServer'; * Initialize node types. */ export async function initActiveWorkflowRunner() { - mockInstance(Push); - mockInstance(OrchestrationService); + mockInstance(OrchestrationService, { + isMultiMainSetupEnabled: false, + shouldAddWebhooks: jest.fn().mockReturnValue(true), + }); + mockInstance(Push); mockInstance(ExecutionService); const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner'); const workflowRunner = Container.get(ActiveWorkflowRunner); diff --git a/packages/cli/test/integration/workflows/workflow.service.test.ts b/packages/cli/test/integration/workflows/workflow.service.test.ts index 996b3d0d86..fe9780691a 100644 --- a/packages/cli/test/integration/workflows/workflow.service.test.ts +++ b/packages/cli/test/integration/workflows/workflow.service.test.ts @@ -83,35 +83,4 @@ describe('update()', () => { expect(addSpy).not.toHaveBeenCalled(); }); - - test('should broadcast active workflow state change if state changed', async () => { - const owner = await createOwner(); - const workflow = await createWorkflow({ active: true }, owner); - - const publishSpy = jest.spyOn(orchestrationService, 'publish'); - - workflow.active = false; - await workflowService.update(owner, workflow, workflow.id); - - expect(publishSpy).toHaveBeenCalledTimes(1); - expect(publishSpy).toHaveBeenCalledWith( - 'workflowActiveStateChanged', - expect.objectContaining({ - newState: false, - oldState: true, - workflowId: workflow.id, - }), - ); - }); - - test('should not broadcast active workflow state change if state did not change', async () => { - const owner = await createOwner(); - const workflow = await createWorkflow({ active: true }, owner); - - const publishSpy = jest.spyOn(orchestrationService, 'publish'); - - await workflowService.update(owner, workflow, workflow.id); - - expect(publishSpy).not.toHaveBeenCalled(); - }); }); diff --git a/packages/workflow/src/ErrorReporterProxy.ts b/packages/workflow/src/ErrorReporterProxy.ts index fc5a08828d..3b50d31719 100644 --- a/packages/workflow/src/ErrorReporterProxy.ts +++ b/packages/workflow/src/ErrorReporterProxy.ts @@ -33,7 +33,10 @@ export const error = (e: unknown, options?: ReportingOptions) => { if (toReport) instance.report(toReport, options); }; -export const report = error; +export const info = (msg: string, options?: ReportingOptions) => { + Logger.info(msg); + instance.report(msg, options); +}; export const warn = (warning: Error | string, options?: ReportingOptions) => error(warning, { level: 'warning', ...options }); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 7dcc8da243..b1d6eda58c 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -1986,10 +1986,10 @@ export type WorkflowExecuteMode = export type WorkflowActivateMode = | 'init' - | 'create' + | 'create' // unused | 'update' | 'activate' - | 'manual' + | 'manual' // unused | 'leadershipChange'; export interface IWorkflowHooksOptionalParameters {