From 873851b54e431962ca523baace19af16d12f41fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 14 Oct 2024 15:15:42 +0200 Subject: [PATCH] refactor(core): Revamp logs for scaling mode (#11244) --- packages/cli/src/commands/start.ts | 7 +- packages/cli/src/commands/worker.ts | 24 ++--- .../worker-missing-encryption-key.error.ts | 14 +++ .../__tests__/publisher.service.test.ts | 12 ++- .../scaling/__tests__/worker-server.test.ts | 13 +-- packages/cli/src/scaling/job-processor.ts | 54 ++++++++--- .../src/scaling/pubsub/publisher.service.ts | 6 +- .../cli/src/scaling/pubsub/pubsub.types.ts | 6 +- .../src/scaling/pubsub/subscriber.service.ts | 35 ++++--- packages/cli/src/scaling/scaling.service.ts | 93 ++++++++++++++----- packages/cli/src/scaling/scaling.types.ts | 38 ++++++-- packages/cli/src/scaling/worker-server.ts | 8 ++ packages/cli/src/workflow-runner.ts | 3 +- packages/core/src/DirectoryLoader.ts | 4 +- packages/workflow/src/ErrorReporterProxy.ts | 9 +- .../workflow/src/errors/application.error.ts | 1 + 16 files changed, 230 insertions(+), 97 deletions(-) create mode 100644 packages/cli/src/errors/worker-missing-encryption-key.error.ts diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 428f451fdc..21e277a5dc 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -174,8 +174,9 @@ export class Start extends BaseCommand { this.logger.info('Initializing n8n process'); if (config.getEnv('executions.mode') === 'queue') { - this.logger.debug('Main Instance running in queue mode'); - this.logger.debug(`Queue mode id: ${this.queueModeId}`); + const scopedLogger = this.logger.withScope('scaling'); + scopedLogger.debug('Starting main instance in scaling mode'); + scopedLogger.debug(`Host ID: ${this.queueModeId}`); } const { flags } = await this.parse(Start); @@ -260,6 +261,8 @@ export class Start extends BaseCommand { await subscriber.subscribe('n8n.commands'); await subscriber.subscribe('n8n.worker-response'); + this.logger.withScope('scaling').debug('Pubsub setup completed'); + if (!orchestrationService.isMultiMainSetupEnabled) return; orchestrationService.multiMainSetup diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 528951be4a..1685fbb034 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -1,13 +1,13 @@ import { Flags, type Config } from '@oclif/core'; -import { ApplicationError } from 'n8n-workflow'; import { Container } from 'typedi'; import config from '@/config'; import { N8N_VERSION, inTest } from '@/constants'; +import { WorkerMissingEncryptionKey } from '@/errors/worker-missing-encryption-key.error'; import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; -import { JobProcessor } from '@/scaling/job-processor'; +import { Logger } from '@/logging/logger.service'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import type { ScalingService } from '@/scaling/scaling.service'; @@ -39,8 +39,6 @@ export class Worker extends BaseCommand { scalingService: ScalingService; - jobProcessor: JobProcessor; - override needsCommunityPackages = true; /** @@ -49,25 +47,23 @@ export class Worker extends BaseCommand { * get removed. */ async stopProcess() { - this.logger.info('Stopping n8n...'); + this.logger.info('Stopping worker...'); try { await this.externalHooks?.run('n8n.stop', []); } catch (error) { - await this.exitWithCrash('There was an error shutting down n8n.', error); + await this.exitWithCrash('Error shutting down worker', error); } await this.exitSuccessFully(); } constructor(argv: string[], cmdConfig: Config) { + if (!process.env.N8N_ENCRYPTION_KEY) throw new WorkerMissingEncryptionKey(); + super(argv, cmdConfig); - if (!process.env.N8N_ENCRYPTION_KEY) { - throw new ApplicationError( - 'Missing encryption key. Worker started without the required N8N_ENCRYPTION_KEY env var. More information: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/', - ); - } + this.logger = Container.get(Logger).withScope('scaling'); this.setInstanceQueueModeId(); } @@ -84,7 +80,7 @@ export class Worker extends BaseCommand { await this.initCrashJournal(); this.logger.debug('Starting n8n worker...'); - this.logger.debug(`Queue mode id: ${this.queueModeId}`); + this.logger.debug(`Host ID: ${this.queueModeId}`); await this.setConcurrency(); await super.init(); @@ -133,6 +129,8 @@ export class Worker extends BaseCommand { Container.get(PubSubHandler).init(); await Container.get(Subscriber).subscribe('n8n.commands'); + + this.logger.withScope('scaling').debug('Pubsub setup ready'); } async setConcurrency() { @@ -150,8 +148,6 @@ export class Worker extends BaseCommand { await this.scalingService.setupQueue(); this.scalingService.setupWorker(this.concurrency); - - this.jobProcessor = Container.get(JobProcessor); } async run() { diff --git a/packages/cli/src/errors/worker-missing-encryption-key.error.ts b/packages/cli/src/errors/worker-missing-encryption-key.error.ts new file mode 100644 index 0000000000..88ec11877a --- /dev/null +++ b/packages/cli/src/errors/worker-missing-encryption-key.error.ts @@ -0,0 +1,14 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class WorkerMissingEncryptionKey extends ApplicationError { + constructor() { + super( + [ + 'Failed to start worker because of missing encryption key.', + 'Please set the `N8N_ENCRYPTION_KEY` env var when starting the worker.', + 'See: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/', + ].join(''), + { level: 'warning' }, + ); + } +} diff --git a/packages/cli/src/scaling/__tests__/publisher.service.test.ts b/packages/cli/src/scaling/__tests__/publisher.service.test.ts index 05bb52bc6a..af8ff9f0c1 100644 --- a/packages/cli/src/scaling/__tests__/publisher.service.test.ts +++ b/packages/cli/src/scaling/__tests__/publisher.service.test.ts @@ -4,6 +4,7 @@ import { mock } from 'jest-mock-extended'; import config from '@/config'; import { generateNanoId } from '@/databases/utils/generators'; import type { RedisClientService } from '@/services/redis-client.service'; +import { mockLogger } from '@test/mocking'; import { Publisher } from '../pubsub/publisher.service'; import type { PubSub } from '../pubsub/pubsub.types'; @@ -18,18 +19,19 @@ describe('Publisher', () => { }); const client = mock(); + const logger = mockLogger(); const redisClientService = mock({ createClient: () => client }); describe('constructor', () => { it('should init Redis client in scaling mode', () => { - const publisher = new Publisher(mock(), redisClientService); + const publisher = new Publisher(logger, redisClientService); expect(publisher.getClient()).toEqual(client); }); it('should not init Redis client in regular mode', () => { config.set('executions.mode', 'regular'); - const publisher = new Publisher(mock(), redisClientService); + const publisher = new Publisher(logger, redisClientService); expect(publisher.getClient()).toBeUndefined(); }); @@ -37,7 +39,7 @@ describe('Publisher', () => { describe('shutdown', () => { it('should disconnect Redis client', () => { - const publisher = new Publisher(mock(), redisClientService); + const publisher = new Publisher(logger, redisClientService); publisher.shutdown(); expect(client.disconnect).toHaveBeenCalled(); }); @@ -45,7 +47,7 @@ describe('Publisher', () => { describe('publishCommand', () => { it('should publish command into `n8n.commands` pubsub channel', async () => { - const publisher = new Publisher(mock(), redisClientService); + const publisher = new Publisher(logger, redisClientService); const msg = mock({ command: 'reload-license' }); await publisher.publishCommand(msg); @@ -59,7 +61,7 @@ describe('Publisher', () => { describe('publishWorkerResponse', () => { it('should publish worker response into `n8n.worker-response` pubsub channel', async () => { - const publisher = new Publisher(mock(), redisClientService); + const publisher = new Publisher(logger, redisClientService); const msg = mock({ response: 'response-to-get-worker-status', }); diff --git a/packages/cli/src/scaling/__tests__/worker-server.test.ts b/packages/cli/src/scaling/__tests__/worker-server.test.ts index 778d403bf2..8bcdd3aa5c 100644 --- a/packages/cli/src/scaling/__tests__/worker-server.test.ts +++ b/packages/cli/src/scaling/__tests__/worker-server.test.ts @@ -8,6 +8,7 @@ import * as http from 'node:http'; import type { ExternalHooks } from '@/external-hooks'; import type { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service'; import { bodyParser, rawBodyReader } from '@/middlewares'; +import { mockLogger } from '@test/mocking'; import { WorkerServer } from '../worker-server'; @@ -48,7 +49,7 @@ describe('WorkerServer', () => { () => new WorkerServer( globalConfig, - mock(), + mockLogger(), mock(), externalHooks, mock({ instanceType: 'webhook' }), @@ -73,7 +74,7 @@ describe('WorkerServer', () => { new WorkerServer( globalConfig, - mock(), + mockLogger(), mock(), externalHooks, instanceSettings, @@ -100,7 +101,7 @@ describe('WorkerServer', () => { const workerServer = new WorkerServer( globalConfig, - mock(), + mockLogger(), mock(), externalHooks, instanceSettings, @@ -135,7 +136,7 @@ describe('WorkerServer', () => { const workerServer = new WorkerServer( globalConfig, - mock(), + mockLogger(), mock(), externalHooks, instanceSettings, @@ -156,7 +157,7 @@ describe('WorkerServer', () => { const workerServer = new WorkerServer( globalConfig, - mock(), + mockLogger(), mock(), externalHooks, instanceSettings, @@ -174,7 +175,7 @@ describe('WorkerServer', () => { const workerServer = new WorkerServer( globalConfig, - mock(), + mockLogger(), mock(), externalHooks, instanceSettings, diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 49e1383ac6..e11395002b 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -12,7 +12,14 @@ import { Logger } from '@/logging/logger.service'; import { NodeTypes } from '@/node-types'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; -import type { Job, JobId, JobResult, RunningJob } from './scaling.types'; +import type { + Job, + JobFinishedMessage, + JobId, + JobResult, + RespondToWebhookMessage, + RunningJob, +} from './scaling.types'; /** * Responsible for processing jobs from the queue, i.e. running enqueued executions. @@ -26,7 +33,9 @@ export class JobProcessor { private readonly executionRepository: ExecutionRepository, private readonly workflowRepository: WorkflowRepository, private readonly nodeTypes: NodeTypes, - ) {} + ) { + this.logger = this.logger.withScope('scaling'); + } async processJob(job: Job): Promise { const { executionId, loadStaticData } = job.data; @@ -37,15 +46,18 @@ export class JobProcessor { }); if (!execution) { - this.logger.error('[JobProcessor] Failed to find execution data', { executionId }); - throw new ApplicationError('Failed to find execution data. Aborting execution.', { - extra: { executionId }, - }); + throw new ApplicationError( + `Worker failed to find data for execution ${executionId} (job ${job.id})`, + { level: 'warning' }, + ); } const workflowId = execution.workflowData.id; - this.logger.info(`[JobProcessor] Starting job ${job.id} (execution ${executionId})`); + this.logger.info(`Worker started execution ${executionId} (job ${job.id})`, { + executionId, + jobId: job.id, + }); const startedAt = await this.executionRepository.setRunning(executionId); @@ -58,8 +70,10 @@ export class JobProcessor { }); if (workflowData === null) { - this.logger.error('[JobProcessor] Failed to find workflow', { workflowId, executionId }); - throw new ApplicationError('Failed to find workflow', { extra: { workflowId } }); + throw new ApplicationError( + `Worker failed to find workflow ${workflowId} to run execution ${executionId} (job ${job.id})`, + { level: 'warning' }, + ); } staticData = workflowData.staticData; @@ -102,11 +116,14 @@ export class JobProcessor { additionalData.hooks.hookFunctions.sendResponse = [ async (response: IExecuteResponsePromiseData): Promise => { - await job.progress({ + const msg: RespondToWebhookMessage = { kind: 'respond-to-webhook', executionId, response: this.encodeWebhookResponse(response), - }); + workerId: config.getEnv('redis.queueModeId'), + }; + + await job.progress(msg); }, ]; @@ -115,7 +132,7 @@ export class JobProcessor { additionalData.setExecutionStatus = (status: ExecutionStatus) => { // Can't set the status directly in the queued worker, but it will happen in InternalHook.onWorkflowPostExecute this.logger.debug( - `[JobProcessor] Queued worker execution status for ${executionId} is "${status}"`, + `Queued worker execution status for execution ${executionId} (job ${job.id}) is "${status}"`, ); }; @@ -148,7 +165,18 @@ export class JobProcessor { delete this.runningJobs[job.id]; - this.logger.debug('[JobProcessor] Job finished running', { jobId: job.id, executionId }); + this.logger.info(`Worker finished execution ${executionId} (job ${job.id})`, { + executionId, + jobId: job.id, + }); + + const msg: JobFinishedMessage = { + kind: 'job-finished', + executionId, + workerId: config.getEnv('redis.queueModeId'), + }; + + await job.progress(msg); /** * @important Do NOT call `workflowExecuteAfter` hook here. diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index 29d31989ff..cc25304e2c 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -24,6 +24,8 @@ export class Publisher { // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. if (config.getEnv('executions.mode') !== 'queue') return; + this.logger = this.logger.withScope('scaling'); + this.client = this.redisClientService.createClient({ type: 'publisher(n8n)' }); } @@ -55,11 +57,11 @@ export class Publisher { this.logger.debug(`Published ${msg.command} to command channel`); } - /** Publish a response for a command into the `n8n.worker-response` channel. */ + /** Publish a response to a command into the `n8n.worker-response` channel. */ async publishWorkerResponse(msg: PubSub.WorkerResponse) { await this.client.publish('n8n.worker-response', JSON.stringify(msg)); - this.logger.debug(`Published response ${msg.response} to worker response channel`); + this.logger.debug(`Published ${msg.response} to worker response channel`); } // #endregion diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index b4d6e1a962..eec0110201 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -88,7 +88,7 @@ export namespace PubSub { /** Content of worker response. */ response: WorkerResponseKey; - /** Whether the command should be debounced when received. */ + /** Whether the worker response should be debounced when received. */ debounce?: boolean; } & (PubSubWorkerResponseMap[WorkerResponseKey] extends never ? { payload?: never } // some responses carry no payload @@ -101,6 +101,10 @@ export namespace PubSub { /** Response sent via the `n8n.worker-response` pubsub channel. */ export type WorkerResponse = ToWorkerResponse<'response-to-get-worker-status'>; + // ---------------------------------- + // events + // ---------------------------------- + /** * Of all events emitted from pubsub messages, those whose handlers * are all present in main, worker, and webhook processes. diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index 7c7f90fb0e..207c726370 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -17,8 +17,6 @@ import type { PubSub } from './pubsub.types'; export class Subscriber { private readonly client: SingleNodeClient | MultiNodeClient; - // #region Lifecycle - constructor( private readonly logger: Logger, private readonly redisClientService: RedisClientService, @@ -27,6 +25,8 @@ export class Subscriber { // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. if (config.getEnv('executions.mode') !== 'queue') return; + this.logger = this.logger.withScope('scaling'); + this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' }); const handlerFn = (msg: PubSub.Command | PubSub.WorkerResponse) => { @@ -36,8 +36,8 @@ export class Subscriber { const debouncedHandlerFn = debounce(handlerFn, 300); - this.client.on('message', (_channel: PubSub.Channel, str) => { - const msg = this.parseMessage(str); + this.client.on('message', (channel: PubSub.Channel, str) => { + const msg = this.parseMessage(str, channel); if (!msg) return; if (msg.debounce) debouncedHandlerFn(msg); else handlerFn(msg); @@ -53,31 +53,27 @@ export class Subscriber { this.client.disconnect(); } - // #endregion - - // #region Subscribing - async subscribe(channel: PubSub.Channel) { await this.client.subscribe(channel, (error) => { if (error) { - this.logger.error('Failed to subscribe to channel', { channel, cause: error }); + this.logger.error(`Failed to subscribe to channel ${channel}`, { error }); return; } - this.logger.debug('Subscribed to channel', { channel }); + this.logger.debug(`Subscribed to channel ${channel}`); }); } - // #region Commands - - private parseMessage(str: string) { + private parseMessage(str: string, channel: PubSub.Channel) { const msg = jsonParse(str, { fallbackValue: null, }); if (!msg) { - this.logger.debug('Received invalid string via pubsub channel', { message: str }); - + this.logger.error(`Received malformed message via channel ${channel}`, { + msg: str, + channel, + }); return null; } @@ -91,10 +87,13 @@ export class Subscriber { return null; } - this.logger.debug('Received message via pubsub channel', msg); + const msgName = 'command' in msg ? msg.command : msg.response; + + this.logger.debug(`Received message ${msgName} via channel ${channel}`, { + msg, + channel, + }); return msg; } - - // #endregion } diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index f35b4348a6..5edf43eeac 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -6,6 +6,7 @@ import { sleep, jsonStringify, ErrorReporterProxy, + ensureError, } from 'n8n-workflow'; import type { IExecuteResponsePromiseData } from 'n8n-workflow'; import { strict } from 'node:assert'; @@ -20,6 +21,7 @@ import { MaxStalledCountError } from '@/errors/max-stalled-count.error'; import { EventService } from '@/events/event.service'; import { Logger } from '@/logging/logger.service'; import { OrchestrationService } from '@/services/orchestration.service'; +import { assertNever } from '@/utils'; import { JOB_TYPE_NAME, QUEUE_NAME } from './constants'; import { JobProcessor } from './job-processor'; @@ -31,7 +33,8 @@ import type { JobStatus, JobId, QueueRecoveryContext, - JobReport, + JobMessage, + JobFailedMessage, } from './scaling.types'; @Service() @@ -89,34 +92,46 @@ export class ScalingService { void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => { try { await this.jobProcessor.processJob(job); - } catch (error: unknown) { - // Errors thrown here will be sent to the main instance by bull. Logging - // them out and rethrowing them allows to find out which worker had the - // issue. - this.logger.error('Executing a job errored', { - jobId: job.id, - executionId: job.data.executionId, - error, - }); - ErrorReporterProxy.error(error); - throw error; + } catch (error) { + await this.reportJobProcessingError(ensureError(error), job); } }); this.logger.debug('Worker setup completed'); } + private async reportJobProcessingError(error: Error, job: Job) { + const { executionId } = job.data; + + this.logger.error(`Worker errored while running execution ${executionId} (job ${job.id})`, { + error, + executionId, + jobId: job.id, + }); + + const msg: JobFailedMessage = { + kind: 'job-failed', + executionId, + workerId: config.getEnv('redis.queueModeId'), + errorMsg: error.message, + }; + + await job.progress(msg); + + ErrorReporterProxy.error(error, { executionId }); + + throw error; + } + @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) async stop() { - await this.queue.pause(true, true); + await this.queue.pause(true, true); // no more jobs will be picked up this.logger.debug('Queue paused'); this.stopQueueRecovery(); this.stopQueueMetrics(); - this.logger.debug('Queue recovery and metrics stopped'); - let count = 0; while (this.getRunningJobsCount() !== 0) { @@ -161,7 +176,10 @@ export class ScalingService { const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions); - this.logger.info(`Added job ${job.id} (execution ${jobData.executionId})`); + const { executionId } = jobData; + const jobId = job.id; + + this.logger.info(`Enqueued execution ${executionId} (job ${jobId})`, { executionId, jobId }); return job; } @@ -218,7 +236,7 @@ export class ScalingService { */ private registerWorkerListeners() { this.queue.on('global:progress', (jobId: JobId, msg: unknown) => { - if (!this.isPubSubMessage(msg)) return; + if (!this.isJobMessage(msg)) return; if (msg.kind === 'abort-job') this.jobProcessor.stopJob(jobId); }); @@ -258,12 +276,36 @@ export class ScalingService { throw error; }); - this.queue.on('global:progress', (_jobId: JobId, msg: unknown) => { - if (!this.isPubSubMessage(msg)) return; + this.queue.on('global:progress', (jobId: JobId, msg: unknown) => { + if (!this.isJobMessage(msg)) return; - if (msg.kind === 'respond-to-webhook') { - const decodedResponse = this.decodeWebhookResponse(msg.response); - this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse); + // completion and failure are reported via `global:progress` to convey more details + // than natively provided by Bull in `global:completed` and `global:failed` events + + switch (msg.kind) { + case 'respond-to-webhook': + const decodedResponse = this.decodeWebhookResponse(msg.response); + this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse); + break; + case 'job-finished': + this.logger.info(`Execution ${msg.executionId} (job ${jobId}) finished successfully`, { + workerId: msg.workerId, + executionId: msg.executionId, + jobId, + }); + break; + case 'job-failed': + this.logger.error(`Execution ${msg.executionId} (job ${jobId}) failed`, { + workerId: msg.workerId, + errorMsg: msg.errorMsg, + executionId: msg.executionId, + jobId, + }); + break; + case 'abort-job': + break; // only for worker + default: + assertNever(msg); } }); @@ -273,7 +315,8 @@ export class ScalingService { } } - private isPubSubMessage(candidate: unknown): candidate is JobReport { + /** Whether the argument is a message sent via Bull's internal pubsub setup. */ + private isJobMessage(candidate: unknown): candidate is JobMessage { return typeof candidate === 'object' && candidate !== null && 'kind' in candidate; } @@ -345,6 +388,8 @@ export class ScalingService { if (this.queueMetricsInterval) { clearInterval(this.queueMetricsInterval); this.queueMetricsInterval = undefined; + + this.logger.debug('Queue metrics collection stopped'); } } @@ -379,6 +424,8 @@ export class ScalingService { private stopQueueRecovery() { clearTimeout(this.queueRecoveryContext.timeout); + + this.logger.debug('Queue recovery stopped'); } /** diff --git a/packages/cli/src/scaling/scaling.types.ts b/packages/cli/src/scaling/scaling.types.ts index fa8210450f..38cb6805de 100644 --- a/packages/cli/src/scaling/scaling.types.ts +++ b/packages/cli/src/scaling/scaling.types.ts @@ -23,19 +23,43 @@ export type JobStatus = Bull.JobStatus; export type JobOptions = Bull.JobOptions; -export type JobReport = JobReportToMain | JobReportToWorker; +/** + * Message sent by main to worker and vice versa about a job. `JobMessage` is + * sent via Bull's internal pubsub setup - do not confuse with `PubSub.Command` + * and `PubSub.Response`, which are sent via n8n's own pubsub setup to keep + * main and worker processes in sync outside of a job's lifecycle. + */ +export type JobMessage = + | RespondToWebhookMessage + | JobFinishedMessage + | JobFailedMessage + | AbortJobMessage; -type JobReportToMain = RespondToWebhookMessage; - -type JobReportToWorker = AbortJobMessage; - -type RespondToWebhookMessage = { +/** Message sent by worker to main to respond to a webhook. */ +export type RespondToWebhookMessage = { kind: 'respond-to-webhook'; executionId: string; response: IExecuteResponsePromiseData; + workerId: string; }; -type AbortJobMessage = { +/** Message sent by worker to main to report a job has finished successfully. */ +export type JobFinishedMessage = { + kind: 'job-finished'; + executionId: string; + workerId: string; +}; + +/** Message sent by worker to main to report a job has failed. */ +export type JobFailedMessage = { + kind: 'job-failed'; + executionId: string; + workerId: string; + errorMsg: string; +}; + +/** Message sent by main to worker to abort a job. */ +export type AbortJobMessage = { kind: 'abort-job'; }; diff --git a/packages/cli/src/scaling/worker-server.ts b/packages/cli/src/scaling/worker-server.ts index 3cf6995882..0af948670f 100644 --- a/packages/cli/src/scaling/worker-server.ts +++ b/packages/cli/src/scaling/worker-server.ts @@ -58,6 +58,8 @@ export class WorkerServer { ) { assert(this.instanceSettings.instanceType === 'worker'); + this.logger = this.logger.withScope('scaling'); + this.app = express(); this.app.disable('x-powered-by'); @@ -84,6 +86,10 @@ export class WorkerServer { await this.mountEndpoints(); + this.logger.debug('Worker server initialized', { + endpoints: Object.keys(this.endpointsConfig), + }); + await new Promise((resolve) => this.server.listen(this.port, this.address, resolve)); await this.externalHooks.run('worker.ready'); @@ -141,6 +147,8 @@ export class WorkerServer { this.overwritesLoaded = true; + this.logger.debug('Worker loaded credentials overwrites'); + ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200); } } diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 8d1e147e85..0f1f37b71d 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -64,7 +64,7 @@ export class WorkflowRunner { executionId: string, hooks?: WorkflowHooks, ) { - ErrorReporter.error(error); + ErrorReporter.error(error, { executionId }); const isQueueMode = config.getEnv('executions.mode') === 'queue'; @@ -476,7 +476,6 @@ export class WorkflowRunner { clearWatchdogInterval(); } } catch (error) { - ErrorReporter.error(error); // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the // "workflowExecuteAfter" which we require. const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( diff --git a/packages/core/src/DirectoryLoader.ts b/packages/core/src/DirectoryLoader.ts index a1401a8fb5..b0e77125a7 100644 --- a/packages/core/src/DirectoryLoader.ts +++ b/packages/core/src/DirectoryLoader.ts @@ -448,9 +448,9 @@ export class LazyPackageDirectoryLoader extends PackageDirectoryLoader { ); } - Logger.debug(`Lazy Loading credentials and nodes from ${this.packageJson.name}`, { - credentials: this.types.credentials?.length ?? 0, + Logger.debug(`Lazy-loading nodes and credentials from ${this.packageJson.name}`, { nodes: this.types.nodes?.length ?? 0, + credentials: this.types.credentials?.length ?? 0, }); this.isLazyLoaded = true; diff --git a/packages/workflow/src/ErrorReporterProxy.ts b/packages/workflow/src/ErrorReporterProxy.ts index dd5fe9515c..cedb921d5e 100644 --- a/packages/workflow/src/ErrorReporterProxy.ts +++ b/packages/workflow/src/ErrorReporterProxy.ts @@ -6,12 +6,17 @@ interface ErrorReporter { } const instance: ErrorReporter = { - report: (error) => { + report: (error, options) => { if (error instanceof Error) { let e = error; + + const { executionId } = options ?? {}; + const context = executionId ? ` (execution ${executionId})` : ''; + do { + const msg = [e.message + context, e.stack ? `\n${e.stack}\n` : ''].join(''); const meta = e instanceof ApplicationError ? e.extra : undefined; - Logger.error(`${e.constructor.name}: ${e.message}`, meta); + Logger.error(msg, meta); e = e.cause as Error; } while (e); } diff --git a/packages/workflow/src/errors/application.error.ts b/packages/workflow/src/errors/application.error.ts index 7cd3095cf0..b8f54cf8b4 100644 --- a/packages/workflow/src/errors/application.error.ts +++ b/packages/workflow/src/errors/application.error.ts @@ -5,6 +5,7 @@ export type Level = 'warning' | 'error' | 'fatal' | 'info'; export type ReportingOptions = { level?: Level; + executionId?: string; } & Pick; export class ApplicationError extends Error {