diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index f74be9b363..83a859fc75 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -70,7 +70,7 @@ export abstract class AbstractPush extends TypedEmitter(type: Type, data: PushPayload, pushRefs: string[]) { - this.logger.debug(`Send data of type "${type}" to editor-UI`, { + this.logger.debug(`Pushed to frontend: ${type}`, { dataType: type, pushRefs: pushRefs.join(', '), }); diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index 248a455e3e..fc007f76c0 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -4,6 +4,7 @@ import { Service } from 'typedi'; import config from '@/config'; import { Logger } from '@/logging/logger.service'; +import type { LogMetadata } from '@/logging/types'; import { RedisClientService } from '@/services/redis-client.service'; import type { PubSub } from './pubsub.types'; @@ -45,7 +46,7 @@ export class Publisher { // #region Publishing /** Publish a command into the `n8n.commands` channel. */ - async publishCommand(msg: Omit) { + async publishCommand(msg: PubSub.Command) { // @TODO: Once this class is only ever used in scaling mode, remove next line. if (config.getEnv('executions.mode') !== 'queue') return; @@ -59,7 +60,18 @@ export class Publisher { }), ); - this.logger.debug(`Published ${msg.command} to command channel`); + let msgName = msg.command; + + const metadata: LogMetadata = { msg: msg.command, channel: 'n8n.commands' }; + + if (msg.command === 'relay-execution-lifecycle-event') { + const { args, type } = msg.payload; + msgName += ` (${type})`; + metadata.type = type; + metadata.executionId = args.executionId; + } + + this.logger.debug(`Published pubsub msg: ${msgName}`, metadata); } /** Publish a response to a command into the `n8n.worker-response` channel. */ diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index eec0110201..501185d07e 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -23,10 +23,14 @@ export namespace PubSub { // ---------------------------------- type _ToCommand = { - senderId: string; - targets?: string[]; command: CommandKey; + /** Host ID of the sender, added during publishing. */ + senderId?: string; + + /** Host IDs of the receivers. */ + targets?: string[]; + /** Whether the command should be sent to the sender as well. */ selfSend?: boolean; diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index ed673fc4e4..248c1198d2 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -7,6 +7,7 @@ import { Service } from 'typedi'; import config from '@/config'; import { EventService } from '@/events/event.service'; import { Logger } from '@/logging/logger.service'; +import type { LogMetadata } from '@/logging/types'; import { RedisClientService } from '@/services/redis-client.service'; import type { PubSub } from './pubsub.types'; @@ -72,7 +73,7 @@ export class Subscriber { }); if (!msg) { - this.logger.error(`Received malformed message via channel ${channel}`, { + this.logger.error('Received malformed pubsub message', { msg: str, channel, }); @@ -89,12 +90,18 @@ export class Subscriber { return null; } - const msgName = 'command' in msg ? msg.command : msg.response; + let msgName = 'command' in msg ? msg.command : msg.response; - this.logger.debug(`Received message ${msgName} via channel ${channel}`, { - msg, - channel, - }); + const metadata: LogMetadata = { msg: msgName, channel }; + + if ('command' in msg && msg.command === 'relay-execution-lifecycle-event') { + const { args, type } = msg.payload; + msgName += ` (${type})`; + metadata.type = type; + metadata.executionId = args.executionId; + } + + this.logger.debug(`Received pubsub msg: ${msgName}`, metadata); return msg; }