2023-01-04 00:47:48 -08:00
|
|
|
import { v4 as uuid } from 'uuid';
|
2023-10-25 07:35:22 -07:00
|
|
|
import { Container } from 'typedi';
|
2023-11-27 00:11:52 -08:00
|
|
|
import type { INodeCredentials, MessageEventBusDestinationOptions } from 'n8n-workflow';
|
|
|
|
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
|
2024-08-22 02:10:37 -07:00
|
|
|
import { Logger } from '@/logger';
|
2024-08-26 02:10:06 -07:00
|
|
|
import type { AbstractEventMessage } from '../event-message-classes/abstract-event-message';
|
|
|
|
import type { EventMessageTypes } from '../event-message-classes';
|
|
|
|
import type { EventMessageConfirmSource } from '../event-message-classes/event-message-confirm';
|
|
|
|
import type { MessageEventBus, MessageWithCallback } from '../message-event-bus/message-event-bus';
|
2024-08-27 07:44:32 -07:00
|
|
|
import { EventDestinationsRepository } from '@/databases/repositories/event-destinations.repository';
|
2024-08-22 02:10:37 -07:00
|
|
|
import { License } from '@/license';
|
2023-01-04 00:47:48 -08:00
|
|
|
|
|
|
|
export abstract class MessageEventBusDestination implements MessageEventBusDestinationOptions {
|
|
|
|
// Since you can't have static abstract functions - this just serves as a reminder that you need to implement these. Please.
|
|
|
|
// static abstract deserialize(): MessageEventBusDestination | null;
|
|
|
|
readonly id: string;
|
|
|
|
|
2023-02-17 01:54:07 -08:00
|
|
|
readonly eventBusInstance: MessageEventBus;
|
|
|
|
|
2023-10-25 07:35:22 -07:00
|
|
|
protected readonly logger: Logger;
|
|
|
|
|
2024-06-11 00:11:39 -07:00
|
|
|
protected readonly license: License;
|
|
|
|
|
2023-01-04 00:47:48 -08:00
|
|
|
__type: MessageEventBusDestinationTypeNames;
|
|
|
|
|
|
|
|
label: string;
|
|
|
|
|
|
|
|
enabled: boolean;
|
|
|
|
|
|
|
|
subscribedEvents: string[];
|
|
|
|
|
|
|
|
credentials: INodeCredentials = {};
|
|
|
|
|
|
|
|
anonymizeAuditMessages: boolean;
|
|
|
|
|
2023-02-17 01:54:07 -08:00
|
|
|
constructor(eventBusInstance: MessageEventBus, options: MessageEventBusDestinationOptions) {
|
2024-06-11 00:11:39 -07:00
|
|
|
// @TODO: Use DI
|
2023-10-25 07:35:22 -07:00
|
|
|
this.logger = Container.get(Logger);
|
2024-06-11 00:11:39 -07:00
|
|
|
this.license = Container.get(License);
|
|
|
|
|
2023-02-17 01:54:07 -08:00
|
|
|
this.eventBusInstance = eventBusInstance;
|
2023-01-04 00:47:48 -08:00
|
|
|
this.id = !options.id || options.id.length !== 36 ? uuid() : options.id;
|
|
|
|
this.__type = options.__type ?? MessageEventBusDestinationTypeNames.abstract;
|
|
|
|
this.label = options.label ?? 'Log Destination';
|
|
|
|
this.enabled = options.enabled ?? false;
|
|
|
|
this.subscribedEvents = options.subscribedEvents ?? [];
|
|
|
|
this.anonymizeAuditMessages = options.anonymizeAuditMessages ?? false;
|
|
|
|
if (options.credentials) this.credentials = options.credentials;
|
2023-10-25 07:35:22 -07:00
|
|
|
this.logger.debug(`${this.__type}(${this.id}) event destination constructed`);
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
startListening() {
|
|
|
|
if (this.enabled) {
|
2023-02-17 01:54:07 -08:00
|
|
|
this.eventBusInstance.on(
|
|
|
|
this.getId(),
|
|
|
|
async (
|
|
|
|
msg: EventMessageTypes,
|
|
|
|
confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void,
|
|
|
|
) => {
|
|
|
|
await this.receiveFromEventBus({ msg, confirmCallback });
|
|
|
|
},
|
|
|
|
);
|
2023-10-25 07:35:22 -07:00
|
|
|
this.logger.debug(`${this.id} listener started`);
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
stopListening() {
|
2023-02-17 01:54:07 -08:00
|
|
|
this.eventBusInstance.removeAllListeners(this.getId());
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
enable() {
|
|
|
|
this.enabled = true;
|
|
|
|
this.startListening();
|
|
|
|
}
|
|
|
|
|
|
|
|
disable() {
|
|
|
|
this.enabled = false;
|
|
|
|
this.stopListening();
|
|
|
|
}
|
|
|
|
|
|
|
|
getId() {
|
|
|
|
return this.id;
|
|
|
|
}
|
|
|
|
|
|
|
|
hasSubscribedToEvent(msg: AbstractEventMessage) {
|
|
|
|
if (!this.enabled) return false;
|
|
|
|
for (const eventName of this.subscribedEvents) {
|
|
|
|
if (eventName === '*' || msg.eventName.startsWith(eventName)) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
async saveToDb() {
|
|
|
|
const data = {
|
|
|
|
id: this.getId(),
|
|
|
|
destination: this.serialize(),
|
|
|
|
};
|
2024-01-02 08:53:24 -08:00
|
|
|
const dbResult = await Container.get(EventDestinationsRepository).upsert(data, {
|
2023-01-04 00:47:48 -08:00
|
|
|
skipUpdateIfNoValuesChanged: true,
|
|
|
|
conflictPaths: ['id'],
|
|
|
|
});
|
|
|
|
return dbResult;
|
|
|
|
}
|
|
|
|
|
|
|
|
async deleteFromDb() {
|
2024-01-17 07:08:50 -08:00
|
|
|
return await MessageEventBusDestination.deleteFromDb(this.getId());
|
2023-01-04 00:47:48 -08:00
|
|
|
}
|
|
|
|
|
2024-01-02 08:53:24 -08:00
|
|
|
static async deleteFromDb(id: string) {
|
2023-11-10 06:04:26 -08:00
|
|
|
const dbResult = await Container.get(EventDestinationsRepository).delete({ id });
|
2023-01-04 00:47:48 -08:00
|
|
|
return dbResult;
|
|
|
|
}
|
|
|
|
|
|
|
|
serialize(): MessageEventBusDestinationOptions {
|
|
|
|
return {
|
|
|
|
__type: this.__type,
|
|
|
|
id: this.getId(),
|
|
|
|
label: this.label,
|
|
|
|
enabled: this.enabled,
|
|
|
|
subscribedEvents: this.subscribedEvents,
|
|
|
|
anonymizeAuditMessages: this.anonymizeAuditMessages,
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2023-02-17 01:54:07 -08:00
|
|
|
abstract receiveFromEventBus(emitterPayload: MessageWithCallback): Promise<boolean>;
|
2023-01-04 00:47:48 -08:00
|
|
|
|
|
|
|
toString() {
|
|
|
|
return JSON.stringify(this.serialize());
|
|
|
|
}
|
|
|
|
|
|
|
|
close(): void | Promise<void> {
|
|
|
|
this.stopListening();
|
|
|
|
}
|
|
|
|
}
|