mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 21:07:28 -08:00
feat(core): Expose queue metrics for Prometheus (#10559)
This commit is contained in:
parent
acfd60ac85
commit
008c510b76
|
@ -49,6 +49,14 @@ class PrometheusMetricsConfig {
|
||||||
/** Whether to include metrics derived from n8n's internal events */
|
/** Whether to include metrics derived from n8n's internal events */
|
||||||
@Env('N8N_METRICS_INCLUDE_MESSAGE_EVENT_BUS_METRICS')
|
@Env('N8N_METRICS_INCLUDE_MESSAGE_EVENT_BUS_METRICS')
|
||||||
includeMessageEventBusMetrics: boolean = false;
|
includeMessageEventBusMetrics: boolean = false;
|
||||||
|
|
||||||
|
/** Whether to include metrics for jobs in scaling mode. Not supported in multi-main setup. */
|
||||||
|
@Env('N8N_METRICS_INCLUDE_QUEUE_METRICS')
|
||||||
|
includeQueueMetrics: boolean = false;
|
||||||
|
|
||||||
|
/** How often (in seconds) to update queue metrics. */
|
||||||
|
@Env('N8N_METRICS_QUEUE_METRICS_INTERVAL')
|
||||||
|
queueMetricsInterval: number = 20;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Config
|
@Config
|
||||||
|
|
|
@ -165,6 +165,8 @@ describe('GlobalConfig', () => {
|
||||||
includeApiMethodLabel: false,
|
includeApiMethodLabel: false,
|
||||||
includeCredentialTypeLabel: false,
|
includeCredentialTypeLabel: false,
|
||||||
includeApiStatusCodeLabel: false,
|
includeApiStatusCodeLabel: false,
|
||||||
|
includeQueueMetrics: false,
|
||||||
|
queueMetricsInterval: 20,
|
||||||
},
|
},
|
||||||
additionalNonUIRoutes: '',
|
additionalNonUIRoutes: '',
|
||||||
disableProductionWebhooksOnMainProcess: false,
|
disableProductionWebhooksOnMainProcess: false,
|
||||||
|
|
|
@ -33,6 +33,7 @@ describe('TelemetryEventRelay', () => {
|
||||||
includeApiEndpoints: false,
|
includeApiEndpoints: false,
|
||||||
includeCacheMetrics: false,
|
includeCacheMetrics: false,
|
||||||
includeMessageEventBusMetrics: false,
|
includeMessageEventBusMetrics: false,
|
||||||
|
includeQueueMetrics: false,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
@ -948,6 +949,7 @@ describe('TelemetryEventRelay', () => {
|
||||||
metrics_category_routes: false,
|
metrics_category_routes: false,
|
||||||
metrics_category_cache: false,
|
metrics_category_cache: false,
|
||||||
metrics_category_logs: false,
|
metrics_category_logs: false,
|
||||||
|
metrics_category_queue: false,
|
||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { TypedEmitter } from '@/TypedEmitter';
|
import { TypedEmitter } from '@/TypedEmitter';
|
||||||
import type { RelayEventMap } from './relay-event-map';
|
import type { RelayEventMap } from './relay-event-map';
|
||||||
|
import type { QueueMetricsEventMap } from './queue-metrics-event-map';
|
||||||
|
|
||||||
|
type EventMap = RelayEventMap & QueueMetricsEventMap;
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class EventService extends TypedEmitter<RelayEventMap> {}
|
export class EventService extends TypedEmitter<EventMap> {}
|
||||||
|
|
8
packages/cli/src/events/queue-metrics-event-map.ts
Normal file
8
packages/cli/src/events/queue-metrics-event-map.ts
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
export type QueueMetricsEventMap = {
|
||||||
|
'job-counts-updated': {
|
||||||
|
active: number;
|
||||||
|
completed: number;
|
||||||
|
failed: number;
|
||||||
|
waiting: number;
|
||||||
|
};
|
||||||
|
};
|
|
@ -768,6 +768,7 @@ export class TelemetryEventRelay extends EventRelay {
|
||||||
metrics_category_routes: this.globalConfig.endpoints.metrics.includeApiEndpoints,
|
metrics_category_routes: this.globalConfig.endpoints.metrics.includeApiEndpoints,
|
||||||
metrics_category_cache: this.globalConfig.endpoints.metrics.includeCacheMetrics,
|
metrics_category_cache: this.globalConfig.endpoints.metrics.includeCacheMetrics,
|
||||||
metrics_category_logs: this.globalConfig.endpoints.metrics.includeMessageEventBusMetrics,
|
metrics_category_logs: this.globalConfig.endpoints.metrics.includeMessageEventBusMetrics,
|
||||||
|
metrics_category_queue: this.globalConfig.endpoints.metrics.includeQueueMetrics,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ import type express from 'express';
|
||||||
import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
|
import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
|
||||||
import { mockInstance } from '@test/mocking';
|
import { mockInstance } from '@test/mocking';
|
||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
|
import type { EventService } from '@/events/event.service';
|
||||||
|
|
||||||
const mockMiddleware = (
|
const mockMiddleware = (
|
||||||
_req: express.Request,
|
_req: express.Request,
|
||||||
|
@ -22,27 +23,62 @@ describe('PrometheusMetricsService', () => {
|
||||||
endpoints: {
|
endpoints: {
|
||||||
metrics: {
|
metrics: {
|
||||||
prefix: 'n8n_',
|
prefix: 'n8n_',
|
||||||
includeDefaultMetrics: true,
|
includeDefaultMetrics: false,
|
||||||
includeApiEndpoints: true,
|
includeApiEndpoints: false,
|
||||||
includeCacheMetrics: true,
|
includeCacheMetrics: false,
|
||||||
includeMessageEventBusMetrics: true,
|
includeMessageEventBusMetrics: false,
|
||||||
includeCredentialTypeLabel: false,
|
includeCredentialTypeLabel: false,
|
||||||
includeNodeTypeLabel: false,
|
includeNodeTypeLabel: false,
|
||||||
includeWorkflowIdLabel: false,
|
includeWorkflowIdLabel: false,
|
||||||
includeApiPathLabel: true,
|
includeApiPathLabel: false,
|
||||||
includeApiMethodLabel: true,
|
includeApiMethodLabel: false,
|
||||||
includeApiStatusCodeLabel: true,
|
includeApiStatusCodeLabel: false,
|
||||||
|
includeQueueMetrics: false,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const app = mock<express.Application>();
|
||||||
|
const eventBus = mock<MessageEventBus>();
|
||||||
|
const eventService = mock<EventService>();
|
||||||
|
const prometheusMetricsService = new PrometheusMetricsService(
|
||||||
|
mock(),
|
||||||
|
eventBus,
|
||||||
|
globalConfig,
|
||||||
|
eventService,
|
||||||
|
);
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
jest.clearAllMocks();
|
||||||
|
prometheusMetricsService.disableAllMetrics();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('constructor', () => {
|
||||||
|
it('should enable metrics based on global config', async () => {
|
||||||
|
const customGlobalConfig = { ...globalConfig };
|
||||||
|
customGlobalConfig.endpoints.metrics.includeCacheMetrics = true;
|
||||||
|
const customPrometheusMetricsService = new PrometheusMetricsService(
|
||||||
|
mock(),
|
||||||
|
mock(),
|
||||||
|
customGlobalConfig,
|
||||||
|
mock(),
|
||||||
|
);
|
||||||
|
|
||||||
|
await customPrometheusMetricsService.init(app);
|
||||||
|
|
||||||
|
expect(promClient.Counter).toHaveBeenCalledWith({
|
||||||
|
name: 'n8n_cache_hits_total',
|
||||||
|
help: 'Total number of cache hits.',
|
||||||
|
labelNames: ['cache'],
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('init', () => {
|
describe('init', () => {
|
||||||
it('should set up `n8n_version_info`', async () => {
|
it('should set up `n8n_version_info`', async () => {
|
||||||
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
await service.init(mock<express.Application>());
|
expect(promClient.Gauge).toHaveBeenNthCalledWith(1, {
|
||||||
|
|
||||||
expect(promClient.Gauge).toHaveBeenCalledWith({
|
|
||||||
name: 'n8n_version_info',
|
name: 'n8n_version_info',
|
||||||
help: 'n8n version info.',
|
help: 'n8n version info.',
|
||||||
labelNames: ['version', 'major', 'minor', 'patch'],
|
labelNames: ['version', 'major', 'minor', 'patch'],
|
||||||
|
@ -50,48 +86,37 @@ describe('PrometheusMetricsService', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should set up default metrics collection with `prom-client`', async () => {
|
it('should set up default metrics collection with `prom-client`', async () => {
|
||||||
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);
|
prometheusMetricsService.enableMetric('default');
|
||||||
|
await prometheusMetricsService.init(app);
|
||||||
await service.init(mock<express.Application>());
|
|
||||||
|
|
||||||
expect(promClient.collectDefaultMetrics).toHaveBeenCalled();
|
expect(promClient.collectDefaultMetrics).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should set up `n8n_cache_hits_total`', async () => {
|
it('should set up `n8n_cache_hits_total`', async () => {
|
||||||
config.set('endpoints.metrics.includeCacheMetrics', true);
|
prometheusMetricsService.enableMetric('cache');
|
||||||
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
await service.init(mock<express.Application>());
|
|
||||||
|
|
||||||
expect(promClient.Counter).toHaveBeenCalledWith({
|
expect(promClient.Counter).toHaveBeenCalledWith({
|
||||||
name: 'n8n_cache_hits_total',
|
name: 'n8n_cache_hits_total',
|
||||||
help: 'Total number of cache hits.',
|
help: 'Total number of cache hits.',
|
||||||
labelNames: ['cache'],
|
labelNames: ['cache'],
|
||||||
});
|
});
|
||||||
// @ts-expect-error private field
|
|
||||||
expect(service.counters.cacheHitsTotal?.inc).toHaveBeenCalledWith(0);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should set up `n8n_cache_misses_total`', async () => {
|
it('should set up `n8n_cache_misses_total`', async () => {
|
||||||
config.set('endpoints.metrics.includeCacheMetrics', true);
|
prometheusMetricsService.enableMetric('cache');
|
||||||
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
await service.init(mock<express.Application>());
|
|
||||||
|
|
||||||
expect(promClient.Counter).toHaveBeenCalledWith({
|
expect(promClient.Counter).toHaveBeenCalledWith({
|
||||||
name: 'n8n_cache_misses_total',
|
name: 'n8n_cache_misses_total',
|
||||||
help: 'Total number of cache misses.',
|
help: 'Total number of cache misses.',
|
||||||
labelNames: ['cache'],
|
labelNames: ['cache'],
|
||||||
});
|
});
|
||||||
// @ts-expect-error private field
|
|
||||||
expect(service.counters.cacheMissesTotal?.inc).toHaveBeenCalledWith(0);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should set up `n8n_cache_updates_total`', async () => {
|
it('should set up `n8n_cache_updates_total`', async () => {
|
||||||
config.set('endpoints.metrics.includeCacheMetrics', true);
|
prometheusMetricsService.enableMetric('cache');
|
||||||
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
await service.init(mock<express.Application>());
|
|
||||||
|
|
||||||
expect(promClient.Counter).toHaveBeenCalledWith({
|
expect(promClient.Counter).toHaveBeenCalledWith({
|
||||||
name: 'n8n_cache_updates_total',
|
name: 'n8n_cache_updates_total',
|
||||||
|
@ -99,26 +124,19 @@ describe('PrometheusMetricsService', () => {
|
||||||
labelNames: ['cache'],
|
labelNames: ['cache'],
|
||||||
});
|
});
|
||||||
// @ts-expect-error private field
|
// @ts-expect-error private field
|
||||||
expect(service.counters.cacheUpdatesTotal?.inc).toHaveBeenCalledWith(0);
|
expect(prometheusMetricsService.counters.cacheUpdatesTotal?.inc).toHaveBeenCalledWith(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should set up route metrics with `express-prom-bundle`', async () => {
|
it('should set up route metrics with `express-prom-bundle`', async () => {
|
||||||
config.set('endpoints.metrics.includeApiEndpoints', true);
|
prometheusMetricsService.enableMetric('routes');
|
||||||
config.set('endpoints.metrics.includeApiPathLabel', true);
|
await prometheusMetricsService.init(app);
|
||||||
config.set('endpoints.metrics.includeApiMethodLabel', true);
|
|
||||||
config.set('endpoints.metrics.includeApiStatusCodeLabel', true);
|
|
||||||
const service = new PrometheusMetricsService(mock(), mock(), globalConfig);
|
|
||||||
|
|
||||||
const app = mock<express.Application>();
|
|
||||||
|
|
||||||
await service.init(app);
|
|
||||||
|
|
||||||
expect(promBundle).toHaveBeenCalledWith({
|
expect(promBundle).toHaveBeenCalledWith({
|
||||||
autoregister: false,
|
autoregister: false,
|
||||||
includeUp: false,
|
includeUp: false,
|
||||||
includePath: true,
|
includePath: false,
|
||||||
includeMethod: true,
|
includeMethod: false,
|
||||||
includeStatusCode: true,
|
includeStatusCode: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(app.use).toHaveBeenCalledWith(
|
expect(app.use).toHaveBeenCalledWith(
|
||||||
|
@ -137,12 +155,52 @@ describe('PrometheusMetricsService', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should set up event bus metrics', async () => {
|
it('should set up event bus metrics', async () => {
|
||||||
const eventBus = mock<MessageEventBus>();
|
prometheusMetricsService.enableMetric('logs');
|
||||||
const service = new PrometheusMetricsService(mock(), eventBus, globalConfig);
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
await service.init(mock<express.Application>());
|
|
||||||
|
|
||||||
expect(eventBus.on).toHaveBeenCalledWith('metrics.eventBus.event', expect.any(Function));
|
expect(eventBus.on).toHaveBeenCalledWith('metrics.eventBus.event', expect.any(Function));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should set up queue metrics if enabled', async () => {
|
||||||
|
config.set('executions.mode', 'queue');
|
||||||
|
prometheusMetricsService.enableMetric('queue');
|
||||||
|
|
||||||
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
|
// call 1 is for `n8n_version_info` (always enabled)
|
||||||
|
|
||||||
|
expect(promClient.Gauge).toHaveBeenNthCalledWith(2, {
|
||||||
|
name: 'n8n_scaling_mode_queue_jobs_waiting',
|
||||||
|
help: 'Current number of enqueued jobs waiting for pickup in scaling mode.',
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(promClient.Gauge).toHaveBeenNthCalledWith(3, {
|
||||||
|
name: 'n8n_scaling_mode_queue_jobs_active',
|
||||||
|
help: 'Current number of jobs being processed across all workers in scaling mode.',
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(promClient.Counter).toHaveBeenNthCalledWith(1, {
|
||||||
|
name: 'n8n_scaling_mode_queue_jobs_completed',
|
||||||
|
help: 'Total number of jobs completed across all workers in scaling mode since instance start.',
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(promClient.Counter).toHaveBeenNthCalledWith(2, {
|
||||||
|
name: 'n8n_scaling_mode_queue_jobs_failed',
|
||||||
|
help: 'Total number of jobs failed across all workers in scaling mode since instance start.',
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(eventService.on).toHaveBeenCalledWith('job-counts-updated', expect.any(Function));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not set up queue metrics if enabled but not on scaling mode', async () => {
|
||||||
|
config.set('executions.mode', 'regular');
|
||||||
|
prometheusMetricsService.enableMetric('queue');
|
||||||
|
|
||||||
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
|
expect(promClient.Gauge).toHaveBeenCalledTimes(1); // version metric
|
||||||
|
expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics
|
||||||
|
expect(eventService.on).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { N8N_VERSION } from '@/constants';
|
import { N8N_VERSION } from '@/constants';
|
||||||
import type express from 'express';
|
import type express from 'express';
|
||||||
import promBundle from 'express-prom-bundle';
|
import promBundle from 'express-prom-bundle';
|
||||||
import promClient, { type Counter } from 'prom-client';
|
import promClient, { type Counter, type Gauge } from 'prom-client';
|
||||||
import semverParse from 'semver/functions/parse';
|
import semverParse from 'semver/functions/parse';
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
|
|
||||||
|
@ -11,6 +11,8 @@ import { EventMessageTypeNames } from 'n8n-workflow';
|
||||||
import type { EventMessageTypes } from '@/eventbus';
|
import type { EventMessageTypes } from '@/eventbus';
|
||||||
import type { Includes, MetricCategory, MetricLabel } from './types';
|
import type { Includes, MetricCategory, MetricLabel } from './types';
|
||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
|
import { EventService } from '@/events/event.service';
|
||||||
|
import config from '@/config';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class PrometheusMetricsService {
|
export class PrometheusMetricsService {
|
||||||
|
@ -18,10 +20,13 @@ export class PrometheusMetricsService {
|
||||||
private readonly cacheService: CacheService,
|
private readonly cacheService: CacheService,
|
||||||
private readonly eventBus: MessageEventBus,
|
private readonly eventBus: MessageEventBus,
|
||||||
private readonly globalConfig: GlobalConfig,
|
private readonly globalConfig: GlobalConfig,
|
||||||
|
private readonly eventService: EventService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
private readonly counters: { [key: string]: Counter<string> | null } = {};
|
private readonly counters: { [key: string]: Counter<string> | null } = {};
|
||||||
|
|
||||||
|
private readonly gauges: Record<string, Gauge<string>> = {};
|
||||||
|
|
||||||
private readonly prefix = this.globalConfig.endpoints.metrics.prefix;
|
private readonly prefix = this.globalConfig.endpoints.metrics.prefix;
|
||||||
|
|
||||||
private readonly includes: Includes = {
|
private readonly includes: Includes = {
|
||||||
|
@ -30,6 +35,7 @@ export class PrometheusMetricsService {
|
||||||
routes: this.globalConfig.endpoints.metrics.includeApiEndpoints,
|
routes: this.globalConfig.endpoints.metrics.includeApiEndpoints,
|
||||||
cache: this.globalConfig.endpoints.metrics.includeCacheMetrics,
|
cache: this.globalConfig.endpoints.metrics.includeCacheMetrics,
|
||||||
logs: this.globalConfig.endpoints.metrics.includeMessageEventBusMetrics,
|
logs: this.globalConfig.endpoints.metrics.includeMessageEventBusMetrics,
|
||||||
|
queue: this.globalConfig.endpoints.metrics.includeQueueMetrics,
|
||||||
},
|
},
|
||||||
labels: {
|
labels: {
|
||||||
credentialsType: this.globalConfig.endpoints.metrics.includeCredentialTypeLabel,
|
credentialsType: this.globalConfig.endpoints.metrics.includeCredentialTypeLabel,
|
||||||
|
@ -48,6 +54,7 @@ export class PrometheusMetricsService {
|
||||||
this.initCacheMetrics();
|
this.initCacheMetrics();
|
||||||
this.initEventBusMetrics();
|
this.initEventBusMetrics();
|
||||||
this.initRouteMetrics(app);
|
this.initRouteMetrics(app);
|
||||||
|
this.initQueueMetrics();
|
||||||
this.mountMetricsEndpoint(app);
|
this.mountMetricsEndpoint(app);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,6 +225,42 @@ export class PrometheusMetricsService {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private initQueueMetrics() {
|
||||||
|
if (!this.includes.metrics.queue || config.getEnv('executions.mode') !== 'queue') return;
|
||||||
|
|
||||||
|
this.gauges.waiting = new promClient.Gauge({
|
||||||
|
name: this.prefix + 'scaling_mode_queue_jobs_waiting',
|
||||||
|
help: 'Current number of enqueued jobs waiting for pickup in scaling mode.',
|
||||||
|
});
|
||||||
|
|
||||||
|
this.gauges.active = new promClient.Gauge({
|
||||||
|
name: this.prefix + 'scaling_mode_queue_jobs_active',
|
||||||
|
help: 'Current number of jobs being processed across all workers in scaling mode.',
|
||||||
|
});
|
||||||
|
|
||||||
|
this.counters.completed = new promClient.Counter({
|
||||||
|
name: this.prefix + 'scaling_mode_queue_jobs_completed',
|
||||||
|
help: 'Total number of jobs completed across all workers in scaling mode since instance start.',
|
||||||
|
});
|
||||||
|
|
||||||
|
this.counters.failed = new promClient.Counter({
|
||||||
|
name: this.prefix + 'scaling_mode_queue_jobs_failed',
|
||||||
|
help: 'Total number of jobs failed across all workers in scaling mode since instance start.',
|
||||||
|
});
|
||||||
|
|
||||||
|
this.gauges.waiting.set(0);
|
||||||
|
this.gauges.active.set(0);
|
||||||
|
this.counters.completed.inc(0);
|
||||||
|
this.counters.failed.inc(0);
|
||||||
|
|
||||||
|
this.eventService.on('job-counts-updated', (jobCounts) => {
|
||||||
|
this.gauges.waiting.set(jobCounts.waiting);
|
||||||
|
this.gauges.active.set(jobCounts.active);
|
||||||
|
this.counters.completed?.inc(jobCounts.completed);
|
||||||
|
this.counters.failed?.inc(jobCounts.failed);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private toLabels(event: EventMessageTypes): Record<string, string> {
|
private toLabels(event: EventMessageTypes): Record<string, string> {
|
||||||
const { __type, eventName, payload } = event;
|
const { __type, eventName, payload } = event;
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
export type MetricCategory = 'default' | 'routes' | 'cache' | 'logs';
|
export type MetricCategory = 'default' | 'routes' | 'cache' | 'logs' | 'queue';
|
||||||
|
|
||||||
export type MetricLabel =
|
export type MetricLabel =
|
||||||
| 'credentialsType'
|
| 'credentialsType'
|
||||||
|
|
|
@ -37,6 +37,12 @@ describe('ScalingService', () => {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
endpoints: {
|
||||||
|
metrics: {
|
||||||
|
includeQueueMetrics: false,
|
||||||
|
queueMetricsInterval: 20,
|
||||||
|
},
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
const instanceSettings = Container.get(InstanceSettings);
|
const instanceSettings = Container.get(InstanceSettings);
|
||||||
|
@ -73,6 +79,7 @@ describe('ScalingService', () => {
|
||||||
mock(),
|
mock(),
|
||||||
instanceSettings,
|
instanceSettings,
|
||||||
orchestrationService,
|
orchestrationService,
|
||||||
|
mock(),
|
||||||
);
|
);
|
||||||
|
|
||||||
getRunningJobsCountSpy = jest.spyOn(scalingService, 'getRunningJobsCount');
|
getRunningJobsCountSpy = jest.spyOn(scalingService, 'getRunningJobsCount');
|
||||||
|
|
|
@ -23,6 +23,7 @@ import { GlobalConfig } from '@n8n/config';
|
||||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
import { InstanceSettings } from 'n8n-core';
|
import { InstanceSettings } from 'n8n-core';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
import { OrchestrationService } from '@/services/orchestration.service';
|
||||||
|
import { EventService } from '@/events/event.service';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class ScalingService {
|
export class ScalingService {
|
||||||
|
@ -38,6 +39,7 @@ export class ScalingService {
|
||||||
private readonly executionRepository: ExecutionRepository,
|
private readonly executionRepository: ExecutionRepository,
|
||||||
private readonly instanceSettings: InstanceSettings,
|
private readonly instanceSettings: InstanceSettings,
|
||||||
private readonly orchestrationService: OrchestrationService,
|
private readonly orchestrationService: OrchestrationService,
|
||||||
|
private readonly eventService: EventService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
// #region Lifecycle
|
// #region Lifecycle
|
||||||
|
@ -66,6 +68,8 @@ export class ScalingService {
|
||||||
.on('leader-stepdown', () => this.stopQueueRecovery());
|
.on('leader-stepdown', () => this.stopQueueRecovery());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.scheduleQueueMetrics();
|
||||||
|
|
||||||
this.logger.debug('[ScalingService] Queue setup completed');
|
this.logger.debug('[ScalingService] Queue setup completed');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,8 +93,9 @@ export class ScalingService {
|
||||||
this.logger.debug('[ScalingService] Queue paused');
|
this.logger.debug('[ScalingService] Queue paused');
|
||||||
|
|
||||||
this.stopQueueRecovery();
|
this.stopQueueRecovery();
|
||||||
|
this.stopQueueMetrics();
|
||||||
|
|
||||||
this.logger.debug('[ScalingService] Queue recovery stopped');
|
this.logger.debug('[ScalingService] Queue recovery and metrics stopped');
|
||||||
|
|
||||||
let count = 0;
|
let count = 0;
|
||||||
|
|
||||||
|
@ -113,6 +118,12 @@ export class ScalingService {
|
||||||
|
|
||||||
// #region Jobs
|
// #region Jobs
|
||||||
|
|
||||||
|
async getPendingJobCounts() {
|
||||||
|
const { active, waiting } = await this.queue.getJobCounts();
|
||||||
|
|
||||||
|
return { active, waiting };
|
||||||
|
}
|
||||||
|
|
||||||
async addJob(jobData: JobData, jobOptions: JobOptions) {
|
async addJob(jobData: JobData, jobOptions: JobOptions) {
|
||||||
const { executionId } = jobData;
|
const { executionId } = jobData;
|
||||||
|
|
||||||
|
@ -246,6 +257,11 @@ export class ScalingService {
|
||||||
this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse);
|
this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (this.isQueueMetricsEnabled) {
|
||||||
|
this.queue.on('global:completed', () => this.jobCounters.completed++);
|
||||||
|
this.queue.on('global:failed', () => this.jobCounters.failed++);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private isPubSubMessage(candidate: unknown): candidate is PubSubMessage {
|
private isPubSubMessage(candidate: unknown): candidate is PubSubMessage {
|
||||||
|
@ -282,6 +298,49 @@ export class ScalingService {
|
||||||
throw new ApplicationError('This method must be called on a `worker` instance');
|
throw new ApplicationError('This method must be called on a `worker` instance');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// #region Queue metrics
|
||||||
|
|
||||||
|
/** Counters for completed and failed jobs, reset on each interval tick. */
|
||||||
|
private readonly jobCounters = { completed: 0, failed: 0 };
|
||||||
|
|
||||||
|
/** Interval for collecting queue metrics to expose via Prometheus. */
|
||||||
|
private queueMetricsInterval: NodeJS.Timer | undefined;
|
||||||
|
|
||||||
|
get isQueueMetricsEnabled() {
|
||||||
|
return (
|
||||||
|
this.globalConfig.endpoints.metrics.includeQueueMetrics &&
|
||||||
|
this.instanceType === 'main' &&
|
||||||
|
!this.orchestrationService.isMultiMainSetupEnabled
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Set up an interval to collect queue metrics and emit them in an event. */
|
||||||
|
private scheduleQueueMetrics() {
|
||||||
|
if (!this.isQueueMetricsEnabled || this.queueMetricsInterval) return;
|
||||||
|
|
||||||
|
this.queueMetricsInterval = setInterval(async () => {
|
||||||
|
const pendingJobCounts = await this.getPendingJobCounts();
|
||||||
|
|
||||||
|
this.eventService.emit('job-counts-updated', {
|
||||||
|
...pendingJobCounts, // active, waiting
|
||||||
|
...this.jobCounters, // completed, failed
|
||||||
|
});
|
||||||
|
|
||||||
|
this.jobCounters.completed = 0;
|
||||||
|
this.jobCounters.failed = 0;
|
||||||
|
}, this.globalConfig.endpoints.metrics.queueMetricsInterval * Time.seconds.toMilliseconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Stop collecting queue metrics. */
|
||||||
|
private stopQueueMetrics() {
|
||||||
|
if (this.queueMetricsInterval) {
|
||||||
|
clearInterval(this.queueMetricsInterval);
|
||||||
|
this.queueMetricsInterval = undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// #endregion
|
||||||
|
|
||||||
// #region Queue recovery
|
// #region Queue recovery
|
||||||
|
|
||||||
private readonly queueRecoveryContext: QueueRecoveryContext = {
|
private readonly queueRecoveryContext: QueueRecoveryContext = {
|
||||||
|
|
|
@ -6,11 +6,14 @@ import { N8N_VERSION } from '@/constants';
|
||||||
import { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service';
|
import { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service';
|
||||||
import { setupTestServer } from './shared/utils';
|
import { setupTestServer } from './shared/utils';
|
||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
|
import config from '@/config';
|
||||||
|
import { EventService } from '@/events/event.service';
|
||||||
|
|
||||||
jest.unmock('@/eventbus/message-event-bus/message-event-bus');
|
jest.unmock('@/eventbus/message-event-bus/message-event-bus');
|
||||||
|
|
||||||
const toLines = (response: Response) => response.text.trim().split('\n');
|
const toLines = (response: Response) => response.text.trim().split('\n');
|
||||||
|
|
||||||
|
const eventService = Container.get(EventService);
|
||||||
const globalConfig = Container.get(GlobalConfig);
|
const globalConfig = Container.get(GlobalConfig);
|
||||||
globalConfig.endpoints.metrics = {
|
globalConfig.endpoints.metrics = {
|
||||||
enable: true,
|
enable: true,
|
||||||
|
@ -25,6 +28,8 @@ globalConfig.endpoints.metrics = {
|
||||||
includeApiPathLabel: true,
|
includeApiPathLabel: true,
|
||||||
includeApiMethodLabel: true,
|
includeApiMethodLabel: true,
|
||||||
includeApiStatusCodeLabel: true,
|
includeApiStatusCodeLabel: true,
|
||||||
|
includeQueueMetrics: true,
|
||||||
|
queueMetricsInterval: 20,
|
||||||
};
|
};
|
||||||
|
|
||||||
const server = setupTestServer({ endpointGroups: ['metrics'] });
|
const server = setupTestServer({ endpointGroups: ['metrics'] });
|
||||||
|
@ -32,7 +37,7 @@ const agent = request.agent(server.app);
|
||||||
|
|
||||||
let prometheusService: PrometheusMetricsService;
|
let prometheusService: PrometheusMetricsService;
|
||||||
|
|
||||||
describe('Metrics', () => {
|
describe('PrometheusMetricsService', () => {
|
||||||
beforeAll(() => {
|
beforeAll(() => {
|
||||||
prometheusService = Container.get(PrometheusMetricsService);
|
prometheusService = Container.get(PrometheusMetricsService);
|
||||||
});
|
});
|
||||||
|
@ -222,4 +227,60 @@ describe('Metrics', () => {
|
||||||
expect(lines).toContainEqual(expect.stringContaining('path="/webhook-test/some-uuid"'));
|
expect(lines).toContainEqual(expect.stringContaining('path="/webhook-test/some-uuid"'));
|
||||||
expect(lines).toContainEqual(expect.stringContaining('status_code="404"'));
|
expect(lines).toContainEqual(expect.stringContaining('status_code="404"'));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should return queue metrics if enabled', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
prometheusService.enableMetric('queue');
|
||||||
|
config.set('executions.mode', 'queue');
|
||||||
|
await prometheusService.init(server.app);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
const response = await agent.get('/metrics');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(response.status).toEqual(200);
|
||||||
|
expect(response.type).toEqual('text/plain');
|
||||||
|
|
||||||
|
const lines = toLines(response);
|
||||||
|
|
||||||
|
expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_waiting 0');
|
||||||
|
expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_active 0');
|
||||||
|
expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_completed 0');
|
||||||
|
expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_failed 0');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should set queue metrics in response to `job-counts-updated` event', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
prometheusService.enableMetric('queue');
|
||||||
|
config.set('executions.mode', 'queue');
|
||||||
|
await prometheusService.init(server.app);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
eventService.emit('job-counts-updated', { waiting: 1, active: 2, completed: 0, failed: 0 });
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
const response = await agent.get('/metrics');
|
||||||
|
|
||||||
|
expect(response.status).toEqual(200);
|
||||||
|
expect(response.type).toEqual('text/plain');
|
||||||
|
|
||||||
|
const lines = toLines(response);
|
||||||
|
|
||||||
|
expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_waiting 1');
|
||||||
|
expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_active 2');
|
||||||
|
expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_completed 0');
|
||||||
|
expect(lines).toContain('n8n_test_scaling_mode_queue_jobs_failed 0');
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in a new issue