diff --git a/packages/cli/src/abstract-server.ts b/packages/cli/src/abstract-server.ts index 95ecaccdc5..4456470b86 100644 --- a/packages/cli/src/abstract-server.ts +++ b/packages/cli/src/abstract-server.ts @@ -5,7 +5,6 @@ import { engine as expressHandlebars } from 'express-handlebars'; import { readFile } from 'fs/promises'; import type { Server } from 'http'; import isbot from 'isbot'; -import type { InstanceType } from 'n8n-core'; import { Container, Service } from 'typedi'; import config from '@/config'; @@ -22,7 +21,6 @@ import { TestWebhooks } from '@/webhooks/test-webhooks'; import { WaitingWebhooks } from '@/webhooks/waiting-webhooks'; import { createWebhookHandlerFor } from '@/webhooks/webhook-request-handler'; -import { generateHostInstanceId } from './databases/utils/generators'; import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error'; @Service() @@ -61,7 +59,7 @@ export abstract class AbstractServer { readonly uniqueInstanceId: string; - constructor(instanceType: Exclude) { + constructor() { this.app = express(); this.app.disable('x-powered-by'); @@ -85,8 +83,6 @@ export abstract class AbstractServer { this.endpointWebhookTest = this.globalConfig.endpoints.webhookTest; this.endpointWebhookWaiting = this.globalConfig.endpoints.webhookWaiting; - this.uniqueInstanceId = generateHostInstanceId(instanceType); - this.logger = Container.get(Logger); } diff --git a/packages/cli/src/commands/base-command.ts b/packages/cli/src/commands/base-command.ts index f4d97a6a05..303b3cae3e 100644 --- a/packages/cli/src/commands/base-command.ts +++ b/packages/cli/src/commands/base-command.ts @@ -19,7 +19,6 @@ import type { AbstractServer } from '@/abstract-server'; import config from '@/config'; import { LICENSE_FEATURES, inDevelopment, inTest } from '@/constants'; import * as CrashJournal from '@/crash-journal'; -import { generateHostInstanceId } from '@/databases/utils/generators'; import * as Db from '@/db'; import { getDataDeduplicationService } from '@/deduplication'; import { initErrorHandling } from '@/error-reporting'; @@ -45,8 +44,6 @@ export abstract class BaseCommand extends Command { protected instanceSettings: InstanceSettings = Container.get(InstanceSettings); - queueModeId: string; - protected server?: AbstractServer; protected shutdownService: ShutdownService = Container.get(ShutdownService); @@ -133,16 +130,6 @@ export abstract class BaseCommand extends Command { await Container.get(TelemetryEventRelay).init(); } - protected setInstanceQueueModeId() { - if (config.get('redis.queueModeId')) { - this.queueModeId = config.get('redis.queueModeId'); - return; - } - // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion - this.queueModeId = generateHostInstanceId(this.instanceSettings.instanceType!); - config.set('redis.queueModeId', this.queueModeId); - } - protected async stopProcess() { // This needs to be overridden } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index a437414469..b46ef52ea4 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -1,6 +1,6 @@ /* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ -import { Flags, type Config } from '@oclif/core'; +import { Flags } from '@oclif/core'; import glob from 'fast-glob'; import { createReadStream, createWriteStream, existsSync } from 'fs'; import { mkdir } from 'fs/promises'; @@ -70,11 +70,6 @@ export class Start extends BaseCommand { override needsCommunityPackages = true; - constructor(argv: string[], cmdConfig: Config) { - super(argv, cmdConfig); - this.setInstanceQueueModeId(); - } - /** * Opens the UI in browser */ @@ -176,7 +171,7 @@ export class Start extends BaseCommand { if (config.getEnv('executions.mode') === 'queue') { const scopedLogger = this.logger.withScope('scaling'); scopedLogger.debug('Starting main instance in scaling mode'); - scopedLogger.debug(`Host ID: ${this.queueModeId}`); + scopedLogger.debug(`Host ID: ${this.instanceSettings.hostId}`); } const { flags } = await this.parse(Start); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 8c601c7ebc..43a9703087 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -1,4 +1,4 @@ -import { Flags, type Config } from '@oclif/core'; +import { Flags } from '@oclif/core'; import { ApplicationError } from 'n8n-workflow'; import { Container } from 'typedi'; @@ -24,14 +24,6 @@ export class Webhook extends BaseCommand { override needsCommunityPackages = true; - constructor(argv: string[], cmdConfig: Config) { - super(argv, cmdConfig); - if (this.queueModeId) { - this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`); - } - this.setInstanceQueueModeId(); - } - /** * Stops n8n in a graceful way. * Make for example sure that all the webhooks from third party services @@ -71,8 +63,8 @@ export class Webhook extends BaseCommand { await this.initCrashJournal(); this.logger.debug('Crash journal initialized'); - this.logger.info('Initializing n8n webhook process'); - this.logger.debug(`Queue mode id: ${this.queueModeId}`); + this.logger.info('Starting n8n webhook process...'); + this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`); await super.init(); @@ -100,7 +92,6 @@ export class Webhook extends BaseCommand { const { ScalingService } = await import('@/scaling/scaling.service'); await Container.get(ScalingService).setupQueue(); await this.server.start(); - this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`); this.logger.info('Webhook listener waiting for requests.'); // Make sure that the process does not close diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 438ed9c8c8..5dfb5c210b 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -70,8 +70,6 @@ export class Worker extends BaseCommand { super(argv, cmdConfig); this.logger = Container.get(Logger).withScope('scaling'); - - this.setInstanceQueueModeId(); } async init() { @@ -86,7 +84,7 @@ export class Worker extends BaseCommand { await this.initCrashJournal(); this.logger.debug('Starting n8n worker...'); - this.logger.debug(`Host ID: ${this.queueModeId}`); + this.logger.debug(`Host ID: ${this.instanceSettings.hostId}`); await this.setConcurrency(); await super.init(); @@ -111,7 +109,7 @@ export class Worker extends BaseCommand { new EventMessageGeneric({ eventName: 'n8n.worker.started', payload: { - workerId: this.queueModeId, + workerId: this.instanceSettings.hostId, }, }), ); @@ -130,7 +128,7 @@ export class Worker extends BaseCommand { async initEventBus() { await Container.get(MessageEventBus).initialize({ - workerId: this.queueModeId, + workerId: this.instanceSettings.hostId, }); Container.get(LogStreamingEventRelay).init(); } diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 047df9341e..d2bb5297d4 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -491,11 +491,6 @@ export const schema = { default: 'n8n', env: 'N8N_REDIS_KEY_PREFIX', }, - queueModeId: { - doc: 'Unique ID for this n8n instance, is usually set automatically by n8n during startup', - format: String, - default: '', - }, }, /** diff --git a/packages/cli/src/errors/worker-missing-encryption-key.error.ts b/packages/cli/src/errors/worker-missing-encryption-key.error.ts index 88ec11877a..29b8dad929 100644 --- a/packages/cli/src/errors/worker-missing-encryption-key.error.ts +++ b/packages/cli/src/errors/worker-missing-encryption-key.error.ts @@ -7,7 +7,7 @@ export class WorkerMissingEncryptionKey extends ApplicationError { 'Failed to start worker because of missing encryption key.', 'Please set the `N8N_ENCRYPTION_KEY` env var when starting the worker.', 'See: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/', - ].join(''), + ].join(' '), { level: 'warning' }, ); } diff --git a/packages/cli/src/scaling/__tests__/publisher.service.test.ts b/packages/cli/src/scaling/__tests__/publisher.service.test.ts index af8ff9f0c1..f77b6b5d5a 100644 --- a/packages/cli/src/scaling/__tests__/publisher.service.test.ts +++ b/packages/cli/src/scaling/__tests__/publisher.service.test.ts @@ -1,8 +1,8 @@ import type { Redis as SingleNodeClient } from 'ioredis'; import { mock } from 'jest-mock-extended'; +import type { InstanceSettings } from 'n8n-core'; import config from '@/config'; -import { generateNanoId } from '@/databases/utils/generators'; import type { RedisClientService } from '@/services/redis-client.service'; import { mockLogger } from '@test/mocking'; @@ -10,28 +10,26 @@ import { Publisher } from '../pubsub/publisher.service'; import type { PubSub } from '../pubsub/pubsub.types'; describe('Publisher', () => { - let queueModeId: string; - beforeEach(() => { config.set('executions.mode', 'queue'); - queueModeId = generateNanoId(); - config.set('redis.queueModeId', queueModeId); }); const client = mock(); const logger = mockLogger(); + const hostId = 'main-bnxa1riryKUNHtln'; + const instanceSettings = mock({ hostId }); const redisClientService = mock({ createClient: () => client }); describe('constructor', () => { it('should init Redis client in scaling mode', () => { - const publisher = new Publisher(logger, redisClientService); + const publisher = new Publisher(logger, redisClientService, instanceSettings); expect(publisher.getClient()).toEqual(client); }); it('should not init Redis client in regular mode', () => { config.set('executions.mode', 'regular'); - const publisher = new Publisher(logger, redisClientService); + const publisher = new Publisher(logger, redisClientService, instanceSettings); expect(publisher.getClient()).toBeUndefined(); }); @@ -39,7 +37,7 @@ describe('Publisher', () => { describe('shutdown', () => { it('should disconnect Redis client', () => { - const publisher = new Publisher(logger, redisClientService); + const publisher = new Publisher(logger, redisClientService, instanceSettings); publisher.shutdown(); expect(client.disconnect).toHaveBeenCalled(); }); @@ -47,21 +45,21 @@ describe('Publisher', () => { describe('publishCommand', () => { it('should publish command into `n8n.commands` pubsub channel', async () => { - const publisher = new Publisher(logger, redisClientService); + const publisher = new Publisher(logger, redisClientService, instanceSettings); const msg = mock({ command: 'reload-license' }); await publisher.publishCommand(msg); expect(client.publish).toHaveBeenCalledWith( 'n8n.commands', - JSON.stringify({ ...msg, senderId: queueModeId, selfSend: false, debounce: true }), + JSON.stringify({ ...msg, senderId: hostId, selfSend: false, debounce: true }), ); }); }); describe('publishWorkerResponse', () => { it('should publish worker response into `n8n.worker-response` pubsub channel', async () => { - const publisher = new Publisher(logger, redisClientService); + const publisher = new Publisher(logger, redisClientService, instanceSettings); const msg = mock({ response: 'response-to-get-worker-status', }); diff --git a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts index 62834dba33..4f97208b99 100644 --- a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts +++ b/packages/cli/src/scaling/__tests__/subscriber.service.test.ts @@ -17,14 +17,14 @@ describe('Subscriber', () => { describe('constructor', () => { it('should init Redis client in scaling mode', () => { - const subscriber = new Subscriber(mock(), redisClientService, mock()); + const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); expect(subscriber.getClient()).toEqual(client); }); it('should not init Redis client in regular mode', () => { config.set('executions.mode', 'regular'); - const subscriber = new Subscriber(mock(), redisClientService, mock()); + const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); expect(subscriber.getClient()).toBeUndefined(); }); @@ -32,7 +32,7 @@ describe('Subscriber', () => { describe('shutdown', () => { it('should disconnect Redis client', () => { - const subscriber = new Subscriber(mock(), redisClientService, mock()); + const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); subscriber.shutdown(); expect(client.disconnect).toHaveBeenCalled(); }); @@ -40,7 +40,7 @@ describe('Subscriber', () => { describe('subscribe', () => { it('should subscribe to pubsub channel', async () => { - const subscriber = new Subscriber(mock(), redisClientService, mock()); + const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); await subscriber.subscribe('n8n.commands'); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index e11395002b..1322beac27 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -1,5 +1,5 @@ import type { RunningJobSummary } from '@n8n/api-types'; -import { WorkflowExecute } from 'n8n-core'; +import { InstanceSettings, WorkflowExecute } from 'n8n-core'; import { BINARY_ENCODING, ApplicationError, Workflow } from 'n8n-workflow'; import type { ExecutionStatus, IExecuteResponsePromiseData, IRun } from 'n8n-workflow'; import type PCancelable from 'p-cancelable'; @@ -33,6 +33,7 @@ export class JobProcessor { private readonly executionRepository: ExecutionRepository, private readonly workflowRepository: WorkflowRepository, private readonly nodeTypes: NodeTypes, + private readonly instanceSettings: InstanceSettings, ) { this.logger = this.logger.withScope('scaling'); } @@ -120,7 +121,7 @@ export class JobProcessor { kind: 'respond-to-webhook', executionId, response: this.encodeWebhookResponse(response), - workerId: config.getEnv('redis.queueModeId'), + workerId: this.instanceSettings.hostId, }; await job.progress(msg); @@ -173,7 +174,7 @@ export class JobProcessor { const msg: JobFinishedMessage = { kind: 'job-finished', executionId, - workerId: config.getEnv('redis.queueModeId'), + workerId: this.instanceSettings.hostId, }; await job.progress(msg); diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index cc25304e2c..06a876accf 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -1,4 +1,5 @@ import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis'; +import { InstanceSettings } from 'n8n-core'; import { Service } from 'typedi'; import config from '@/config'; @@ -20,6 +21,7 @@ export class Publisher { constructor( private readonly logger: Logger, private readonly redisClientService: RedisClientService, + private readonly instanceSettings: InstanceSettings, ) { // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. if (config.getEnv('executions.mode') !== 'queue') return; @@ -48,7 +50,7 @@ export class Publisher { 'n8n.commands', JSON.stringify({ ...msg, - senderId: config.getEnv('redis.queueModeId'), + senderId: this.instanceSettings.hostId, selfSend: SELF_SEND_COMMANDS.has(msg.command), debounce: !IMMEDIATE_COMMANDS.has(msg.command), }), diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index ca590dd2c2..deeed5b584 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -3,7 +3,6 @@ import { ensureError } from 'n8n-workflow'; import { Service } from 'typedi'; import { ActiveWorkflowManager } from '@/active-workflow-manager'; -import config from '@/config'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; @@ -49,7 +48,7 @@ export class PubSubHandler { ...this.commonHandlers, 'get-worker-status': async () => await this.publisher.publishWorkerResponse({ - senderId: config.getEnv('redis.queueModeId'), + senderId: this.instanceSettings.hostId, response: 'response-to-get-worker-status', payload: this.workerStatusService.generateStatus(), }), diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index 207c726370..c2045215e0 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -1,5 +1,6 @@ import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis'; import debounce from 'lodash/debounce'; +import { InstanceSettings } from 'n8n-core'; import { jsonParse } from 'n8n-workflow'; import { Service } from 'typedi'; @@ -21,6 +22,7 @@ export class Subscriber { private readonly logger: Logger, private readonly redisClientService: RedisClientService, private readonly eventService: EventService, + private readonly instanceSettings: InstanceSettings, ) { // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. if (config.getEnv('executions.mode') !== 'queue') return; @@ -77,12 +79,12 @@ export class Subscriber { return null; } - const queueModeId = config.getEnv('redis.queueModeId'); + const { hostId } = this.instanceSettings; if ( 'command' in msg && !msg.selfSend && - (msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId))) + (msg.senderId === hostId || (msg.targets && !msg.targets.includes(hostId))) ) { return null; } diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 5edf43eeac..f965587263 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -112,7 +112,7 @@ export class ScalingService { const msg: JobFailedMessage = { kind: 'job-failed', executionId, - workerId: config.getEnv('redis.queueModeId'), + workerId: this.instanceSettings.hostId, errorMsg: error.message, }; diff --git a/packages/cli/src/scaling/worker-status.service.ts b/packages/cli/src/scaling/worker-status.service.ts index 725cbb0ca7..a50a1b8d2e 100644 --- a/packages/cli/src/scaling/worker-status.service.ts +++ b/packages/cli/src/scaling/worker-status.service.ts @@ -1,19 +1,22 @@ import type { WorkerStatus } from '@n8n/api-types'; +import { InstanceSettings } from 'n8n-core'; import os from 'node:os'; import { Service } from 'typedi'; -import config from '@/config'; import { N8N_VERSION } from '@/constants'; import { JobProcessor } from './job-processor'; @Service() export class WorkerStatusService { - constructor(private readonly jobProcessor: JobProcessor) {} + constructor( + private readonly jobProcessor: JobProcessor, + private readonly instanceSettings: InstanceSettings, + ) {} generateStatus(): WorkerStatus { return { - senderId: config.getEnv('redis.queueModeId'), + senderId: this.instanceSettings.hostId, runningJobsSummary: this.jobProcessor.getRunningJobsSummary(), freeMem: os.freemem(), totalMem: os.totalmem(), diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 00971d71a5..3cfd93054b 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -79,8 +79,9 @@ export class Server extends AbstractServer { private readonly orchestrationService: OrchestrationService, private readonly postHogClient: PostHogClient, private readonly eventService: EventService, + private readonly instanceSettings: InstanceSettings, ) { - super('main'); + super(); this.testWebhooksEnabled = true; this.webhooksEnabled = !this.globalConfig.endpoints.disableProductionWebhooksOnMainProcess; @@ -97,7 +98,7 @@ export class Server extends AbstractServer { this.endpointPresetCredentials = this.globalConfig.credentials.overwrite.endpoint; await super.start(); - this.logger.debug(`Server ID: ${this.uniqueInstanceId}`); + this.logger.debug(`Server ID: ${this.instanceSettings.hostId}`); if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') { void this.loadNodesAndCredentials.setupHotReload(); diff --git a/packages/cli/src/services/__tests__/orchestration.service.test.ts b/packages/cli/src/services/__tests__/orchestration.service.test.ts index 6c66573047..0169462891 100644 --- a/packages/cli/src/services/__tests__/orchestration.service.test.ts +++ b/packages/cli/src/services/__tests__/orchestration.service.test.ts @@ -23,15 +23,11 @@ redisClientService.createClient.mockReturnValue(mockRedisClient); const os = Container.get(OrchestrationService); mockInstance(ActiveWorkflowManager); -let queueModeId: string; - describe('Orchestration Service', () => { mockInstance(Push); mockInstance(ExternalSecretsManager); beforeAll(async () => { - queueModeId = config.get('redis.queueModeId'); - // @ts-expect-error readonly property instanceSettings.instanceType = 'main'; }); @@ -48,7 +44,6 @@ describe('Orchestration Service', () => { await os.init(); // @ts-expect-error Private field expect(os.publisher).toBeDefined(); - expect(queueModeId).toBeDefined(); }); describe('shouldAddWebhooks', () => { diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index a248a144ec..64dbd0ddae 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -43,10 +43,6 @@ export class OrchestrationService { return !this.isMultiMainSetupEnabled; } - get instanceId() { - return config.getEnv('redis.queueModeId'); - } - sanityCheck() { return this.isInitialized && config.get('executions.mode') === 'queue'; } @@ -94,7 +90,7 @@ export class OrchestrationService { if (!this.sanityCheck()) return; this.logger.debug( - `[Instance ID ${this.instanceId}] Publishing command "${commandKey}"`, + `[Instance ID ${this.instanceSettings.hostId}] Publishing command "${commandKey}"`, payload, ); diff --git a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts b/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts index 3b104cc1e3..034a214765 100644 --- a/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts +++ b/packages/cli/src/services/orchestration/main/multi-main-setup.ee.ts @@ -24,10 +24,6 @@ export class MultiMainSetup extends TypedEmitter { super(); } - get instanceId() { - return config.getEnv('redis.queueModeId'); - } - private leaderKey: string; private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl'); @@ -57,16 +53,18 @@ export class MultiMainSetup extends TypedEmitter { private async checkLeader() { const leaderId = await this.publisher.get(this.leaderKey); - if (leaderId === this.instanceId) { - this.logger.debug(`[Instance ID ${this.instanceId}] Leader is this instance`); + const { hostId } = this.instanceSettings; + + if (leaderId === hostId) { + this.logger.debug(`[Instance ID ${hostId}] Leader is this instance`); await this.publisher.setExpiration(this.leaderKey, this.leaderKeyTtl); return; } - if (leaderId && leaderId !== this.instanceId) { - this.logger.debug(`[Instance ID ${this.instanceId}] Leader is other instance "${leaderId}"`); + if (leaderId && leaderId !== hostId) { + this.logger.debug(`[Instance ID ${hostId}] Leader is other instance "${leaderId}"`); if (this.instanceSettings.isLeader) { this.instanceSettings.markAsFollower(); @@ -81,7 +79,7 @@ export class MultiMainSetup extends TypedEmitter { if (!leaderId) { this.logger.debug( - `[Instance ID ${this.instanceId}] Leadership vacant, attempting to become leader...`, + `[Instance ID ${hostId}] Leadership vacant, attempting to become leader...`, ); this.instanceSettings.markAsFollower(); @@ -96,11 +94,13 @@ export class MultiMainSetup extends TypedEmitter { } private async tryBecomeLeader() { + const { hostId } = this.instanceSettings; + // this can only succeed if leadership is currently vacant - const keySetSuccessfully = await this.publisher.setIfNotExists(this.leaderKey, this.instanceId); + const keySetSuccessfully = await this.publisher.setIfNotExists(this.leaderKey, hostId); if (keySetSuccessfully) { - this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`); + this.logger.debug(`[Instance ID ${hostId}] Leader is now this instance`); this.instanceSettings.markAsLeader(); diff --git a/packages/cli/src/services/orchestration/main/types.ts b/packages/cli/src/services/orchestration/main/types.ts index 7388a55032..461630d396 100644 --- a/packages/cli/src/services/orchestration/main/types.ts +++ b/packages/cli/src/services/orchestration/main/types.ts @@ -1,6 +1,6 @@ import type { Publisher } from '@/scaling/pubsub/publisher.service'; export type MainResponseReceivedHandlerOptions = { - queueModeId: string; + hostId: string; publisher: Publisher; }; diff --git a/packages/cli/src/services/orchestration/worker/types.ts b/packages/cli/src/services/orchestration/worker/types.ts index d821a194b2..afe7362210 100644 --- a/packages/cli/src/services/orchestration/worker/types.ts +++ b/packages/cli/src/services/orchestration/worker/types.ts @@ -3,7 +3,7 @@ import type { RunningJobSummary } from '@n8n/api-types'; import type { Publisher } from '@/scaling/pubsub/publisher.service'; export interface WorkerCommandReceivedHandlerOptions { - queueModeId: string; + hostId: string; publisher: Publisher; getRunningJobIds: () => Array; getRunningJobsSummary: () => RunningJobSummary[]; diff --git a/packages/cli/src/webhooks/webhook-server.ts b/packages/cli/src/webhooks/webhook-server.ts index d54f39f2cf..263375325b 100644 --- a/packages/cli/src/webhooks/webhook-server.ts +++ b/packages/cli/src/webhooks/webhook-server.ts @@ -3,8 +3,4 @@ import { Service } from 'typedi'; import { AbstractServer } from '@/abstract-server'; @Service() -export class WebhookServer extends AbstractServer { - constructor() { - super('webhook'); - } -} +export class WebhookServer extends AbstractServer {} diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 8c33755fd7..fad6eb3fd7 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -48,10 +48,8 @@ const command = setupTestCommand(Worker); test('worker initializes all its components', async () => { config.set('executions.mode', 'regular'); // should be overridden - const worker = await command.run(); - expect(worker.queueModeId).toBeDefined(); - expect(worker.queueModeId).toContain('worker'); - expect(worker.queueModeId.length).toBeGreaterThan(15); + await command.run(); + expect(license.init).toHaveBeenCalledTimes(1); expect(binaryDataService.init).toHaveBeenCalledTimes(1); expect(externalHooks.init).toHaveBeenCalledTimes(1); diff --git a/packages/core/package.json b/packages/core/package.json index 95cf23efa6..aec9b34891 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -36,6 +36,7 @@ "@types/xml2js": "catalog:" }, "dependencies": { + "@langchain/core": "catalog:", "@n8n/client-oauth2": "workspace:*", "aws4": "1.11.0", "axios": "catalog:", @@ -45,10 +46,10 @@ "file-type": "16.5.4", "form-data": "catalog:", "lodash": "catalog:", - "@langchain/core": "catalog:", "luxon": "catalog:", "mime-types": "2.1.35", "n8n-workflow": "workspace:*", + "nanoid": "catalog:", "oauth-1.0a": "2.2.6", "p-cancelable": "2.1.1", "pretty-bytes": "5.6.0", diff --git a/packages/core/src/InstanceSettings.ts b/packages/core/src/InstanceSettings.ts index 17ccf15def..4a050db121 100644 --- a/packages/core/src/InstanceSettings.ts +++ b/packages/core/src/InstanceSettings.ts @@ -1,9 +1,12 @@ import { createHash, randomBytes } from 'crypto'; import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'fs'; -import { ApplicationError, jsonParse } from 'n8n-workflow'; +import { ApplicationError, jsonParse, ALPHABET } from 'n8n-workflow'; +import { customAlphabet } from 'nanoid'; import path from 'path'; import { Service } from 'typedi'; +const nanoid = customAlphabet(ALPHABET, 16); + interface ReadOnlySettings { encryptionKey: string; } @@ -40,6 +43,12 @@ export class InstanceSettings { private settings = this.loadOrCreate(); + /** + * Fixed ID of this n8n instance, for telemetry. + * Derived from encryption key. Do not confuse with `hostId`. + * + * @example '258fce876abf5ea60eb86a2e777e5e190ff8f3e36b5b37aafec6636c31d4d1f9' + */ readonly instanceId = this.generateInstanceId(); readonly instanceType: InstanceType; @@ -49,6 +58,8 @@ export class InstanceSettings { this.instanceType = ['webhook', 'worker'].includes(command) ? (command as InstanceType) : 'main'; + + this.hostId = `${this.instanceType}-${nanoid()}`; } /** @@ -61,6 +72,16 @@ export class InstanceSettings { */ instanceRole: InstanceRole = 'unset'; + /** + * Transient ID of this n8n instance, for scaling mode. + * Reset on restart. Do not confuse with `instanceId`. + * + * @example 'main-bnxa1riryKUNHtln' + * @example 'worker-nDJR0FnSd2Vf6DB5' + * @example 'webhook-jxQ7AO8IzxEtfW1F' + */ + readonly hostId: string; + get isLeader() { return this.instanceRole === 'leader'; } diff --git a/packages/core/test/InstanceSettings.test.ts b/packages/core/test/InstanceSettings.test.ts index 64b6840f2f..7bc572b168 100644 --- a/packages/core/test/InstanceSettings.test.ts +++ b/packages/core/test/InstanceSettings.test.ts @@ -69,4 +69,19 @@ describe('InstanceSettings', () => { ); }); }); + + describe('constructor', () => { + it('should generate a `hostId`', () => { + const encryptionKey = 'test_key'; + process.env.N8N_ENCRYPTION_KEY = encryptionKey; + jest.spyOn(fs, 'existsSync').mockReturnValueOnce(true); + jest.spyOn(fs, 'readFileSync').mockReturnValueOnce(JSON.stringify({ encryptionKey })); + + const settings = new InstanceSettings(); + + const [instanceType, nanoid] = settings.hostId.split('-'); + expect(instanceType).toEqual('main'); + expect(nanoid).toHaveLength(16); // e.g. sDX6ZPc0bozv66zM + }); + }); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6748091148..10528f347b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1112,6 +1112,9 @@ importers: n8n-workflow: specifier: workspace:* version: link:../workflow + nanoid: + specifier: 'catalog:' + version: 3.3.6 oauth-1.0a: specifier: 2.2.6 version: 2.2.6 @@ -2206,7 +2209,7 @@ packages: '@azure/core-http@3.0.4': resolution: {integrity: sha512-Fok9VVhMdxAFOtqiiAtg74fL0UJkt0z3D+ouUUxcRLzZNBioPRAMJFVxiWoJljYpXsRi4GDQHzQHDc9AiYaIUQ==} engines: {node: '>=14.0.0'} - deprecated: This package is no longer supported. Please migrate to use @azure/core-rest-pipeline + deprecated: deprecating as we migrated to core v2 '@azure/core-lro@2.4.0': resolution: {integrity: sha512-F65+rYkll1dpw3RGm8/SSiSj+/QkMeYDanzS/QKlM1dmuneVyXbO46C88V1MRHluLGdMP6qfD3vDRYALn0z0tQ==} @@ -5769,6 +5772,9 @@ packages: axios-retry@3.7.0: resolution: {integrity: sha512-ZTnCkJbRtfScvwiRnoVskFAfvU0UG3xNcsjwTR0mawSbIJoothxn67gKsMaNAFHRXJ1RmuLhmZBzvyXi3+9WyQ==} + axios@1.7.3: + resolution: {integrity: sha512-Ar7ND9pU99eJ9GpoGQKhKf58GpUOgnzuaB7ueNQ5BMi0p+LZ5oaEnfF999fAArcTIBwXTCHAmGcHOZJaWPq9Nw==} + axios@1.7.4: resolution: {integrity: sha512-DukmaFRnY6AzAALSH4J2M3k6PkaC+MfaAGdEERRWcC9q3/TWQwLpHR8ZRLKTdQ3aBDL64EdluRDjJqKw+BPZEw==} @@ -14925,7 +14931,7 @@ snapshots: '@n8n/localtunnel@3.0.0': dependencies: - axios: 1.7.7(debug@4.3.6) + axios: 1.7.3(debug@4.3.6) debug: 4.3.6(supports-color@8.1.1) transitivePeerDependencies: - supports-color @@ -17630,6 +17636,14 @@ snapshots: '@babel/runtime': 7.24.7 is-retry-allowed: 2.2.0 + axios@1.7.3(debug@4.3.6): + dependencies: + follow-redirects: 1.15.6(debug@4.3.6) + form-data: 4.0.0 + proxy-from-env: 1.1.0 + transitivePeerDependencies: + - debug + axios@1.7.4: dependencies: follow-redirects: 1.15.6(debug@4.3.6) @@ -17646,14 +17660,6 @@ snapshots: transitivePeerDependencies: - debug - axios@1.7.7(debug@4.3.6): - dependencies: - follow-redirects: 1.15.6(debug@4.3.6) - form-data: 4.0.0 - proxy-from-env: 1.1.0 - transitivePeerDependencies: - - debug - axios@1.7.7(debug@4.3.7): dependencies: follow-redirects: 1.15.6(debug@4.3.7) @@ -19293,7 +19299,7 @@ snapshots: eslint-import-resolver-node@0.3.9: dependencies: - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) is-core-module: 2.13.1 resolve: 1.22.8 transitivePeerDependencies: @@ -19318,7 +19324,7 @@ snapshots: eslint-module-utils@2.8.0(@typescript-eslint/parser@7.2.0(eslint@8.57.0)(typescript@5.6.2))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.2.0(eslint@8.57.0)(typescript@5.6.2))(eslint-plugin-import@2.29.1)(eslint@8.57.0))(eslint@8.57.0): dependencies: - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) optionalDependencies: '@typescript-eslint/parser': 7.2.0(eslint@8.57.0)(typescript@5.6.2) eslint: 8.57.0 @@ -19338,7 +19344,7 @@ snapshots: array.prototype.findlastindex: 1.2.3 array.prototype.flat: 1.3.2 array.prototype.flatmap: 1.3.2 - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) doctrine: 2.1.0 eslint: 8.57.0 eslint-import-resolver-node: 0.3.9 @@ -20136,7 +20142,7 @@ snapshots: array-parallel: 0.1.3 array-series: 0.1.5 cross-spawn: 4.0.2 - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) transitivePeerDependencies: - supports-color @@ -23039,7 +23045,7 @@ snapshots: pdf-parse@1.1.1: dependencies: - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) node-ensure: 0.0.0 transitivePeerDependencies: - supports-color @@ -23868,7 +23874,7 @@ snapshots: rhea@1.0.24: dependencies: - debug: 3.2.7(supports-color@8.1.1) + debug: 3.2.7(supports-color@5.5.0) transitivePeerDependencies: - supports-color