refactor(core): Expand logs for scaling scope

This commit is contained in:
Iván Ovejero 2024-10-09 18:28:36 +02:00
parent e94cda3837
commit 8ba24cd145
No known key found for this signature in database
5 changed files with 108 additions and 23 deletions

View file

@ -7,6 +7,7 @@ import { N8N_VERSION, inTest } from '@/constants';
import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import { Logger } from '@/logging/logger.service';
import { JobProcessor } from '@/scaling/job-processor';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
@ -63,6 +64,8 @@ export class Worker extends BaseCommand {
constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
this.logger = Container.get(Logger).withScope('scaling');
if (!process.env.N8N_ENCRYPTION_KEY) {
throw new ApplicationError(
'Missing encryption key. Worker started without the required N8N_ENCRYPTION_KEY env var. More information: https://docs.n8n.io/hosting/configuration/configuration-examples/encryption-key/',
@ -90,7 +93,6 @@ export class Worker extends BaseCommand {
await super.init();
await this.initLicense();
this.logger.debug('License init complete');
await this.initBinaryDataService();
this.logger.debug('Binary data service init complete');
await this.initExternalHooks();

View file

@ -26,7 +26,11 @@ export class JobProcessor {
private readonly executionRepository: ExecutionRepository,
private readonly workflowRepository: WorkflowRepository,
private readonly nodeTypes: NodeTypes,
) {}
) {
this.logger = this.logger.withScope('scaling');
this.logger.debug('Job processor initialized');
}
async processJob(job: Job): Promise<JobResult> {
const { executionId, loadStaticData } = job.data;
@ -37,15 +41,16 @@ export class JobProcessor {
});
if (!execution) {
this.logger.error('[JobProcessor] Failed to find execution data', { executionId });
throw new ApplicationError('Failed to find execution data. Aborting execution.', {
extra: { executionId },
extra: { executionId, jobId: job.id },
});
}
const workflowId = execution.workflowData.id;
this.logger.info(`[JobProcessor] Starting job ${job.id} (execution ${executionId})`);
this.logger.info(`Worker started running job ${job.id} (execution ${executionId})`, {
instanceType: 'worker',
});
const startedAt = await this.executionRepository.setRunning(executionId);
@ -58,8 +63,9 @@ export class JobProcessor {
});
if (workflowData === null) {
this.logger.error('[JobProcessor] Failed to find workflow', { workflowId, executionId });
throw new ApplicationError('Failed to find workflow', { extra: { workflowId } });
throw new ApplicationError('Failed to find workflow', {
extra: { workflowId, instanceType: 'worker' },
});
}
staticData = workflowData.staticData;
@ -102,10 +108,14 @@ export class JobProcessor {
additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
this.logger.debug(`Responding to webhook for execution ${executionId} (job ${job.id})`, {
instanceType: 'worker',
});
await job.progress({
kind: 'respond-to-webhook',
executionId,
response: this.encodeWebhookResponse(response),
workerId: config.getEnv('redis.queueModeId'),
});
},
];
@ -115,7 +125,7 @@ export class JobProcessor {
additionalData.setExecutionStatus = (status: ExecutionStatus) => {
// Can't set the status directly in the queued worker, but it will happen in InternalHook.onWorkflowPostExecute
this.logger.debug(
`[JobProcessor] Queued worker execution status for ${executionId} is "${status}"`,
`Queued worker execution status for execution ${executionId} is "${status}"`,
);
};
@ -148,7 +158,15 @@ export class JobProcessor {
delete this.runningJobs[job.id];
this.logger.debug('[JobProcessor] Job finished running', { jobId: job.id, executionId });
this.logger.info(`Worker finished running execution ${executionId} (job ${job.id})`, {
instanceType: 'worker',
});
await job.progress({
kind: 'job-finished',
executionId,
workerId: config.getEnv('redis.queueModeId'),
});
/**
* @important Do NOT call `workflowExecuteAfter` hook here.

View file

@ -31,7 +31,7 @@ import type {
JobStatus,
JobId,
QueueRecoveryContext,
JobReport,
JobMessage,
} from './scaling.types';
@Service()
@ -49,6 +49,8 @@ export class ScalingService {
private readonly eventService: EventService,
) {
this.logger = this.logger.withScope('scaling');
this.logger.debug('Scaling service initialized');
}
// #region Lifecycle
@ -93,11 +95,18 @@ export class ScalingService {
// Errors thrown here will be sent to the main instance by bull. Logging
// them out and rethrowing them allows to find out which worker had the
// issue.
this.logger.error('Executing a job errored', {
jobId: job.id,
this.logger.error(
`Worker errored while running execution ${job.data.executionId} (job ${job.id})`,
{ error },
);
await job.progress({
kind: 'job-failed',
executionId: job.data.executionId,
workerId: config.getEnv('redis.queueModeId'),
error,
});
ErrorReporterProxy.error(error);
throw error;
}
@ -161,7 +170,7 @@ export class ScalingService {
const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions);
this.logger.info(`Added job ${job.id} (execution ${jobData.executionId})`);
this.logger.info(`Enqueued execution ${jobData.executionId} (job ${job.id})`);
return job;
}
@ -218,7 +227,7 @@ export class ScalingService {
*/
private registerWorkerListeners() {
this.queue.on('global:progress', (jobId: JobId, msg: unknown) => {
if (!this.isPubSubMessage(msg)) return;
if (!this.isBullMessage(msg)) return;
if (msg.kind === 'abort-job') this.jobProcessor.stopJob(jobId);
});
@ -244,6 +253,8 @@ export class ScalingService {
throw error;
});
this.logger.debug('Registered listeners on Bull queue', { instanceType: 'worker' });
}
/**
@ -258,12 +269,33 @@ export class ScalingService {
throw error;
});
this.queue.on('global:progress', (_jobId: JobId, msg: unknown) => {
if (!this.isPubSubMessage(msg)) return;
this.queue.on('global:progress', (jobId: JobId, msg: unknown) => {
if (!this.isBullMessage(msg)) return;
if (msg.kind === 'respond-to-webhook') {
const decodedResponse = this.decodeWebhookResponse(msg.response);
this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse);
switch (msg.kind) {
case 'respond-to-webhook':
this.logger.debug(
`Queue reported execution ${msg.executionId} (job ${jobId}) responded to webhook`,
{
instanceType: this.instanceSettings.instanceType,
workerId: msg.workerId,
},
);
const decodedResponse = this.decodeWebhookResponse(msg.response);
this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse);
break;
case 'job-finished':
this.logger.debug(`Queue reported execution ${msg.executionId} (job ${jobId}) finished`, {
instanceType: this.instanceSettings.instanceType,
workerId: msg.workerId,
});
break;
case 'job-failed':
this.logger.error(`Queue reported execution ${msg.executionId} (job ${jobId}) failed`, {
errorMsg: msg.error,
workerId: msg.workerId,
});
break;
}
});
@ -271,9 +303,13 @@ export class ScalingService {
this.queue.on('global:completed', () => this.jobCounters.completed++);
this.queue.on('global:failed', () => this.jobCounters.failed++);
}
this.logger.debug('Registered listeners on Bull queue', {
instanceType: this.instanceSettings.instanceType,
});
}
private isPubSubMessage(candidate: unknown): candidate is JobReport {
private isBullMessage(candidate: unknown): candidate is JobMessage {
return typeof candidate === 'object' && candidate !== null && 'kind' in candidate;
}
@ -346,6 +382,8 @@ export class ScalingService {
clearInterval(this.queueMetricsInterval);
this.queueMetricsInterval = undefined;
}
this.logger.debug('Stopped collecting queue metrics');
}
// #endregion

View file

@ -23,16 +23,34 @@ export type JobStatus = Bull.JobStatus;
export type JobOptions = Bull.JobOptions;
export type JobReport = JobReportToMain | JobReportToWorker;
/**
* Message sent via Bull's pubsub mechanism during the execution of a job.
* Do not confuse Bull's pubsub mechanism with n8n's own pubsub setup.
*/
export type JobMessage = JobMessageToMain | JobMessageToWorker;
type JobReportToMain = RespondToWebhookMessage;
type JobMessageToMain = RespondToWebhookMessage | JobFinishedMessage | JobFailedMessage;
type JobReportToWorker = AbortJobMessage;
type JobMessageToWorker = AbortJobMessage;
type RespondToWebhookMessage = {
kind: 'respond-to-webhook';
executionId: string;
response: IExecuteResponsePromiseData;
workerId: string;
};
type JobFinishedMessage = {
kind: 'job-finished';
executionId: string;
workerId: string;
};
type JobFailedMessage = {
kind: 'job-failed';
executionId: string;
workerId: string;
error: Error;
};
type AbortJobMessage = {

View file

@ -58,6 +58,8 @@ export class WorkerServer {
) {
assert(this.instanceSettings.instanceType === 'worker');
this.logger = this.logger.withScope('scaling');
this.app = express();
this.app.disable('x-powered-by');
@ -110,6 +112,11 @@ export class WorkerServer {
if (metrics) {
await this.prometheusMetricsService.init(this.app);
}
this.logger.debug('Worker server initialized', {
instanceType: 'worker',
endpoints: Object.keys(this.endpointsConfig),
});
}
private async readiness(_req: express.Request, res: express.Response) {
@ -141,6 +148,8 @@ export class WorkerServer {
this.overwritesLoaded = true;
this.logger.debug('Credentials overwrites ready', { instanceType: 'worker' });
ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200);
}
}