refactor(core): Revamp logs for scaling mode (#11244)

This commit is contained in:
Iván Ovejero 2024-10-14 15:15:42 +02:00 committed by GitHub
parent 3d97f02a8d
commit 873851b54e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 230 additions and 97 deletions

View file

@ -174,8 +174,9 @@ export class Start extends BaseCommand {
this.logger.info('Initializing n8n process'); this.logger.info('Initializing n8n process');
if (config.getEnv('executions.mode') === 'queue') { if (config.getEnv('executions.mode') === 'queue') {
this.logger.debug('Main Instance running in queue mode'); const scopedLogger = this.logger.withScope('scaling');
this.logger.debug(`Queue mode id: ${this.queueModeId}`); scopedLogger.debug('Starting main instance in scaling mode');
scopedLogger.debug(`Host ID: ${this.queueModeId}`);
} }
const { flags } = await this.parse(Start); const { flags } = await this.parse(Start);
@ -260,6 +261,8 @@ export class Start extends BaseCommand {
await subscriber.subscribe('n8n.commands'); await subscriber.subscribe('n8n.commands');
await subscriber.subscribe('n8n.worker-response'); await subscriber.subscribe('n8n.worker-response');
this.logger.withScope('scaling').debug('Pubsub setup completed');
if (!orchestrationService.isMultiMainSetupEnabled) return; if (!orchestrationService.isMultiMainSetupEnabled) return;
orchestrationService.multiMainSetup orchestrationService.multiMainSetup

View file

@ -1,13 +1,13 @@
import { Flags, type Config } from '@oclif/core'; import { Flags, type Config } from '@oclif/core';
import { ApplicationError } from 'n8n-workflow';
import { Container } from 'typedi'; import { Container } from 'typedi';
import config from '@/config'; import config from '@/config';
import { N8N_VERSION, inTest } from '@/constants'; import { N8N_VERSION, inTest } from '@/constants';
import { WorkerMissingEncryptionKey } from '@/errors/worker-missing-encryption-key.error';
import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic'; import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import { JobProcessor } from '@/scaling/job-processor'; import { Logger } from '@/logging/logger.service';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import type { ScalingService } from '@/scaling/scaling.service'; import type { ScalingService } from '@/scaling/scaling.service';
@ -39,8 +39,6 @@ export class Worker extends BaseCommand {
scalingService: ScalingService; scalingService: ScalingService;
jobProcessor: JobProcessor;
override needsCommunityPackages = true; override needsCommunityPackages = true;
/** /**
@ -49,25 +47,23 @@ export class Worker extends BaseCommand {
* get removed. * get removed.
*/ */
async stopProcess() { async stopProcess() {
this.logger.info('Stopping n8n...'); this.logger.info('Stopping worker...');
try { try {
await this.externalHooks?.run('n8n.stop', []); await this.externalHooks?.run('n8n.stop', []);
} catch (error) { } catch (error) {
await this.exitWithCrash('There was an error shutting down n8n.', error); await this.exitWithCrash('Error shutting down worker', error);
} }
await this.exitSuccessFully(); await this.exitSuccessFully();
} }
constructor(argv: string[], cmdConfig: Config) { constructor(argv: string[], cmdConfig: Config) {
if (!process.env.N8N_ENCRYPTION_KEY) throw new WorkerMissingEncryptionKey();
super(argv, cmdConfig); super(argv, cmdConfig);
if (!process.env.N8N_ENCRYPTION_KEY) { this.logger = Container.get(Logger).withScope('scaling');
throw new ApplicationError(
'Missing encryption key. Worker started without the required N8N_ENCRYPTION_KEY env var. More information: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/',
);
}
this.setInstanceQueueModeId(); this.setInstanceQueueModeId();
} }
@ -84,7 +80,7 @@ export class Worker extends BaseCommand {
await this.initCrashJournal(); await this.initCrashJournal();
this.logger.debug('Starting n8n worker...'); this.logger.debug('Starting n8n worker...');
this.logger.debug(`Queue mode id: ${this.queueModeId}`); this.logger.debug(`Host ID: ${this.queueModeId}`);
await this.setConcurrency(); await this.setConcurrency();
await super.init(); await super.init();
@ -133,6 +129,8 @@ export class Worker extends BaseCommand {
Container.get(PubSubHandler).init(); Container.get(PubSubHandler).init();
await Container.get(Subscriber).subscribe('n8n.commands'); await Container.get(Subscriber).subscribe('n8n.commands');
this.logger.withScope('scaling').debug('Pubsub setup ready');
} }
async setConcurrency() { async setConcurrency() {
@ -150,8 +148,6 @@ export class Worker extends BaseCommand {
await this.scalingService.setupQueue(); await this.scalingService.setupQueue();
this.scalingService.setupWorker(this.concurrency); this.scalingService.setupWorker(this.concurrency);
this.jobProcessor = Container.get(JobProcessor);
} }
async run() { async run() {

View file

@ -0,0 +1,14 @@
import { ApplicationError } from 'n8n-workflow';
export class WorkerMissingEncryptionKey extends ApplicationError {
constructor() {
super(
[
'Failed to start worker because of missing encryption key.',
'Please set the `N8N_ENCRYPTION_KEY` env var when starting the worker.',
'See: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/',
].join(''),
{ level: 'warning' },
);
}
}

View file

@ -4,6 +4,7 @@ import { mock } from 'jest-mock-extended';
import config from '@/config'; import config from '@/config';
import { generateNanoId } from '@/databases/utils/generators'; import { generateNanoId } from '@/databases/utils/generators';
import type { RedisClientService } from '@/services/redis-client.service'; import type { RedisClientService } from '@/services/redis-client.service';
import { mockLogger } from '@test/mocking';
import { Publisher } from '../pubsub/publisher.service'; import { Publisher } from '../pubsub/publisher.service';
import type { PubSub } from '../pubsub/pubsub.types'; import type { PubSub } from '../pubsub/pubsub.types';
@ -18,18 +19,19 @@ describe('Publisher', () => {
}); });
const client = mock<SingleNodeClient>(); const client = mock<SingleNodeClient>();
const logger = mockLogger();
const redisClientService = mock<RedisClientService>({ createClient: () => client }); const redisClientService = mock<RedisClientService>({ createClient: () => client });
describe('constructor', () => { describe('constructor', () => {
it('should init Redis client in scaling mode', () => { it('should init Redis client in scaling mode', () => {
const publisher = new Publisher(mock(), redisClientService); const publisher = new Publisher(logger, redisClientService);
expect(publisher.getClient()).toEqual(client); expect(publisher.getClient()).toEqual(client);
}); });
it('should not init Redis client in regular mode', () => { it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular'); config.set('executions.mode', 'regular');
const publisher = new Publisher(mock(), redisClientService); const publisher = new Publisher(logger, redisClientService);
expect(publisher.getClient()).toBeUndefined(); expect(publisher.getClient()).toBeUndefined();
}); });
@ -37,7 +39,7 @@ describe('Publisher', () => {
describe('shutdown', () => { describe('shutdown', () => {
it('should disconnect Redis client', () => { it('should disconnect Redis client', () => {
const publisher = new Publisher(mock(), redisClientService); const publisher = new Publisher(logger, redisClientService);
publisher.shutdown(); publisher.shutdown();
expect(client.disconnect).toHaveBeenCalled(); expect(client.disconnect).toHaveBeenCalled();
}); });
@ -45,7 +47,7 @@ describe('Publisher', () => {
describe('publishCommand', () => { describe('publishCommand', () => {
it('should publish command into `n8n.commands` pubsub channel', async () => { it('should publish command into `n8n.commands` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService); const publisher = new Publisher(logger, redisClientService);
const msg = mock<PubSub.Command>({ command: 'reload-license' }); const msg = mock<PubSub.Command>({ command: 'reload-license' });
await publisher.publishCommand(msg); await publisher.publishCommand(msg);
@ -59,7 +61,7 @@ describe('Publisher', () => {
describe('publishWorkerResponse', () => { describe('publishWorkerResponse', () => {
it('should publish worker response into `n8n.worker-response` pubsub channel', async () => { it('should publish worker response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService); const publisher = new Publisher(logger, redisClientService);
const msg = mock<PubSub.WorkerResponse>({ const msg = mock<PubSub.WorkerResponse>({
response: 'response-to-get-worker-status', response: 'response-to-get-worker-status',
}); });

View file

@ -8,6 +8,7 @@ import * as http from 'node:http';
import type { ExternalHooks } from '@/external-hooks'; import type { ExternalHooks } from '@/external-hooks';
import type { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service'; import type { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service';
import { bodyParser, rawBodyReader } from '@/middlewares'; import { bodyParser, rawBodyReader } from '@/middlewares';
import { mockLogger } from '@test/mocking';
import { WorkerServer } from '../worker-server'; import { WorkerServer } from '../worker-server';
@ -48,7 +49,7 @@ describe('WorkerServer', () => {
() => () =>
new WorkerServer( new WorkerServer(
globalConfig, globalConfig,
mock(), mockLogger(),
mock(), mock(),
externalHooks, externalHooks,
mock<InstanceSettings>({ instanceType: 'webhook' }), mock<InstanceSettings>({ instanceType: 'webhook' }),
@ -73,7 +74,7 @@ describe('WorkerServer', () => {
new WorkerServer( new WorkerServer(
globalConfig, globalConfig,
mock(), mockLogger(),
mock(), mock(),
externalHooks, externalHooks,
instanceSettings, instanceSettings,
@ -100,7 +101,7 @@ describe('WorkerServer', () => {
const workerServer = new WorkerServer( const workerServer = new WorkerServer(
globalConfig, globalConfig,
mock(), mockLogger(),
mock(), mock(),
externalHooks, externalHooks,
instanceSettings, instanceSettings,
@ -135,7 +136,7 @@ describe('WorkerServer', () => {
const workerServer = new WorkerServer( const workerServer = new WorkerServer(
globalConfig, globalConfig,
mock(), mockLogger(),
mock(), mock(),
externalHooks, externalHooks,
instanceSettings, instanceSettings,
@ -156,7 +157,7 @@ describe('WorkerServer', () => {
const workerServer = new WorkerServer( const workerServer = new WorkerServer(
globalConfig, globalConfig,
mock(), mockLogger(),
mock(), mock(),
externalHooks, externalHooks,
instanceSettings, instanceSettings,
@ -174,7 +175,7 @@ describe('WorkerServer', () => {
const workerServer = new WorkerServer( const workerServer = new WorkerServer(
globalConfig, globalConfig,
mock(), mockLogger(),
mock(), mock(),
externalHooks, externalHooks,
instanceSettings, instanceSettings,

View file

@ -12,7 +12,14 @@ import { Logger } from '@/logging/logger.service';
import { NodeTypes } from '@/node-types'; import { NodeTypes } from '@/node-types';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import type { Job, JobId, JobResult, RunningJob } from './scaling.types'; import type {
Job,
JobFinishedMessage,
JobId,
JobResult,
RespondToWebhookMessage,
RunningJob,
} from './scaling.types';
/** /**
* Responsible for processing jobs from the queue, i.e. running enqueued executions. * Responsible for processing jobs from the queue, i.e. running enqueued executions.
@ -26,7 +33,9 @@ export class JobProcessor {
private readonly executionRepository: ExecutionRepository, private readonly executionRepository: ExecutionRepository,
private readonly workflowRepository: WorkflowRepository, private readonly workflowRepository: WorkflowRepository,
private readonly nodeTypes: NodeTypes, private readonly nodeTypes: NodeTypes,
) {} ) {
this.logger = this.logger.withScope('scaling');
}
async processJob(job: Job): Promise<JobResult> { async processJob(job: Job): Promise<JobResult> {
const { executionId, loadStaticData } = job.data; const { executionId, loadStaticData } = job.data;
@ -37,15 +46,18 @@ export class JobProcessor {
}); });
if (!execution) { if (!execution) {
this.logger.error('[JobProcessor] Failed to find execution data', { executionId }); throw new ApplicationError(
throw new ApplicationError('Failed to find execution data. Aborting execution.', { `Worker failed to find data for execution ${executionId} (job ${job.id})`,
extra: { executionId }, { level: 'warning' },
}); );
} }
const workflowId = execution.workflowData.id; const workflowId = execution.workflowData.id;
this.logger.info(`[JobProcessor] Starting job ${job.id} (execution ${executionId})`); this.logger.info(`Worker started execution ${executionId} (job ${job.id})`, {
executionId,
jobId: job.id,
});
const startedAt = await this.executionRepository.setRunning(executionId); const startedAt = await this.executionRepository.setRunning(executionId);
@ -58,8 +70,10 @@ export class JobProcessor {
}); });
if (workflowData === null) { if (workflowData === null) {
this.logger.error('[JobProcessor] Failed to find workflow', { workflowId, executionId }); throw new ApplicationError(
throw new ApplicationError('Failed to find workflow', { extra: { workflowId } }); `Worker failed to find workflow ${workflowId} to run execution ${executionId} (job ${job.id})`,
{ level: 'warning' },
);
} }
staticData = workflowData.staticData; staticData = workflowData.staticData;
@ -102,11 +116,14 @@ export class JobProcessor {
additionalData.hooks.hookFunctions.sendResponse = [ additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => { async (response: IExecuteResponsePromiseData): Promise<void> => {
await job.progress({ const msg: RespondToWebhookMessage = {
kind: 'respond-to-webhook', kind: 'respond-to-webhook',
executionId, executionId,
response: this.encodeWebhookResponse(response), response: this.encodeWebhookResponse(response),
}); workerId: config.getEnv('redis.queueModeId'),
};
await job.progress(msg);
}, },
]; ];
@ -115,7 +132,7 @@ export class JobProcessor {
additionalData.setExecutionStatus = (status: ExecutionStatus) => { additionalData.setExecutionStatus = (status: ExecutionStatus) => {
// Can't set the status directly in the queued worker, but it will happen in InternalHook.onWorkflowPostExecute // Can't set the status directly in the queued worker, but it will happen in InternalHook.onWorkflowPostExecute
this.logger.debug( this.logger.debug(
`[JobProcessor] Queued worker execution status for ${executionId} is "${status}"`, `Queued worker execution status for execution ${executionId} (job ${job.id}) is "${status}"`,
); );
}; };
@ -148,7 +165,18 @@ export class JobProcessor {
delete this.runningJobs[job.id]; delete this.runningJobs[job.id];
this.logger.debug('[JobProcessor] Job finished running', { jobId: job.id, executionId }); this.logger.info(`Worker finished execution ${executionId} (job ${job.id})`, {
executionId,
jobId: job.id,
});
const msg: JobFinishedMessage = {
kind: 'job-finished',
executionId,
workerId: config.getEnv('redis.queueModeId'),
};
await job.progress(msg);
/** /**
* @important Do NOT call `workflowExecuteAfter` hook here. * @important Do NOT call `workflowExecuteAfter` hook here.

View file

@ -24,6 +24,8 @@ export class Publisher {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return; if (config.getEnv('executions.mode') !== 'queue') return;
this.logger = this.logger.withScope('scaling');
this.client = this.redisClientService.createClient({ type: 'publisher(n8n)' }); this.client = this.redisClientService.createClient({ type: 'publisher(n8n)' });
} }
@ -55,11 +57,11 @@ export class Publisher {
this.logger.debug(`Published ${msg.command} to command channel`); this.logger.debug(`Published ${msg.command} to command channel`);
} }
/** Publish a response for a command into the `n8n.worker-response` channel. */ /** Publish a response to a command into the `n8n.worker-response` channel. */
async publishWorkerResponse(msg: PubSub.WorkerResponse) { async publishWorkerResponse(msg: PubSub.WorkerResponse) {
await this.client.publish('n8n.worker-response', JSON.stringify(msg)); await this.client.publish('n8n.worker-response', JSON.stringify(msg));
this.logger.debug(`Published response ${msg.response} to worker response channel`); this.logger.debug(`Published ${msg.response} to worker response channel`);
} }
// #endregion // #endregion

View file

@ -88,7 +88,7 @@ export namespace PubSub {
/** Content of worker response. */ /** Content of worker response. */
response: WorkerResponseKey; response: WorkerResponseKey;
/** Whether the command should be debounced when received. */ /** Whether the worker response should be debounced when received. */
debounce?: boolean; debounce?: boolean;
} & (PubSubWorkerResponseMap[WorkerResponseKey] extends never } & (PubSubWorkerResponseMap[WorkerResponseKey] extends never
? { payload?: never } // some responses carry no payload ? { payload?: never } // some responses carry no payload
@ -101,6 +101,10 @@ export namespace PubSub {
/** Response sent via the `n8n.worker-response` pubsub channel. */ /** Response sent via the `n8n.worker-response` pubsub channel. */
export type WorkerResponse = ToWorkerResponse<'response-to-get-worker-status'>; export type WorkerResponse = ToWorkerResponse<'response-to-get-worker-status'>;
// ----------------------------------
// events
// ----------------------------------
/** /**
* Of all events emitted from pubsub messages, those whose handlers * Of all events emitted from pubsub messages, those whose handlers
* are all present in main, worker, and webhook processes. * are all present in main, worker, and webhook processes.

View file

@ -17,8 +17,6 @@ import type { PubSub } from './pubsub.types';
export class Subscriber { export class Subscriber {
private readonly client: SingleNodeClient | MultiNodeClient; private readonly client: SingleNodeClient | MultiNodeClient;
// #region Lifecycle
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly redisClientService: RedisClientService, private readonly redisClientService: RedisClientService,
@ -27,6 +25,8 @@ export class Subscriber {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return; if (config.getEnv('executions.mode') !== 'queue') return;
this.logger = this.logger.withScope('scaling');
this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' }); this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' });
const handlerFn = (msg: PubSub.Command | PubSub.WorkerResponse) => { const handlerFn = (msg: PubSub.Command | PubSub.WorkerResponse) => {
@ -36,8 +36,8 @@ export class Subscriber {
const debouncedHandlerFn = debounce(handlerFn, 300); const debouncedHandlerFn = debounce(handlerFn, 300);
this.client.on('message', (_channel: PubSub.Channel, str) => { this.client.on('message', (channel: PubSub.Channel, str) => {
const msg = this.parseMessage(str); const msg = this.parseMessage(str, channel);
if (!msg) return; if (!msg) return;
if (msg.debounce) debouncedHandlerFn(msg); if (msg.debounce) debouncedHandlerFn(msg);
else handlerFn(msg); else handlerFn(msg);
@ -53,31 +53,27 @@ export class Subscriber {
this.client.disconnect(); this.client.disconnect();
} }
// #endregion
// #region Subscribing
async subscribe(channel: PubSub.Channel) { async subscribe(channel: PubSub.Channel) {
await this.client.subscribe(channel, (error) => { await this.client.subscribe(channel, (error) => {
if (error) { if (error) {
this.logger.error('Failed to subscribe to channel', { channel, cause: error }); this.logger.error(`Failed to subscribe to channel ${channel}`, { error });
return; return;
} }
this.logger.debug('Subscribed to channel', { channel }); this.logger.debug(`Subscribed to channel ${channel}`);
}); });
} }
// #region Commands private parseMessage(str: string, channel: PubSub.Channel) {
private parseMessage(str: string) {
const msg = jsonParse<PubSub.Command | PubSub.WorkerResponse | null>(str, { const msg = jsonParse<PubSub.Command | PubSub.WorkerResponse | null>(str, {
fallbackValue: null, fallbackValue: null,
}); });
if (!msg) { if (!msg) {
this.logger.debug('Received invalid string via pubsub channel', { message: str }); this.logger.error(`Received malformed message via channel ${channel}`, {
msg: str,
channel,
});
return null; return null;
} }
@ -91,10 +87,13 @@ export class Subscriber {
return null; return null;
} }
this.logger.debug('Received message via pubsub channel', msg); const msgName = 'command' in msg ? msg.command : msg.response;
this.logger.debug(`Received message ${msgName} via channel ${channel}`, {
msg,
channel,
});
return msg; return msg;
} }
// #endregion
} }

View file

@ -6,6 +6,7 @@ import {
sleep, sleep,
jsonStringify, jsonStringify,
ErrorReporterProxy, ErrorReporterProxy,
ensureError,
} from 'n8n-workflow'; } from 'n8n-workflow';
import type { IExecuteResponsePromiseData } from 'n8n-workflow'; import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import { strict } from 'node:assert'; import { strict } from 'node:assert';
@ -20,6 +21,7 @@ import { MaxStalledCountError } from '@/errors/max-stalled-count.error';
import { EventService } from '@/events/event.service'; import { EventService } from '@/events/event.service';
import { Logger } from '@/logging/logger.service'; import { Logger } from '@/logging/logger.service';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationService } from '@/services/orchestration.service';
import { assertNever } from '@/utils';
import { JOB_TYPE_NAME, QUEUE_NAME } from './constants'; import { JOB_TYPE_NAME, QUEUE_NAME } from './constants';
import { JobProcessor } from './job-processor'; import { JobProcessor } from './job-processor';
@ -31,7 +33,8 @@ import type {
JobStatus, JobStatus,
JobId, JobId,
QueueRecoveryContext, QueueRecoveryContext,
JobReport, JobMessage,
JobFailedMessage,
} from './scaling.types'; } from './scaling.types';
@Service() @Service()
@ -89,34 +92,46 @@ export class ScalingService {
void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => { void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => {
try { try {
await this.jobProcessor.processJob(job); await this.jobProcessor.processJob(job);
} catch (error: unknown) { } catch (error) {
// Errors thrown here will be sent to the main instance by bull. Logging await this.reportJobProcessingError(ensureError(error), job);
// them out and rethrowing them allows to find out which worker had the
// issue.
this.logger.error('Executing a job errored', {
jobId: job.id,
executionId: job.data.executionId,
error,
});
ErrorReporterProxy.error(error);
throw error;
} }
}); });
this.logger.debug('Worker setup completed'); this.logger.debug('Worker setup completed');
} }
private async reportJobProcessingError(error: Error, job: Job) {
const { executionId } = job.data;
this.logger.error(`Worker errored while running execution ${executionId} (job ${job.id})`, {
error,
executionId,
jobId: job.id,
});
const msg: JobFailedMessage = {
kind: 'job-failed',
executionId,
workerId: config.getEnv('redis.queueModeId'),
errorMsg: error.message,
};
await job.progress(msg);
ErrorReporterProxy.error(error, { executionId });
throw error;
}
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
async stop() { async stop() {
await this.queue.pause(true, true); await this.queue.pause(true, true); // no more jobs will be picked up
this.logger.debug('Queue paused'); this.logger.debug('Queue paused');
this.stopQueueRecovery(); this.stopQueueRecovery();
this.stopQueueMetrics(); this.stopQueueMetrics();
this.logger.debug('Queue recovery and metrics stopped');
let count = 0; let count = 0;
while (this.getRunningJobsCount() !== 0) { while (this.getRunningJobsCount() !== 0) {
@ -161,7 +176,10 @@ export class ScalingService {
const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions); const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions);
this.logger.info(`Added job ${job.id} (execution ${jobData.executionId})`); const { executionId } = jobData;
const jobId = job.id;
this.logger.info(`Enqueued execution ${executionId} (job ${jobId})`, { executionId, jobId });
return job; return job;
} }
@ -218,7 +236,7 @@ export class ScalingService {
*/ */
private registerWorkerListeners() { private registerWorkerListeners() {
this.queue.on('global:progress', (jobId: JobId, msg: unknown) => { this.queue.on('global:progress', (jobId: JobId, msg: unknown) => {
if (!this.isPubSubMessage(msg)) return; if (!this.isJobMessage(msg)) return;
if (msg.kind === 'abort-job') this.jobProcessor.stopJob(jobId); if (msg.kind === 'abort-job') this.jobProcessor.stopJob(jobId);
}); });
@ -258,12 +276,36 @@ export class ScalingService {
throw error; throw error;
}); });
this.queue.on('global:progress', (_jobId: JobId, msg: unknown) => { this.queue.on('global:progress', (jobId: JobId, msg: unknown) => {
if (!this.isPubSubMessage(msg)) return; if (!this.isJobMessage(msg)) return;
if (msg.kind === 'respond-to-webhook') { // completion and failure are reported via `global:progress` to convey more details
// than natively provided by Bull in `global:completed` and `global:failed` events
switch (msg.kind) {
case 'respond-to-webhook':
const decodedResponse = this.decodeWebhookResponse(msg.response); const decodedResponse = this.decodeWebhookResponse(msg.response);
this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse); this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse);
break;
case 'job-finished':
this.logger.info(`Execution ${msg.executionId} (job ${jobId}) finished successfully`, {
workerId: msg.workerId,
executionId: msg.executionId,
jobId,
});
break;
case 'job-failed':
this.logger.error(`Execution ${msg.executionId} (job ${jobId}) failed`, {
workerId: msg.workerId,
errorMsg: msg.errorMsg,
executionId: msg.executionId,
jobId,
});
break;
case 'abort-job':
break; // only for worker
default:
assertNever(msg);
} }
}); });
@ -273,7 +315,8 @@ export class ScalingService {
} }
} }
private isPubSubMessage(candidate: unknown): candidate is JobReport { /** Whether the argument is a message sent via Bull's internal pubsub setup. */
private isJobMessage(candidate: unknown): candidate is JobMessage {
return typeof candidate === 'object' && candidate !== null && 'kind' in candidate; return typeof candidate === 'object' && candidate !== null && 'kind' in candidate;
} }
@ -345,6 +388,8 @@ export class ScalingService {
if (this.queueMetricsInterval) { if (this.queueMetricsInterval) {
clearInterval(this.queueMetricsInterval); clearInterval(this.queueMetricsInterval);
this.queueMetricsInterval = undefined; this.queueMetricsInterval = undefined;
this.logger.debug('Queue metrics collection stopped');
} }
} }
@ -379,6 +424,8 @@ export class ScalingService {
private stopQueueRecovery() { private stopQueueRecovery() {
clearTimeout(this.queueRecoveryContext.timeout); clearTimeout(this.queueRecoveryContext.timeout);
this.logger.debug('Queue recovery stopped');
} }
/** /**

View file

@ -23,19 +23,43 @@ export type JobStatus = Bull.JobStatus;
export type JobOptions = Bull.JobOptions; export type JobOptions = Bull.JobOptions;
export type JobReport = JobReportToMain | JobReportToWorker; /**
* Message sent by main to worker and vice versa about a job. `JobMessage` is
* sent via Bull's internal pubsub setup - do not confuse with `PubSub.Command`
* and `PubSub.Response`, which are sent via n8n's own pubsub setup to keep
* main and worker processes in sync outside of a job's lifecycle.
*/
export type JobMessage =
| RespondToWebhookMessage
| JobFinishedMessage
| JobFailedMessage
| AbortJobMessage;
type JobReportToMain = RespondToWebhookMessage; /** Message sent by worker to main to respond to a webhook. */
export type RespondToWebhookMessage = {
type JobReportToWorker = AbortJobMessage;
type RespondToWebhookMessage = {
kind: 'respond-to-webhook'; kind: 'respond-to-webhook';
executionId: string; executionId: string;
response: IExecuteResponsePromiseData; response: IExecuteResponsePromiseData;
workerId: string;
}; };
type AbortJobMessage = { /** Message sent by worker to main to report a job has finished successfully. */
export type JobFinishedMessage = {
kind: 'job-finished';
executionId: string;
workerId: string;
};
/** Message sent by worker to main to report a job has failed. */
export type JobFailedMessage = {
kind: 'job-failed';
executionId: string;
workerId: string;
errorMsg: string;
};
/** Message sent by main to worker to abort a job. */
export type AbortJobMessage = {
kind: 'abort-job'; kind: 'abort-job';
}; };

View file

@ -58,6 +58,8 @@ export class WorkerServer {
) { ) {
assert(this.instanceSettings.instanceType === 'worker'); assert(this.instanceSettings.instanceType === 'worker');
this.logger = this.logger.withScope('scaling');
this.app = express(); this.app = express();
this.app.disable('x-powered-by'); this.app.disable('x-powered-by');
@ -84,6 +86,10 @@ export class WorkerServer {
await this.mountEndpoints(); await this.mountEndpoints();
this.logger.debug('Worker server initialized', {
endpoints: Object.keys(this.endpointsConfig),
});
await new Promise<void>((resolve) => this.server.listen(this.port, this.address, resolve)); await new Promise<void>((resolve) => this.server.listen(this.port, this.address, resolve));
await this.externalHooks.run('worker.ready'); await this.externalHooks.run('worker.ready');
@ -141,6 +147,8 @@ export class WorkerServer {
this.overwritesLoaded = true; this.overwritesLoaded = true;
this.logger.debug('Worker loaded credentials overwrites');
ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200); ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200);
} }
} }

View file

@ -64,7 +64,7 @@ export class WorkflowRunner {
executionId: string, executionId: string,
hooks?: WorkflowHooks, hooks?: WorkflowHooks,
) { ) {
ErrorReporter.error(error); ErrorReporter.error(error, { executionId });
const isQueueMode = config.getEnv('executions.mode') === 'queue'; const isQueueMode = config.getEnv('executions.mode') === 'queue';
@ -476,7 +476,6 @@ export class WorkflowRunner {
clearWatchdogInterval(); clearWatchdogInterval();
} }
} catch (error) { } catch (error) {
ErrorReporter.error(error);
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// "workflowExecuteAfter" which we require. // "workflowExecuteAfter" which we require.
const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter( const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(

View file

@ -448,9 +448,9 @@ export class LazyPackageDirectoryLoader extends PackageDirectoryLoader {
); );
} }
Logger.debug(`Lazy Loading credentials and nodes from ${this.packageJson.name}`, { Logger.debug(`Lazy-loading nodes and credentials from ${this.packageJson.name}`, {
credentials: this.types.credentials?.length ?? 0,
nodes: this.types.nodes?.length ?? 0, nodes: this.types.nodes?.length ?? 0,
credentials: this.types.credentials?.length ?? 0,
}); });
this.isLazyLoaded = true; this.isLazyLoaded = true;

View file

@ -6,12 +6,17 @@ interface ErrorReporter {
} }
const instance: ErrorReporter = { const instance: ErrorReporter = {
report: (error) => { report: (error, options) => {
if (error instanceof Error) { if (error instanceof Error) {
let e = error; let e = error;
const { executionId } = options ?? {};
const context = executionId ? ` (execution ${executionId})` : '';
do { do {
const msg = [e.message + context, e.stack ? `\n${e.stack}\n` : ''].join('');
const meta = e instanceof ApplicationError ? e.extra : undefined; const meta = e instanceof ApplicationError ? e.extra : undefined;
Logger.error(`${e.constructor.name}: ${e.message}`, meta); Logger.error(msg, meta);
e = e.cause as Error; e = e.cause as Error;
} while (e); } while (e);
} }

View file

@ -5,6 +5,7 @@ export type Level = 'warning' | 'error' | 'fatal' | 'info';
export type ReportingOptions = { export type ReportingOptions = {
level?: Level; level?: Level;
executionId?: string;
} & Pick<Event, 'tags' | 'extra'>; } & Pick<Event, 'tags' | 'extra'>;
export class ApplicationError extends Error { export class ApplicationError extends Error {