refactor(core): Use type-safe event emitters (no-changelog) (#10234)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-07-30 13:23:01 +02:00 committed by GitHub
parent 99dc56c7a1
commit 1fca3af335
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 124 additions and 89 deletions

View file

@ -0,0 +1,48 @@
import { EventEmitter } from 'node:events';
import debounce from 'lodash/debounce';
type Payloads<ListenerMap> = {
[E in keyof ListenerMap]: unknown;
};
type Listener<Payload> = (payload: Payload) => void;
export class TypedEmitter<ListenerMap extends Payloads<ListenerMap>> extends EventEmitter {
private debounceWait = 300; // milliseconds
override on<EventName extends keyof ListenerMap & string>(
eventName: EventName,
listener: Listener<ListenerMap[EventName]>,
) {
return super.on(eventName, listener);
}
override once<EventName extends keyof ListenerMap & string>(
eventName: EventName,
listener: Listener<ListenerMap[EventName]>,
) {
return super.once(eventName, listener);
}
override off<EventName extends keyof ListenerMap & string>(
eventName: EventName,
listener: Listener<ListenerMap[EventName]>,
) {
return super.off(eventName, listener);
}
override emit<EventName extends keyof ListenerMap & string>(
eventName: EventName,
payload?: ListenerMap[EventName],
): boolean {
return super.emit(eventName, payload);
}
protected debouncedEmit = debounce(
<EventName extends keyof ListenerMap & string>(
eventName: EventName,
payload?: ListenerMap[EventName],
) => super.emit(eventName, payload),
this.debounceWait,
);
}

View file

@ -360,11 +360,10 @@ export async function executeWebhook(
NodeExecuteFunctions, NodeExecuteFunctions,
executionMode, executionMode,
); );
Container.get(WorkflowStatisticsService).emit( Container.get(WorkflowStatisticsService).emit('nodeFetchedData', {
'nodeFetchedData', workflowId: workflow.id,
workflow.id, node: workflowStartNode,
workflowStartNode, });
);
} catch (err) { } catch (err) {
// Send error response to webhook caller // Send error response to webhook caller
const errorMessage = 'Workflow Webhook Error: Workflow could not be started!'; const errorMessage = 'Workflow Webhook Error: Workflow could not be started!';

View file

@ -525,17 +525,16 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
); );
} }
} finally { } finally {
workflowStatisticsService.emit( workflowStatisticsService.emit('workflowExecutionCompleted', {
'workflowExecutionCompleted', workflowData: this.workflowData,
this.workflowData,
fullRunData, fullRunData,
); });
} }
}, },
], ],
nodeFetchedData: [ nodeFetchedData: [
async (workflowId: string, node: INode) => { async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', workflowId, node); workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
}, },
], ],
}; };
@ -636,11 +635,10 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
this.retryOf, this.retryOf,
); );
} finally { } finally {
workflowStatisticsService.emit( workflowStatisticsService.emit('workflowExecutionCompleted', {
'workflowExecutionCompleted', workflowData: this.workflowData,
this.workflowData,
fullRunData, fullRunData,
); });
} }
}, },
async function (this: WorkflowHooks, runData: IRun): Promise<void> { async function (this: WorkflowHooks, runData: IRun): Promise<void> {
@ -676,7 +674,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
], ],
nodeFetchedData: [ nodeFetchedData: [
async (workflowId: string, node: INode) => { async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', workflowId, node); workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
}, },
], ],
}; };

View file

