From 1a0e28555385f682aa335115c4d72e671c0bdc85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 12 Jan 2024 11:48:58 +0100 Subject: [PATCH] feat(core): Implement inter-main communication for test webhooks in multi-main setup (#8267) --- packages/cli/src/TestWebhooks.ts | 42 ++++++++--- packages/cli/src/constants.ts | 6 +- packages/cli/src/push/abstract.push.ts | 75 +++++++++++-------- packages/cli/src/push/index.ts | 17 ++--- packages/cli/src/push/sse.push.ts | 10 ++- packages/cli/src/push/websocket.push.ts | 7 +- .../cli/src/services/cache/cache.service.ts | 16 ++++ .../src/services/cache/redis.cache-manager.ts | 4 + .../orchestration/main/MultiMainSetup.ee.ts | 25 ++----- .../main/handleCommandMessageMain.ts | 46 +++++++++++- .../worker/handleCommandMessageWorker.ts | 7 ++ .../services/redis/RedisServiceCommands.ts | 34 ++++++--- .../test-webhook-registrations.service.ts | 14 ++++ .../cli/src/workflows/workflow.service.ts | 2 +- .../test/integration/workflow.service.test.ts | 16 +++- packages/cli/test/unit/TestWebhooks.test.ts | 2 +- .../cli/test/unit/push/websocket.push.test.ts | 4 +- 17 files changed, 231 insertions(+), 96 deletions(-) diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/TestWebhooks.ts index df5dbecded..82d2d9e7ef 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/TestWebhooks.ts @@ -17,13 +17,14 @@ import type { import { Push } from '@/push'; import { NodeTypes } from '@/NodeTypes'; import * as WebhookHelpers from '@/WebhookHelpers'; -import { TIME } from '@/constants'; +import { TEST_WEBHOOK_TIMEOUT } from '@/constants'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { WorkflowMissingIdError } from '@/errors/workflow-missing-id.error'; import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error'; import * as NodeExecuteFunctions from 'n8n-core'; import { removeTrailingSlash } from './utils'; import { TestWebhookRegistrationsService } from '@/services/test-webhook-registrations.service'; +import { MultiMainSetup } from './services/orchestration/main/MultiMainSetup.ee'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; @Service() @@ -32,6 +33,7 @@ export class TestWebhooks implements IWebhookManager { private readonly push: Push, private readonly nodeTypes: NodeTypes, private readonly registrations: TestWebhookRegistrationsService, + private readonly multiMainSetup: MultiMainSetup, ) {} private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {}; @@ -89,7 +91,6 @@ export class TestWebhooks implements IWebhookManager { } const { destinationNode, sessionId, workflowEntity } = registration; - const timeout = this.timeouts[key]; const workflow = this.toWorkflow(workflowEntity); @@ -135,15 +136,34 @@ export class TestWebhooks implements IWebhookManager { } } catch {} - // Delete webhook also if an error is thrown - if (timeout) clearTimeout(timeout); + /** + * Multi-main setup: In a manual webhook execution, the main process that + * handles a webhook might not be the same as the main process that created + * the webhook. If so, after the test webhook has been successfully executed, + * the handler process commands the creator process to clear its test webhooks. + */ + if ( + this.multiMainSetup.isEnabled && + sessionId && + !this.push.getBackend().hasSessionId(sessionId) + ) { + const payload = { webhookKey: key, workflowEntity, sessionId }; + void this.multiMainSetup.publish('clear-test-webhooks', payload); + return; + } - await this.registrations.deregisterAll(); + this.clearTimeout(key); await this.deactivateWebhooks(workflow); }); } + clearTimeout(key: string) { + const timeout = this.timeouts[key]; + + if (timeout) clearTimeout(timeout); + } + async getWebhookMethods(path: string) { const allKeys = await this.registrations.getAllKeys(); @@ -208,7 +228,7 @@ export class TestWebhooks implements IWebhookManager { return false; // no webhooks found to start a workflow } - const timeout = setTimeout(async () => this.cancelWebhook(workflow.id), 2 * TIME.MINUTE); + const timeout = setTimeout(async () => this.cancelWebhook(workflow.id), TEST_WEBHOOK_TIMEOUT); for (const webhook of webhooks) { const key = this.registrations.toKey(webhook); @@ -270,13 +290,11 @@ export class TestWebhooks implements IWebhookManager { const { sessionId, workflowEntity } = registration; - const timeout = this.timeouts[key]; - const workflow = this.toWorkflow(workflowEntity); if (workflowEntity.id !== workflowId) continue; - clearTimeout(timeout); + this.clearTimeout(key); if (sessionId !== undefined) { try { @@ -359,13 +377,13 @@ export class TestWebhooks implements IWebhookManager { if (staticData) workflow.staticData = staticData; await workflow.deleteWebhook(webhook, NodeExecuteFunctions, 'internal', 'update'); - - await this.registrations.deregister(webhook); } + + await this.registrations.deregisterAll(); } /** - * Convert a `WorkflowEntity` from `typeorm` to a `Workflow` from `n8n-workflow`. + * Convert a `WorkflowEntity` from `typeorm` to a temporary `Workflow` from `n8n-workflow`. */ toWorkflow(workflowEntity: IWorkflowDb) { return new Workflow({ diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index 7d4e4fbf61..8603d29963 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -109,8 +109,12 @@ export const TIME = { MINUTE: 60 * 1000, HOUR: 60 * 60 * 1000, DAY: 24 * 60 * 60 * 1000, -}; +} as const; export const MIN_PASSWORD_CHAR_LENGTH = 8; export const MAX_PASSWORD_CHAR_LENGTH = 64; + +export const TEST_WEBHOOK_TIMEOUT = 2 * TIME.MINUTE; + +export const TEST_WEBHOOK_TIMEOUT_BUFFER = 30 * TIME.SECOND; diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 42adadaa7b..655e5486b5 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -3,6 +3,7 @@ import { assert, jsonStringify } from 'n8n-workflow'; import type { IPushDataType } from '@/Interfaces'; import type { Logger } from '@/Logger'; import type { User } from '@db/entities/User'; +import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; /** * Abstract class for two-way push communication. @@ -16,19 +17,23 @@ export abstract class AbstractPush extends EventEmitter { protected userIdBySessionId: Record = {}; protected abstract close(connection: T): void; - protected abstract sendToOne(connection: T, data: string): void; + protected abstract sendToOneConnection(connection: T, data: string): void; - constructor(protected readonly logger: Logger) { + constructor( + protected readonly logger: Logger, + private readonly multiMainSetup: MultiMainSetup, + ) { super(); } - protected add(sessionId: string, userId: User['id'], connection: T): void { + protected add(sessionId: string, userId: User['id'], connection: T) { const { connections, userIdBySessionId: userIdsBySessionId } = this; this.logger.debug('Add editor-UI session', { sessionId }); const existingConnection = connections[sessionId]; + if (existingConnection) { - // Make sure to remove existing connection with the same id + // Make sure to remove existing connection with the same ID this.close(existingConnection); } @@ -36,46 +41,58 @@ export abstract class AbstractPush extends EventEmitter { userIdsBySessionId[sessionId] = userId; } - protected onMessageReceived(sessionId: string, msg: unknown): void { + protected onMessageReceived(sessionId: string, msg: unknown) { this.logger.debug('Received message from editor-UI', { sessionId, msg }); + const userId = this.userIdBySessionId[sessionId]; - this.emit('message', { - sessionId, - userId, - msg, - }); + + this.emit('message', { sessionId, userId, msg }); } - protected remove(sessionId?: string): void { - if (sessionId !== undefined) { - this.logger.debug('Remove editor-UI session', { sessionId }); - delete this.connections[sessionId]; - delete this.userIdBySessionId[sessionId]; - } + protected remove(sessionId?: string) { + if (!sessionId) return; + + this.logger.debug('Removed editor-UI session', { sessionId }); + + delete this.connections[sessionId]; + delete this.userIdBySessionId[sessionId]; } - private sendToSessions(type: IPushDataType, data: D, sessionIds: string[]) { + private sendToSessions(type: IPushDataType, data: unknown, sessionIds: string[]) { this.logger.debug(`Send data of type "${type}" to editor-UI`, { dataType: type, sessionIds: sessionIds.join(', '), }); - const sendData = jsonStringify({ type, data }, { replaceCircularRefs: true }); + const stringifiedPayload = jsonStringify({ type, data }, { replaceCircularRefs: true }); for (const sessionId of sessionIds) { const connection = this.connections[sessionId]; assert(connection); - this.sendToOne(connection, sendData); + this.sendToOneConnection(connection, stringifiedPayload); } } - broadcast(type: IPushDataType, data?: D) { + sendToAllSessions(type: IPushDataType, data?: unknown) { this.sendToSessions(type, data, Object.keys(this.connections)); } - send(type: IPushDataType, data: D, sessionId: string) { - const { connections } = this; - if (connections[sessionId] === undefined) { + sendToOneSession(type: IPushDataType, data: unknown, sessionId: string) { + /** + * Multi-main setup: In a manual webhook execution, the main process that + * handles a webhook might not be the same as the main process that created + * the webhook. If so, the handler process commands the creator process to + * relay the former's execution lifecyle events to the creator's frontend. + */ + if (this.multiMainSetup.isEnabled && !this.hasSessionId(sessionId)) { + const payload = { type, args: data, sessionId }; + + void this.multiMainSetup.publish('relay-execution-lifecycle-event', payload); + + return; + } + + if (this.connections[sessionId] === undefined) { this.logger.error(`The session "${sessionId}" is not registered.`, { sessionId }); return; } @@ -83,10 +100,7 @@ export abstract class AbstractPush extends EventEmitter { this.sendToSessions(type, data, [sessionId]); } - /** - * Sends the given data to given users' connections - */ - sendToUsers(type: IPushDataType, data: D, userIds: Array) { + sendToUsers(type: IPushDataType, data: unknown, userIds: Array) { const { connections } = this; const userSessionIds = Object.keys(connections).filter((sessionId) => userIds.includes(this.userIdBySessionId[sessionId]), @@ -95,9 +109,6 @@ export abstract class AbstractPush extends EventEmitter { this.sendToSessions(type, data, userSessionIds); } - /** - * Closes all push existing connections - */ closeAllConnections() { for (const sessionId in this.connections) { // Signal the connection that we want to close it. @@ -107,4 +118,8 @@ export abstract class AbstractPush extends EventEmitter { this.close(this.connections[sessionId]); } } + + hasSessionId(sessionId: string) { + return this.connections[sessionId] !== undefined; + } } diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index d8705a475c..e740821321 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -34,9 +34,7 @@ export class Push extends EventEmitter { constructor() { super(); - if (useWebSockets) { - this.backend.on('message', (msg) => this.emit('message', msg)); - } + if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg)); } handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { @@ -44,6 +42,7 @@ export class Push extends EventEmitter { userId, query: { sessionId }, } = req; + if (req.ws) { (this.backend as WebSocketPush).add(sessionId, userId, req.ws); } else if (!useWebSockets) { @@ -56,24 +55,24 @@ export class Push extends EventEmitter { this.emit('editorUiConnected', sessionId); } - broadcast(type: IPushDataType, data?: D) { - this.backend.broadcast(type, data); + broadcast(type: IPushDataType, data?: unknown) { + this.backend.sendToAllSessions(type, data); } - send(type: IPushDataType, data: D, sessionId: string) { - this.backend.send(type, data, sessionId); + send(type: IPushDataType, data: unknown, sessionId: string) { + this.backend.sendToOneSession(type, data, sessionId); } getBackend() { return this.backend; } - sendToUsers(type: IPushDataType, data: D, userIds: Array) { + sendToUsers(type: IPushDataType, data: unknown, userIds: Array) { this.backend.sendToUsers(type, data, userIds); } @OnShutdown() - onShutdown(): void { + onShutdown() { this.backend.closeAllConnections(); } } diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts index f4c75a3205..17f4c7ad9e 100644 --- a/packages/cli/src/push/sse.push.ts +++ b/packages/cli/src/push/sse.push.ts @@ -4,6 +4,7 @@ import { Logger } from '@/Logger'; import { AbstractPush } from './abstract.push'; import type { PushRequest, PushResponse } from './types'; import type { User } from '@db/entities/User'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; type Connection = { req: PushRequest; res: PushResponse }; @@ -13,8 +14,9 @@ export class SSEPush extends AbstractPush { readonly connections: Record = {}; - constructor(logger: Logger) { - super(logger); + constructor(logger: Logger, multiMainSetup: MultiMainSetup) { + super(logger, multiMainSetup); + this.channel.on('disconnect', (channel, { req }) => { this.remove(req?.query?.sessionId); }); @@ -25,12 +27,12 @@ export class SSEPush extends AbstractPush { this.channel.addClient(connection.req, connection.res); } - protected close({ res }: Connection): void { + protected close({ res }: Connection) { res.end(); this.channel.removeClient(res); } - protected sendToOne(connection: Connection, data: string): void { + protected sendToOneConnection(connection: Connection, data: string) { this.channel.send(data, [connection.res]); } } diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts index 08ebad2e9d..6f47b1fb62 100644 --- a/packages/cli/src/push/websocket.push.ts +++ b/packages/cli/src/push/websocket.push.ts @@ -3,6 +3,7 @@ import { Service } from 'typedi'; import { Logger } from '@/Logger'; import { AbstractPush } from './abstract.push'; import type { User } from '@db/entities/User'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; function heartbeat(this: WebSocket) { this.isAlive = true; @@ -10,8 +11,8 @@ function heartbeat(this: WebSocket) { @Service() export class WebSocketPush extends AbstractPush { - constructor(logger: Logger) { - super(logger); + constructor(logger: Logger, multiMainSetup: MultiMainSetup) { + super(logger, multiMainSetup); // Ping all connected clients every 60 seconds setInterval(() => this.pingAll(), 60 * 1000); @@ -51,7 +52,7 @@ export class WebSocketPush extends AbstractPush { connection.close(); } - protected sendToOne(connection: WebSocket, data: string): void { + protected sendToOneConnection(connection: WebSocket, data: string): void { connection.send(data); } diff --git a/packages/cli/src/services/cache/cache.service.ts b/packages/cli/src/services/cache/cache.service.ts index d9ea8b04b1..421597a172 100644 --- a/packages/cli/src/services/cache/cache.service.ts +++ b/packages/cli/src/services/cache/cache.service.ts @@ -15,6 +15,7 @@ import type { MaybeHash, Hash, } from '@/services/cache/cache.types'; +import { TIME } from '@/constants'; @Service() export class CacheService extends EventEmitter { @@ -130,6 +131,21 @@ export class CacheService extends EventEmitter { await this.set(key, hashObject); } + async expire(key: string, ttlMs: number) { + if (!this.cache) await this.init(); + + if (!key?.length) return; + + if (this.cache.kind === 'memory') { + setTimeout(async () => { + await this.cache.store.del(key); + }, ttlMs); + return; + } + + await this.cache.store.expire(key, ttlMs / TIME.SECOND); + } + // ---------------------------------- // retrieving // ---------------------------------- diff --git a/packages/cli/src/services/cache/redis.cache-manager.ts b/packages/cli/src/services/cache/redis.cache-manager.ts index ca3e920dc2..d556dacdc7 100644 --- a/packages/cli/src/services/cache/redis.cache-manager.ts +++ b/packages/cli/src/services/cache/redis.cache-manager.ts @@ -39,6 +39,7 @@ export interface RedisStore extends Store { hvals(key: string): Promise; hexists(key: string, field: string): Promise; hdel(key: string, field: string): Promise; + expire(key: string, ttlSeconds: number): Promise; } function builder( @@ -56,6 +57,9 @@ function builder( if (val === undefined || val === null) return undefined; else return jsonParse(val); }, + async expire(key: string, ttlSeconds: number) { + await redisCache.expire(key, ttlSeconds); + }, async set(key, value, ttl) { // eslint-disable-next-line @typescript-eslint/no-throw-literal, @typescript-eslint/restrict-template-expressions if (!isCacheable(value)) throw new NoCacheableError(`"${value}" is not a cacheable value`); diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index b032b1e979..55822c9ac8 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -4,6 +4,10 @@ import { TIME } from '@/constants'; import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; import { getRedisPrefix } from '@/services/redis/RedisServiceHelper'; import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; +import type { + RedisServiceBaseCommand, + RedisServiceCommand, +} from '@/services/redis/RedisServiceCommands'; @Service() export class MultiMainSetup extends SingleMainSetup { @@ -122,27 +126,14 @@ export class MultiMainSetup extends SingleMainSetup { } } - async broadcastWorkflowActiveStateChanged(payload: { - workflowId: string; - oldState: boolean; - newState: boolean; - versionId: string; - }) { + async publish(command: RedisServiceCommand, data: unknown) { if (!this.sanityCheck()) return; - await this.redisPublisher.publishToCommandChannel({ - command: 'workflowActiveStateChanged', - payload, - }); - } + const payload = data as RedisServiceBaseCommand['payload']; - async broadcastWorkflowFailedToActivate(payload: { workflowId: string; errorMessage: string }) { - if (!this.sanityCheck()) return; + this.logger.debug(`[Instance ID ${this.id}] Publishing command "${command}"`, payload); - await this.redisPublisher.publishToCommandChannel({ - command: 'workflowFailedToActivate', - payload, - }); + await this.redisPublisher.publishToCommandChannel({ command, payload }); } async fetchLeaderKey() { diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 9a2752e5d6..f41106ca2d 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -9,6 +9,7 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { Push } from '@/push'; import { MultiMainSetup } from './MultiMainSetup.ee'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import { TestWebhooks } from '@/TestWebhooks'; export async function handleCommandMessageMain(messageString: string) { const queueModeId = config.getEnv('redis.queueModeId'); @@ -31,6 +32,9 @@ export async function handleCommandMessageMain(messageString: string) { ); return message; } + + const push = Container.get(Push); + switch (message.command) { case 'reloadLicense': if (!debounceMessageReceiver(message, 500)) { @@ -84,8 +88,6 @@ export async function handleCommandMessageMain(messageString: string) { break; } - const push = Container.get(Push); - if (!oldState && newState) { try { await activeWorkflowRunner.add(workflowId, 'activate'); @@ -98,7 +100,7 @@ export async function handleCommandMessageMain(messageString: string) { versionId, }); - await Container.get(MultiMainSetup).broadcastWorkflowFailedToActivate({ + await Container.get(MultiMainSetup).publish('workflowFailedToActivate', { workflowId, errorMessage: error.message, }); @@ -125,6 +127,44 @@ export async function handleCommandMessageMain(messageString: string) { if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break; Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage }); + + break; + } + + case 'relay-execution-lifecycle-event': { + /** + * Do not debounce this - all events share the same message name. + */ + + const { type, args, sessionId } = message.payload; + + if (!push.getBackend().hasSessionId(sessionId)) break; + + push.send(type, args, sessionId); + + break; + } + + case 'clear-test-webhooks': { + if (!debounceMessageReceiver(message, 100)) { + // @ts-expect-error Legacy typing + message.payload = { result: 'debounced' }; + return message; + } + + const { webhookKey, workflowEntity, sessionId } = message.payload; + + if (!push.getBackend().hasSessionId(sessionId)) break; + + const testWebhooks = Container.get(TestWebhooks); + + testWebhooks.clearTimeout(webhookKey); + + const workflow = testWebhooks.toWorkflow(workflowEntity); + + await testWebhooks.deactivateWebhooks(workflow); + + break; } default: diff --git a/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts b/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts index 32c8a5c631..ec68be49a8 100644 --- a/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts +++ b/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts @@ -122,6 +122,13 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa // await this.stopProcess(); break; default: + if ( + message.command === 'relay-execution-lifecycle-event' || + message.command === 'clear-test-webhooks' + ) { + break; // meant only for main + } + logger.debug( // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`, diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 4c622e3ac9..e1c20d71a6 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -1,4 +1,4 @@ -import type { IPushDataWorkerStatusPayload } from '@/Interfaces'; +import type { IPushDataType, IPushDataWorkerStatusPayload, IWorkflowDb } from '@/Interfaces'; export type RedisServiceCommand = | 'getStatus' @@ -8,7 +8,9 @@ export type RedisServiceCommand = | 'reloadLicense' | 'reloadExternalSecretsProviders' | 'workflowActiveStateChanged' // multi-main only - | 'workflowFailedToActivate'; // multi-main only + | 'workflowFailedToActivate' // multi-main only + | 'relay-execution-lifecycle-event' // multi-main only + | 'clear-test-webhooks'; // multi-main only /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -16,13 +18,27 @@ export type RedisServiceCommand = * @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids. * @field payload: Optional arguments to be sent with the command. */ -type RedisServiceBaseCommand = { - senderId: string; - command: RedisServiceCommand; - payload?: { - [key: string]: string | number | boolean | string[] | number[] | boolean[]; - }; -}; +export type RedisServiceBaseCommand = + | { + senderId: string; + command: Exclude< + RedisServiceCommand, + 'relay-execution-lifecycle-event' | 'clear-test-webhooks' + >; + payload?: { + [key: string]: string | number | boolean | string[] | number[] | boolean[]; + }; + } + | { + senderId: string; + command: 'relay-execution-lifecycle-event'; + payload: { type: IPushDataType; args: Record; sessionId: string }; + } + | { + senderId: string; + command: 'clear-test-webhooks'; + payload: { webhookKey: string; workflowEntity: IWorkflowDb; sessionId: string }; + }; export type RedisServiceWorkerResponseObject = { workerId: string; diff --git a/packages/cli/src/services/test-webhook-registrations.service.ts b/packages/cli/src/services/test-webhook-registrations.service.ts index 7c0d05b2d5..58a80dd758 100644 --- a/packages/cli/src/services/test-webhook-registrations.service.ts +++ b/packages/cli/src/services/test-webhook-registrations.service.ts @@ -2,6 +2,7 @@ import { Service } from 'typedi'; import { CacheService } from '@/services/cache/cache.service'; import { type IWebhookData } from 'n8n-workflow'; import type { IWorkflowDb } from '@/Interfaces'; +import { TEST_WEBHOOK_TIMEOUT, TEST_WEBHOOK_TIMEOUT_BUFFER } from '@/constants'; export type TestWebhookRegistration = { sessionId?: string; @@ -20,6 +21,19 @@ export class TestWebhookRegistrationsService { const hashKey = this.toKey(registration.webhook); await this.cacheService.setHash(this.cacheKey, { [hashKey]: registration }); + + /** + * Multi-main setup: In a manual webhook execution, the main process that + * handles a webhook might not be the same as the main process that created + * the webhook. If so, after the test webhook has been successfully executed, + * the handler process commands the creator process to clear its test webhooks. + * We set a TTL on the key so that it is cleared even on creator process crash, + * with an additional buffer to ensure this safeguard expiration will not delete + * the key before the regular test webhook timeout fetches the key to delete it. + */ + const ttl = TEST_WEBHOOK_TIMEOUT + TEST_WEBHOOK_TIMEOUT_BUFFER; + + await this.cacheService.expire(this.cacheKey, ttl); } async deregister(arg: IWebhookData | string) { diff --git a/packages/cli/src/workflows/workflow.service.ts b/packages/cli/src/workflows/workflow.service.ts index 2d234e426c..c6546155b0 100644 --- a/packages/cli/src/workflows/workflow.service.ts +++ b/packages/cli/src/workflows/workflow.service.ts @@ -282,7 +282,7 @@ export class WorkflowService { const newState = updatedWorkflow.active; if (this.multiMainSetup.isEnabled && oldState !== newState) { - await this.multiMainSetup.broadcastWorkflowActiveStateChanged({ + await this.multiMainSetup.publish('workflowActiveStateChanged', { workflowId, oldState, newState, diff --git a/packages/cli/test/integration/workflow.service.test.ts b/packages/cli/test/integration/workflow.service.test.ts index 923fc71756..b98fb8ff74 100644 --- a/packages/cli/test/integration/workflow.service.test.ts +++ b/packages/cli/test/integration/workflow.service.test.ts @@ -90,22 +90,30 @@ describe('update()', () => { const owner = await createOwner(); const workflow = await createWorkflow({ active: true }, owner); - const broadcastSpy = jest.spyOn(multiMainSetup, 'broadcastWorkflowActiveStateChanged'); + const publishSpy = jest.spyOn(multiMainSetup, 'publish'); workflow.active = false; await workflowService.update(owner, workflow, workflow.id); - expect(broadcastSpy).toHaveBeenCalledTimes(1); + expect(publishSpy).toHaveBeenCalledTimes(1); + expect(publishSpy).toHaveBeenCalledWith( + 'workflowActiveStateChanged', + expect.objectContaining({ + newState: false, + oldState: true, + workflowId: workflow.id, + }), + ); }); test('should not broadcast active workflow state change if state did not change', async () => { const owner = await createOwner(); const workflow = await createWorkflow({ active: true }, owner); - const broadcastSpy = jest.spyOn(multiMainSetup, 'broadcastWorkflowActiveStateChanged'); + const publishSpy = jest.spyOn(multiMainSetup, 'publish'); await workflowService.update(owner, workflow, workflow.id); - expect(broadcastSpy).not.toHaveBeenCalled(); + expect(publishSpy).not.toHaveBeenCalled(); }); }); diff --git a/packages/cli/test/unit/TestWebhooks.test.ts b/packages/cli/test/unit/TestWebhooks.test.ts index ecb3fe8044..6d2a561f10 100644 --- a/packages/cli/test/unit/TestWebhooks.test.ts +++ b/packages/cli/test/unit/TestWebhooks.test.ts @@ -39,7 +39,7 @@ let testWebhooks: TestWebhooks; describe('TestWebhooks', () => { beforeAll(() => { - testWebhooks = new TestWebhooks(mock(), mock(), registrations); + testWebhooks = new TestWebhooks(mock(), mock(), registrations, mock()); jest.useFakeTimers(); }); diff --git a/packages/cli/test/unit/push/websocket.push.test.ts b/packages/cli/test/unit/push/websocket.push.test.ts index 2e072c3857..ab55605ba9 100644 --- a/packages/cli/test/unit/push/websocket.push.test.ts +++ b/packages/cli/test/unit/push/websocket.push.test.ts @@ -65,7 +65,7 @@ describe('WebSocketPush', () => { }, }; - webSocketPush.send('executionRecovered', data, sessionId1); + webSocketPush.sendToOneSession('executionRecovered', data, sessionId1); expect(mockWebSocket1.send).toHaveBeenCalledWith( JSON.stringify({ @@ -91,7 +91,7 @@ describe('WebSocketPush', () => { }, }; - webSocketPush.broadcast('executionRecovered', data); + webSocketPush.sendToAllSessions('executionRecovered', data); const expectedMsg = JSON.stringify({ type: 'executionRecovered',