n8n/packages/cli/src/eventbus/message-event-bus-destination/message-event-bus-destination.ee.ts
Iván Ovejero 3a9c65e1cb
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Benchmark Docker Image CI / build (push) Waiting to run
refactor(core): Modernize logger service (#11031)
2024-10-01 12:16:09 +02:00

139 lines
3.9 KiB
TypeScript

import type { INodeCredentials, MessageEventBusDestinationOptions } from 'n8n-workflow';
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
import { Container } from 'typedi';
import { v4 as uuid } from 'uuid';
import { EventDestinationsRepository } from '@/databases/repositories/event-destinations.repository';
import { License } from '@/license';
import { Logger } from '@/logging/logger.service';
import type { EventMessageTypes } from '../event-message-classes';
import type { AbstractEventMessage } from '../event-message-classes/abstract-event-message';
import type { EventMessageConfirmSource } from '../event-message-classes/event-message-confirm';
import type { MessageEventBus, MessageWithCallback } from '../message-event-bus/message-event-bus';
export abstract class MessageEventBusDestination implements MessageEventBusDestinationOptions {
// Since you can't have static abstract functions - this just serves as a reminder that you need to implement these. Please.
// static abstract deserialize(): MessageEventBusDestination | null;
readonly id: string;
readonly eventBusInstance: MessageEventBus;
protected readonly logger: Logger;
protected readonly license: License;
__type: MessageEventBusDestinationTypeNames;
label: string;
enabled: boolean;
subscribedEvents: string[];
credentials: INodeCredentials = {};
anonymizeAuditMessages: boolean;
constructor(eventBusInstance: MessageEventBus, options: MessageEventBusDestinationOptions) {
// @TODO: Use DI
this.logger = Container.get(Logger);
this.license = Container.get(License);
this.eventBusInstance = eventBusInstance;
this.id = !options.id || options.id.length !== 36 ? uuid() : options.id;
this.__type = options.__type ?? MessageEventBusDestinationTypeNames.abstract;
this.label = options.label ?? 'Log Destination';
this.enabled = options.enabled ?? false;
this.subscribedEvents = options.subscribedEvents ?? [];
this.anonymizeAuditMessages = options.anonymizeAuditMessages ?? false;
if (options.credentials) this.credentials = options.credentials;
this.logger.debug(`${this.__type}(${this.id}) event destination constructed`);
}
startListening() {
if (this.enabled) {
this.eventBusInstance.on(
this.getId(),
async (
msg: EventMessageTypes,
confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void,
) => {
await this.receiveFromEventBus({ msg, confirmCallback });
},
);
this.logger.debug(`${this.id} listener started`);
}
}
stopListening() {
this.eventBusInstance.removeAllListeners(this.getId());
}
enable() {
this.enabled = true;
this.startListening();
}
disable() {
this.enabled = false;
this.stopListening();
}
getId() {
return this.id;
}
hasSubscribedToEvent(msg: AbstractEventMessage) {
if (!this.enabled) return false;
for (const eventName of this.subscribedEvents) {
if (eventName === '*' || msg.eventName.startsWith(eventName)) {
return true;
}
}
return false;
}
async saveToDb() {
const data = {
id: this.getId(),
destination: this.serialize(),
};
const dbResult = await Container.get(EventDestinationsRepository).upsert(data, {
skipUpdateIfNoValuesChanged: true,
conflictPaths: ['id'],
});
return dbResult;
}
async deleteFromDb() {
return await MessageEventBusDestination.deleteFromDb(this.getId());
}
static async deleteFromDb(id: string) {
const dbResult = await Container.get(EventDestinationsRepository).delete({ id });
return dbResult;
}
serialize(): MessageEventBusDestinationOptions {
return {
__type: this.__type,
id: this.getId(),
label: this.label,
enabled: this.enabled,
subscribedEvents: this.subscribedEvents,
anonymizeAuditMessages: this.anonymizeAuditMessages,
};
}
abstract receiveFromEventBus(emitterPayload: MessageWithCallback): Promise<boolean>;
toString() {
return JSON.stringify(this.serialize());
}
close(): void | Promise<void> {
this.stopListening();
}
}