diff --git a/packages/@n8n/config/src/configs/event-bus.ts b/packages/@n8n/config/src/configs/event-bus.ts new file mode 100644 index 0000000000..ed1226fa92 --- /dev/null +++ b/packages/@n8n/config/src/configs/event-bus.ts @@ -0,0 +1,31 @@ +import { Config, Env, Nested } from '../decorators'; + +@Config +class LogWriterConfig { + /** Number of event log files to keep */ + @Env('N8N_EVENTBUS_LOGWRITER_KEEPLOGCOUNT') + readonly keepLogCount: number = 3; + + /** Max size (in KB) of an event log file before a new one is started */ + @Env('N8N_EVENTBUS_LOGWRITER_MAXFILESIZEINKB') + readonly maxFileSizeInKB: number = 10240; // 10 MB + + /** Basename of event log file */ + @Env('N8N_EVENTBUS_LOGWRITER_LOGBASENAME') + readonly logBaseName: string = 'n8nEventLog'; +} + +@Config +export class EventBusConfig { + /** How often (in ms) to check for unsent event messages. Can in rare cases cause a message to be sent twice. `0` to disable */ + @Env('N8N_EVENTBUS_CHECKUNSENTINTERVAL') + readonly checkUnsentInterval: number = 0; + + /** Endpoint to retrieve n8n version information from */ + @Nested + readonly logWriter: LogWriterConfig; + + /** Whether to recover execution details after a crash or only mark status executions as crashed. */ + @Env('N8N_EVENTBUS_RECOVERY_MODE') + readonly crashRecoveryMode: 'simple' | 'extensive' = 'extensive'; +} diff --git a/packages/@n8n/config/src/index.ts b/packages/@n8n/config/src/index.ts index d74b942b80..11440c70bf 100644 --- a/packages/@n8n/config/src/index.ts +++ b/packages/@n8n/config/src/index.ts @@ -6,6 +6,7 @@ import { VersionNotificationsConfig } from './configs/version-notifications'; import { PublicApiConfig } from './configs/public-api'; import { ExternalSecretsConfig } from './configs/external-secrets'; import { TemplatesConfig } from './configs/templates'; +import { EventBusConfig } from './configs/event-bus'; @Config class UserManagementConfig { @@ -35,4 +36,7 @@ export class GlobalConfig { @Nested templates: TemplatesConfig; + + @Nested + eventBus: EventBusConfig; } diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 3079db91a6..d99b51e190 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -679,21 +679,6 @@ export const schema = { }, }, - templates: { - enabled: { - doc: 'Whether templates feature is enabled to load workflow templates.', - format: Boolean, - default: true, - env: 'N8N_TEMPLATES_ENABLED', - }, - host: { - doc: 'Endpoint host to retrieve workflow templates from endpoints.', - format: String, - default: 'https://api.n8n.io/api/', - env: 'N8N_TEMPLATES_HOST', - }, - }, - push: { backend: { format: ['sse', 'websocket'] as const, @@ -930,41 +915,6 @@ export const schema = { doc: 'Hide or show the usage page', }, - eventBus: { - checkUnsentInterval: { - doc: 'How often (in ms) to check for unsent event messages. Can in rare cases cause a message to be sent twice. 0=disabled', - format: Number, - default: 0, - env: 'N8N_EVENTBUS_CHECKUNSENTINTERVAL', - }, - logWriter: { - keepLogCount: { - doc: 'How many event log files to keep.', - format: Number, - default: 3, - env: 'N8N_EVENTBUS_LOGWRITER_KEEPLOGCOUNT', - }, - maxFileSizeInKB: { - doc: 'Maximum size of an event log file before a new one is started.', - format: Number, - default: 10240, // 10MB - env: 'N8N_EVENTBUS_LOGWRITER_MAXFILESIZEINKB', - }, - logBaseName: { - doc: 'Basename of the event log file.', - format: String, - default: 'n8nEventLog', - env: 'N8N_EVENTBUS_LOGWRITER_LOGBASENAME', - }, - }, - crashRecoveryMode: { - doc: 'Should n8n try to recover execution details after a crash, or just mark pending executions as crashed', - format: ['simple', 'extensive'] as const, - default: 'extensive', - env: 'N8N_EVENTBUS_RECOVERY_MODE', - }, - }, - redis: { prefix: { doc: 'Prefix for all n8n related keys', diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 46a353e0c7..758eeb5ae5 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -37,6 +37,7 @@ import { import { License } from '@/License'; import type { EventMessageExecutionOptions } from '../EventMessageClasses/EventMessageExecution'; import { EventMessageExecution } from '../EventMessageClasses/EventMessageExecution'; +import { GlobalConfig } from '@n8n/config'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -70,6 +71,7 @@ export class MessageEventBus extends EventEmitter { private readonly orchestrationService: OrchestrationService, private readonly recoveryService: ExecutionRecoveryService, private readonly license: License, + private readonly globalConfig: GlobalConfig, ) { super(); } @@ -109,7 +111,7 @@ export class MessageEventBus extends EventEmitter { if (options?.workerId) { // only add 'worker' to log file name since the ID changes on every start and we // would not be able to recover the log files from the previous run not knowing it - const logBaseName = config.getEnv('eventBus.logWriter.logBaseName') + '-worker'; + const logBaseName = this.globalConfig.eventBus.logWriter.logBaseName + '-worker'; this.logWriter = await MessageEventBusLogWriter.getInstance({ logBaseName, }); @@ -168,7 +170,7 @@ export class MessageEventBus extends EventEmitter { } } const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning(); - if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') { + if (recoveryAlreadyAttempted || this.globalConfig.eventBus.crashRecoveryMode === 'simple') { await this.executionRepository.markAsCrashed(unfinishedExecutionIds); // if we end up here, it means that the previous recovery process did not finish // a possible reason would be that recreating the workflow data itself caused e.g an OOM error @@ -188,13 +190,13 @@ export class MessageEventBus extends EventEmitter { } } // if configured, run this test every n ms - if (config.getEnv('eventBus.checkUnsentInterval') > 0) { + if (this.globalConfig.eventBus.checkUnsentInterval > 0) { if (this.pushIntervalTimer) { clearInterval(this.pushIntervalTimer); } this.pushIntervalTimer = setInterval(async () => { await this.trySendingUnsent(); - }, config.getEnv('eventBus.checkUnsentInterval')); + }, this.globalConfig.eventBus.checkUnsentInterval); } this.logger.debug('MessageEventBus initialized'); diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts index 87a4326d26..08899c0e09 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts @@ -7,7 +7,6 @@ import { Worker } from 'worker_threads'; import { createReadStream, existsSync, rmSync } from 'fs'; import readline from 'readline'; import remove from 'lodash/remove'; -import config from '@/config'; import type { EventMessageGenericOptions } from '../EventMessageClasses/EventMessageGeneric'; import { EventMessageGeneric } from '../EventMessageClasses/EventMessageGeneric'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; @@ -29,6 +28,7 @@ import { once as eventOnce } from 'events'; import { inTest } from '@/constants'; import { Logger } from '@/Logger'; import Container from 'typedi'; +import { GlobalConfig } from '@n8n/config'; interface MessageEventBusLogWriterConstructorOptions { logBaseName?: string; @@ -59,10 +59,13 @@ export class MessageEventBusLogWriter { private readonly logger: Logger; + private readonly globalConfig: GlobalConfig; + private _worker: Worker | undefined; constructor() { this.logger = Container.get(Logger); + this.globalConfig = Container.get(GlobalConfig); } public get worker(): Worker | undefined { @@ -83,12 +86,13 @@ export class MessageEventBusLogWriter { MessageEventBusLogWriter.options = { logFullBasePath: path.join( options?.logBasePath ?? Container.get(InstanceSettings).n8nFolder, - options?.logBaseName ?? config.getEnv('eventBus.logWriter.logBaseName'), + options?.logBaseName ?? Container.get(GlobalConfig).eventBus.logWriter.logBaseName, ), keepNumberOfFiles: - options?.keepNumberOfFiles ?? config.getEnv('eventBus.logWriter.keepLogCount'), + options?.keepNumberOfFiles ?? Container.get(GlobalConfig).eventBus.logWriter.keepLogCount, maxFileSizeInKB: - options?.maxFileSizeInKB ?? config.getEnv('eventBus.logWriter.maxFileSizeInKB'), + options?.maxFileSizeInKB ?? + Container.get(GlobalConfig).eventBus.logWriter.maxFileSizeInKB, }; await MessageEventBusLogWriter.instance.startThread(); } @@ -181,7 +185,7 @@ export class MessageEventBusLogWriter { sentMessages: [], unfinishedExecutions: {}, }; - const configLogCount = config.get('eventBus.logWriter.keepLogCount'); + const configLogCount = this.globalConfig.eventBus.logWriter.keepLogCount; const logCount = logHistory ? Math.min(configLogCount, logHistory) : configLogCount; for (let i = logCount; i >= 0; i--) { const logFileName = this.getLogFileName(i); @@ -282,7 +286,7 @@ export class MessageEventBusLogWriter { logHistory?: number, ): Promise { const result: EventMessageTypes[] = []; - const configLogCount = config.get('eventBus.logWriter.keepLogCount'); + const configLogCount = this.globalConfig.eventBus.logWriter.keepLogCount; const logCount = logHistory ? Math.min(configLogCount, logHistory) : configLogCount; for (let i = 0; i < logCount; i++) { const logFileName = this.getLogFileName(i); diff --git a/packages/cli/test/integration/eventbus.ee.test.ts b/packages/cli/test/integration/eventbus.ee.test.ts index b19f024f52..a79c1388b3 100644 --- a/packages/cli/test/integration/eventbus.ee.test.ts +++ b/packages/cli/test/integration/eventbus.ee.test.ts @@ -1,5 +1,4 @@ import { Container } from 'typedi'; -import config from '@/config'; import axios from 'axios'; import syslog from 'syslog-client'; import { v4 as uuid } from 'uuid'; @@ -92,9 +91,6 @@ beforeAll(async () => { mockedSyslog.createClient.mockImplementation(() => new syslog.Client()); - config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); - config.set('eventBus.logWriter.keepLogCount', 1); - eventBus = Container.get(MessageEventBus); await eventBus.initialize(); });