diff --git a/.vscode/launch.json b/.vscode/launch.json index 255c1f8a9d..9eca76b830 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -35,6 +35,23 @@ "outputCapture": "std", "killBehavior": "polite" }, + { + "name": "Launch n8n CLI dev with debug", + "runtimeExecutable": "pnpm", + "cwd": "${workspaceFolder}/packages/cli", + "runtimeArgs": ["run", "dev", "--", "--inspect-brk"], + "console": "integratedTerminal", + "restart": true, + "autoAttachChildProcesses": true, + "request": "launch", + "skipFiles": ["/**"], + "type": "node", + "env": { + // "N8N_PORT": "5679", + }, + "outputCapture": "std", + "killBehavior": "polite" + }, { "name": "Debug CLI tests", "cwd": "${workspaceFolder}/packages/cli", diff --git a/packages/cli/src/eventbus/EventMessageClasses/Helpers.ts b/packages/cli/src/eventbus/EventMessageClasses/Helpers.ts index 1475b74229..c482a0edc4 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/Helpers.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/Helpers.ts @@ -3,6 +3,8 @@ import { EventMessageGeneric, EventMessageGenericOptions } from './EventMessageG import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions'; import { EventMessageWorkflow, EventMessageWorkflowOptions } from './EventMessageWorkflow'; import { EventMessageTypeNames } from 'n8n-workflow'; +import { EventMessageAudit, EventMessageAuditOptions } from './EventMessageAudit'; +import { EventMessageNode, EventMessageNodeOptions } from './EventMessageNode'; export const getEventMessageObjectByType = ( message: AbstractEventMessageOptions, @@ -12,6 +14,10 @@ export const getEventMessageObjectByType = ( return new EventMessageGeneric(message as EventMessageGenericOptions); case EventMessageTypeNames.workflow: return new EventMessageWorkflow(message as EventMessageWorkflowOptions); + case EventMessageTypeNames.audit: + return new EventMessageAudit(message as EventMessageAuditOptions); + case EventMessageTypeNames.node: + return new EventMessageNode(message as EventMessageNodeOptions); default: return null; } diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index b28a7d2bd1..c21c0e2d3e 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -24,7 +24,7 @@ import { eventMessageGenericDestinationTestEvent, } from '../EventMessageClasses/EventMessageGeneric'; -export type EventMessageReturnMode = 'sent' | 'unsent' | 'all'; +export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; class MessageEventBus extends EventEmitter { private static instance: MessageEventBus; @@ -89,14 +89,20 @@ class MessageEventBus extends EventEmitter { // - cycle event logs and start the logging to a fresh file // - retry sending events LoggerProxy.debug('Checking for unsent event messages'); - const unsentMessages = await this.getEventsUnsent(); + const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions(); LoggerProxy.debug( `Start logging into ${ (await this.logWriter?.getThread()?.getLogFileName()) ?? 'unknown filename' } `, ); await this.logWriter?.startLogging(); - await this.send(unsentMessages); + await this.send(unsentAndUnfinished.unsentMessages); + + if (unsentAndUnfinished.unfinishedExecutions.size > 0) { + for (const executionId of unsentAndUnfinished.unfinishedExecutions) { + LoggerProxy.debug(`Found unfinished execution ${executionId} in event log(s)`); + } + } // if configured, run this test every n ms if (config.getEnv('eventBus.checkUnsentInterval') > 0) { @@ -190,6 +196,15 @@ class MessageEventBus extends EventEmitter { await this.logWriter?.confirmMessageSent(msg.id, source); } + private hasAnyDestinationSubscribedToEvent(msg: EventMessageTypes): boolean { + for (const destinationName of Object.keys(this.destinations)) { + if (this.destinations[destinationName].hasSubscribedToEvent(msg)) { + return true; + } + } + return false; + } + private async emitMessage(msg: EventMessageTypes) { // generic emit for external modules to capture events // this is for internal use ONLY and not for use with custom destinations! @@ -198,7 +213,11 @@ class MessageEventBus extends EventEmitter { LoggerProxy.debug(`Listeners: ${this.eventNames().join(',')}`); // if there are no set up destinations, immediately mark the event as sent - if (!isLogStreamingEnabled() || Object.keys(this.destinations).length === 0) { + if ( + !isLogStreamingEnabled() || + Object.keys(this.destinations).length === 0 || + !this.hasAnyDestinationSubscribedToEvent(msg) + ) { await this.confirmSent(msg, { id: '0', name: 'eventBus' }); } else { for (const destinationName of Object.keys(this.destinations)) { @@ -207,32 +226,50 @@ class MessageEventBus extends EventEmitter { } } - async getEvents(mode: EventMessageReturnMode = 'all'): Promise { - let queryResult: EventMessageTypes[]; - switch (mode) { - case 'all': - queryResult = await this.logWriter?.getMessages(); - break; - case 'sent': - queryResult = await this.logWriter?.getMessagesSent(); - break; - case 'unsent': - queryResult = await this.logWriter?.getMessagesUnsent(); - } + async getEventsAll(): Promise { + const queryResult = await this.logWriter?.getMessagesAll(); const filtered = uniqby(queryResult, 'id'); return filtered; } async getEventsSent(): Promise { - const sentMessages = await this.getEvents('sent'); - return sentMessages; + const queryResult = await this.logWriter?.getMessagesSent(); + const filtered = uniqby(queryResult, 'id'); + return filtered; } async getEventsUnsent(): Promise { - const unSentMessages = await this.getEvents('unsent'); - return unSentMessages; + const queryResult = await this.logWriter?.getMessagesUnsent(); + const filtered = uniqby(queryResult, 'id'); + return filtered; } + async getUnfinishedExecutions(): Promise> { + const queryResult = await this.logWriter?.getUnfinishedExecutions(); + return queryResult; + } + + async getUnsentAndUnfinishedExecutions(): Promise<{ + unsentMessages: EventMessageTypes[]; + unfinishedExecutions: Set; + }> { + const queryResult = await this.logWriter?.getUnsentAndUnfinishedExecutions(); + return queryResult; + } + + /** + * This will pull all events for a given execution id from the event log files. Note that this can be a very expensive operation, depending on the number of events and the size of the log files. + * @param executionId id to look for + * @param logHistory defaults to 1, which means it will look at the current log file AND the previous one. + * @returns Array of EventMessageTypes + */ + async getEventsByExecutionId( + executionId: string, + logHistory?: number, + ): Promise { + const result = await this.logWriter?.getMessagesByExecutionId(executionId, logHistory); + return result; + } /** * Convenience Methods */ diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts index b8c675ac23..1a4632eed6 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts @@ -28,6 +28,12 @@ interface MessageEventBusLogWriterOptions { maxFileSizeInKB?: number; } +interface ReadMessagesFromLogFileResult { + loggedMessages: EventMessageTypes[]; + sentMessages: EventMessageTypes[]; + unfinishedExecutions: Set; +} + /** * MessageEventBusWriter for Files */ @@ -138,45 +144,31 @@ export class MessageEventBusLogWriter { async getMessages( mode: EventMessageReturnMode = 'all', - includePreviousLog = true, - ): Promise { - const logFileName0 = await MessageEventBusLogWriter.instance.getThread()?.getLogFileName(); - const logFileName1 = includePreviousLog - ? await MessageEventBusLogWriter.instance.getThread()?.getLogFileName(1) - : undefined; - const results: { - loggedMessages: EventMessageTypes[]; - sentMessages: EventMessageTypes[]; - } = { + logHistory = 1, + ): Promise { + const results: ReadMessagesFromLogFileResult = { loggedMessages: [], sentMessages: [], + unfinishedExecutions: new Set(), }; - if (logFileName0) { - await this.readLoggedMessagesFromFile(results, mode, logFileName0); - } - if (logFileName1) { - await this.readLoggedMessagesFromFile(results, mode, logFileName1); - } - switch (mode) { - case 'all': - case 'unsent': - return results.loggedMessages; - case 'sent': - return results.sentMessages; + const logCount = logHistory + ? Math.min(config.get('eventBus.logWriter.keepLogCount') as number, logHistory) + : (config.get('eventBus.logWriter.keepLogCount') as number); + for (let i = logCount; i >= 0; i--) { + const logFileName = await MessageEventBusLogWriter.instance.getThread()?.getLogFileName(i); + if (logFileName) { + await this.readLoggedMessagesFromFile(results, mode, logFileName); + } } + + return results; } async readLoggedMessagesFromFile( - results: { - loggedMessages: EventMessageTypes[]; - sentMessages: EventMessageTypes[]; - }, + results: ReadMessagesFromLogFileResult, mode: EventMessageReturnMode, logFileName: string, - ): Promise<{ - loggedMessages: EventMessageTypes[]; - sentMessages: EventMessageTypes[]; - }> { + ): Promise { if (logFileName && existsSync(logFileName)) { try { const rl = readline.createInterface({ @@ -189,6 +181,15 @@ export class MessageEventBusLogWriter { if (isEventMessageOptions(json) && json.__type !== undefined) { const msg = getEventMessageObjectByType(json); if (msg !== null) results.loggedMessages.push(msg); + if (msg?.eventName === 'n8n.workflow.started' && msg?.payload?.executionId) { + results.unfinishedExecutions.add(msg?.payload?.executionId as string); + } else if ( + (msg?.eventName === 'n8n.workflow.success' || + msg?.eventName === 'n8n.workflow.failed') && + msg?.payload?.executionId + ) { + results.unfinishedExecutions.delete(msg?.payload?.executionId as string); + } } if (isEventMessageConfirm(json) && mode !== 'all') { const removedMessage = remove(results.loggedMessages, (e) => e.id === json.confirm); @@ -211,11 +212,84 @@ export class MessageEventBusLogWriter { return results; } + async getMessagesByExecutionId( + executionId: string, + logHistory?: number, + ): Promise { + const result: EventMessageTypes[] = []; + const logCount = logHistory + ? Math.min(config.get('eventBus.logWriter.keepLogCount') as number, logHistory) + : (config.get('eventBus.logWriter.keepLogCount') as number); + for (let i = 0; i < logCount; i++) { + const logFileName = await MessageEventBusLogWriter.instance.getThread()?.getLogFileName(i); + if (logFileName) { + result.push(...(await this.readFromFileByExecutionId(executionId, logFileName))); + } + } + return result.sort((a, b) => a.ts.diff(b.ts).toMillis()); + } + + async readFromFileByExecutionId( + executionId: string, + logFileName: string, + ): Promise { + const messages: EventMessageTypes[] = []; + if (logFileName && existsSync(logFileName)) { + try { + const rl = readline.createInterface({ + input: createReadStream(logFileName), + crlfDelay: Infinity, + }); + rl.on('line', (line) => { + try { + const json = jsonParse(line); + if ( + isEventMessageOptions(json) && + json.__type !== undefined && + json.payload?.executionId === executionId + ) { + const msg = getEventMessageObjectByType(json); + if (msg !== null) messages.push(msg); + } + } catch { + LoggerProxy.error( + `Error reading line messages from file: ${logFileName}, line: ${line}`, + ); + } + }); + // wait for stream to finish before continue + await eventOnce(rl, 'close'); + } catch { + LoggerProxy.error(`Error reading logged messages from file: ${logFileName}`); + } + } + return messages; + } + + async getMessagesAll(): Promise { + return (await this.getMessages('all')).loggedMessages; + } + async getMessagesSent(): Promise { - return this.getMessages('sent'); + return (await this.getMessages('sent')).sentMessages; } async getMessagesUnsent(): Promise { - return this.getMessages('unsent'); + return (await this.getMessages('unsent')).loggedMessages; + } + + async getUnfinishedExecutions(): Promise> { + return (await this.getMessages('unfinished')).unfinishedExecutions; + } + + async getUnsentAndUnfinishedExecutions(): Promise<{ + unsentMessages: EventMessageTypes[]; + unfinishedExecutions: Set; + }> { + const result = await this.getMessages('unsent'); + return { + unsentMessages: result.loggedMessages, + unfinishedExecutions: result.unfinishedExecutions, + }; } } diff --git a/packages/cli/src/eventbus/eventBusRoutes.ts b/packages/cli/src/eventbus/eventBusRoutes.ts index 42ec2911ae..b9702f9773 100644 --- a/packages/cli/src/eventbus/eventBusRoutes.ts +++ b/packages/cli/src/eventbus/eventBusRoutes.ts @@ -2,7 +2,6 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-explicit-any */ import express from 'express'; -import { ResponseHelper } from '..'; import { isEventMessageOptions } from './EventMessageClasses/AbstractEventMessage'; import { EventMessageGeneric } from './EventMessageClasses/EventMessageGeneric'; import { @@ -32,6 +31,7 @@ import { MessageEventBusDestinationOptions, } from 'n8n-workflow'; import { User } from '../databases/entities/User'; +import * as ResponseHelper from '@/ResponseHelper'; export const eventBusRouter = express.Router(); @@ -83,11 +83,27 @@ eventBusRouter.get( return eventBus.getEventsSent(); case 'unsent': return eventBus.getEventsUnsent(); + case 'unfinished': + return eventBus.getUnfinishedExecutions(); case 'all': default: + return eventBus.getEventsAll(); } } - return eventBus.getEvents(); + return eventBus.getEventsAll(); + }), +); + +eventBusRouter.get( + '/execution/:id', + ResponseHelper.send(async (req: express.Request): Promise => { + if (req.params?.id) { + let logHistory; + if (req.query?.logHistory) { + logHistory = parseInt(req.query.logHistory as string, 10); + } + return eventBus.getEventsByExecutionId(req.params.id, logHistory); + } }), ); diff --git a/packages/cli/test/integration/eventbus.test.ts b/packages/cli/test/integration/eventbus.test.ts index ea5801b29d..65a420be80 100644 --- a/packages/cli/test/integration/eventbus.test.ts +++ b/packages/cli/test/integration/eventbus.test.ts @@ -64,17 +64,16 @@ const testSentryDestination: MessageEventBusDestinationSentryOptions = { async function cleanLogs() { await eventBus.logWriter.getThread()?.cleanLogs(); - const allMessages = await eventBus.getEvents('all'); + const allMessages = await eventBus.getEventsAll(); expect(allMessages.length).toBe(0); } -async function confirmIdsSentUnsent() { - const sent = await eventBus.getEvents('sent'); - const unsent = await eventBus.getEvents('unsent'); +async function confirmIdsSentUnsent(id: string) { + const sent = await eventBus.getEventsSent(); + const unsent = await eventBus.getEventsUnsent(); expect(sent.length).toBe(1); - expect(sent[0].id).toBe(testMessage.id); - expect(unsent.length).toBe(1); - expect(unsent[0].id).toBe(testMessageUnsubscribed.id); + expect(sent[0].id).toBe(id); + expect(unsent.length).toBe(0); } const testMessage = new EventMessageGeneric({ eventName: 'n8n.test.message' }); @@ -139,14 +138,14 @@ test('should have a running logwriter process', async () => { test('should have a clean log', async () => { await eventBus.logWriter.getThread()?.cleanLogs(); - const allMessages = await eventBus.getEvents('all'); + const allMessages = await eventBus.getEventsAll(); expect(allMessages.length).toBe(0); }); test('should have logwriter log messages', async () => { await eventBus.send(testMessage); - const sent = await eventBus.getEvents('sent'); - const unsent = await eventBus.getEvents('unsent'); + const sent = await eventBus.getEventsSent(); + const unsent = await eventBus.getEventsUnsent(); expect(sent.length).toBeGreaterThan(0); expect(unsent.length).toBe(0); expect(sent.find((e) => e.id === testMessage.id)).toEqual(testMessage); @@ -209,11 +208,38 @@ test('should send message to syslog ', async () => { }); await eventBus.send(testMessage); + + await new Promise((resolve) => setTimeout(resolve, 100)); + expect(mockedSyslogClientLog).toHaveBeenCalled(); + await confirmIdsSentUnsent(testMessage.id); + + syslogDestination.disable(); +}); + +test('should confirm send message if there are no subscribers', async () => { + config.set('enterprise.features.logStreaming', true); + await cleanLogs(); + + const syslogDestination = eventBus.destinations[ + testSyslogDestination.id! + ] as MessageEventBusDestinationSyslog; + + syslogDestination.enable(); + + const mockedSyslogClientLog = jest.spyOn(syslogDestination.client, 'log'); + mockedSyslogClientLog.mockImplementation((_m, _options, _cb) => { + eventBus.confirmSent(testMessage, { + id: syslogDestination.id, + name: syslogDestination.label, + }); + return syslogDestination.client; + }); + await eventBus.send(testMessageUnsubscribed); await new Promise((resolve) => setTimeout(resolve, 100)); expect(mockedSyslogClientLog).toHaveBeenCalled(); - await confirmIdsSentUnsent(); + await confirmIdsSentUnsent(testMessageUnsubscribed.id); syslogDestination.disable(); }); @@ -266,10 +292,9 @@ test('should send message to webhook ', async () => { mockedAxios.request.mockResolvedValue({ status: 200, data: { msg: 'OK' } }); await eventBus.send(testMessage); - await eventBus.send(testMessageUnsubscribed); // not elegant, but since communication happens through emitters, we'll wait for a bit await new Promise((resolve) => setTimeout(resolve, 100)); - await confirmIdsSentUnsent(); + await confirmIdsSentUnsent(testMessage.id); webhookDestination.disable(); }); @@ -294,11 +319,10 @@ test('should send message to sentry ', async () => { }); await eventBus.send(testMessage); - await eventBus.send(testMessageUnsubscribed); // not elegant, but since communication happens through emitters, we'll wait for a bit await new Promise((resolve) => setTimeout(resolve, 100)); expect(mockedSentryCaptureMessage).toHaveBeenCalled(); - await confirmIdsSentUnsent(); + await confirmIdsSentUnsent(testMessage.id); sentryDestination.disable(); }); diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index f8828dd670..a36fda56b8 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -37,6 +37,7 @@ import { INodeListSearchItems, NodeParameterValueType, INodeActionTypeDescription, + IAbstractEventMessage, } from 'n8n-workflow'; import { FAKE_DOOR_FEATURES } from './constants'; import { BulkCommand, Undoable } from '@/models/history'; @@ -236,6 +237,7 @@ export interface IRestApi { retryExecution(id: string, loadWorkflow?: boolean): Promise; getTimezones(): Promise; getBinaryUrl(dataPath: string, mode: 'view' | 'download'): string; + getExecutionEvents(id: string): Promise; } export interface INodeTranslationHeaders { diff --git a/packages/editor-ui/src/mixins/restApi.ts b/packages/editor-ui/src/mixins/restApi.ts index 7998f46776..ab90327c21 100644 --- a/packages/editor-ui/src/mixins/restApi.ts +++ b/packages/editor-ui/src/mixins/restApi.ts @@ -19,6 +19,7 @@ import { INodeTranslationHeaders, } from '@/Interface'; import { + IAbstractEventMessage, IDataObject, ILoadOptions, INodeCredentials, @@ -203,6 +204,11 @@ export const restApi = Vue.extend({ // Binary data getBinaryUrl: (dataPath, mode): string => self.rootStore.getRestApiContext.baseUrl + `/data/${dataPath}?mode=${mode}`, + + // Returns all the available timezones + getExecutionEvents: (id: string): Promise => { + return self.restApi().makeRestApiRequest('GET', '/eventbus/execution/' + id); + }, }; }, }, diff --git a/packages/editor-ui/src/plugins/i18n/locales/en.json b/packages/editor-ui/src/plugins/i18n/locales/en.json index d91a644b3a..ceb65c478e 100644 --- a/packages/editor-ui/src/plugins/i18n/locales/en.json +++ b/packages/editor-ui/src/plugins/i18n/locales/en.json @@ -459,6 +459,7 @@ "executionsList.showError.refreshData.title": "Problem loading data", "executionsList.showError.retryExecution.title": "Problem with retry", "executionsList.showError.stopExecution.title": "Problem stopping execution", + "executionsList.showError.getExecutionEvents.title": "Problem fetching execution events", "executionsList.showMessage.handleDeleteSelected.title": "Execution deleted", "executionsList.showMessage.retrySuccessfulFalse.title": "Retry unsuccessful", "executionsList.showMessage.retrySuccessfulTrue.title": "Retry successful", diff --git a/packages/workflow/src/MessageEventBus.ts b/packages/workflow/src/MessageEventBus.ts index 5b96be2b44..e9bf3e8dfa 100644 --- a/packages/workflow/src/MessageEventBus.ts +++ b/packages/workflow/src/MessageEventBus.ts @@ -1,3 +1,4 @@ +import { DateTime } from 'luxon'; import { INodeCredentials } from './Interfaces'; // =============================== @@ -7,7 +8,6 @@ import { INodeCredentials } from './Interfaces'; export enum EventMessageTypeNames { generic = '$$EventMessage', audit = '$$EventMessageAudit', - user = '$$EventMessageUser', confirm = '$$EventMessageConfirm', workflow = '$$EventMessageWorkflow', node = '$$EventMessageNode', @@ -20,6 +20,25 @@ export enum MessageEventBusDestinationTypeNames { syslog = '$$MessageEventBusDestinationSyslog', } +// =============================== +// Event Message Interfaces +// =============================== + +export interface IAbstractEventMessage { + __type: EventMessageTypeNames; + + id: string; + + ts: DateTime; + + eventName: string; + + message: string; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + payload: any; +} + // =============================== // Event Destination Interfaces // ===============================