From 3aa679e4ac411d0d34e039fa6c43bc98f2e3670f Mon Sep 17 00:00:00 2001 From: Juuso Tapaninen Date: Wed, 26 Feb 2025 14:16:38 +0200 Subject: [PATCH] feat(core): Add metric for active workflow count (#13420) --- .../config/src/configs/endpoints.config.ts | 4 + packages/@n8n/config/test/config.test.ts | 1 + .../repositories/workflow.repository.ts | 6 ++ .../prometheus-metrics.service.test.ts | 21 ++++- ...rometheus-metrics.service.unmocked.test.ts | 70 +++++++++++++++ .../src/metrics/prometheus-metrics.service.ts | 40 ++++++++- .../integration/prometheus-metrics.test.ts | 87 +++++++++++++++++++ 7 files changed, 226 insertions(+), 3 deletions(-) diff --git a/packages/@n8n/config/src/configs/endpoints.config.ts b/packages/@n8n/config/src/configs/endpoints.config.ts index 4ec58ccf0d..994343c650 100644 --- a/packages/@n8n/config/src/configs/endpoints.config.ts +++ b/packages/@n8n/config/src/configs/endpoints.config.ts @@ -57,6 +57,10 @@ class PrometheusMetricsConfig { /** How often (in seconds) to update queue metrics. */ @Env('N8N_METRICS_QUEUE_METRICS_INTERVAL') queueMetricsInterval: number = 20; + + /** How often (in seconds) to update active workflow metric */ + @Env('N8N_METRICS_ACTIVE_WORKFLOW_METRIC_INTERVAL') + activeWorkflowCountInterval: number = 60; } @Config diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index 735cb4bfcf..626c5f4d62 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -173,6 +173,7 @@ describe('GlobalConfig', () => { includeApiStatusCodeLabel: false, includeQueueMetrics: false, queueMetricsInterval: 20, + activeWorkflowCountInterval: 60, }, additionalNonUIRoutes: '', disableProductionWebhooksOnMainProcess: false, diff --git a/packages/cli/src/databases/repositories/workflow.repository.ts b/packages/cli/src/databases/repositories/workflow.repository.ts index 8b229840c8..69bf66d955 100644 --- a/packages/cli/src/databases/repositories/workflow.repository.ts +++ b/packages/cli/src/databases/repositories/workflow.repository.ts @@ -79,6 +79,12 @@ export class WorkflowRepository extends Repository { return activeWorkflows.map((workflow) => workflow.id); } + async getActiveCount() { + return await this.count({ + where: { active: true }, + }); + } + async findById(workflowId: string) { return await this.findOne({ where: { id: workflowId }, diff --git a/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts b/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts index cbdd09f643..d47cdb2737 100644 --- a/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts +++ b/packages/cli/src/metrics/__tests__/prometheus-metrics.service.test.ts @@ -6,6 +6,7 @@ import type { InstanceSettings } from 'n8n-core'; import promClient from 'prom-client'; import config from '@/config'; +import type { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import type { EventService } from '@/events/event.service'; import { mockInstance } from '@test/mocking'; @@ -52,12 +53,14 @@ describe('PrometheusMetricsService', () => { const eventBus = mock(); const eventService = mock(); const instanceSettings = mock({ instanceType: 'main' }); + const workflowRepository = mock(); const prometheusMetricsService = new PrometheusMetricsService( mock(), eventBus, globalConfig, eventService, instanceSettings, + workflowRepository, ); afterEach(() => { @@ -75,6 +78,7 @@ describe('PrometheusMetricsService', () => { customGlobalConfig, mock(), instanceSettings, + mock(), ); await customPrometheusMetricsService.init(app); @@ -217,7 +221,7 @@ describe('PrometheusMetricsService', () => { await prometheusMetricsService.init(app); - expect(promClient.Gauge).toHaveBeenCalledTimes(1); // version metric + expect(promClient.Gauge).toHaveBeenCalledTimes(2); // version metric + active workflow count metric expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics expect(eventService.on).not.toHaveBeenCalled(); }); @@ -230,9 +234,22 @@ describe('PrometheusMetricsService', () => { await prometheusMetricsService.init(app); - expect(promClient.Gauge).toHaveBeenCalledTimes(1); // version metric + expect(promClient.Gauge).toHaveBeenCalledTimes(2); // version metric + active workflow count metric expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics expect(eventService.on).not.toHaveBeenCalled(); }); + + it('should setup active workflow count metric', async () => { + await prometheusMetricsService.init(app); + + // First call is n8n version metric + expect(promClient.Gauge).toHaveBeenCalledTimes(2); + + expect(promClient.Gauge).toHaveBeenNthCalledWith(2, { + name: 'n8n_active_workflow_count', + help: 'Total number of active workflows.', + collect: expect.any(Function), + }); + }); }); }); diff --git a/packages/cli/src/metrics/__tests__/prometheus-metrics.service.unmocked.test.ts b/packages/cli/src/metrics/__tests__/prometheus-metrics.service.unmocked.test.ts index e485bbe435..2dde75086d 100644 --- a/packages/cli/src/metrics/__tests__/prometheus-metrics.service.unmocked.test.ts +++ b/packages/cli/src/metrics/__tests__/prometheus-metrics.service.unmocked.test.ts @@ -4,8 +4,10 @@ import { mock } from 'jest-mock-extended'; import type { InstanceSettings } from 'n8n-core'; import promClient from 'prom-client'; +import type { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { EventMessageWorkflow } from '@/eventbus/event-message-classes/event-message-workflow'; import type { EventService } from '@/events/event.service'; +import type { CacheService } from '@/services/cache/cache.service'; import { mockInstance } from '@test/mocking'; import { MessageEventBus } from '../../eventbus/message-event-bus/message-event-bus'; @@ -15,8 +17,10 @@ jest.unmock('@/eventbus/message-event-bus/message-event-bus'); const customPrefix = 'custom_'; +const cacheService = mock(); const eventService = mock(); const instanceSettings = mock({ instanceType: 'main' }); +const workflowRepository = mock(); const app = mock(); const eventBus = new MessageEventBus( mock(), @@ -48,6 +52,7 @@ describe('workflow_success_total', () => { globalConfig, eventService, instanceSettings, + workflowRepository, ); await prometheusMetricsService.init(app); @@ -87,6 +92,7 @@ workflow_success_total{workflow_id="1234"} 1" globalConfig, eventService, instanceSettings, + workflowRepository, ); await prometheusMetricsService.init(app); @@ -107,3 +113,67 @@ workflow_success_total{workflow_id="1234"} 1" } }); }); + +describe('Active workflow count', () => { + const globalConfig = mockInstance(GlobalConfig, { + endpoints: { + metrics: { + prefix: '', + activeWorkflowCountInterval: 30, + }, + }, + }); + + const prometheusMetricsService = new PrometheusMetricsService( + cacheService, + eventBus, + globalConfig, + eventService, + instanceSettings, + workflowRepository, + ); + + afterEach(() => { + jest.clearAllMocks(); + prometheusMetricsService.disableAllMetrics(); + }); + + it('should prioritize cached value', async () => { + await prometheusMetricsService.init(app); + + cacheService.get.mockReturnValueOnce(Promise.resolve('1')); + workflowRepository.getActiveCount.mockReturnValueOnce(Promise.resolve(2)); + + const activeWorkflowCount = + await promClient.register.getSingleMetricAsString('active_workflow_count'); + + expect(cacheService.get).toHaveBeenCalledWith('metrics:active-workflow-count'); + expect(workflowRepository.getActiveCount).not.toHaveBeenCalled(); + + expect(activeWorkflowCount).toMatchInlineSnapshot(` +"# HELP active_workflow_count Total number of active workflows. +# TYPE active_workflow_count gauge +active_workflow_count 1" +`); + }); + + it('should query value from database if cache misses', async () => { + await prometheusMetricsService.init(app); + + cacheService.get.mockReturnValueOnce(Promise.resolve(undefined)); + workflowRepository.getActiveCount.mockReturnValueOnce(Promise.resolve(2)); + + const activeWorkflowCount = + await promClient.register.getSingleMetricAsString('active_workflow_count'); + + expect(cacheService.get).toHaveBeenCalledWith('metrics:active-workflow-count'); + expect(workflowRepository.getActiveCount).toHaveBeenCalled(); + expect(cacheService.set).toHaveBeenCalledWith('metrics:active-workflow-count', '2', 30_000); + + expect(activeWorkflowCount).toMatchInlineSnapshot(` +"# HELP active_workflow_count Total number of active workflows. +# TYPE active_workflow_count gauge +active_workflow_count 2" +`); + }); +}); diff --git a/packages/cli/src/metrics/prometheus-metrics.service.ts b/packages/cli/src/metrics/prometheus-metrics.service.ts index ec66dd9a70..a646c00efd 100644 --- a/packages/cli/src/metrics/prometheus-metrics.service.ts +++ b/packages/cli/src/metrics/prometheus-metrics.service.ts @@ -8,7 +8,8 @@ import promClient, { type Counter, type Gauge } from 'prom-client'; import semverParse from 'semver/functions/parse'; import config from '@/config'; -import { N8N_VERSION } from '@/constants'; +import { N8N_VERSION, Time } from '@/constants'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import type { EventMessageTypes } from '@/eventbus'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; @@ -24,6 +25,7 @@ export class PrometheusMetricsService { private readonly globalConfig: GlobalConfig, private readonly eventService: EventService, private readonly instanceSettings: InstanceSettings, + private readonly workflowRepository: WorkflowRepository, ) {} private readonly counters: { [key: string]: Counter | null } = {}; @@ -58,6 +60,7 @@ export class PrometheusMetricsService { this.initEventBusMetrics(); this.initRouteMetrics(app); this.initQueueMetrics(); + this.initActiveWorkflowCountMetric(); this.mountMetricsEndpoint(app); } @@ -285,6 +288,41 @@ export class PrometheusMetricsService { }); } + /** + * Setup active workflow count metric + * + * This metric is updated every time metrics are collected. + * We also cache the value of active workflow counts so we + * don't hit the database on every metrics query. Both the + * metric being enabled and the TTL of the cached value is + * configurable. + */ + private initActiveWorkflowCountMetric() { + const workflowRepository = this.workflowRepository; + const cacheService = this.cacheService; + const cacheKey = 'metrics:active-workflow-count'; + const cacheTtl = + this.globalConfig.endpoints.metrics.activeWorkflowCountInterval * Time.seconds.toMilliseconds; + + new promClient.Gauge({ + name: this.prefix + 'active_workflow_count', + help: 'Total number of active workflows.', + async collect() { + const value = await cacheService.get(cacheKey); + const numericValue = value !== undefined ? parseInt(value, 10) : undefined; + + if (numericValue !== undefined && Number.isFinite(numericValue)) { + this.set(numericValue); + } else { + const activeWorkflowCount = await workflowRepository.getActiveCount(); + await cacheService.set(cacheKey, activeWorkflowCount.toString(), cacheTtl); + + this.set(activeWorkflowCount); + } + }, + }); + } + private toLabels(event: EventMessageTypes): Record { const { __type, eventName, payload } = event; diff --git a/packages/cli/test/integration/prometheus-metrics.test.ts b/packages/cli/test/integration/prometheus-metrics.test.ts index c5accedbc6..792f978bfa 100644 --- a/packages/cli/test/integration/prometheus-metrics.test.ts +++ b/packages/cli/test/integration/prometheus-metrics.test.ts @@ -5,8 +5,11 @@ import request, { type Response } from 'supertest'; import config from '@/config'; import { N8N_VERSION } from '@/constants'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { EventService } from '@/events/event.service'; import { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service'; +import { CacheService } from '@/services/cache/cache.service'; +import { createWorkflow, newWorkflow } from '@test-integration/db/workflows'; import { setupTestServer } from './shared/utils'; @@ -16,6 +19,7 @@ const toLines = (response: Response) => response.text.trim().split('\n'); const eventService = Container.get(EventService); const globalConfig = Container.get(GlobalConfig); +globalConfig.cache.backend = 'memory'; globalConfig.endpoints.metrics = { enable: true, prefix: 'n8n_test_', @@ -31,6 +35,7 @@ globalConfig.endpoints.metrics = { includeApiStatusCodeLabel: true, includeQueueMetrics: true, queueMetricsInterval: 20, + activeWorkflowCountInterval: 60, }; const server = setupTestServer({ endpointGroups: ['metrics'] }); @@ -202,6 +207,51 @@ describe('PrometheusMetricsService', () => { ); }); + it('should include last activity metric with route metrics', async () => { + /** + * Arrange + */ + prometheusService.enableMetric('routes'); + await prometheusService.init(server.app); + await agent.get('/api/v1/workflows'); + + /** + * Act + */ + let response = await agent.get('/metrics'); + + /** + * Assert + */ + expect(response.status).toEqual(200); + expect(response.type).toEqual('text/plain'); + + const lines = toLines(response); + + expect(lines).toContainEqual(expect.stringContaining('n8n_test_last_activity')); + + const lastActivityLine = lines.find((line) => + line.startsWith('n8n_test_last_activity{timestamp='), + ); + + expect(lastActivityLine).toBeDefined(); + expect(lastActivityLine?.endsWith('1')).toBe(true); + + // Update last activity + await agent.get('/api/v1/workflows'); + + response = await agent.get('/metrics'); + const updatedLines = toLines(response); + + const newLastActivityLine = updatedLines.find((line) => + line.startsWith('n8n_test_last_activity{timestamp='), + ); + + expect(newLastActivityLine).toBeDefined(); + // Timestamp label should be different + expect(newLastActivityLine).not.toBe(lastActivityLine); + }); + it('should return labels in route metrics if enabled', async () => { /** * ARrange @@ -284,4 +334,41 @@ describe('PrometheusMetricsService', () => { expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_completed 0'); expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_failed 0'); }); + + it('should return active workflow count', async () => { + await prometheusService.init(server.app); + + let response = await agent.get('/metrics'); + + expect(response.status).toEqual(200); + expect(response.type).toEqual('text/plain'); + + let lines = toLines(response); + + expect(lines).toContain('n8n_test_active_workflow_count 0'); + + const workflow = newWorkflow({ active: true }); + await createWorkflow(workflow); + + const workflowRepository = Container.get(WorkflowRepository); + const activeWorkflowCount = await workflowRepository.getActiveCount(); + + expect(activeWorkflowCount).toBe(1); + + response = await agent.get('/metrics'); + + lines = toLines(response); + + // Should return cached value + expect(lines).toContain('n8n_test_active_workflow_count 0'); + + const cacheService = Container.get(CacheService); + await cacheService.delete('metrics:active-workflow-count'); + + response = await agent.get('/metrics'); + + lines = toLines(response); + + expect(lines).toContain('n8n_test_active_workflow_count 1'); + }); });