@ -53,7 +53,7 @@ export class ConcurrencyControlService {
this.isEnabled = true; this.isEnabled = true;
this.productionQueue.on('concurrency-check', ({ capacity }: { capacity: number }) => { this.productionQueue.on('concurrency-check', ({ capacity }) => {
if (this.shouldReport(capacity)) { if (this.shouldReport(capacity)) {
void this.telemetry.track('User hit concurrency limit', { void this.telemetry.track('User hit concurrency limit', {
threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity,
@ -61,12 +61,12 @@ export class ConcurrencyControlService {
} }
}); });
this.productionQueue.on('execution-throttled', ({ executionId }: { executionId: string }) => { this.productionQueue.on('execution-throttled', ({ executionId }) => {
this.log('Execution throttled', { executionId }); this.log('Execution throttled', { executionId });
this.eventService.emit('execution-throttled', { executionId }); this.eventService.emit('execution-throttled', { executionId });
}); });
this.productionQueue.on('execution-released', async (executionId: string) => { this.productionQueue.on('execution-released', async (executionId) => {
this.log('Execution released', { executionId }); this.log('Execution released', { executionId });
await this.executionRepository.resetStartedAt(executionId); await this.executionRepository.resetStartedAt(executionId);
}); });

View file

@ -1,9 +1,14 @@
import { Service } from 'typedi'; import { Service } from 'typedi';
import { EventEmitter } from 'node:events'; import { TypedEmitter } from '@/TypedEmitter';
import debounce from 'lodash/debounce';
type ConcurrencyEvents = {
'execution-throttled': { executionId: string };
'execution-released': string;
'concurrency-check': { capacity: number };
};
@Service() @Service()
export class ConcurrencyQueue extends EventEmitter { export class ConcurrencyQueue extends TypedEmitter<ConcurrencyEvents> {
private readonly queue: Array<{ private readonly queue: Array<{
executionId: string; executionId: string;
resolve: () => void; resolve: () => void;
@ -63,9 +68,4 @@ export class ConcurrencyQueue extends EventEmitter {
resolve(); resolve();
} }
private debouncedEmit = debounce(
(event: string, payload: object) => this.emit(event, payload),
300,
);
} }

View file

@ -1,16 +1,6 @@
import { EventEmitter } from 'node:events';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { TypedEmitter } from '@/TypedEmitter';
import type { Event } from './event.types'; import type { Event } from './event.types';
@Service() @Service()
export class EventService extends EventEmitter { export class EventService extends TypedEmitter<Event> {}
emit<K extends keyof Event>(eventName: K, arg?: Event[K]) {
super.emit(eventName, arg);
return true;
}
on<K extends keyof Event>(eventName: K, handler: (arg: Event[K]) => void) {
super.on(eventName, handler);
return this;
}
}

View file

@ -1,4 +1,3 @@
import { EventEmitter } from 'events';
import { ServerResponse } from 'http'; import { ServerResponse } from 'http';
import type { Server } from 'http'; import type { Server } from 'http';
import type { Socket } from 'net'; import type { Socket } from 'net';
@ -17,6 +16,11 @@ import { OrchestrationService } from '@/services/orchestration.service';
import { SSEPush } from './sse.push'; import { SSEPush } from './sse.push';
import { WebSocketPush } from './websocket.push'; import { WebSocketPush } from './websocket.push';
import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
import { TypedEmitter } from '@/TypedEmitter';
type PushEvents = {
editorUiConnected: string;
};
const useWebSockets = config.getEnv('push.backend') === 'websocket'; const useWebSockets = config.getEnv('push.backend') === 'websocket';
@ -28,7 +32,7 @@ const useWebSockets = config.getEnv('push.backend') === 'websocket';
* @emits message when a message is received from a client * @emits message when a message is received from a client
*/ */
@Service() @Service()
export class Push extends EventEmitter { export class Push extends TypedEmitter<PushEvents> {
private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);
constructor(private readonly orchestrationService: OrchestrationService) { constructor(private readonly orchestrationService: OrchestrationService) {
@ -37,7 +41,6 @@ export class Push extends EventEmitter {
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
const { const {
user,
ws, ws,
query: { pushRef }, query: { pushRef },
} = req; } = req;

View file

@ -1,5 +1,3 @@
import EventEmitter from 'node:events';
import Container, { Service } from 'typedi'; import Container, { Service } from 'typedi';
import { caching } from 'cache-manager'; import { caching } from 'cache-manager';
import { ApplicationError, jsonStringify } from 'n8n-workflow'; import { ApplicationError, jsonStringify } from 'n8n-workflow';
@ -10,14 +8,20 @@ import { MalformedRefreshValueError } from '@/errors/cache-errors/malformed-refr
import type { import type {
TaggedRedisCache, TaggedRedisCache,
TaggedMemoryCache, TaggedMemoryCache,
CacheEvent,
MaybeHash, MaybeHash,
Hash, Hash,
} from '@/services/cache/cache.types'; } from '@/services/cache/cache.types';
import { TIME } from '@/constants'; import { TIME } from '@/constants';
import { TypedEmitter } from '@/TypedEmitter';
type CacheEvents = {
'metrics.cache.hit': never;
'metrics.cache.miss': never;
'metrics.cache.update': never;
};
@Service() @Service()
export class CacheService extends EventEmitter { export class CacheService extends TypedEmitter<CacheEvents> {
private cache: TaggedRedisCache | TaggedMemoryCache; private cache: TaggedRedisCache | TaggedMemoryCache;
async init() { async init() {
@ -66,10 +70,6 @@ export class CacheService extends EventEmitter {
await this.cache.store.reset(); await this.cache.store.reset();
} }
emit(event: CacheEvent, ...args: unknown[]) {
return super.emit(event, ...args);
}
isRedis() { isRedis() {
return this.cache.kind === 'redis'; return this.cache.kind === 'redis';
} }

View file

@ -8,5 +8,3 @@ export type TaggedMemoryCache = MemoryCache & { kind: 'memory' };
export type Hash<T = unknown> = Record<string, T>; export type Hash<T = unknown> = Record<string, T>;
export type MaybeHash<T> = Hash<T> | undefined; export type MaybeHash<T> = Hash<T> | undefined;
export type CacheEvent = `metrics.cache.${'hit' | 'miss' | 'update'}`;

View file

@ -1,4 +1,3 @@
import { EventEmitter } from 'node:events';
import config from '@/config'; import config from '@/config';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { TIME } from '@/constants'; import { TIME } from '@/constants';
@ -6,9 +5,15 @@ import { ErrorReporterProxy as EventReporter } from 'n8n-workflow';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import { RedisClientService } from '@/services/redis/redis-client.service'; import { RedisClientService } from '@/services/redis/redis-client.service';
import { TypedEmitter } from '@/TypedEmitter';
type MultiMainEvents = {
'leader-stepdown': never;
'leader-takeover': never;
};
@Service() @Service()
export class MultiMainSetup extends EventEmitter { export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly redisPublisher: RedisServicePubSubPublisher, private readonly redisPublisher: RedisServicePubSubPublisher,

View file

@ -1,29 +1,48 @@
import { EventEmitter } from 'events'; import { Service } from 'typedi';
import { Container, Service } from 'typedi';
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow'; import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { StatisticsNames } from '@db/entities/WorkflowStatistics'; import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import { WorkflowStatisticsRepository } from '@db/repositories/workflowStatistics.repository'; import { WorkflowStatisticsRepository } from '@db/repositories/workflowStatistics.repository';
import { UserService } from '@/services/user.service'; import { UserService } from '@/services/user.service';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { OwnershipService } from './ownership.service'; import { OwnershipService } from './ownership.service';
import { TypedEmitter } from '@/TypedEmitter';
type WorkflowStatisticsEvents = {
nodeFetchedData: { workflowId: string; node: INode };
workflowExecutionCompleted: { workflowData: IWorkflowBase; fullRunData: IRun };
'telemetry.onFirstProductionWorkflowSuccess': {
project_id: string;
workflow_id: string;
user_id: string;
};
'telemetry.onFirstWorkflowDataLoad': {
user_id: string;
project_id: string;
workflow_id: string;
node_type: string;
node_id: string;
};
};
@Service() @Service()
export class WorkflowStatisticsService extends EventEmitter { export class WorkflowStatisticsService extends TypedEmitter<WorkflowStatisticsEvents> {
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly repository: WorkflowStatisticsRepository, private readonly repository: WorkflowStatisticsRepository,
private readonly ownershipService: OwnershipService, private readonly ownershipService: OwnershipService,
private readonly userService: UserService,
) { ) {
super({ captureRejections: true }); super({ captureRejections: true });
if ('SKIP_STATISTICS_EVENTS' in process.env) return; if ('SKIP_STATISTICS_EVENTS' in process.env) return;
this.on( this.on(
'nodeFetchedData', 'nodeFetchedData',
async (workflowId, node) => await this.nodeFetchedData(workflowId, node), async ({ workflowId, node }) => await this.nodeFetchedData(workflowId, node),
); );
this.on( this.on(
'workflowExecutionCompleted', 'workflowExecutionCompleted',
async (workflowData, runData) => await this.workflowExecutionCompleted(workflowData, runData), async ({ workflowData, fullRunData }) =>
await this.workflowExecutionCompleted(workflowData, fullRunData),
); );
} }
@ -49,18 +68,18 @@ export class WorkflowStatisticsService extends EventEmitter {
const upsertResult = await this.repository.upsertWorkflowStatistics(name, workflowId); const upsertResult = await this.repository.upsertWorkflowStatistics(name, workflowId);
if (name === StatisticsNames.productionSuccess && upsertResult === 'insert') { if (name === StatisticsNames.productionSuccess && upsertResult === 'insert') {
const project = await Container.get(OwnershipService).getWorkflowProjectCached(workflowId); const project = await this.ownershipService.getWorkflowProjectCached(workflowId);
if (project.type === 'personal') { if (project.type === 'personal') {
const owner = await Container.get(OwnershipService).getProjectOwnerCached(project.id); const owner = await this.ownershipService.getProjectOwnerCached(project.id);
const metrics = { const metrics = {
project_id: project.id, project_id: project.id,
workflow_id: workflowId, workflow_id: workflowId,
user_id: owner?.id, user_id: owner!.id,
}; };
if (owner && !owner.settings?.userActivated) { if (owner && !owner.settings?.userActivated) {
await Container.get(UserService).updateSettings(owner.id, { await this.userService.updateSettings(owner.id, {
firstSuccessfulWorkflowId: workflowId, firstSuccessfulWorkflowId: workflowId,
userActivated: true, userActivated: true,
userActivatedAt: runData.startedAt.getTime(), userActivatedAt: runData.startedAt.getTime(),
@ -90,7 +109,7 @@ export class WorkflowStatisticsService extends EventEmitter {
const owner = await this.ownershipService.getProjectOwnerCached(project.id); const owner = await this.ownershipService.getProjectOwnerCached(project.id);
let metrics = { let metrics = {
user_id: owner?.id, user_id: owner!.id,
project_id: project.id, project_id: project.id,
workflow_id: workflowId, workflow_id: workflowId,
node_type: node.type, node_type: node.type,
@ -111,29 +130,3 @@ export class WorkflowStatisticsService extends EventEmitter {
this.emit('telemetry.onFirstWorkflowDataLoad', metrics); this.emit('telemetry.onFirstWorkflowDataLoad', metrics);
} }
} }
export declare interface WorkflowStatisticsService {
on(
event: 'nodeFetchedData',
listener: (workflowId: string | undefined | null, node: INode) => void,
): this;
on(
event: 'workflowExecutionCompleted',
listener: (workflowData: IWorkflowBase, runData: IRun) => void,
): this;
on(
event: 'telemetry.onFirstProductionWorkflowSuccess',
listener: (metrics: { user_id: string; workflow_id: string }) => void,
): this;
on(
event: 'telemetry.onFirstWorkflowDataLoad',
listener: (metrics: {
user_id: string;
workflow_id: string;
node_type: string;
node_id: string;
credential_type?: string;
credential_id?: string;
}) => void,
): this;
}

View file

@ -48,6 +48,7 @@ describe('WorkflowStatisticsService', () => {
mock(), mock(),
new WorkflowStatisticsRepository(dataSource, globalConfig), new WorkflowStatisticsRepository(dataSource, globalConfig),
ownershipService, ownershipService,
userService,
); );
const onFirstProductionWorkflowSuccess = jest.fn(); const onFirstProductionWorkflowSuccess = jest.fn();