diff --git a/packages/cli/package.json b/packages/cli/package.json index 0512cd4e52..fcafc65a01 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -188,7 +188,6 @@ "sse-channel": "^4.0.0", "swagger-ui-express": "^4.3.0", "syslog-client": "^1.1.1", - "threads": "^1.7.0", "tslib": "1.14.1", "typeorm": "0.2.45", "uuid": "^8.3.2", diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index cd43c3a170..ef55c7d64a 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -1065,12 +1065,6 @@ export const schema = { env: 'N8N_EVENTBUS_CHECKUNSENTINTERVAL', }, logWriter: { - syncFileAccess: { - doc: 'Whether all file access happens synchronously within the thread.', - format: Boolean, - default: false, - env: 'N8N_EVENTBUS_LOGWRITER_SYNCFILEACCESS', - }, keepLogCount: { doc: 'How many event log files to keep.', format: Number, diff --git a/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessage.ts b/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessage.ts index 425af23ab5..0b89bebc05 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessage.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessage.ts @@ -1,5 +1,4 @@ /* eslint-disable @typescript-eslint/no-unsafe-argument */ -/* eslint-disable @typescript-eslint/no-explicit-any */ import { DateTime } from 'luxon'; import type { EventMessageTypeNames, JsonObject } from 'n8n-workflow'; import { v4 as uuid } from 'uuid'; diff --git a/packages/cli/src/eventbus/EventMessageClasses/EventMessageAudit.ts b/packages/cli/src/eventbus/EventMessageClasses/EventMessageAudit.ts index a296bbdb14..5c59095f8c 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/EventMessageAudit.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/EventMessageAudit.ts @@ -44,7 +44,6 @@ export interface EventMessageAuditOptions extends AbstractEventMessageOptions { } export class EventMessageAudit extends AbstractEventMessage { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access readonly __type = EventMessageTypeNames.audit; eventName: EventNamesAuditType; diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index c21c0e2d3e..2211f58464 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -66,7 +66,6 @@ class MessageEventBus extends EventEmitter { LoggerProxy.debug('Initializing event bus...'); - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call const savedEventDestinations = await Db.collections.EventDestinations.find({}); if (savedEventDestinations.length > 0) { for (const destinationData of savedEventDestinations) { @@ -91,11 +90,9 @@ class MessageEventBus extends EventEmitter { LoggerProxy.debug('Checking for unsent event messages'); const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions(); LoggerProxy.debug( - `Start logging into ${ - (await this.logWriter?.getThread()?.getLogFileName()) ?? 'unknown filename' - } `, + `Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `, ); - await this.logWriter?.startLogging(); + this.logWriter?.startLogging(); await this.send(unsentAndUnfinished.unsentMessages); if (unsentAndUnfinished.unfinishedExecutions.size > 0) { @@ -130,10 +127,8 @@ class MessageEventBus extends EventEmitter { if (id && Object.keys(this.destinations).includes(id)) { result = [this.destinations[id].serialize()]; } else { - // eslint-disable-next-line @typescript-eslint/no-unsafe-return result = Object.keys(this.destinations).map((e) => this.destinations[e].serialize()); } - // eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? '')); } @@ -175,7 +170,11 @@ class MessageEventBus extends EventEmitter { msgs = [msgs]; } for (const msg of msgs) { - await this.logWriter?.putMessage(msg); + this.logWriter?.putMessage(msg); + // if there are no set up destinations, immediately mark the event as sent + if (!this.shouldSendMsg(msg)) { + this.confirmSent(msg, { id: '0', name: 'eventBus' }); + } await this.emitMessage(msg); } } @@ -192,8 +191,8 @@ class MessageEventBus extends EventEmitter { return false; } - async confirmSent(msg: EventMessageTypes, source?: EventMessageConfirmSource) { - await this.logWriter?.confirmMessageSent(msg.id, source); + confirmSent(msg: EventMessageTypes, source?: EventMessageConfirmSource) { + this.logWriter?.confirmMessageSent(msg.id, source); } private hasAnyDestinationSubscribedToEvent(msg: EventMessageTypes): boolean { @@ -210,22 +209,23 @@ class MessageEventBus extends EventEmitter { // this is for internal use ONLY and not for use with custom destinations! this.emit('message', msg); - LoggerProxy.debug(`Listeners: ${this.eventNames().join(',')}`); + // LoggerProxy.debug(`Listeners: ${this.eventNames().join(',')}`); - // if there are no set up destinations, immediately mark the event as sent - if ( - !isLogStreamingEnabled() || - Object.keys(this.destinations).length === 0 || - !this.hasAnyDestinationSubscribedToEvent(msg) - ) { - await this.confirmSent(msg, { id: '0', name: 'eventBus' }); - } else { + if (this.shouldSendMsg(msg)) { for (const destinationName of Object.keys(this.destinations)) { this.emit(this.destinations[destinationName].getId(), msg); } } } + shouldSendMsg(msg: EventMessageTypes): boolean { + return ( + isLogStreamingEnabled() && + Object.keys(this.destinations).length > 0 && + this.hasAnyDestinationSubscribedToEvent(msg) + ); + } + async getEventsAll(): Promise { const queryResult = await this.logWriter?.getMessagesAll(); const filtered = uniqby(queryResult, 'id'); diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts index 1e74884b3d..abc08a87e1 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts @@ -1,5 +1,4 @@ /* eslint-disable import/no-cycle */ -/* eslint-disable @typescript-eslint/no-unsafe-member-access */ import { MessageEventBusDestinationTypeNames } from 'n8n-workflow'; import type { EventDestinations } from '@/databases/entities/MessageEventBusDestinationEntity'; import type { MessageEventBusDestination } from './MessageEventBusDestination.ee'; @@ -10,7 +9,6 @@ import { MessageEventBusDestinationWebhook } from './MessageEventBusDestinationW export function messageEventBusDestinationFromDb( dbData: EventDestinations, ): MessageEventBusDestination | null { - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-assignment const destinationData = dbData.destination; if ('__type' in destinationData) { switch (destinationData.__type) { diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts index 689680730d..5384075b3a 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts @@ -1,5 +1,3 @@ -/* eslint-disable @typescript-eslint/no-unsafe-member-access */ -/* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { v4 as uuid } from 'uuid'; import { INodeCredentials, @@ -83,7 +81,6 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti id: this.getId(), destination: this.serialize(), }; - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call const dbResult: InsertResult = await Db.collections.EventDestinations.upsert(data, { skipUpdateIfNoValuesChanged: true, conflictPaths: ['id'], @@ -97,7 +94,6 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti } static async deleteFromDb(id: string): Promise { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access const dbResult = await Db.collections.EventDestinations.delete({ id }); return dbResult; } diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts index 3a18f5219f..232ae982f7 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts @@ -36,7 +36,6 @@ export class MessageEventBusDestinationSentry constructor(options: MessageEventBusDestinationSentryOptions) { super(options); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access this.label = options.label ?? 'Sentry DSN'; this.__type = options.__type ?? MessageEventBusDestinationTypeNames.sentry; this.dsn = options.dsn; @@ -85,7 +84,7 @@ export class MessageEventBusDestinationSentry ); if (sentryResult) { - await eventBus.confirmSent(msg, { id: this.id, name: this.label }); + eventBus.confirmSent(msg, { id: this.id, name: this.label }); sendResult = true; } } catch (error) { @@ -109,7 +108,6 @@ export class MessageEventBusDestinationSentry ): MessageEventBusDestinationSentry | null { if ( '__type' in data && - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access data.__type === MessageEventBusDestinationTypeNames.sentry && isMessageEventBusDestinationSentryOptions(data) ) { diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts index 382f5aa950..287b95ea5a 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts @@ -96,7 +96,7 @@ export class MessageEventBusDestinationSyslog if (error) { console.log(error); } else { - await eventBus.confirmSent(msg, { id: this.id, name: this.label }); + eventBus.confirmSent(msg, { id: this.id, name: this.label }); sendResult = true; } }, @@ -112,7 +112,6 @@ export class MessageEventBusDestinationSyslog serialize(): MessageEventBusDestinationSyslogOptions { const abstractSerialized = super.serialize(); - // eslint-disable-next-line @typescript-eslint/no-unsafe-return return { ...abstractSerialized, expectedStatusCode: this.expectedStatusCode, diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts index f99573e935..81aa75f2c4 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts @@ -192,8 +192,6 @@ export class MessageEventBusDestinationWebhook } catch (_) { console.log('JSON parameter need to be an valid JSON'); } - - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment this.axiosRequestOptions.params = jsonParse(this.jsonQuery); } } @@ -212,8 +210,6 @@ export class MessageEventBusDestinationWebhook } catch (_) { console.log('JSON parameter need to be an valid JSON'); } - - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment this.axiosRequestOptions.headers = jsonParse(this.jsonHeaders); } } @@ -222,7 +218,6 @@ export class MessageEventBusDestinationWebhook if (this.axiosRequestOptions.headers === undefined) { this.axiosRequestOptions.headers = {}; } - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access this.axiosRequestOptions.headers['Content-Type'] = 'application/json'; } @@ -336,10 +331,8 @@ export class MessageEventBusDestinationWebhook password: httpBasicAuth.password as string, }; } else if (httpHeaderAuth) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access this.axiosRequestOptions.headers[httpHeaderAuth.name as string] = httpHeaderAuth.value; } else if (httpQueryAuth) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access this.axiosRequestOptions.params[httpQueryAuth.name as string] = httpQueryAuth.value; } else if (httpDigestAuth) { this.axiosRequestOptions.auth = { @@ -353,13 +346,13 @@ export class MessageEventBusDestinationWebhook if (requestResponse) { if (this.responseCodeMustMatch) { if (requestResponse.status === this.expectedStatusCode) { - await eventBus.confirmSent(msg, { id: this.id, name: this.label }); + eventBus.confirmSent(msg, { id: this.id, name: this.label }); sendResult = true; } else { sendResult = false; } } else { - await eventBus.confirmSent(msg, { id: this.id, name: this.label }); + eventBus.confirmSent(msg, { id: this.id, name: this.label }); sendResult = true; } } diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts index 1a4632eed6..31dc42c40e 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts @@ -3,9 +3,8 @@ import { isEventMessageOptions } from '../EventMessageClasses/AbstractEventMessage'; import { UserSettings } from 'n8n-core'; import path, { parse } from 'path'; -import { ModuleThread, spawn, Thread, Worker } from 'threads'; -import { MessageEventBusLogWriterWorker } from './MessageEventBusLogWriterWorker'; -import { createReadStream, existsSync } from 'fs'; +import { Worker } from 'worker_threads'; +import { createReadStream, existsSync, rmSync } from 'fs'; import readline from 'readline'; import { jsonParse, LoggerProxy } from 'n8n-workflow'; import remove from 'lodash.remove'; @@ -19,15 +18,21 @@ import { isEventMessageConfirm, } from '../EventMessageClasses/EventMessageConfirm'; import { once as eventOnce } from 'events'; +import { inTest } from '../../constants'; -interface MessageEventBusLogWriterOptions { - syncFileAccess?: boolean; +interface MessageEventBusLogWriterConstructorOptions { logBaseName?: string; logBasePath?: string; - keepLogCount?: number; + keepNumberOfFiles?: number; maxFileSizeInKB?: number; } +export interface MessageEventBusLogWriterOptions { + logFullBasePath: string; + keepNumberOfFiles: number; + maxFileSizeInKB: number; +} + interface ReadMessagesFromLogFileResult { loggedMessages: EventMessageTypes[]; sentMessages: EventMessageTypes[]; @@ -42,7 +47,11 @@ export class MessageEventBusLogWriter { static options: Required; - private worker: ModuleThread | null; + private _worker: Worker | undefined; + + public get worker(): Worker | undefined { + return this._worker; + } /** * Instantiates the Writer and the corresponding worker thread. @@ -51,16 +60,17 @@ export class MessageEventBusLogWriter { * **Note** that starting to log will archive existing logs, so handle unsent events first before calling startLogging() */ static async getInstance( - options?: MessageEventBusLogWriterOptions, + options?: MessageEventBusLogWriterConstructorOptions, ): Promise { if (!MessageEventBusLogWriter.instance) { MessageEventBusLogWriter.instance = new MessageEventBusLogWriter(); MessageEventBusLogWriter.options = { - logBaseName: options?.logBaseName ?? config.getEnv('eventBus.logWriter.logBaseName'), - logBasePath: options?.logBasePath ?? UserSettings.getUserN8nFolderPath(), - syncFileAccess: - options?.syncFileAccess ?? config.getEnv('eventBus.logWriter.syncFileAccess'), - keepLogCount: options?.keepLogCount ?? config.getEnv('eventBus.logWriter.keepLogCount'), + logFullBasePath: path.join( + options?.logBasePath ?? UserSettings.getUserN8nFolderPath(), + options?.logBaseName ?? config.getEnv('eventBus.logWriter.logBaseName'), + ), + keepNumberOfFiles: + options?.keepNumberOfFiles ?? config.getEnv('eventBus.logWriter.keepLogCount'), maxFileSizeInKB: options?.maxFileSizeInKB ?? config.getEnv('eventBus.logWriter.maxFileSizeInKB'), }; @@ -73,15 +83,19 @@ export class MessageEventBusLogWriter { * First archives existing log files one history level upwards, * then starts logging events into a fresh event log */ - async startLogging() { - await MessageEventBusLogWriter.instance.getThread()?.startLogging(); + startLogging() { + if (this.worker) { + this.worker.postMessage({ command: 'startLogging', data: {} }); + } } /** * Pauses all logging. Events are still received by the worker, they just are not logged any more */ async pauseLogging() { - await MessageEventBusLogWriter.instance.getThread()?.pauseLogging(); + if (this.worker) { + this.worker.postMessage({ command: 'pauseLogging', data: {} }); + } } private async startThread() { @@ -89,26 +103,23 @@ export class MessageEventBusLogWriter { await this.close(); } await MessageEventBusLogWriter.instance.spawnThread(); - await MessageEventBusLogWriter.instance - .getThread() - ?.initialize( - path.join( - MessageEventBusLogWriter.options.logBasePath, - MessageEventBusLogWriter.options.logBaseName, - ), - MessageEventBusLogWriter.options.syncFileAccess, - MessageEventBusLogWriter.options.keepLogCount, - MessageEventBusLogWriter.options.maxFileSizeInKB, - ); + if (this.worker) { + this.worker.postMessage({ command: 'initialize', data: MessageEventBusLogWriter.options }); + } } private async spawnThread(): Promise { - this.worker = await spawn( - new Worker(`${parse(__filename).name}Worker`), - ); + const parsedName = parse(__filename); + let workerFileName; + if (inTest) { + workerFileName = './dist/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.js'; + } else { + workerFileName = path.join(parsedName.dir, `${parsedName.name}Worker${parsedName.ext}`); + } + this._worker = new Worker(workerFileName); if (this.worker) { - Thread.errors(this.worker).subscribe(async (error) => { - LoggerProxy.error('Event Bus Log Writer thread error', error); + this.worker.on('messageerror', async (error) => { + LoggerProxy.error('Event Bus Log Writer thread error, attempting to restart...', error); await MessageEventBusLogWriter.instance.startThread(); }); return true; @@ -116,29 +127,25 @@ export class MessageEventBusLogWriter { return false; } - getThread(): ModuleThread | undefined { - if (this.worker) { - return this.worker; - } - return; - } - async close(): Promise { if (this.worker) { - await Thread.terminate(this.worker); - this.worker = null; + await this.worker.terminate(); + this._worker = undefined; } } - async putMessage(msg: EventMessageTypes): Promise { + putMessage(msg: EventMessageTypes): void { if (this.worker) { - await this.worker.appendMessageToLog(msg.serialize()); + this.worker.postMessage({ command: 'appendMessageToLog', data: msg.serialize() }); } } - async confirmMessageSent(msgId: string, source?: EventMessageConfirmSource): Promise { + confirmMessageSent(msgId: string, source?: EventMessageConfirmSource): void { if (this.worker) { - await this.worker.confirmMessageSent(new EventMessageConfirm(msgId, source).serialize()); + this.worker.postMessage({ + command: 'confirmMessageSent', + data: new EventMessageConfirm(msgId, source).serialize(), + }); } } @@ -155,7 +162,7 @@ export class MessageEventBusLogWriter { ? Math.min(config.get('eventBus.logWriter.keepLogCount') as number, logHistory) : (config.get('eventBus.logWriter.keepLogCount') as number); for (let i = logCount; i >= 0; i--) { - const logFileName = await MessageEventBusLogWriter.instance.getThread()?.getLogFileName(i); + const logFileName = this.getLogFileName(i); if (logFileName) { await this.readLoggedMessagesFromFile(results, mode, logFileName); } @@ -212,6 +219,22 @@ export class MessageEventBusLogWriter { return results; } + getLogFileName(counter?: number): string { + if (counter) { + return `${MessageEventBusLogWriter.options.logFullBasePath}-${counter}.log`; + } else { + return `${MessageEventBusLogWriter.options.logFullBasePath}.log`; + } + } + + cleanAllLogs() { + for (let i = 0; i <= MessageEventBusLogWriter.options.keepNumberOfFiles; i++) { + if (existsSync(this.getLogFileName(i))) { + rmSync(this.getLogFileName(i)); + } + } + } + async getMessagesByExecutionId( executionId: string, logHistory?: number, @@ -221,7 +244,7 @@ export class MessageEventBusLogWriter { ? Math.min(config.get('eventBus.logWriter.keepLogCount') as number, logHistory) : (config.get('eventBus.logWriter.keepLogCount') as number); for (let i = 0; i < logCount; i++) { - const logFileName = await MessageEventBusLogWriter.instance.getThread()?.getLogFileName(i); + const logFileName = this.getLogFileName(i); if (logFileName) { result.push(...(await this.readFromFileByExecutionId(executionId, logFileName))); } diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts index 19aeacd097..2672a3b987 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts @@ -1,17 +1,11 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import { appendFileSync, existsSync, rmSync, renameSync, openSync, closeSync } from 'fs'; -import { appendFile, stat } from 'fs/promises'; -import { expose, isWorkerRuntime } from 'threads/worker'; - -// ----------------------------------------- -// * This part runs in the Worker Thread ! * -// ----------------------------------------- - -// all references to and imports from classes have been remove to keep memory usage low +import { stat } from 'fs/promises'; +import { isMainThread, parentPort } from 'worker_threads'; +import type { MessageEventBusLogWriterOptions } from './MessageEventBusLogWriter'; let logFileBasePath = ''; let loggingPaused = true; -let syncFileAccess = false; let keepFiles = 10; let fileStatTimer: NodeJS.Timer; let maxLogFileSizeInKB = 102400; @@ -20,12 +14,8 @@ function setLogFileBasePath(basePath: string) { logFileBasePath = basePath; } -function setUseSyncFileAccess(useSync: boolean) { - syncFileAccess = useSync; -} - -function setMaxLogFileSizeInKB(maxSizeInKB: number) { - maxLogFileSizeInKB = maxSizeInKB; +function setMaxLogFileSizeInKB(maxFileSizeInKB: number) { + maxLogFileSizeInKB = maxFileSizeInKB; } function setKeepFiles(keepNumberOfFiles: number) { @@ -81,65 +71,53 @@ function appendMessageSync(msg: any) { appendFileSync(buildLogFileNameWithCounter(), JSON.stringify(msg) + '\n'); } -async function appendMessage(msg: any) { - if (loggingPaused) { - return; - } - await appendFile(buildLogFileNameWithCounter(), JSON.stringify(msg) + '\n'); +if (!isMainThread) { + // ----------------------------------------- + // * This part runs in the Worker Thread ! * + // ----------------------------------------- + parentPort?.on('message', async (msg: { command: string; data: any }) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const { command, data } = msg; + try { + switch (command) { + case 'appendMessageToLog': + case 'confirmMessageSent': + appendMessageSync(data); + parentPort?.postMessage({ command, data: true }); + break; + case 'pauseLogging': + loggingPaused = true; + clearInterval(fileStatTimer); + break; + case 'initialize': + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const settings: MessageEventBusLogWriterOptions = { + logFullBasePath: (data as MessageEventBusLogWriterOptions).logFullBasePath ?? '', + keepNumberOfFiles: (data as MessageEventBusLogWriterOptions).keepNumberOfFiles ?? 10, + maxFileSizeInKB: (data as MessageEventBusLogWriterOptions).maxFileSizeInKB ?? 102400, + }; + setLogFileBasePath(settings.logFullBasePath); + setKeepFiles(settings.keepNumberOfFiles); + setMaxLogFileSizeInKB(settings.maxFileSizeInKB); + break; + case 'startLogging': + if (logFileBasePath) { + renameAndCreateLogs(); + loggingPaused = false; + fileStatTimer = setInterval(async () => { + await checkFileSize(buildLogFileNameWithCounter()); + }, 5000); + } + break; + case 'cleanLogs': + cleanAllLogs(); + parentPort?.postMessage('cleanedAllLogs'); + break; + default: + break; + } + } catch (error) { + parentPort?.postMessage(error); + } + }); } - -const messageEventBusLogWriterWorker = { - async appendMessageToLog(msg: any) { - if (syncFileAccess) { - appendMessageSync(msg); - } else { - await appendMessage(msg); - } - }, - async confirmMessageSent(confirm: unknown) { - if (syncFileAccess) { - appendMessageSync(confirm); - } else { - await appendMessage(confirm); - } - }, - pauseLogging() { - loggingPaused = true; - clearInterval(fileStatTimer); - }, - initialize( - basePath: string, - useSyncFileAccess = false, - keepNumberOfFiles = 10, - maxSizeInKB = 102400, - ) { - setLogFileBasePath(basePath); - setUseSyncFileAccess(useSyncFileAccess); - setKeepFiles(keepNumberOfFiles); - setMaxLogFileSizeInKB(maxSizeInKB); - }, - startLogging() { - if (logFileBasePath) { - renameAndCreateLogs(); - loggingPaused = false; - fileStatTimer = setInterval(async () => { - await checkFileSize(buildLogFileNameWithCounter()); - }, 5000); - } - }, - getLogFileName(counter?: number) { - if (logFileBasePath) { - return buildLogFileNameWithCounter(counter); - } else { - return undefined; - } - }, - cleanLogs() { - cleanAllLogs(); - }, -}; -if (isWorkerRuntime()) { - // Register the serializer on the worker thread - expose(messageEventBusLogWriterWorker); -} -export type MessageEventBusLogWriterWorker = typeof messageEventBusLogWriterWorker; diff --git a/packages/cli/src/eventbus/eventBusRoutes.ts b/packages/cli/src/eventbus/eventBusRoutes.ts index b9702f9773..d5a30fc9fb 100644 --- a/packages/cli/src/eventbus/eventBusRoutes.ts +++ b/packages/cli/src/eventbus/eventBusRoutes.ts @@ -55,17 +55,14 @@ const isWithQueryString = (candidate: unknown): candidate is { query: string } = const isMessageEventBusDestinationWebhookOptions = ( candidate: unknown, ): candidate is MessageEventBusDestinationWebhookOptions => { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment const o = candidate as MessageEventBusDestinationWebhookOptions; if (!o) return false; - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access return o.url !== undefined; }; const isMessageEventBusDestinationOptions = ( candidate: unknown, ): candidate is MessageEventBusDestinationOptions => { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment const o = candidate as MessageEventBusDestinationOptions; if (!o) return false; return o.__type !== undefined; @@ -138,23 +135,20 @@ eventBusRouter.post( eventBusRouter.get( '/destination', - // eslint-disable-next-line @typescript-eslint/no-unused-vars - ResponseHelper.send(async (req: express.Request, res: express.Response): Promise => { + ResponseHelper.send(async (req: express.Request): Promise => { let result = []; if (isWithIdString(req.query)) { result = await eventBus.findDestination(req.query.id); } else { result = await eventBus.findDestination(); } - // eslint-disable-next-line @typescript-eslint/no-unsafe-return return result; }), ); eventBusRouter.post( '/destination', - // eslint-disable-next-line @typescript-eslint/no-unused-vars - ResponseHelper.send(async (req: express.Request, res: express.Response): Promise => { + ResponseHelper.send(async (req: express.Request): Promise => { if (!req.user || (req.user as User).globalRole.name !== 'owner') { throw new ResponseHelper.UnauthorizedError('Invalid request'); } @@ -195,8 +189,7 @@ eventBusRouter.post( eventBusRouter.get( '/testmessage', - // eslint-disable-next-line @typescript-eslint/no-unused-vars - ResponseHelper.send(async (req: express.Request, res: express.Response): Promise => { + ResponseHelper.send(async (req: express.Request): Promise => { let result = false; if (isWithIdString(req.query)) { result = await eventBus.testDestination(req.query.id); @@ -207,8 +200,7 @@ eventBusRouter.get( eventBusRouter.delete( '/destination', - // eslint-disable-next-line @typescript-eslint/no-unused-vars - ResponseHelper.send(async (req: express.Request, res: express.Response): Promise => { + ResponseHelper.send(async (req: express.Request): Promise => { if (!req.user || (req.user as User).globalRole.name !== 'owner') { throw new ResponseHelper.UnauthorizedError('Invalid request'); } diff --git a/packages/cli/test/integration/eventbus.test.ts b/packages/cli/test/integration/eventbus.test.ts index 65a420be80..dfb8fbaa2e 100644 --- a/packages/cli/test/integration/eventbus.test.ts +++ b/packages/cli/test/integration/eventbus.test.ts @@ -21,6 +21,7 @@ import { MessageEventBusDestinationSyslog } from '../../src/eventbus/MessageEven import { MessageEventBusDestinationWebhook } from '../../src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; import { MessageEventBusDestinationSentry } from '../../src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee'; import { EventMessageAudit } from '../../src/eventbus/EventMessageClasses/EventMessageAudit'; +import { v4 as uuid } from 'uuid'; jest.unmock('@/eventbus/MessageEventBus/MessageEventBus'); jest.mock('axios'); @@ -63,28 +64,22 @@ const testSentryDestination: MessageEventBusDestinationSentryOptions = { }; async function cleanLogs() { - await eventBus.logWriter.getThread()?.cleanLogs(); + eventBus.logWriter.cleanAllLogs(); const allMessages = await eventBus.getEventsAll(); expect(allMessages.length).toBe(0); } -async function confirmIdsSentUnsent(id: string) { - const sent = await eventBus.getEventsSent(); - const unsent = await eventBus.getEventsUnsent(); - expect(sent.length).toBe(1); - expect(sent[0].id).toBe(id); - expect(unsent.length).toBe(0); +async function confirmIdInAll(id: string) { + const sent = await eventBus.getEventsAll(); + expect(sent.length).toBeGreaterThan(0); + expect(sent.find((msg) => msg.id === id)).toBeTruthy(); } -const testMessage = new EventMessageGeneric({ eventName: 'n8n.test.message' }); -const testMessageUnsubscribed = new EventMessageGeneric({ eventName: 'n8n.test.unsub' }); -const testAuditMessage = new EventMessageAudit({ - eventName: 'n8n.audit.user.updated', - payload: { - _secret: 'secret', - public: 'public', - }, -}); +async function confirmIdSent(id: string) { + const sent = await eventBus.getEventsSent(); + expect(sent.length).toBeGreaterThan(0); + expect(sent.find((msg) => msg.id === id)).toBeTruthy(); +} beforeAll(async () => { const initResult = await testDb.init(); @@ -119,36 +114,33 @@ beforeAll(async () => { }); beforeEach(async () => { - // await testDb.truncate(['EventDestinations'], testDbName); - config.set('userManagement.disabled', false); config.set('userManagement.isInstanceOwnerSetUp', true); config.set('enterprise.features.logStreaming', false); }); afterAll(async () => { + jest.mock('@/eventbus/MessageEventBus/MessageEventBus'); await testDb.terminate(testDbName); await eventBus.close(); }); -test('should have a running logwriter process', async () => { - const thread = eventBus.logWriter.getThread(); +test('should have a running logwriter process', () => { + const thread = eventBus.logWriter.worker; expect(thread).toBeDefined(); }); -test('should have a clean log', async () => { - await eventBus.logWriter.getThread()?.cleanLogs(); - const allMessages = await eventBus.getEventsAll(); - expect(allMessages.length).toBe(0); -}); - test('should have logwriter log messages', async () => { + const testMessage = new EventMessageGeneric({ eventName: 'n8n.test.message', id: uuid() }); await eventBus.send(testMessage); - const sent = await eventBus.getEventsSent(); - const unsent = await eventBus.getEventsUnsent(); - expect(sent.length).toBeGreaterThan(0); - expect(unsent.length).toBe(0); - expect(sent.find((e) => e.id === testMessage.id)).toEqual(testMessage); + await new Promise((resolve) => { + eventBus.logWriter.worker?.once('message', async (msg: { command: string; data: any }) => { + expect(msg.command).toBe('appendMessageToLog'); + expect(msg.data).toBe(true); + await confirmIdInAll(testMessage.id); + resolve(true); + }); + }); }); test('GET /eventbus/destination should fail due to missing authentication', async () => { @@ -189,8 +181,9 @@ test('GET /eventbus/destination all returned destinations should exist in eventb }); test('should send message to syslog ', async () => { + const testMessage = new EventMessageGeneric({ eventName: 'n8n.test.message', id: uuid() }); config.set('enterprise.features.logStreaming', true); - await cleanLogs(); + // await cleanLogs(); const syslogDestination = eventBus.destinations[ testSyslogDestination.id! @@ -208,17 +201,31 @@ test('should send message to syslog ', async () => { }); await eventBus.send(testMessage); - - await new Promise((resolve) => setTimeout(resolve, 100)); - expect(mockedSyslogClientLog).toHaveBeenCalled(); - await confirmIdsSentUnsent(testMessage.id); - - syslogDestination.disable(); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler001(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + await confirmIdInAll(testMessage.id); + } else if (msg.command === 'confirmMessageSent') { + await confirmIdSent(testMessage.id); + expect(mockedSyslogClientLog).toHaveBeenCalled(); + syslogDestination.disable(); + eventBus.logWriter.worker?.removeListener('message', handler001); + resolve(true); + } + }, + ); + }); }); test('should confirm send message if there are no subscribers', async () => { + const testMessageUnsubscribed = new EventMessageGeneric({ + eventName: 'n8n.test.unsub', + id: uuid(), + }); config.set('enterprise.features.logStreaming', true); - await cleanLogs(); + // await cleanLogs(); const syslogDestination = eventBus.destinations[ testSyslogDestination.id! @@ -228,25 +235,40 @@ test('should confirm send message if there are no subscribers', async () => { const mockedSyslogClientLog = jest.spyOn(syslogDestination.client, 'log'); mockedSyslogClientLog.mockImplementation((_m, _options, _cb) => { - eventBus.confirmSent(testMessage, { - id: syslogDestination.id, - name: syslogDestination.label, - }); return syslogDestination.client; }); await eventBus.send(testMessageUnsubscribed); - await new Promise((resolve) => setTimeout(resolve, 100)); - expect(mockedSyslogClientLog).toHaveBeenCalled(); - await confirmIdsSentUnsent(testMessageUnsubscribed.id); - - syslogDestination.disable(); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler002(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + await confirmIdInAll(testMessageUnsubscribed.id); + } else if (msg.command === 'confirmMessageSent') { + await confirmIdSent(testMessageUnsubscribed.id); + expect(mockedSyslogClientLog).toHaveBeenCalled(); + syslogDestination.disable(); + eventBus.logWriter.worker?.removeListener('message', handler002); + resolve(true); + } + }, + ); + }); }); test('should anonymize audit message to syslog ', async () => { + const testAuditMessage = new EventMessageAudit({ + eventName: 'n8n.audit.user.updated', + payload: { + _secret: 'secret', + public: 'public', + }, + id: uuid(), + }); config.set('enterprise.features.logStreaming', true); - await cleanLogs(); + // await cleanLogs(); const syslogDestination = eventBus.destinations[ testSyslogDestination.id! @@ -269,18 +291,44 @@ test('should anonymize audit message to syslog ', async () => { syslogDestination.anonymizeAuditMessages = true; await eventBus.send(testAuditMessage); - expect(mockedSyslogClientLog).toHaveBeenCalled(); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler005(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + const sent = await eventBus.getEventsAll(); + await confirmIdInAll(testAuditMessage.id); + expect(mockedSyslogClientLog).toHaveBeenCalled(); + eventBus.logWriter.worker?.removeListener('message', handler005); + resolve(true); + } + }, + ); + }); syslogDestination.anonymizeAuditMessages = false; await eventBus.send(testAuditMessage); - expect(mockedSyslogClientLog).toHaveBeenCalled(); - - syslogDestination.disable(); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler006(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + const sent = await eventBus.getEventsAll(); + await confirmIdInAll(testAuditMessage.id); + expect(mockedSyslogClientLog).toHaveBeenCalled(); + syslogDestination.disable(); + eventBus.logWriter.worker?.removeListener('message', handler006); + resolve(true); + } + }, + ); + }); }); test('should send message to webhook ', async () => { + const testMessage = new EventMessageGeneric({ eventName: 'n8n.test.message', id: uuid() }); config.set('enterprise.features.logStreaming', true); - await cleanLogs(); + // await cleanLogs(); const webhookDestination = eventBus.destinations[ testWebhookDestination.id! @@ -292,16 +340,28 @@ test('should send message to webhook ', async () => { mockedAxios.request.mockResolvedValue({ status: 200, data: { msg: 'OK' } }); await eventBus.send(testMessage); - // not elegant, but since communication happens through emitters, we'll wait for a bit - await new Promise((resolve) => setTimeout(resolve, 100)); - await confirmIdsSentUnsent(testMessage.id); - - webhookDestination.disable(); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler003(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + await confirmIdInAll(testMessage.id); + } else if (msg.command === 'confirmMessageSent') { + await confirmIdSent(testMessage.id); + expect(mockedAxios.request).toHaveBeenCalled(); + webhookDestination.disable(); + eventBus.logWriter.worker?.removeListener('message', handler003); + resolve(true); + } + }, + ); + }); }); test('should send message to sentry ', async () => { + const testMessage = new EventMessageGeneric({ eventName: 'n8n.test.message', id: uuid() }); config.set('enterprise.features.logStreaming', true); - await cleanLogs(); + // await cleanLogs(); const sentryDestination = eventBus.destinations[ testSentryDestination.id! @@ -319,12 +379,22 @@ test('should send message to sentry ', async () => { }); await eventBus.send(testMessage); - // not elegant, but since communication happens through emitters, we'll wait for a bit - await new Promise((resolve) => setTimeout(resolve, 100)); - expect(mockedSentryCaptureMessage).toHaveBeenCalled(); - await confirmIdsSentUnsent(testMessage.id); - - sentryDestination.disable(); + await new Promise((resolve) => { + eventBus.logWriter.worker?.on( + 'message', + async function handler004(msg: { command: string; data: any }) { + if (msg.command === 'appendMessageToLog') { + await confirmIdInAll(testMessage.id); + } else if (msg.command === 'confirmMessageSent') { + await confirmIdSent(testMessage.id); + expect(mockedSentryCaptureMessage).toHaveBeenCalled(); + sentryDestination.disable(); + eventBus.logWriter.worker?.removeListener('message', handler004); + resolve(true); + } + }, + ); + }); }); test('DEL /eventbus/destination delete all destinations by id', async () => { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ce3498ae22..4c95b18b66 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -214,7 +214,6 @@ importers: supertest: ^6.2.2 swagger-ui-express: ^4.3.0 syslog-client: ^1.1.1 - threads: ^1.7.0 ts-node: ^9.1.1 tsc-alias: ^1.7.0 tsconfig-paths: ^3.14.1 @@ -304,7 +303,6 @@ importers: sse-channel: 4.0.0 swagger-ui-express: 4.5.0_express@4.18.2 syslog-client: 1.1.1 - threads: 1.7.0 tslib: 1.14.1 typeorm: 0.2.45_b2izk5tn6tm5xb65gvog337urq uuid: 8.3.2 @@ -11304,12 +11302,6 @@ packages: - supports-color dev: true - /esm/3.2.25: - resolution: {integrity: sha512-U1suiZ2oDVWv4zPO56S0NcR5QriEahGtdN2OR6FiOG4WJvcjBVFB0qI4+eKoWFH483PKGuLuu6V8Z4T5g63UVA==} - engines: {node: '>=6'} - dev: false - optional: true - /espree/6.2.1: resolution: {integrity: sha512-ysCxRQY3WaXJz9tdbWOwuWr5Y/XrPTGX9Kiz3yoUXwW0VZ4w30HTkQLaGx/+ttFjF8i+ACbArnB4ce68a9m5hw==} engines: {node: '>=6.0.0'} @@ -13635,11 +13627,6 @@ packages: resolution: {integrity: sha512-2rRIahhZr2UWb45fIOuvZGpFtz0TyOZLf32KxBbSoUCeZR495zCKlWUKKUByk3geS2eAs7ZAABt0Y/Rx0GiQGA==} dev: true - /is-observable/2.1.0: - resolution: {integrity: sha512-DailKdLb0WU+xX8K5w7VsJhapwHLZ9jjmazqCJq4X12CTgqq73TKnbRcnSLuXYPOoLQgV5IrD7ePiX/h1vnkBw==} - engines: {node: '>=8'} - dev: false - /is-path-inside/3.0.3: resolution: {integrity: sha512-Fd4gABb+ycGAmKou8eMftCupSir5lRxqf4aD/vd0cD2qc4HL07OjCeuHMr8Ro4CoMaeCKDB0/ECBOVWjTwUvPQ==} engines: {node: '>=8'} @@ -16632,10 +16619,6 @@ packages: es-abstract: 1.20.4 dev: true - /observable-fns/0.6.1: - resolution: {integrity: sha512-9gRK4+sRWzeN6AOewNBTLXir7Zl/i3GB6Yl26gK4flxz8BXVpD3kt8amREmWNb0mxYOGDotvE5a4N+PtGGKdkg==} - dev: false - /on-finished/2.4.1: resolution: {integrity: sha512-oVlzkg3ENAhCk2zdv7IJwd/QUD4z2RxRwpkcGY8psCVcCYZNq4wYnVWALHM+brtuJjePWiYF/ClmuDr8Ch5+kg==} engines: {node: '>= 0.8'} @@ -20391,19 +20374,6 @@ packages: any-promise: 1.3.0 dev: false - /threads/1.7.0: - resolution: {integrity: sha512-Mx5NBSHX3sQYR6iI9VYbgHKBLisyB+xROCBGjjWm1O9wb9vfLxdaGtmT/KCjUqMsSNW6nERzCW3T6H43LqjDZQ==} - dependencies: - callsites: 3.1.0 - debug: 4.3.4 - is-observable: 2.1.0 - observable-fns: 0.6.1 - optionalDependencies: - tiny-worker: 2.3.0 - transitivePeerDependencies: - - supports-color - dev: false - /throttle-debounce/1.1.0: resolution: {integrity: sha512-XH8UiPCQcWNuk2LYePibW/4qL97+ZQ1AN3FNXwZRBNPPowo/NRU5fAlDCSNBJIYCKbioZfuYtMhG4quqoJhVzg==} engines: {node: '>=4'} @@ -20455,13 +20425,6 @@ packages: globrex: 0.1.2 dev: true - /tiny-worker/2.3.0: - resolution: {integrity: sha512-pJ70wq5EAqTAEl9IkGzA+fN0836rycEuz2Cn6yeZ6FRzlVS5IDOkFHpIoEsksPRQV34GDqXm65+OlnZqUSyK2g==} - dependencies: - esm: 3.2.25 - dev: false - optional: true - /tinycolor2/1.4.2: resolution: {integrity: sha512-vJhccZPs965sV/L2sU4oRQVAos0pQXwsvTLkWYdqJ+a8Q5kPFzJTuOFwy7UniPli44NKQGAglksjvOcpo95aZA==} dev: false