diff --git a/packages/cli/src/TypedEmitter.ts b/packages/cli/src/TypedEmitter.ts new file mode 100644 index 0000000000..176aa9584c --- /dev/null +++ b/packages/cli/src/TypedEmitter.ts @@ -0,0 +1,48 @@ +import { EventEmitter } from 'node:events'; +import debounce from 'lodash/debounce'; + +type Payloads = { + [E in keyof ListenerMap]: unknown; +}; + +type Listener = (payload: Payload) => void; + +export class TypedEmitter> extends EventEmitter { + private debounceWait = 300; // milliseconds + + override on( + eventName: EventName, + listener: Listener, + ) { + return super.on(eventName, listener); + } + + override once( + eventName: EventName, + listener: Listener, + ) { + return super.once(eventName, listener); + } + + override off( + eventName: EventName, + listener: Listener, + ) { + return super.off(eventName, listener); + } + + override emit( + eventName: EventName, + payload?: ListenerMap[EventName], + ): boolean { + return super.emit(eventName, payload); + } + + protected debouncedEmit = debounce( + ( + eventName: EventName, + payload?: ListenerMap[EventName], + ) => super.emit(eventName, payload), + this.debounceWait, + ); +} diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 95e8825e1a..9342a97cda 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -360,11 +360,10 @@ export async function executeWebhook( NodeExecuteFunctions, executionMode, ); - Container.get(WorkflowStatisticsService).emit( - 'nodeFetchedData', - workflow.id, - workflowStartNode, - ); + Container.get(WorkflowStatisticsService).emit('nodeFetchedData', { + workflowId: workflow.id, + node: workflowStartNode, + }); } catch (err) { // Send error response to webhook caller const errorMessage = 'Workflow Webhook Error: Workflow could not be started!'; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index e5a17de7ec..754cfea693 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -525,17 +525,16 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { ); } } finally { - workflowStatisticsService.emit( - 'workflowExecutionCompleted', - this.workflowData, + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, fullRunData, - ); + }); } }, ], nodeFetchedData: [ async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', workflowId, node); + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); }, ], }; @@ -636,11 +635,10 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { this.retryOf, ); } finally { - workflowStatisticsService.emit( - 'workflowExecutionCompleted', - this.workflowData, + workflowStatisticsService.emit('workflowExecutionCompleted', { + workflowData: this.workflowData, fullRunData, - ); + }); } }, async function (this: WorkflowHooks, runData: IRun): Promise { @@ -676,7 +674,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { ], nodeFetchedData: [ async (workflowId: string, node: INode) => { - workflowStatisticsService.emit('nodeFetchedData', workflowId, node); + workflowStatisticsService.emit('nodeFetchedData', { workflowId, node }); }, ], }; diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index cf249c5172..c562448379 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -53,7 +53,7 @@ export class ConcurrencyControlService { this.isEnabled = true; - this.productionQueue.on('concurrency-check', ({ capacity }: { capacity: number }) => { + this.productionQueue.on('concurrency-check', ({ capacity }) => { if (this.shouldReport(capacity)) { void this.telemetry.track('User hit concurrency limit', { threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, @@ -61,12 +61,12 @@ export class ConcurrencyControlService { } }); - this.productionQueue.on('execution-throttled', ({ executionId }: { executionId: string }) => { + this.productionQueue.on('execution-throttled', ({ executionId }) => { this.log('Execution throttled', { executionId }); this.eventService.emit('execution-throttled', { executionId }); }); - this.productionQueue.on('execution-released', async (executionId: string) => { + this.productionQueue.on('execution-released', async (executionId) => { this.log('Execution released', { executionId }); await this.executionRepository.resetStartedAt(executionId); }); diff --git a/packages/cli/src/concurrency/concurrency-queue.ts b/packages/cli/src/concurrency/concurrency-queue.ts index 90c62d2efc..1b578b5a8e 100644 --- a/packages/cli/src/concurrency/concurrency-queue.ts +++ b/packages/cli/src/concurrency/concurrency-queue.ts @@ -1,9 +1,14 @@ import { Service } from 'typedi'; -import { EventEmitter } from 'node:events'; -import debounce from 'lodash/debounce'; +import { TypedEmitter } from '@/TypedEmitter'; + +type ConcurrencyEvents = { + 'execution-throttled': { executionId: string }; + 'execution-released': string; + 'concurrency-check': { capacity: number }; +}; @Service() -export class ConcurrencyQueue extends EventEmitter { +export class ConcurrencyQueue extends TypedEmitter { private readonly queue: Array<{ executionId: string; resolve: () => void; @@ -63,9 +68,4 @@ export class ConcurrencyQueue extends EventEmitter { resolve(); } - - private debouncedEmit = debounce( - (event: string, payload: object) => this.emit(event, payload), - 300, - ); } diff --git a/packages/cli/src/eventbus/event.service.ts b/packages/cli/src/eventbus/event.service.ts index 2df51af22c..2b16ff06ab 100644 --- a/packages/cli/src/eventbus/event.service.ts +++ b/packages/cli/src/eventbus/event.service.ts @@ -1,16 +1,6 @@ -import { EventEmitter } from 'node:events'; import { Service } from 'typedi'; +import { TypedEmitter } from '@/TypedEmitter'; import type { Event } from './event.types'; @Service() -export class EventService extends EventEmitter { - emit(eventName: K, arg?: Event[K]) { - super.emit(eventName, arg); - return true; - } - - on(eventName: K, handler: (arg: Event[K]) => void) { - super.on(eventName, handler); - return this; - } -} +export class EventService extends TypedEmitter {} diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 647ab40acf..a946348430 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -1,4 +1,3 @@ -import { EventEmitter } from 'events'; import { ServerResponse } from 'http'; import type { Server } from 'http'; import type { Socket } from 'net'; @@ -17,6 +16,11 @@ import { OrchestrationService } from '@/services/orchestration.service'; import { SSEPush } from './sse.push'; import { WebSocketPush } from './websocket.push'; import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; +import { TypedEmitter } from '@/TypedEmitter'; + +type PushEvents = { + editorUiConnected: string; +}; const useWebSockets = config.getEnv('push.backend') === 'websocket'; @@ -28,7 +32,7 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket'; * @emits message when a message is received from a client */ @Service() -export class Push extends EventEmitter { +export class Push extends TypedEmitter { private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); constructor(private readonly orchestrationService: OrchestrationService) { @@ -37,7 +41,6 @@ export class Push extends EventEmitter { handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { const { - user, ws, query: { pushRef }, } = req; diff --git a/packages/cli/src/services/cache/cache.service.ts b/packages/cli/src/services/cache/cache.service.ts index 8e9a4dc95c..75dad03b49 100644 --- a/packages/cli/src/services/cache/cache.service.ts +++ b/packages/cli/src/services/cache/cache.service.ts @@ -1,5 +1,3 @@ -import EventEmitter from 'node:events'; - import Container, { Service } from 'typedi'; import { caching } from 'cache-manager'; import { ApplicationError, jsonStringify } from 'n8n-workflow'; @@ -10,14 +8,20 @@ import { MalformedRefreshValueError } from '@/errors/cache-errors/malformed-refr import type { TaggedRedisCache, TaggedMemoryCache, - CacheEvent, MaybeHash, Hash, } from '@/services/cache/cache.types'; import { TIME } from '@/constants'; +import { TypedEmitter } from '@/TypedEmitter'; + +type CacheEvents = { + 'metrics.cache.hit': never; + 'metrics.cache.miss': never; + 'metrics.cache.update': never; +}; @Service() -export class CacheService extends EventEmitter { +export class CacheService extends TypedEmitter { private cache: TaggedRedisCache | TaggedMemoryCache; async init() { @@ -66,10 +70,6 @@ export class CacheService extends EventEmitter { await this.cache.store.reset(); } - emit(event: CacheEvent, ...args: unknown[]) { - return super.emit(event, ...args); - } - isRedis() { return this.cache.kind === 'redis'; } diff --git a/packages/cli/src/services/cache/cache.types.ts b/packages/cli/src/services/cache/cache.types.ts index 4e96b8012a..f598e22494 100644 --- a/packages/cli/src/services/cache/cache.types.ts +++ b/packages/cli/src/services/cache/cache.types.ts @@ -8,5 +8,3 @@ export type TaggedMemoryCache = MemoryCache & { kind: 'memory' }; export type Hash = Record; export type MaybeHash = Hash | undefined; - -export type CacheEvent = `metrics.cache.${'hit' | 'miss' | 'update'}`; diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 3c8c052da7..cabcf5996b 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -1,4 +1,3 @@ -import { EventEmitter } from 'node:events'; import config from '@/config'; import { Service } from 'typedi'; import { TIME } from '@/constants'; @@ -6,9 +5,15 @@ import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; import { Logger } from '@/Logger'; import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; import { RedisClientService } from '@/services/redis/redis-client.service'; +import { TypedEmitter } from '@/TypedEmitter'; + +type MultiMainEvents = { + 'leader-stepdown': never; + 'leader-takeover': never; +}; @Service() -export class MultiMainSetup extends EventEmitter { +export class MultiMainSetup extends TypedEmitter { constructor( private readonly logger: Logger, private readonly redisPublisher: RedisServicePubSubPublisher, diff --git a/packages/cli/src/services/workflow-statistics.service.ts b/packages/cli/src/services/workflow-statistics.service.ts index 516732add8..9311ef885d 100644 --- a/packages/cli/src/services/workflow-statistics.service.ts +++ b/packages/cli/src/services/workflow-statistics.service.ts @@ -1,29 +1,48 @@ -import { EventEmitter } from 'events'; -import { Container, Service } from 'typedi'; +import { Service } from 'typedi'; import type { INode, IRun, IWorkflowBase } from 'n8n-workflow'; import { StatisticsNames } from '@db/entities/WorkflowStatistics'; import { WorkflowStatisticsRepository } from '@db/repositories/workflowStatistics.repository'; import { UserService } from '@/services/user.service'; import { Logger } from '@/Logger'; import { OwnershipService } from './ownership.service'; +import { TypedEmitter } from '@/TypedEmitter'; + +type WorkflowStatisticsEvents = { + nodeFetchedData: { workflowId: string; node: INode }; + workflowExecutionCompleted: { workflowData: IWorkflowBase; fullRunData: IRun }; + 'telemetry.onFirstProductionWorkflowSuccess': { + project_id: string; + workflow_id: string; + user_id: string; + }; + 'telemetry.onFirstWorkflowDataLoad': { + user_id: string; + project_id: string; + workflow_id: string; + node_type: string; + node_id: string; + }; +}; @Service() -export class WorkflowStatisticsService extends EventEmitter { +export class WorkflowStatisticsService extends TypedEmitter { constructor( private readonly logger: Logger, private readonly repository: WorkflowStatisticsRepository, private readonly ownershipService: OwnershipService, + private readonly userService: UserService, ) { super({ captureRejections: true }); if ('SKIP_STATISTICS_EVENTS' in process.env) return; this.on( 'nodeFetchedData', - async (workflowId, node) => await this.nodeFetchedData(workflowId, node), + async ({ workflowId, node }) => await this.nodeFetchedData(workflowId, node), ); this.on( 'workflowExecutionCompleted', - async (workflowData, runData) => await this.workflowExecutionCompleted(workflowData, runData), + async ({ workflowData, fullRunData }) => + await this.workflowExecutionCompleted(workflowData, fullRunData), ); } @@ -49,18 +68,18 @@ export class WorkflowStatisticsService extends EventEmitter { const upsertResult = await this.repository.upsertWorkflowStatistics(name, workflowId); if (name === StatisticsNames.productionSuccess && upsertResult === 'insert') { - const project = await Container.get(OwnershipService).getWorkflowProjectCached(workflowId); + const project = await this.ownershipService.getWorkflowProjectCached(workflowId); if (project.type === 'personal') { - const owner = await Container.get(OwnershipService).getProjectOwnerCached(project.id); + const owner = await this.ownershipService.getProjectOwnerCached(project.id); const metrics = { project_id: project.id, workflow_id: workflowId, - user_id: owner?.id, + user_id: owner!.id, }; if (owner && !owner.settings?.userActivated) { - await Container.get(UserService).updateSettings(owner.id, { + await this.userService.updateSettings(owner.id, { firstSuccessfulWorkflowId: workflowId, userActivated: true, userActivatedAt: runData.startedAt.getTime(), @@ -90,7 +109,7 @@ export class WorkflowStatisticsService extends EventEmitter { const owner = await this.ownershipService.getProjectOwnerCached(project.id); let metrics = { - user_id: owner?.id, + user_id: owner!.id, project_id: project.id, workflow_id: workflowId, node_type: node.type, @@ -111,29 +130,3 @@ export class WorkflowStatisticsService extends EventEmitter { this.emit('telemetry.onFirstWorkflowDataLoad', metrics); } } - -export declare interface WorkflowStatisticsService { - on( - event: 'nodeFetchedData', - listener: (workflowId: string | undefined | null, node: INode) => void, - ): this; - on( - event: 'workflowExecutionCompleted', - listener: (workflowData: IWorkflowBase, runData: IRun) => void, - ): this; - on( - event: 'telemetry.onFirstProductionWorkflowSuccess', - listener: (metrics: { user_id: string; workflow_id: string }) => void, - ): this; - on( - event: 'telemetry.onFirstWorkflowDataLoad', - listener: (metrics: { - user_id: string; - workflow_id: string; - node_type: string; - node_id: string; - credential_type?: string; - credential_id?: string; - }) => void, - ): this; -} diff --git a/packages/cli/test/unit/services/workflow-statistics.service.test.ts b/packages/cli/test/unit/services/workflow-statistics.service.test.ts index da1338f8eb..9bd9864bb0 100644 --- a/packages/cli/test/unit/services/workflow-statistics.service.test.ts +++ b/packages/cli/test/unit/services/workflow-statistics.service.test.ts @@ -48,6 +48,7 @@ describe('WorkflowStatisticsService', () => { mock(), new WorkflowStatisticsRepository(dataSource, globalConfig), ownershipService, + userService, ); const onFirstProductionWorkflowSuccess = jest.fn();