2023-03-29 11:38:47 -07:00
|
|
|
import { Service } from 'typedi';
|
2023-01-27 05:56:56 -08:00
|
|
|
import { LoggerProxy } from 'n8n-workflow';
|
2023-02-17 01:54:07 -08:00
|
|
|
import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
|
2023-01-13 09:12:22 -08:00
|
|
|
import type { DeleteResult } from 'typeorm';
|
2023-03-27 03:30:03 -07:00
|
|
|
import type {
|
|
|
|
EventMessageTypes,
|
|
|
|
EventNamesTypes,
|
|
|
|
FailedEventSummary,
|
|
|
|
} from '../EventMessageClasses/';
|
2023-01-04 00:47:48 -08:00
|
|
|
import type { MessageEventBusDestination } from '../MessageEventBusDestination/MessageEventBusDestination.ee';
|
|
|
|
import { MessageEventBusLogWriter } from '../MessageEventBusWriter/MessageEventBusLogWriter';
|
|
|
|
import EventEmitter from 'events';
|
|
|
|
import config from '@/config';
|
|
|
|
import * as Db from '@/Db';
|
2023-01-19 03:11:31 -08:00
|
|
|
import {
|
|
|
|
messageEventBusDestinationFromDb,
|
|
|
|
incrementPrometheusMetric,
|
|
|
|
} from '../MessageEventBusDestination/Helpers.ee';
|
2023-01-04 00:47:48 -08:00
|
|
|
import uniqby from 'lodash.uniqby';
|
2023-01-27 05:56:56 -08:00
|
|
|
import type { EventMessageConfirmSource } from '../EventMessageClasses/EventMessageConfirm';
|
|
|
|
import type { EventMessageAuditOptions } from '../EventMessageClasses/EventMessageAudit';
|
|
|
|
import { EventMessageAudit } from '../EventMessageClasses/EventMessageAudit';
|
|
|
|
import type { EventMessageWorkflowOptions } from '../EventMessageClasses/EventMessageWorkflow';
|
|
|
|
import { EventMessageWorkflow } from '../EventMessageClasses/EventMessageWorkflow';
|
2023-01-04 00:47:48 -08:00
|
|
|
import { isLogStreamingEnabled } from './MessageEventBusHelper';
|
2023-01-27 05:56:56 -08:00
|
|
|
import type { EventMessageNodeOptions } from '../EventMessageClasses/EventMessageNode';
|
|
|
|
import { EventMessageNode } from '../EventMessageClasses/EventMessageNode';
|
2023-01-04 00:47:48 -08:00
|
|
|
import {
|
|
|
|
EventMessageGeneric,
|
|
|
|
eventMessageGenericDestinationTestEvent,
|
|
|
|
} from '../EventMessageClasses/EventMessageGeneric';
|
2023-02-17 01:54:07 -08:00
|
|
|
import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
|
2023-01-04 00:47:48 -08:00
|
|
|
|
2023-01-11 05:09:09 -08:00
|
|
|
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
|
2023-01-04 00:47:48 -08:00
|
|
|
|
2023-02-17 01:54:07 -08:00
|
|
|
export interface MessageWithCallback {
|
|
|
|
msg: EventMessageTypes;
|
|
|
|
confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void;
|
|
|
|
}
|
|
|
|
|
2023-03-29 11:38:47 -07:00
|
|
|
@Service()
|
2023-02-17 01:54:07 -08:00
|
|
|
export class MessageEventBus extends EventEmitter {
|
2023-03-29 11:38:47 -07:00
|
|
|
private isInitialized = false;
|
2023-01-04 00:47:48 -08:00
|
|
|
|
|
|
|
destinations: {
|
|
|
|
[key: string]: MessageEventBusDestination;
|
|
|
|
} = {};
|
|
|
|
|
|
|
|
private pushIntervalTimer: NodeJS.Timer;
|
|
|
|
|
2023-03-29 11:38:47 -07:00
|
|
|
constructor(private logWriter: MessageEventBusLogWriter) {
|
2023-01-04 00:47:48 -08:00
|
|
|
super();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Needs to be called once at startup to set the event bus instance up. Will launch the event log writer and,
|
|
|
|
* if configured to do so, the previously stored event destinations.
|
|
|
|
*
|
|
|
|
* Will check for unsent event messages in the previous log files once at startup and try to re-send them.
|
|
|
|
*
|
|
|
|
* Sets `isInitialized` to `true` once finished.
|
|
|
|
*/
|
|
|
|
async initialize() {
|
|
|
|
if (this.isInitialized) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
LoggerProxy.debug('Initializing event bus...');
|
|
|
|
|
|
|
|
const savedEventDestinations = await Db.collections.EventDestinations.find({});
|
|
|
|
if (savedEventDestinations.length > 0) {
|
|
|
|
for (const destinationData of savedEventDestinations) {
|
|
|
|
try {
|
2023-02-17 01:54:07 -08:00
|
|
|
const destination = messageEventBusDestinationFromDb(this, destinationData);
|
2023-01-04 00:47:48 -08:00
|
|
|
if (destination) {
|
|
|
|
await this.addDestination(destination);
|
|
|
|
}
|
|
|
|
} catch (error) {
|
2023-02-17 01:54:07 -08:00
|
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
|
|
|
if (error.message) LoggerProxy.debug(error.message as string);
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
LoggerProxy.debug('Initializing event writer');
|
2023-03-29 11:38:47 -07:00
|
|
|
await this.logWriter.startThread();
|
2023-01-04 00:47:48 -08:00
|
|
|
|
|
|
|
// unsent event check:
|
|
|
|
// - find unsent messages in current event log(s)
|
|
|
|
// - cycle event logs and start the logging to a fresh file
|
|
|
|
// - retry sending events
|
|
|
|
LoggerProxy.debug('Checking for unsent event messages');
|
2023-01-11 05:09:09 -08:00
|
|
|
const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions();
|
2023-01-04 00:47:48 -08:00
|
|
|
LoggerProxy.debug(
|
2023-03-29 11:38:47 -07:00
|
|
|
`Start logging into ${this.logWriter.getLogFileName() ?? 'unknown filename'} `,
|
2023-01-04 00:47:48 -08:00
|
|
|
);
|
2023-03-29 11:38:47 -07:00
|
|
|
this.logWriter.startLogging();
|
2023-01-11 05:09:09 -08:00
|
|
|
await this.send(unsentAndUnfinished.unsentMessages);
|
|
|
|
|
2023-02-17 01:54:07 -08:00
|
|
|
if (Object.keys(unsentAndUnfinished.unfinishedExecutions).length > 0) {
|
|
|
|
for (const executionId of Object.keys(unsentAndUnfinished.unfinishedExecutions)) {
|
|
|
|
await recoverExecutionDataFromEventLogMessages(
|
|
|
|
executionId,
|
|
|
|
unsentAndUnfinished.unfinishedExecutions[executionId],
|
|
|
|
true,
|
|
|
|
);
|
2023-01-11 05:09:09 -08:00
|
|
|
}
|
|
|
|
}
|
2023-01-04 00:47:48 -08:00
|
|
|
|
|
|
|
// if configured, run this test every n ms
|
|
|
|
if (config.getEnv('eventBus.checkUnsentInterval') > 0) {
|
|
|
|
if (this.pushIntervalTimer) {
|
|
|
|
clearInterval(this.pushIntervalTimer);
|
|
|
|
}
|
|
|
|
this.pushIntervalTimer = setInterval(async () => {
|
|
|
|
await this.trySendingUnsent();
|
|
|
|
}, config.getEnv('eventBus.checkUnsentInterval'));
|
|
|
|
}
|
|
|
|
|
|
|
|
LoggerProxy.debug('MessageEventBus initialized');
|
|
|
|
this.isInitialized = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
async addDestination(destination: MessageEventBusDestination) {
|
|
|
|
await this.removeDestination(destination.getId());
|
|
|
|
this.destinations[destination.getId()] = destination;
|
|
|
|
this.destinations[destination.getId()].startListening();
|
|
|
|
return destination;
|
|
|
|
}
|
|
|
|
|
|
|
|
async findDestination(id?: string): Promise<MessageEventBusDestinationOptions[]> {
|
|
|
|
let result: MessageEventBusDestinationOptions[];
|
|
|
|
if (id && Object.keys(this.destinations).includes(id)) {
|
|
|
|
result = [this.destinations[id].serialize()];
|
|
|
|
} else {
|
|
|
|
result = Object.keys(this.destinations).map((e) => this.destinations[e].serialize());
|
|
|
|
}
|
|
|
|
return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? ''));
|
|
|
|
}
|
|
|
|
|
|
|
|
async removeDestination(id: string): Promise<DeleteResult | undefined> {
|
|
|
|
let result;
|
|
|
|
if (Object.keys(this.destinations).includes(id)) {
|
|
|
|
await this.destinations[id].close();
|
|
|
|
result = await this.destinations[id].deleteFromDb();
|
|
|
|
delete this.destinations[id];
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
private async trySendingUnsent(msgs?: EventMessageTypes[]) {
|
|
|
|
const unsentMessages = msgs ?? (await this.getEventsUnsent());
|
|
|
|
if (unsentMessages.length > 0) {
|
|
|
|
LoggerProxy.debug(`Found unsent event messages: ${unsentMessages.length}`);
|
|
|
|
for (const unsentMsg of unsentMessages) {
|
|
|
|
LoggerProxy.debug(`Retrying: ${unsentMsg.id} ${unsentMsg.__type}`);
|
|
|
|
await this.emitMessage(unsentMsg);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async close() {
|
|
|
|
LoggerProxy.debug('Shutting down event writer...');
|
2023-03-29 11:38:47 -07:00
|
|
|
await this.logWriter.close();
|
2023-01-04 00:47:48 -08:00
|
|
|
for (const destinationName of Object.keys(this.destinations)) {
|
|
|
|
LoggerProxy.debug(
|
|
|
|
`Shutting down event destination ${this.destinations[destinationName].getId()}...`,
|
|
|
|
);
|
|
|
|
await this.destinations[destinationName].close();
|
|
|
|
}
|
|
|
|
LoggerProxy.debug('EventBus shut down.');
|
|
|
|
}
|
|
|
|
|
|
|
|
async send(msgs: EventMessageTypes | EventMessageTypes[]) {
|
|
|
|
if (!Array.isArray(msgs)) {
|
|
|
|
msgs = [msgs];
|
|
|
|
}
|
|
|
|
for (const msg of msgs) {
|
2023-03-29 11:38:47 -07:00
|
|
|
this.logWriter.putMessage(msg);
|
2023-01-13 06:39:25 -08:00
|
|
|
// if there are no set up destinations, immediately mark the event as sent
|
|
|
|
if (!this.shouldSendMsg(msg)) {
|
|
|
|
this.confirmSent(msg, { id: '0', name: 'eventBus' });
|
|
|
|
}
|
2023-01-04 00:47:48 -08:00
|
|
|
await this.emitMessage(msg);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async testDestination(destinationId: string): Promise<boolean> {
|
2023-02-17 01:54:07 -08:00
|
|
|
const msg = new EventMessageGeneric({
|
2023-01-04 00:47:48 -08:00
|
|
|
eventName: eventMessageGenericDestinationTestEvent,
|
|
|
|
});
|
|
|
|
const destination = await this.findDestination(destinationId);
|
|
|
|
if (destination.length > 0) {
|
2023-02-17 01:54:07 -08:00
|
|
|
const sendResult = await this.destinations[destinationId].receiveFromEventBus({
|
|
|
|
msg,
|
|
|
|
confirmCallback: () => this.confirmSent(msg, { id: '0', name: 'eventBus' }),
|
|
|
|
});
|
2023-01-04 00:47:48 -08:00
|
|
|
return sendResult;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2023-01-13 06:39:25 -08:00
|
|
|
confirmSent(msg: EventMessageTypes, source?: EventMessageConfirmSource) {
|
2023-03-29 11:38:47 -07:00
|
|
|
this.logWriter.confirmMessageSent(msg.id, source);
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
2023-01-11 05:09:09 -08:00
|
|
|
private hasAnyDestinationSubscribedToEvent(msg: EventMessageTypes): boolean {
|
|
|
|
for (const destinationName of Object.keys(this.destinations)) {
|
|
|
|
if (this.destinations[destinationName].hasSubscribedToEvent(msg)) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2023-01-04 00:47:48 -08:00
|
|
|
private async emitMessage(msg: EventMessageTypes) {
|
2023-01-19 03:11:31 -08:00
|
|
|
if (config.getEnv('endpoints.metrics.enable')) {
|
|
|
|
await incrementPrometheusMetric(msg);
|
|
|
|
}
|
|
|
|
|
2023-01-04 00:47:48 -08:00
|
|
|
// generic emit for external modules to capture events
|
|
|
|
// this is for internal use ONLY and not for use with custom destinations!
|
2023-02-17 01:54:07 -08:00
|
|
|
this.emitMessageWithCallback('message', msg);
|
2023-01-04 00:47:48 -08:00
|
|
|
|
2023-01-13 06:39:25 -08:00
|
|
|
if (this.shouldSendMsg(msg)) {
|
2023-01-04 00:47:48 -08:00
|
|
|
for (const destinationName of Object.keys(this.destinations)) {
|
2023-02-17 01:54:07 -08:00
|
|
|
this.emitMessageWithCallback(this.destinations[destinationName].getId(), msg);
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-17 01:54:07 -08:00
|
|
|
private emitMessageWithCallback(eventName: string, msg: EventMessageTypes): boolean {
|
|
|
|
const confirmCallback = (message: EventMessageTypes, src: EventMessageConfirmSource) =>
|
|
|
|
this.confirmSent(message, src);
|
|
|
|
return this.emit(eventName, msg, confirmCallback);
|
|
|
|
}
|
|
|
|
|
2023-01-13 06:39:25 -08:00
|
|
|
shouldSendMsg(msg: EventMessageTypes): boolean {
|
|
|
|
return (
|
|
|
|
isLogStreamingEnabled() &&
|
|
|
|
Object.keys(this.destinations).length > 0 &&
|
|
|
|
this.hasAnyDestinationSubscribedToEvent(msg)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2023-03-27 03:30:03 -07:00
|
|
|
async getEventsFailed(amount = 5): Promise<FailedEventSummary[]> {
|
|
|
|
const result: FailedEventSummary[] = [];
|
|
|
|
try {
|
2023-03-29 11:38:47 -07:00
|
|
|
const queryResult = await this.logWriter.getMessagesAll();
|
2023-03-27 03:30:03 -07:00
|
|
|
const uniques = uniqby(queryResult, 'id');
|
|
|
|
const filteredExecutionIds = uniques
|
|
|
|
.filter((e) =>
|
|
|
|
(['n8n.workflow.crashed', 'n8n.workflow.failed'] as EventNamesTypes[]).includes(
|
|
|
|
e.eventName,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
.map((e) => ({
|
|
|
|
executionId: e.payload.executionId as string,
|
|
|
|
name: e.payload.workflowName,
|
|
|
|
timestamp: e.ts,
|
|
|
|
event: e.eventName,
|
|
|
|
}))
|
|
|
|
.filter((e) => e)
|
|
|
|
.sort((a, b) => (a.timestamp > b.timestamp ? 1 : -1))
|
|
|
|
.slice(-amount);
|
|
|
|
|
|
|
|
for (const execution of filteredExecutionIds) {
|
|
|
|
const data = await recoverExecutionDataFromEventLogMessages(
|
|
|
|
execution.executionId,
|
|
|
|
queryResult,
|
|
|
|
false,
|
|
|
|
);
|
|
|
|
if (data) {
|
|
|
|
const lastNodeExecuted = data.resultData.lastNodeExecuted;
|
|
|
|
result.push({
|
|
|
|
lastNodeExecuted: lastNodeExecuted ?? '',
|
|
|
|
executionId: execution.executionId,
|
|
|
|
name: execution.name as string,
|
|
|
|
event: execution.event,
|
|
|
|
timestamp: execution.timestamp.toISO(),
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch {}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2023-01-11 05:09:09 -08:00
|
|
|
async getEventsAll(): Promise<EventMessageTypes[]> {
|
2023-03-29 11:38:47 -07:00
|
|
|
const queryResult = await this.logWriter.getMessagesAll();
|
2023-01-04 00:47:48 -08:00
|
|
|
const filtered = uniqby(queryResult, 'id');
|
|
|
|
return filtered;
|
|
|
|
}
|
|
|
|
|
|
|
|
async getEventsSent(): Promise<EventMessageTypes[]> {
|
2023-03-29 11:38:47 -07:00
|
|
|
const queryResult = await this.logWriter.getMessagesSent();
|
2023-01-11 05:09:09 -08:00
|
|
|
const filtered = uniqby(queryResult, 'id');
|
|
|
|
return filtered;
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
async getEventsUnsent(): Promise<EventMessageTypes[]> {
|
2023-03-29 11:38:47 -07:00
|
|
|
const queryResult = await this.logWriter.getMessagesUnsent();
|
2023-01-11 05:09:09 -08:00
|
|
|
const filtered = uniqby(queryResult, 'id');
|
|
|
|
return filtered;
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
2023-02-17 01:54:07 -08:00
|
|
|
async getUnfinishedExecutions(): Promise<Record<string, EventMessageTypes[]>> {
|
2023-03-29 11:38:47 -07:00
|
|
|
const queryResult = await this.logWriter.getUnfinishedExecutions();
|
2023-01-11 05:09:09 -08:00
|
|
|
return queryResult;
|
|
|
|
}
|
|
|
|
|
|
|
|
async getUnsentAndUnfinishedExecutions(): Promise<{
|
|
|
|
unsentMessages: EventMessageTypes[];
|
2023-02-17 01:54:07 -08:00
|
|
|
unfinishedExecutions: Record<string, EventMessageTypes[]>;
|
2023-01-11 05:09:09 -08:00
|
|
|
}> {
|
2023-03-29 11:38:47 -07:00
|
|
|
const queryResult = await this.logWriter.getUnsentAndUnfinishedExecutions();
|
2023-01-11 05:09:09 -08:00
|
|
|
return queryResult;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* This will pull all events for a given execution id from the event log files. Note that this can be a very expensive operation, depending on the number of events and the size of the log files.
|
|
|
|
* @param executionId id to look for
|
|
|
|
* @param logHistory defaults to 1, which means it will look at the current log file AND the previous one.
|
|
|
|
* @returns Array of EventMessageTypes
|
|
|
|
*/
|
|
|
|
async getEventsByExecutionId(
|
|
|
|
executionId: string,
|
|
|
|
logHistory?: number,
|
|
|
|
): Promise<EventMessageTypes[]> {
|
2023-03-29 11:38:47 -07:00
|
|
|
const result = await this.logWriter.getMessagesByExecutionId(executionId, logHistory);
|
2023-01-11 05:09:09 -08:00
|
|
|
return result;
|
|
|
|
}
|
2023-01-04 00:47:48 -08:00
|
|
|
/**
|
|
|
|
* Convenience Methods
|
|
|
|
*/
|
|
|
|
|
|
|
|
async sendAuditEvent(options: EventMessageAuditOptions) {
|
|
|
|
await this.send(new EventMessageAudit(options));
|
|
|
|
}
|
|
|
|
|
|
|
|
async sendWorkflowEvent(options: EventMessageWorkflowOptions) {
|
|
|
|
await this.send(new EventMessageWorkflow(options));
|
|
|
|
}
|
|
|
|
|
|
|
|
async sendNodeEvent(options: EventMessageNodeOptions) {
|
|
|
|
await this.send(new EventMessageNode(options));
|
|
|
|
}
|
|
|
|
}
|