mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 21:07:28 -08:00
refactor(core): Remove event bus helpers (no-changelog) (#9690)
This commit is contained in:
parent
817167cf4b
commit
cc4e46eae4
|
@ -22,14 +22,12 @@ import type { EventMessageAuditOptions } from '../EventMessageClasses/EventMessa
|
|||
import { EventMessageAudit } from '../EventMessageClasses/EventMessageAudit';
|
||||
import type { EventMessageWorkflowOptions } from '../EventMessageClasses/EventMessageWorkflow';
|
||||
import { EventMessageWorkflow } from '../EventMessageClasses/EventMessageWorkflow';
|
||||
import { isLogStreamingEnabled } from './MessageEventBusHelper';
|
||||
import type { EventMessageNodeOptions } from '../EventMessageClasses/EventMessageNode';
|
||||
import { EventMessageNode } from '../EventMessageClasses/EventMessageNode';
|
||||
import {
|
||||
EventMessageGeneric,
|
||||
eventMessageGenericDestinationTestEvent,
|
||||
} from '../EventMessageClasses/EventMessageGeneric';
|
||||
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
|
||||
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
|
||||
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
|
||||
import { ExecutionRecoveryService } from '../../executions/execution-recovery.service';
|
||||
|
@ -37,6 +35,7 @@ import {
|
|||
EventMessageAiNode,
|
||||
type EventMessageAiNodeOptions,
|
||||
} from '../EventMessageClasses/EventMessageAiNode';
|
||||
import { License } from '@/License';
|
||||
|
||||
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
|
||||
|
||||
|
@ -69,6 +68,7 @@ export class MessageEventBus extends EventEmitter {
|
|||
private readonly workflowRepository: WorkflowRepository,
|
||||
private readonly orchestrationService: OrchestrationService,
|
||||
private readonly recoveryService: ExecutionRecoveryService,
|
||||
private readonly license: License,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
@ -329,7 +329,7 @@ export class MessageEventBus extends EventEmitter {
|
|||
}
|
||||
|
||||
private async emitMessage(msg: EventMessageTypes) {
|
||||
this.emit(METRICS_EVENT_NAME, msg);
|
||||
this.emit('metrics.messageEventBus.Event', msg);
|
||||
|
||||
// generic emit for external modules to capture events
|
||||
// this is for internal use ONLY and not for use with custom destinations!
|
||||
|
@ -350,7 +350,7 @@ export class MessageEventBus extends EventEmitter {
|
|||
|
||||
shouldSendMsg(msg: EventMessageTypes): boolean {
|
||||
return (
|
||||
isLogStreamingEnabled() &&
|
||||
this.license.isLogStreamingEnabled() &&
|
||||
Object.keys(this.destinations).length > 0 &&
|
||||
this.hasAnyDestinationSubscribedToEvent(msg)
|
||||
);
|
||||
|
|
|
@ -1,7 +0,0 @@
|
|||
import { License } from '@/License';
|
||||
import { Container } from 'typedi';
|
||||
|
||||
export function isLogStreamingEnabled(): boolean {
|
||||
const license = Container.get(License);
|
||||
return license.isLogStreamingEnabled();
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
import { EventMessageTypeNames } from 'n8n-workflow';
|
||||
import config from '@/config';
|
||||
import type { EventMessageTypes } from '../EventMessageClasses';
|
||||
|
||||
export const METRICS_EVENT_NAME = 'metrics.messageEventBus.Event';
|
||||
|
||||
export function getMetricNameForEvent(event: EventMessageTypes): string {
|
||||
const prefix = config.getEnv('endpoints.metrics.prefix');
|
||||
return prefix + event.eventName.replace('n8n.', '').replace(/\./g, '_') + '_total';
|
||||
}
|
||||
|
||||
export function getLabelValueForNode(nodeType: string): string {
|
||||
return nodeType.replace('n8n-nodes-', '').replace(/\./g, '_');
|
||||
}
|
||||
|
||||
export function getLabelValueForCredential(credentialType: string): string {
|
||||
return credentialType.replace(/\./g, '_');
|
||||
}
|
||||
|
||||
export function getLabelsForEvent(event: EventMessageTypes): Record<string, string> {
|
||||
switch (event.__type) {
|
||||
case EventMessageTypeNames.audit:
|
||||
if (event.eventName.startsWith('n8n.audit.user.credentials')) {
|
||||
return config.getEnv('endpoints.metrics.includeCredentialTypeLabel')
|
||||
? {
|
||||
credential_type: getLabelValueForCredential(
|
||||
event.payload.credentialType ?? 'unknown',
|
||||
),
|
||||
}
|
||||
: {};
|
||||
}
|
||||
|
||||
if (event.eventName.startsWith('n8n.audit.workflow')) {
|
||||
return config.getEnv('endpoints.metrics.includeWorkflowIdLabel')
|
||||
? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' }
|
||||
: {};
|
||||
}
|
||||
break;
|
||||
|
||||
case EventMessageTypeNames.node:
|
||||
return config.getEnv('endpoints.metrics.includeNodeTypeLabel')
|
||||
? { node_type: getLabelValueForNode(event.payload.nodeType ?? 'unknown') }
|
||||
: {};
|
||||
|
||||
case EventMessageTypeNames.workflow:
|
||||
return config.getEnv('endpoints.metrics.includeWorkflowIdLabel')
|
||||
? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' }
|
||||
: {};
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
|
@ -8,6 +8,7 @@ import type { EventMessageTypes } from '../EventMessageClasses';
|
|||
import type { EventMessageConfirmSource } from '../EventMessageClasses/EventMessageConfirm';
|
||||
import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus';
|
||||
import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository';
|
||||
import { License } from '@/License';
|
||||
|
||||
export abstract class MessageEventBusDestination implements MessageEventBusDestinationOptions {
|
||||
// Since you can't have static abstract functions - this just serves as a reminder that you need to implement these. Please.
|
||||
|
@ -18,6 +19,8 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti
|
|||
|
||||
protected readonly logger: Logger;
|
||||
|
||||
protected readonly license: License;
|
||||
|
||||
__type: MessageEventBusDestinationTypeNames;
|
||||
|
||||
label: string;
|
||||
|
@ -31,7 +34,10 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti
|
|||
anonymizeAuditMessages: boolean;
|
||||
|
||||
constructor(eventBusInstance: MessageEventBus, options: MessageEventBusDestinationOptions) {
|
||||
// @TODO: Use DI
|
||||
this.logger = Container.get(Logger);
|
||||
this.license = Container.get(License);
|
||||
|
||||
this.eventBusInstance = eventBusInstance;
|
||||
this.id = !options.id || options.id.length !== 36 ? uuid() : options.id;
|
||||
this.__type = options.__type ?? MessageEventBusDestinationTypeNames.abstract;
|
||||
|
|
|
@ -7,7 +7,6 @@ import type {
|
|||
MessageEventBusDestinationOptions,
|
||||
MessageEventBusDestinationSentryOptions,
|
||||
} from 'n8n-workflow';
|
||||
import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper';
|
||||
import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric';
|
||||
import { N8N_VERSION } from '@/constants';
|
||||
import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus';
|
||||
|
@ -57,7 +56,7 @@ export class MessageEventBusDestinationSentry
|
|||
let sendResult = false;
|
||||
if (!this.sentryClient) return sendResult;
|
||||
if (msg.eventName !== eventMessageGenericDestinationTestEvent) {
|
||||
if (!isLogStreamingEnabled()) return sendResult;
|
||||
if (!this.license.isLogStreamingEnabled()) return sendResult;
|
||||
if (!this.hasSubscribedToEvent(msg)) return sendResult;
|
||||
}
|
||||
try {
|
||||
|
|
|
@ -7,7 +7,6 @@ import type {
|
|||
} from 'n8n-workflow';
|
||||
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
|
||||
import { MessageEventBusDestination } from './MessageEventBusDestination.ee';
|
||||
import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper';
|
||||
import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric';
|
||||
import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus';
|
||||
import Container from 'typedi';
|
||||
|
@ -73,7 +72,7 @@ export class MessageEventBusDestinationSyslog
|
|||
const { msg, confirmCallback } = emitterPayload;
|
||||
let sendResult = false;
|
||||
if (msg.eventName !== eventMessageGenericDestinationTestEvent) {
|
||||
if (!isLogStreamingEnabled()) return sendResult;
|
||||
if (!this.license.isLogStreamingEnabled()) return sendResult;
|
||||
if (!this.hasSubscribedToEvent(msg)) return sendResult;
|
||||
}
|
||||
try {
|
||||
|
|
|
@ -14,7 +14,6 @@ import type {
|
|||
} from 'n8n-workflow';
|
||||
import { CredentialsHelper } from '@/CredentialsHelper';
|
||||
import { Agent as HTTPSAgent } from 'https';
|
||||
import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper';
|
||||
import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric';
|
||||
import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus';
|
||||
import * as SecretsHelpers from '@/ExternalSecrets/externalSecretsHelper.ee';
|
||||
|
@ -255,7 +254,7 @@ export class MessageEventBusDestinationWebhook
|
|||
const { msg, confirmCallback } = emitterPayload;
|
||||
let sendResult = false;
|
||||
if (msg.eventName !== eventMessageGenericDestinationTestEvent) {
|
||||
if (!isLogStreamingEnabled()) return sendResult;
|
||||
if (!this.license.isLogStreamingEnabled()) return sendResult;
|
||||
if (!this.hasSubscribedToEvent(msg)) return sendResult;
|
||||
}
|
||||
// at first run, build this.requestOptions with the destination settings
|
||||
|
|
|
@ -1,3 +1,2 @@
|
|||
export { EventMessageTypes } from './EventMessageClasses';
|
||||
export { EventPayloadWorkflow } from './EventMessageClasses/EventMessageWorkflow';
|
||||
export { METRICS_EVENT_NAME, getLabelsForEvent } from './MessageEventBusDestination/Helpers.ee';
|
||||
|
|
|
@ -8,9 +8,10 @@ import { Service } from 'typedi';
|
|||
import EventEmitter from 'events';
|
||||
|
||||
import { CacheService } from '@/services/cache/cache.service';
|
||||
import { METRICS_EVENT_NAME, getLabelsForEvent, type EventMessageTypes } from '@/eventbus';
|
||||
import { type EventMessageTypes } from '@/eventbus';
|
||||
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
|
||||
import { Logger } from '@/Logger';
|
||||
import { EventMessageTypeNames } from 'n8n-workflow';
|
||||
|
||||
@Service()
|
||||
export class MetricsService extends EventEmitter {
|
||||
|
@ -135,7 +136,7 @@ export class MetricsService extends EventEmitter {
|
|||
const counter = new promClient.Counter({
|
||||
name: metricName,
|
||||
help: `Total number of ${event.eventName} events.`,
|
||||
labelNames: Object.keys(getLabelsForEvent(event)),
|
||||
labelNames: Object.keys(this.getLabelsForEvent(event)),
|
||||
});
|
||||
counter.inc(0);
|
||||
this.counters[event.eventName] = counter;
|
||||
|
@ -148,10 +149,52 @@ export class MetricsService extends EventEmitter {
|
|||
if (!config.getEnv('endpoints.metrics.includeMessageEventBusMetrics')) {
|
||||
return;
|
||||
}
|
||||
this.eventBus.on(METRICS_EVENT_NAME, (event: EventMessageTypes) => {
|
||||
this.eventBus.on('metrics.messageEventBus.Event', (event: EventMessageTypes) => {
|
||||
const counter = this.getCounterForEvent(event);
|
||||
if (!counter) return;
|
||||
counter.inc(1);
|
||||
});
|
||||
}
|
||||
|
||||
getLabelsForEvent(event: EventMessageTypes): Record<string, string> {
|
||||
switch (event.__type) {
|
||||
case EventMessageTypeNames.audit:
|
||||
if (event.eventName.startsWith('n8n.audit.user.credentials')) {
|
||||
return config.getEnv('endpoints.metrics.includeCredentialTypeLabel')
|
||||
? {
|
||||
credential_type: this.getLabelValueForCredential(
|
||||
event.payload.credentialType ?? 'unknown',
|
||||
),
|
||||
}
|
||||
: {};
|
||||
}
|
||||
|
||||
if (event.eventName.startsWith('n8n.audit.workflow')) {
|
||||
return config.getEnv('endpoints.metrics.includeWorkflowIdLabel')
|
||||
? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' }
|
||||
: {};
|
||||
}
|
||||
break;
|
||||
|
||||
case EventMessageTypeNames.node:
|
||||
return config.getEnv('endpoints.metrics.includeNodeTypeLabel')
|
||||
? { node_type: this.getLabelValueForNode(event.payload.nodeType ?? 'unknown') }
|
||||
: {};
|
||||
|
||||
case EventMessageTypeNames.workflow:
|
||||
return config.getEnv('endpoints.metrics.includeWorkflowIdLabel')
|
||||
? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' }
|
||||
: {};
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
getLabelValueForNode(nodeType: string) {
|
||||
return nodeType.replace('n8n-nodes-', '').replace(/\./g, '_');
|
||||
}
|
||||
|
||||
getLabelValueForCredential(credentialType: string) {
|
||||
return credentialType.replace(/\./g, '_');
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue