refactor(core): Port event bus config (no-changelog) (#10111)

This commit is contained in:
Iván Ovejero 2024-07-19 13:25:44 +02:00 committed by GitHub
parent aba1c64500
commit 9ab29f2181
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 51 additions and 64 deletions

View file

@ -0,0 +1,31 @@
import { Config, Env, Nested } from '../decorators';
@Config
class LogWriterConfig {
/** Number of event log files to keep */
@Env('N8N_EVENTBUS_LOGWRITER_KEEPLOGCOUNT')
readonly keepLogCount: number = 3;
/** Max size (in KB) of an event log file before a new one is started */
@Env('N8N_EVENTBUS_LOGWRITER_MAXFILESIZEINKB')
readonly maxFileSizeInKB: number = 10240; // 10 MB
/** Basename of event log file */
@Env('N8N_EVENTBUS_LOGWRITER_LOGBASENAME')
readonly logBaseName: string = 'n8nEventLog';
}
@Config
export class EventBusConfig {
/** How often (in ms) to check for unsent event messages. Can in rare cases cause a message to be sent twice. `0` to disable */
@Env('N8N_EVENTBUS_CHECKUNSENTINTERVAL')
readonly checkUnsentInterval: number = 0;
/** Endpoint to retrieve n8n version information from */
@Nested
readonly logWriter: LogWriterConfig;
/** Whether to recover execution details after a crash or only mark status executions as crashed. */
@Env('N8N_EVENTBUS_RECOVERY_MODE')
readonly crashRecoveryMode: 'simple' | 'extensive' = 'extensive';
}

View file

@ -6,6 +6,7 @@ import { VersionNotificationsConfig } from './configs/version-notifications';
import { PublicApiConfig } from './configs/public-api'; import { PublicApiConfig } from './configs/public-api';
import { ExternalSecretsConfig } from './configs/external-secrets'; import { ExternalSecretsConfig } from './configs/external-secrets';
import { TemplatesConfig } from './configs/templates'; import { TemplatesConfig } from './configs/templates';
import { EventBusConfig } from './configs/event-bus';
@Config @Config
class UserManagementConfig { class UserManagementConfig {
@ -35,4 +36,7 @@ export class GlobalConfig {
@Nested @Nested
templates: TemplatesConfig; templates: TemplatesConfig;
@Nested
eventBus: EventBusConfig;
} }

View file

@ -679,21 +679,6 @@ export const schema = {
}, },
}, },
templates: {
enabled: {
doc: 'Whether templates feature is enabled to load workflow templates.',
format: Boolean,
default: true,
env: 'N8N_TEMPLATES_ENABLED',
},
host: {
doc: 'Endpoint host to retrieve workflow templates from endpoints.',
format: String,
default: 'https://api.n8n.io/api/',
env: 'N8N_TEMPLATES_HOST',
},
},
push: { push: {
backend: { backend: {
format: ['sse', 'websocket'] as const, format: ['sse', 'websocket'] as const,
@ -930,41 +915,6 @@ export const schema = {
doc: 'Hide or show the usage page', doc: 'Hide or show the usage page',
}, },
eventBus: {
checkUnsentInterval: {
doc: 'How often (in ms) to check for unsent event messages. Can in rare cases cause a message to be sent twice. 0=disabled',
format: Number,
default: 0,
env: 'N8N_EVENTBUS_CHECKUNSENTINTERVAL',
},
logWriter: {
keepLogCount: {
doc: 'How many event log files to keep.',
format: Number,
default: 3,
env: 'N8N_EVENTBUS_LOGWRITER_KEEPLOGCOUNT',
},
maxFileSizeInKB: {
doc: 'Maximum size of an event log file before a new one is started.',
format: Number,
default: 10240, // 10MB
env: 'N8N_EVENTBUS_LOGWRITER_MAXFILESIZEINKB',
},
logBaseName: {
doc: 'Basename of the event log file.',
format: String,
default: 'n8nEventLog',
env: 'N8N_EVENTBUS_LOGWRITER_LOGBASENAME',
},
},
crashRecoveryMode: {
doc: 'Should n8n try to recover execution details after a crash, or just mark pending executions as crashed',
format: ['simple', 'extensive'] as const,
default: 'extensive',
env: 'N8N_EVENTBUS_RECOVERY_MODE',
},
},
redis: { redis: {
prefix: { prefix: {
doc: 'Prefix for all n8n related keys', doc: 'Prefix for all n8n related keys',

View file

@ -37,6 +37,7 @@ import {
import { License } from '@/License'; import { License } from '@/License';
import type { EventMessageExecutionOptions } from '../EventMessageClasses/EventMessageExecution'; import type { EventMessageExecutionOptions } from '../EventMessageClasses/EventMessageExecution';
import { EventMessageExecution } from '../EventMessageClasses/EventMessageExecution'; import { EventMessageExecution } from '../EventMessageClasses/EventMessageExecution';
import { GlobalConfig } from '@n8n/config';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
@ -70,6 +71,7 @@ export class MessageEventBus extends EventEmitter {
private readonly orchestrationService: OrchestrationService, private readonly orchestrationService: OrchestrationService,
private readonly recoveryService: ExecutionRecoveryService, private readonly recoveryService: ExecutionRecoveryService,
private readonly license: License, private readonly license: License,
private readonly globalConfig: GlobalConfig,
) { ) {
super(); super();
} }
@ -109,7 +111,7 @@ export class MessageEventBus extends EventEmitter {
if (options?.workerId) { if (options?.workerId) {
// only add 'worker' to log file name since the ID changes on every start and we // only add 'worker' to log file name since the ID changes on every start and we
// would not be able to recover the log files from the previous run not knowing it // would not be able to recover the log files from the previous run not knowing it
const logBaseName = config.getEnv('eventBus.logWriter.logBaseName') + '-worker'; const logBaseName = this.globalConfig.eventBus.logWriter.logBaseName + '-worker';
this.logWriter = await MessageEventBusLogWriter.getInstance({ this.logWriter = await MessageEventBusLogWriter.getInstance({
logBaseName, logBaseName,
}); });
@ -168,7 +170,7 @@ export class MessageEventBus extends EventEmitter {
} }
} }
const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning(); const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning();
if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') { if (recoveryAlreadyAttempted || this.globalConfig.eventBus.crashRecoveryMode === 'simple') {
await this.executionRepository.markAsCrashed(unfinishedExecutionIds); await this.executionRepository.markAsCrashed(unfinishedExecutionIds);
// if we end up here, it means that the previous recovery process did not finish // if we end up here, it means that the previous recovery process did not finish
// a possible reason would be that recreating the workflow data itself caused e.g an OOM error // a possible reason would be that recreating the workflow data itself caused e.g an OOM error
@ -188,13 +190,13 @@ export class MessageEventBus extends EventEmitter {
} }
} }
// if configured, run this test every n ms // if configured, run this test every n ms
if (config.getEnv('eventBus.checkUnsentInterval') > 0) { if (this.globalConfig.eventBus.checkUnsentInterval > 0) {
if (this.pushIntervalTimer) { if (this.pushIntervalTimer) {
clearInterval(this.pushIntervalTimer); clearInterval(this.pushIntervalTimer);
} }
this.pushIntervalTimer = setInterval(async () => { this.pushIntervalTimer = setInterval(async () => {
await this.trySendingUnsent(); await this.trySendingUnsent();
}, config.getEnv('eventBus.checkUnsentInterval')); }, this.globalConfig.eventBus.checkUnsentInterval);
} }
this.logger.debug('MessageEventBus initialized'); this.logger.debug('MessageEventBus initialized');

View file

@ -7,7 +7,6 @@ import { Worker } from 'worker_threads';
import { createReadStream, existsSync, rmSync } from 'fs'; import { createReadStream, existsSync, rmSync } from 'fs';
import readline from 'readline'; import readline from 'readline';
import remove from 'lodash/remove'; import remove from 'lodash/remove';
import config from '@/config';
import type { EventMessageGenericOptions } from '../EventMessageClasses/EventMessageGeneric'; import type { EventMessageGenericOptions } from '../EventMessageClasses/EventMessageGeneric';
import { EventMessageGeneric } from '../EventMessageClasses/EventMessageGeneric'; import { EventMessageGeneric } from '../EventMessageClasses/EventMessageGeneric';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
@ -29,6 +28,7 @@ import { once as eventOnce } from 'events';
import { inTest } from '@/constants'; import { inTest } from '@/constants';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import Container from 'typedi'; import Container from 'typedi';
import { GlobalConfig } from '@n8n/config';
interface MessageEventBusLogWriterConstructorOptions { interface MessageEventBusLogWriterConstructorOptions {
logBaseName?: string; logBaseName?: string;
@ -59,10 +59,13 @@ export class MessageEventBusLogWriter {
private readonly logger: Logger; private readonly logger: Logger;
private readonly globalConfig: GlobalConfig;
private _worker: Worker | undefined; private _worker: Worker | undefined;
constructor() { constructor() {
this.logger = Container.get(Logger); this.logger = Container.get(Logger);
this.globalConfig = Container.get(GlobalConfig);
} }
public get worker(): Worker | undefined { public get worker(): Worker | undefined {
@ -83,12 +86,13 @@ export class MessageEventBusLogWriter {
MessageEventBusLogWriter.options = { MessageEventBusLogWriter.options = {
logFullBasePath: path.join( logFullBasePath: path.join(
options?.logBasePath ?? Container.get(InstanceSettings).n8nFolder, options?.logBasePath ?? Container.get(InstanceSettings).n8nFolder,
options?.logBaseName ?? config.getEnv('eventBus.logWriter.logBaseName'), options?.logBaseName ?? Container.get(GlobalConfig).eventBus.logWriter.logBaseName,
), ),
keepNumberOfFiles: keepNumberOfFiles:
options?.keepNumberOfFiles ?? config.getEnv('eventBus.logWriter.keepLogCount'), options?.keepNumberOfFiles ?? Container.get(GlobalConfig).eventBus.logWriter.keepLogCount,
maxFileSizeInKB: maxFileSizeInKB:
options?.maxFileSizeInKB ?? config.getEnv('eventBus.logWriter.maxFileSizeInKB'), options?.maxFileSizeInKB ??
Container.get(GlobalConfig).eventBus.logWriter.maxFileSizeInKB,
}; };
await MessageEventBusLogWriter.instance.startThread(); await MessageEventBusLogWriter.instance.startThread();
} }
@ -181,7 +185,7 @@ export class MessageEventBusLogWriter {
sentMessages: [], sentMessages: [],
unfinishedExecutions: {}, unfinishedExecutions: {},
}; };
const configLogCount = config.get('eventBus.logWriter.keepLogCount'); const configLogCount = this.globalConfig.eventBus.logWriter.keepLogCount;
const logCount = logHistory ? Math.min(configLogCount, logHistory) : configLogCount; const logCount = logHistory ? Math.min(configLogCount, logHistory) : configLogCount;
for (let i = logCount; i >= 0; i--) { for (let i = logCount; i >= 0; i--) {
const logFileName = this.getLogFileName(i); const logFileName = this.getLogFileName(i);
@ -282,7 +286,7 @@ export class MessageEventBusLogWriter {
logHistory?: number, logHistory?: number,
): Promise<EventMessageTypes[]> { ): Promise<EventMessageTypes[]> {
const result: EventMessageTypes[] = []; const result: EventMessageTypes[] = [];
const configLogCount = config.get('eventBus.logWriter.keepLogCount'); const configLogCount = this.globalConfig.eventBus.logWriter.keepLogCount;
const logCount = logHistory ? Math.min(configLogCount, logHistory) : configLogCount; const logCount = logHistory ? Math.min(configLogCount, logHistory) : configLogCount;
for (let i = 0; i < logCount; i++) { for (let i = 0; i < logCount; i++) {
const logFileName = this.getLogFileName(i); const logFileName = this.getLogFileName(i);

View file

@ -1,5 +1,4 @@
import { Container } from 'typedi'; import { Container } from 'typedi';
import config from '@/config';
import axios from 'axios'; import axios from 'axios';
import syslog from 'syslog-client'; import syslog from 'syslog-client';
import { v4 as uuid } from 'uuid'; import { v4 as uuid } from 'uuid';
@ -92,9 +91,6 @@ beforeAll(async () => {
mockedSyslog.createClient.mockImplementation(() => new syslog.Client()); mockedSyslog.createClient.mockImplementation(() => new syslog.Client());
config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter');
config.set('eventBus.logWriter.keepLogCount', 1);
eventBus = Container.get(MessageEventBus); eventBus = Container.get(MessageEventBus);
await eventBus.initialize(); await eventBus.initialize();
}); });