2023-01-04 00:47:48 -08:00
|
|
|
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
|
|
|
/* eslint-disable @typescript-eslint/no-explicit-any */
|
|
|
|
import { isEventMessageOptions } from '../EventMessageClasses/AbstractEventMessage';
|
|
|
|
import { UserSettings } from 'n8n-core';
|
|
|
|
import path, { parse } from 'path';
|
2023-01-13 06:39:25 -08:00
|
|
|
import { Worker } from 'worker_threads';
|
|
|
|
import { createReadStream, existsSync, rmSync } from 'fs';
|
2023-01-04 00:47:48 -08:00
|
|
|
import readline from 'readline';
|
|
|
|
import { jsonParse, LoggerProxy } from 'n8n-workflow';
|
|
|
|
import remove from 'lodash.remove';
|
|
|
|
import config from '@/config';
|
|
|
|
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
|
|
|
|
import type { EventMessageReturnMode } from '../MessageEventBus/MessageEventBus';
|
|
|
|
import type { EventMessageTypes } from '../EventMessageClasses';
|
2023-01-27 05:56:56 -08:00
|
|
|
import type { EventMessageConfirmSource } from '../EventMessageClasses/EventMessageConfirm';
|
2023-01-04 00:47:48 -08:00
|
|
|
import {
|
|
|
|
EventMessageConfirm,
|
|
|
|
isEventMessageConfirm,
|
|
|
|
} from '../EventMessageClasses/EventMessageConfirm';
|
|
|
|
import { once as eventOnce } from 'events';
|
2023-01-13 06:39:25 -08:00
|
|
|
import { inTest } from '../../constants';
|
2023-01-04 00:47:48 -08:00
|
|
|
|
2023-01-13 06:39:25 -08:00
|
|
|
interface MessageEventBusLogWriterConstructorOptions {
|
2023-01-04 00:47:48 -08:00
|
|
|
logBaseName?: string;
|
|
|
|
logBasePath?: string;
|
2023-01-13 06:39:25 -08:00
|
|
|
keepNumberOfFiles?: number;
|
2023-01-04 00:47:48 -08:00
|
|
|
maxFileSizeInKB?: number;
|
|
|
|
}
|
|
|
|
|
2023-01-13 06:39:25 -08:00
|
|
|
export interface MessageEventBusLogWriterOptions {
|
|
|
|
logFullBasePath: string;
|
|
|
|
keepNumberOfFiles: number;
|
|
|
|
maxFileSizeInKB: number;
|
|
|
|
}
|
|
|
|
|
2023-01-11 05:09:09 -08:00
|
|
|
interface ReadMessagesFromLogFileResult {
|
|
|
|
loggedMessages: EventMessageTypes[];
|
|
|
|
sentMessages: EventMessageTypes[];
|
2023-02-17 01:54:07 -08:00
|
|
|
unfinishedExecutions: Record<string, EventMessageTypes[]>;
|
2023-01-11 05:09:09 -08:00
|
|
|
}
|
|
|
|
|
2023-01-04 00:47:48 -08:00
|
|
|
/**
|
|
|
|
* MessageEventBusWriter for Files
|
|
|
|
*/
|
|
|
|
export class MessageEventBusLogWriter {
|
|
|
|
private static instance: MessageEventBusLogWriter;
|
|
|
|
|
|
|
|
static options: Required<MessageEventBusLogWriterOptions>;
|
|
|
|
|
2023-01-13 06:39:25 -08:00
|
|
|
private _worker: Worker | undefined;
|
|
|
|
|
|
|
|
public get worker(): Worker | undefined {
|
|
|
|
return this._worker;
|
|
|
|
}
|
2023-01-04 00:47:48 -08:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Instantiates the Writer and the corresponding worker thread.
|
|
|
|
* To actually start logging, call startLogging() function on the instance.
|
|
|
|
*
|
|
|
|
* **Note** that starting to log will archive existing logs, so handle unsent events first before calling startLogging()
|
|
|
|
*/
|
|
|
|
static async getInstance(
|
2023-01-13 06:39:25 -08:00
|
|
|
options?: MessageEventBusLogWriterConstructorOptions,
|
2023-01-04 00:47:48 -08:00
|
|
|
): Promise<MessageEventBusLogWriter> {
|
|
|
|
if (!MessageEventBusLogWriter.instance) {
|
|
|
|
MessageEventBusLogWriter.instance = new MessageEventBusLogWriter();
|
|
|
|
MessageEventBusLogWriter.options = {
|
2023-01-13 06:39:25 -08:00
|
|
|
logFullBasePath: path.join(
|
|
|
|
options?.logBasePath ?? UserSettings.getUserN8nFolderPath(),
|
|
|
|
options?.logBaseName ?? config.getEnv('eventBus.logWriter.logBaseName'),
|
|
|
|
),
|
|
|
|
keepNumberOfFiles:
|
|
|
|
options?.keepNumberOfFiles ?? config.getEnv('eventBus.logWriter.keepLogCount'),
|
2023-01-04 00:47:48 -08:00
|
|
|
maxFileSizeInKB:
|
|
|
|
options?.maxFileSizeInKB ?? config.getEnv('eventBus.logWriter.maxFileSizeInKB'),
|
|
|
|
};
|
|
|
|
await MessageEventBusLogWriter.instance.startThread();
|
|
|
|
}
|
|
|
|
return MessageEventBusLogWriter.instance;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* First archives existing log files one history level upwards,
|
|
|
|
* then starts logging events into a fresh event log
|
|
|
|
*/
|
2023-01-13 06:39:25 -08:00
|
|
|
startLogging() {
|
|
|
|
if (this.worker) {
|
|
|
|
this.worker.postMessage({ command: 'startLogging', data: {} });
|
|
|
|
}
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Pauses all logging. Events are still received by the worker, they just are not logged any more
|
|
|
|
*/
|
|
|
|
async pauseLogging() {
|
2023-01-13 06:39:25 -08:00
|
|
|
if (this.worker) {
|
|
|
|
this.worker.postMessage({ command: 'pauseLogging', data: {} });
|
|
|
|
}
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
private async startThread() {
|
|
|
|
if (this.worker) {
|
|
|
|
await this.close();
|
|
|
|
}
|
|
|
|
await MessageEventBusLogWriter.instance.spawnThread();
|
2023-01-13 06:39:25 -08:00
|
|
|
if (this.worker) {
|
|
|
|
this.worker.postMessage({ command: 'initialize', data: MessageEventBusLogWriter.options });
|
|
|
|
}
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
private async spawnThread(): Promise<boolean> {
|
2023-01-13 06:39:25 -08:00
|
|
|
const parsedName = parse(__filename);
|
|
|
|
let workerFileName;
|
|
|
|
if (inTest) {
|
|
|
|
workerFileName = './dist/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.js';
|
|
|
|
} else {
|
|
|
|
workerFileName = path.join(parsedName.dir, `${parsedName.name}Worker${parsedName.ext}`);
|
|
|
|
}
|
|
|
|
this._worker = new Worker(workerFileName);
|
2023-01-04 00:47:48 -08:00
|
|
|
if (this.worker) {
|
2023-01-13 06:39:25 -08:00
|
|
|
this.worker.on('messageerror', async (error) => {
|
|
|
|
LoggerProxy.error('Event Bus Log Writer thread error, attempting to restart...', error);
|
2023-01-04 00:47:48 -08:00
|
|
|
await MessageEventBusLogWriter.instance.startThread();
|
|
|
|
});
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
async close(): Promise<void> {
|
|
|
|
if (this.worker) {
|
2023-01-13 06:39:25 -08:00
|
|
|
await this.worker.terminate();
|
|
|
|
this._worker = undefined;
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-13 06:39:25 -08:00
|
|
|
putMessage(msg: EventMessageTypes): void {
|
2023-01-04 00:47:48 -08:00
|
|
|
if (this.worker) {
|
2023-01-13 06:39:25 -08:00
|
|
|
this.worker.postMessage({ command: 'appendMessageToLog', data: msg.serialize() });
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-13 06:39:25 -08:00
|
|
|
confirmMessageSent(msgId: string, source?: EventMessageConfirmSource): void {
|
2023-01-04 00:47:48 -08:00
|
|
|
if (this.worker) {
|
2023-01-13 06:39:25 -08:00
|
|
|
this.worker.postMessage({
|
|
|
|
command: 'confirmMessageSent',
|
|
|
|
data: new EventMessageConfirm(msgId, source).serialize(),
|
|
|
|
});
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async getMessages(
|
|
|
|
mode: EventMessageReturnMode = 'all',
|
2023-01-11 05:09:09 -08:00
|
|
|
logHistory = 1,
|
|
|
|
): Promise<ReadMessagesFromLogFileResult> {
|
|
|
|
const results: ReadMessagesFromLogFileResult = {
|
2023-01-04 00:47:48 -08:00
|
|
|
loggedMessages: [],
|
|
|
|
sentMessages: [],
|
2023-02-17 01:54:07 -08:00
|
|
|
unfinishedExecutions: {},
|
2023-01-04 00:47:48 -08:00
|
|
|
};
|
2023-03-21 04:11:59 -07:00
|
|
|
const configLogCount = config.get('eventBus.logWriter.keepLogCount');
|
|
|
|
const logCount = logHistory ? Math.min(configLogCount, logHistory) : configLogCount;
|
2023-01-11 05:09:09 -08:00
|
|
|
for (let i = logCount; i >= 0; i--) {
|
2023-01-13 06:39:25 -08:00
|
|
|
const logFileName = this.getLogFileName(i);
|
2023-01-11 05:09:09 -08:00
|
|
|
if (logFileName) {
|
|
|
|
await this.readLoggedMessagesFromFile(results, mode, logFileName);
|
|
|
|
}
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
2023-01-11 05:09:09 -08:00
|
|
|
|
|
|
|
return results;
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
async readLoggedMessagesFromFile(
|
2023-01-11 05:09:09 -08:00
|
|
|
results: ReadMessagesFromLogFileResult,
|
2023-01-04 00:47:48 -08:00
|
|
|
mode: EventMessageReturnMode,
|
|
|
|
logFileName: string,
|
2023-01-11 05:09:09 -08:00
|
|
|
): Promise<ReadMessagesFromLogFileResult> {
|
2023-01-04 00:47:48 -08:00
|
|
|
if (logFileName && existsSync(logFileName)) {
|
|
|
|
try {
|
|
|
|
const rl = readline.createInterface({
|
|
|
|
input: createReadStream(logFileName),
|
|
|
|
crlfDelay: Infinity,
|
|
|
|
});
|
|
|
|
rl.on('line', (line) => {
|
|
|
|
try {
|
|
|
|
const json = jsonParse(line);
|
|
|
|
if (isEventMessageOptions(json) && json.__type !== undefined) {
|
|
|
|
const msg = getEventMessageObjectByType(json);
|
|
|
|
if (msg !== null) results.loggedMessages.push(msg);
|
2023-02-17 01:54:07 -08:00
|
|
|
if (msg?.eventName && msg.payload?.executionId) {
|
|
|
|
const executionId = msg.payload.executionId as string;
|
|
|
|
switch (msg.eventName) {
|
|
|
|
case 'n8n.workflow.started':
|
|
|
|
if (!Object.keys(results.unfinishedExecutions).includes(executionId)) {
|
|
|
|
results.unfinishedExecutions[executionId] = [];
|
|
|
|
}
|
|
|
|
results.unfinishedExecutions[executionId] = [msg];
|
|
|
|
break;
|
|
|
|
case 'n8n.workflow.success':
|
|
|
|
case 'n8n.workflow.failed':
|
|
|
|
case 'n8n.workflow.crashed':
|
|
|
|
delete results.unfinishedExecutions[executionId];
|
|
|
|
break;
|
|
|
|
case 'n8n.node.started':
|
|
|
|
case 'n8n.node.finished':
|
|
|
|
if (!Object.keys(results.unfinishedExecutions).includes(executionId)) {
|
|
|
|
results.unfinishedExecutions[executionId] = [];
|
|
|
|
}
|
|
|
|
results.unfinishedExecutions[executionId].push(msg);
|
|
|
|
break;
|
|
|
|
}
|
2023-01-11 05:09:09 -08:00
|
|
|
}
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
if (isEventMessageConfirm(json) && mode !== 'all') {
|
|
|
|
const removedMessage = remove(results.loggedMessages, (e) => e.id === json.confirm);
|
|
|
|
if (mode === 'sent') {
|
|
|
|
results.sentMessages.push(...removedMessage);
|
|
|
|
}
|
|
|
|
}
|
2023-02-17 01:54:07 -08:00
|
|
|
} catch (error) {
|
2023-01-04 00:47:48 -08:00
|
|
|
LoggerProxy.error(
|
2023-02-17 01:54:07 -08:00
|
|
|
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
|
|
|
|
`Error reading line messages from file: ${logFileName}, line: ${line}, ${error.message}}`,
|
2023-01-04 00:47:48 -08:00
|
|
|
);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
// wait for stream to finish before continue
|
|
|
|
await eventOnce(rl, 'close');
|
|
|
|
} catch {
|
|
|
|
LoggerProxy.error(`Error reading logged messages from file: ${logFileName}`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return results;
|
|
|
|
}
|
|
|
|
|
2023-01-13 06:39:25 -08:00
|
|
|
getLogFileName(counter?: number): string {
|
|
|
|
if (counter) {
|
|
|
|
return `${MessageEventBusLogWriter.options.logFullBasePath}-${counter}.log`;
|
|
|
|
} else {
|
|
|
|
return `${MessageEventBusLogWriter.options.logFullBasePath}.log`;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
cleanAllLogs() {
|
|
|
|
for (let i = 0; i <= MessageEventBusLogWriter.options.keepNumberOfFiles; i++) {
|
|
|
|
if (existsSync(this.getLogFileName(i))) {
|
|
|
|
rmSync(this.getLogFileName(i));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-11 05:09:09 -08:00
|
|
|
async getMessagesByExecutionId(
|
|
|
|
executionId: string,
|
|
|
|
logHistory?: number,
|
|
|
|
): Promise<EventMessageTypes[]> {
|
|
|
|
const result: EventMessageTypes[] = [];
|
2023-03-21 04:11:59 -07:00
|
|
|
const configLogCount = config.get('eventBus.logWriter.keepLogCount');
|
|
|
|
const logCount = logHistory ? Math.min(configLogCount, logHistory) : configLogCount;
|
2023-01-11 05:09:09 -08:00
|
|
|
for (let i = 0; i < logCount; i++) {
|
2023-01-13 06:39:25 -08:00
|
|
|
const logFileName = this.getLogFileName(i);
|
2023-01-11 05:09:09 -08:00
|
|
|
if (logFileName) {
|
|
|
|
result.push(...(await this.readFromFileByExecutionId(executionId, logFileName)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return result.sort((a, b) => a.ts.diff(b.ts).toMillis());
|
|
|
|
}
|
|
|
|
|
|
|
|
async readFromFileByExecutionId(
|
|
|
|
executionId: string,
|
|
|
|
logFileName: string,
|
|
|
|
): Promise<EventMessageTypes[]> {
|
|
|
|
const messages: EventMessageTypes[] = [];
|
|
|
|
if (logFileName && existsSync(logFileName)) {
|
|
|
|
try {
|
|
|
|
const rl = readline.createInterface({
|
|
|
|
input: createReadStream(logFileName),
|
|
|
|
crlfDelay: Infinity,
|
|
|
|
});
|
|
|
|
rl.on('line', (line) => {
|
|
|
|
try {
|
|
|
|
const json = jsonParse(line);
|
|
|
|
if (
|
|
|
|
isEventMessageOptions(json) &&
|
|
|
|
json.__type !== undefined &&
|
|
|
|
json.payload?.executionId === executionId
|
|
|
|
) {
|
|
|
|
const msg = getEventMessageObjectByType(json);
|
|
|
|
if (msg !== null) messages.push(msg);
|
|
|
|
}
|
|
|
|
} catch {
|
|
|
|
LoggerProxy.error(
|
|
|
|
`Error reading line messages from file: ${logFileName}, line: ${line}`,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
// wait for stream to finish before continue
|
|
|
|
await eventOnce(rl, 'close');
|
|
|
|
} catch {
|
|
|
|
LoggerProxy.error(`Error reading logged messages from file: ${logFileName}`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return messages;
|
|
|
|
}
|
|
|
|
|
|
|
|
async getMessagesAll(): Promise<EventMessageTypes[]> {
|
|
|
|
return (await this.getMessages('all')).loggedMessages;
|
|
|
|
}
|
|
|
|
|
2023-01-04 00:47:48 -08:00
|
|
|
async getMessagesSent(): Promise<EventMessageTypes[]> {
|
2023-01-11 05:09:09 -08:00
|
|
|
return (await this.getMessages('sent')).sentMessages;
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
async getMessagesUnsent(): Promise<EventMessageTypes[]> {
|
2023-01-11 05:09:09 -08:00
|
|
|
return (await this.getMessages('unsent')).loggedMessages;
|
|
|
|
}
|
|
|
|
|
2023-02-17 01:54:07 -08:00
|
|
|
async getUnfinishedExecutions(): Promise<Record<string, EventMessageTypes[]>> {
|
2023-01-11 05:09:09 -08:00
|
|
|
return (await this.getMessages('unfinished')).unfinishedExecutions;
|
|
|
|
}
|
|
|
|
|
|
|
|
async getUnsentAndUnfinishedExecutions(): Promise<{
|
|
|
|
unsentMessages: EventMessageTypes[];
|
2023-02-17 01:54:07 -08:00
|
|
|
unfinishedExecutions: Record<string, EventMessageTypes[]>;
|
2023-01-11 05:09:09 -08:00
|
|
|
}> {
|
|
|
|
const result = await this.getMessages('unsent');
|
|
|
|
return {
|
|
|
|
unsentMessages: result.loggedMessages,
|
|
|
|
unfinishedExecutions: result.unfinishedExecutions,
|
|
|
|
};
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
}
|