From 17acf70591422bfea84b13f24c35d628bff4d35e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 11 Feb 2025 15:12:55 +0100 Subject: [PATCH] perf(core): Batch workflow activation to speed up startup (#13191) --- .../config/src/configs/workflows.config.ts | 4 + packages/@n8n/config/src/index.ts | 1 + packages/@n8n/config/test/config.test.ts | 1 + .../__tests__/active-workflow-manager.test.ts | 1 + packages/cli/src/active-workflow-manager.ts | 87 +++++++++++-------- 5 files changed, 58 insertions(+), 36 deletions(-) diff --git a/packages/@n8n/config/src/configs/workflows.config.ts b/packages/@n8n/config/src/configs/workflows.config.ts index c5b88775c8..bebce7ec77 100644 --- a/packages/@n8n/config/src/configs/workflows.config.ts +++ b/packages/@n8n/config/src/configs/workflows.config.ts @@ -10,4 +10,8 @@ export class WorkflowsConfig { @Env('N8N_WORKFLOW_CALLER_POLICY_DEFAULT_OPTION') callerPolicyDefaultOption: 'any' | 'none' | 'workflowsFromAList' | 'workflowsFromSameOwner' = 'workflowsFromSameOwner'; + + /** How many workflows to activate simultaneously during startup. */ + @Env('N8N_WORKFLOW_ACTIVATION_BATCH_SIZE') + activationBatchSize: number = 1; } diff --git a/packages/@n8n/config/src/index.ts b/packages/@n8n/config/src/index.ts index edcc794ca5..db3861d855 100644 --- a/packages/@n8n/config/src/index.ts +++ b/packages/@n8n/config/src/index.ts @@ -35,6 +35,7 @@ export { FrontendBetaFeatures, FrontendConfig } from './configs/frontend.config' export { S3Config } from './configs/external-storage.config'; export { LOG_SCOPES } from './configs/logging.config'; export type { LogScope } from './configs/logging.config'; +export { WorkflowsConfig } from './configs/workflows.config'; @Config export class GlobalConfig { diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index 32952ec60b..834963bcfd 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -156,6 +156,7 @@ describe('GlobalConfig', () => { workflows: { defaultName: 'My workflow', callerPolicyDefaultOption: 'workflowsFromSameOwner', + activationBatchSize: 1, }, endpoints: { metrics: { diff --git a/packages/cli/src/__tests__/active-workflow-manager.test.ts b/packages/cli/src/__tests__/active-workflow-manager.test.ts index a167f1e5a5..dd2d3d8cb5 100644 --- a/packages/cli/src/__tests__/active-workflow-manager.test.ts +++ b/packages/cli/src/__tests__/active-workflow-manager.test.ts @@ -36,6 +36,7 @@ describe('ActiveWorkflowManager', () => { mock(), instanceSettings, mock(), + mock(), ); }); diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index b783724bef..2f47212561 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -1,5 +1,7 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ +import { WorkflowsConfig } from '@n8n/config'; import { Service } from '@n8n/di'; +import { chunk } from 'lodash'; import { ActiveWorkflows, ErrorReporter, @@ -82,6 +84,7 @@ export class ActiveWorkflowManager { private readonly workflowExecutionService: WorkflowExecutionService, private readonly instanceSettings: InstanceSettings, private readonly publisher: Publisher, + private readonly workflowsConfig: WorkflowsConfig, ) {} async init() { @@ -419,48 +422,61 @@ export class ActiveWorkflowManager { this.logger.info(' ================================'); } - for (const dbWorkflow of dbWorkflows) { - try { - const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow, { - shouldPublish: false, - }); + const batches = chunk(dbWorkflows, this.workflowsConfig.activationBatchSize); - if (wasActivated) { - this.logger.debug(`Successfully started workflow ${dbWorkflow.display()}`, { - workflowName: dbWorkflow.name, - workflowId: dbWorkflow.id, - }); - this.logger.info(' => Started'); - } - } catch (error) { - this.errorReporter.error(error); - this.logger.info( - ' => ERROR: Workflow could not be activated on first try, keep on trying if not an auth issue', - ); + for (const batch of batches) { + const activationPromises = batch.map(async (dbWorkflow) => { + await this.activateWorkflow(dbWorkflow, activationMode); + }); - this.logger.info(` ${error.message}`); - this.logger.error( - `Issue on initial workflow activation try of ${dbWorkflow.display()} (startup)`, - { - workflowName: dbWorkflow.name, - workflowId: dbWorkflow.id, - }, - ); - - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - this.executeErrorWorkflow(error, dbWorkflow, 'internal'); - - // do not keep trying to activate on authorization error - // eslint-disable-next-line @typescript-eslint/no-unsafe-call - if (error.message.includes('Authorization')) continue; - - this.addQueuedWorkflowActivation('init', dbWorkflow); - } + await Promise.all(activationPromises); } this.logger.debug('Finished activating workflows (startup)'); } + private async activateWorkflow( + dbWorkflow: WorkflowEntity, + activationMode: 'init' | 'leadershipChange', + ) { + try { + const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow, { + shouldPublish: false, + }); + if (wasActivated) { + this.logger.info(` - ${dbWorkflow.display()})`); + this.logger.info(' => Started'); + this.logger.debug(`Successfully started workflow ${dbWorkflow.display()}`, { + workflowName: dbWorkflow.name, + workflowId: dbWorkflow.id, + }); + } + } catch (error) { + this.errorReporter.error(error); + this.logger.info( + ` => ERROR: Workflow ${dbWorkflow.display()} could not be activated on first try, keep on trying if not an auth issue`, + ); + + this.logger.info(` ${error.message}`); + this.logger.error( + `Issue on initial workflow activation try of ${dbWorkflow.display()} (startup)`, + { + workflowName: dbWorkflow.name, + workflowId: dbWorkflow.id, + }, + ); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + this.executeErrorWorkflow(error, dbWorkflow, 'internal'); + + // do not keep trying to activate on authorization error + // eslint-disable-next-line @typescript-eslint/no-unsafe-call + if (error.message.includes('Authorization')) return; + + this.addQueuedWorkflowActivation('init', dbWorkflow); + } + } + async clearAllActivationErrors() { this.logger.debug('Clearing all activation errors'); @@ -533,7 +549,6 @@ export class ActiveWorkflowManager { } if (shouldDisplayActivationMessage) { - this.logger.info(` - ${dbWorkflow.display()}`); this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, { workflowName: dbWorkflow.name, workflowId: dbWorkflow.id,