diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 4a80c1d1c4..775b0594fe 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -22,6 +22,7 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus' import { EventService } from '@/events/event.service'; import { ExecutionService } from '@/executions/execution.service'; import { License } from '@/license'; +import { ScopedLogger } from '@/logging/scoped-logger.service'; import { LocalTaskManager } from '@/runners/task-managers/local-task-manager'; import { TaskManager } from '@/runners/task-managers/task-manager'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; @@ -170,7 +171,7 @@ export class Start extends BaseCommand { this.logger.info('Initializing n8n process'); if (config.getEnv('executions.mode') === 'queue') { - const scopedLogger = this.logger.scoped('scaling'); + const scopedLogger = Container.get(ScopedLogger).from('scaling'); scopedLogger.debug('Starting main instance in scaling mode'); scopedLogger.debug(`Host ID: ${this.instanceSettings.hostId}`); } @@ -263,7 +264,7 @@ export class Start extends BaseCommand { await subscriber.subscribe('n8n.commands'); await subscriber.subscribe('n8n.worker-response'); - this.logger.scoped(['scaling', 'pubsub']).debug('Pubsub setup completed'); + Container.get(ScopedLogger).from(['scaling', 'pubsub']).debug('Pubsub setup completed'); if (!orchestrationService.isMultiMainSetupEnabled) return; diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 4ba505a9b8..4ebbad55db 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -7,7 +7,7 @@ import { WorkerMissingEncryptionKey } from '@/errors/worker-missing-encryption-k import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; -import { Logger } from '@/logging/logger.service'; +import { ScopedLogger } from '@/logging/scoped-logger.service'; import { LocalTaskManager } from '@/runners/task-managers/local-task-manager'; import { TaskManager } from '@/runners/task-managers/task-manager'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; @@ -69,7 +69,7 @@ export class Worker extends BaseCommand { super(argv, cmdConfig); - this.logger = Container.get(Logger).scoped('scaling'); + this.logger = Container.get(ScopedLogger).from('scaling'); } async init() { @@ -151,7 +151,7 @@ export class Worker extends BaseCommand { Container.get(PubSubHandler).init(); await Container.get(Subscriber).subscribe('n8n.commands'); - this.logger.scoped(['scaling', 'pubsub']).debug('Pubsub setup completed'); + Container.get(ScopedLogger).from(['scaling', 'pubsub']).debug('Pubsub setup completed'); } async setConcurrency() { diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 3896daad2e..18fa9c1ec0 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -7,7 +7,8 @@ import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit import { UnknownExecutionModeError } from '@/errors/unknown-execution-mode.error'; import { EventService } from '@/events/event.service'; import type { IExecutingWorkflowData } from '@/interfaces'; -import { Logger } from '@/logging/logger.service'; +import type { Logger } from '@/logging/logger.service'; +import { ScopedLogger } from '@/logging/scoped-logger.service'; import { Telemetry } from '@/telemetry'; import { ConcurrencyQueue } from './concurrency-queue'; @@ -27,13 +28,15 @@ export class ConcurrencyControlService { (t) => CLOUD_TEMP_PRODUCTION_LIMIT - t, ); + private readonly logger: Logger; + constructor( - private readonly logger: Logger, + private readonly scopedLogger: ScopedLogger, private readonly executionRepository: ExecutionRepository, private readonly telemetry: Telemetry, private readonly eventService: EventService, ) { - this.logger = this.logger.scoped('concurrency'); + this.logger = this.scopedLogger.from('concurrency'); this.productionLimit = config.getEnv('executions.concurrency.productionLimit'); diff --git a/packages/cli/src/license.ts b/packages/cli/src/license.ts index 68ff15cf39..24ac48a679 100644 --- a/packages/cli/src/license.ts +++ b/packages/cli/src/license.ts @@ -7,7 +7,7 @@ import Container, { Service } from 'typedi'; import config from '@/config'; import { SettingsRepository } from '@/databases/repositories/settings.repository'; import { OnShutdown } from '@/decorators/on-shutdown'; -import { Logger } from '@/logging/logger.service'; +import type { Logger } from '@/logging/logger.service'; import { LicenseMetricsService } from '@/metrics/license-metrics.service'; import { OrchestrationService } from '@/services/orchestration.service'; @@ -19,6 +19,7 @@ import { UNLIMITED_LICENSE_QUOTA, } from './constants'; import type { BooleanLicenseFeature, NumericLicenseFeature } from './interfaces'; +import { ScopedLogger } from './logging/scoped-logger.service'; export type FeatureReturnType = Partial< { @@ -32,15 +33,17 @@ export class License { private isShuttingDown = false; + private readonly logger: Logger; + constructor( - private readonly logger: Logger, + private readonly scopedLogger: ScopedLogger, private readonly instanceSettings: InstanceSettings, private readonly orchestrationService: OrchestrationService, private readonly settingsRepository: SettingsRepository, private readonly licenseMetricsService: LicenseMetricsService, private readonly globalConfig: GlobalConfig, ) { - this.logger = this.logger.scoped('license'); + this.logger = this.scopedLogger.from('license'); } /** diff --git a/packages/cli/src/logging/logger.service.ts b/packages/cli/src/logging/logger.service.ts index 46471c0611..4548c843e9 100644 --- a/packages/cli/src/logging/logger.service.ts +++ b/packages/cli/src/logging/logger.service.ts @@ -17,20 +17,20 @@ import type { LogLocationMetadata, LogLevel, LogMetadata } from './types'; @Service() export class Logger { - private internalLogger: winston.Logger; + protected internalLogger: winston.Logger; private readonly level: LogLevel; private readonly scopes: Set; + // TODO: needs extracting private get isScopingEnabled() { return this.scopes.size > 0; } constructor( - private readonly globalConfig: GlobalConfig, - private readonly instanceSettings: InstanceSettings, - { isRoot }: { isRoot?: boolean } = { isRoot: true }, + protected readonly globalConfig: GlobalConfig, + protected readonly instanceSettings: InstanceSettings, ) { this.level = this.globalConfig.logging.level; @@ -49,25 +49,11 @@ export class Logger { if (outputs.includes('console')) this.setConsoleTransport(); if (outputs.includes('file')) this.setFileTransport(); + // TODO: needs extracting this.scopes = new Set(scopes); } - if (isRoot) LoggerProxy.init(this); - } - - private setInternalLogger(internalLogger: winston.Logger) { - this.internalLogger = internalLogger; - } - - /** Create a logger that injects the given scopes into its log metadata. */ - scoped(scopes: LogScope | LogScope[]) { - scopes = Array.isArray(scopes) ? scopes : [scopes]; - const scopedLogger = new Logger(this.globalConfig, this.instanceSettings, { isRoot: false }); - const childLogger = this.internalLogger.child({ scopes }); - - scopedLogger.setInternalLogger(childLogger); - - return scopedLogger; + LoggerProxy.init(this); } private log(level: LogLevel, message: string, metadata: LogMetadata) { @@ -107,6 +93,7 @@ export class Logger { this.internalLogger.add(new winston.transports.Console({ format })); } + // TODO: needs extracting private scopeFilter() { return winston.format((info: TransformableInfo & { metadata: LogMetadata }) => { if (!this.isScopingEnabled) return info; @@ -125,7 +112,7 @@ export class Logger { winston.format.metadata(), winston.format.timestamp({ format: () => this.devTsFormat() }), winston.format.colorize({ all: true }), - this.scopeFilter(), + this.scopeFilter(), // TODO: needs extracting winston.format.printf(({ level: _level, message, timestamp, metadata: _metadata }) => { const SEPARATOR = ' '.repeat(3); const LOG_LEVEL_COLUMN_WIDTH = 15; // 5 columns + ANSI color codes @@ -140,7 +127,7 @@ export class Logger { return winston.format.combine( winston.format.metadata(), winston.format.timestamp(), - this.scopeFilter(), + this.scopeFilter(), // TODO: needs extracting winston.format.printf(({ level, message, timestamp, metadata }) => { const _metadata = this.toPrintable(metadata); return `${timestamp} | ${level.padEnd(5)} | ${message}${_metadata ? ' ' + _metadata : ''}`; diff --git a/packages/cli/src/logging/scoped-logger.service.ts b/packages/cli/src/logging/scoped-logger.service.ts new file mode 100644 index 0000000000..43e3522ca3 --- /dev/null +++ b/packages/cli/src/logging/scoped-logger.service.ts @@ -0,0 +1,26 @@ +import { GlobalConfig } from '@n8n/config'; +import type { LogScope } from '@n8n/config'; +import { InstanceSettings } from 'n8n-core'; +import { Service } from 'typedi'; +import type winston from 'winston'; + +import { Logger } from './logger.service'; + +@Service() +export class ScopedLogger { + protected internalLogger: winston.Logger; + + constructor( + private readonly globalConfig: GlobalConfig, + private readonly instanceSettings: InstanceSettings, + ) {} + + from(scopes: LogScope | LogScope[]) { + scopes = Array.isArray(scopes) ? scopes : [scopes]; + const scopedLogger = new Logger(this.globalConfig, this.instanceSettings); + + this.internalLogger = this.internalLogger.child({ scopes }); + + return scopedLogger; + } +} diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 9a531d3039..b2aa2b251f 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -13,7 +13,8 @@ import { Service } from 'typedi'; import config from '@/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; -import { Logger } from '@/logging/logger.service'; +import type { Logger } from '@/logging/logger.service'; +import { ScopedLogger } from '@/logging/scoped-logger.service'; import { NodeTypes } from '@/node-types'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; @@ -33,14 +34,16 @@ import type { export class JobProcessor { private readonly runningJobs: Record = {}; + private readonly logger: Logger; + constructor( - private readonly logger: Logger, + private readonly scopedLogger: ScopedLogger, private readonly executionRepository: ExecutionRepository, private readonly workflowRepository: WorkflowRepository, private readonly nodeTypes: NodeTypes, private readonly instanceSettings: InstanceSettings, ) { - this.logger = this.logger.scoped('scaling'); + this.logger = this.scopedLogger.from('scaling'); } async processJob(job: Job): Promise { diff --git a/packages/cli/src/scaling/multi-main-setup.ee.ts b/packages/cli/src/scaling/multi-main-setup.ee.ts index 8be7f4ae51..61f07dbd65 100644 --- a/packages/cli/src/scaling/multi-main-setup.ee.ts +++ b/packages/cli/src/scaling/multi-main-setup.ee.ts @@ -4,7 +4,8 @@ import { Service } from 'typedi'; import config from '@/config'; import { TIME } from '@/constants'; -import { Logger } from '@/logging/logger.service'; +import type { Logger } from '@/logging/logger.service'; +import { ScopedLogger } from '@/logging/scoped-logger.service'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import { RedisClientService } from '@/services/redis-client.service'; import { TypedEmitter } from '@/typed-emitter'; @@ -29,16 +30,18 @@ type MultiMainEvents = { @Service() export class MultiMainSetup extends TypedEmitter { constructor( - private readonly logger: Logger, + private readonly scopedLogger: ScopedLogger, private readonly instanceSettings: InstanceSettings, private readonly publisher: Publisher, private readonly redisClientService: RedisClientService, private readonly globalConfig: GlobalConfig, ) { super(); - this.logger = this.logger.scoped(['scaling', 'multi-main-setup']); + this.logger = this.scopedLogger.from(['scaling', 'multi-main-setup']); } + private readonly logger: Logger; + private leaderKey: string; private readonly leaderKeyTtl = this.globalConfig.multiMainSetup.ttl; diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index cc28c2d339..b08b4302e9 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -3,7 +3,8 @@ import { InstanceSettings } from 'n8n-core'; import { Service } from 'typedi'; import config from '@/config'; -import { Logger } from '@/logging/logger.service'; +import type { Logger } from '@/logging/logger.service'; +import { ScopedLogger } from '@/logging/scoped-logger.service'; import { RedisClientService } from '@/services/redis-client.service'; import type { PubSub } from './pubsub.types'; @@ -16,17 +17,19 @@ import { IMMEDIATE_COMMANDS, SELF_SEND_COMMANDS } from '../constants'; export class Publisher { private readonly client: SingleNodeClient | MultiNodeClient; + private readonly logger: Logger; + // #region Lifecycle constructor( - private readonly logger: Logger, + private readonly scopedLogger: ScopedLogger, private readonly redisClientService: RedisClientService, private readonly instanceSettings: InstanceSettings, ) { // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. if (config.getEnv('executions.mode') !== 'queue') return; - this.logger = this.logger.scoped(['scaling', 'pubsub']); + this.logger = this.scopedLogger.from(['scaling', 'pubsub']); this.client = this.redisClientService.createClient({ type: 'publisher(n8n)' }); } diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index ed673fc4e4..fae6b6ef8c 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -6,7 +6,8 @@ import { Service } from 'typedi'; import config from '@/config'; import { EventService } from '@/events/event.service'; -import { Logger } from '@/logging/logger.service'; +import type { Logger } from '@/logging/logger.service'; +import { ScopedLogger } from '@/logging/scoped-logger.service'; import { RedisClientService } from '@/services/redis-client.service'; import type { PubSub } from './pubsub.types'; @@ -18,8 +19,10 @@ import type { PubSub } from './pubsub.types'; export class Subscriber { private readonly client: SingleNodeClient | MultiNodeClient; + private readonly logger: Logger; + constructor( - private readonly logger: Logger, + private readonly scopedLogger: ScopedLogger, private readonly redisClientService: RedisClientService, private readonly eventService: EventService, private readonly instanceSettings: InstanceSettings, @@ -27,7 +30,7 @@ export class Subscriber { // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. if (config.getEnv('executions.mode') !== 'queue') return; - this.logger = this.logger.scoped(['scaling', 'pubsub']); + this.logger = this.scopedLogger.from(['scaling', 'pubsub']); this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' }); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index f7731e26c2..52def690d6 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -19,7 +19,8 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito import { OnShutdown } from '@/decorators/on-shutdown'; import { MaxStalledCountError } from '@/errors/max-stalled-count.error'; import { EventService } from '@/events/event.service'; -import { Logger } from '@/logging/logger.service'; +import type { Logger } from '@/logging/logger.service'; +import { ScopedLogger } from '@/logging/scoped-logger.service'; import { OrchestrationService } from '@/services/orchestration.service'; import { assertNever } from '@/utils'; @@ -41,8 +42,10 @@ import type { export class ScalingService { private queue: JobQueue; + private readonly logger: Logger; + constructor( - private readonly logger: Logger, + private readonly scopedLogger: ScopedLogger, private readonly activeExecutions: ActiveExecutions, private readonly jobProcessor: JobProcessor, private readonly globalConfig: GlobalConfig, @@ -51,7 +54,7 @@ export class ScalingService { private readonly orchestrationService: OrchestrationService, private readonly eventService: EventService, ) { - this.logger = this.logger.scoped('scaling'); + this.logger = this.scopedLogger.from('scaling'); } // #region Lifecycle diff --git a/packages/cli/src/scaling/worker-server.ts b/packages/cli/src/scaling/worker-server.ts index ee622d789c..a25a93bd56 100644 --- a/packages/cli/src/scaling/worker-server.ts +++ b/packages/cli/src/scaling/worker-server.ts @@ -13,7 +13,8 @@ import { CredentialsOverwritesAlreadySetError } from '@/errors/credentials-overw import { NonJsonBodyError } from '@/errors/non-json-body.error'; import { ExternalHooks } from '@/external-hooks'; import type { ICredentialsOverwrite } from '@/interfaces'; -import { Logger } from '@/logging/logger.service'; +import type { Logger } from '@/logging/logger.service'; +import { ScopedLogger } from '@/logging/scoped-logger.service'; import { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service'; import { rawBodyReader, bodyParser } from '@/middlewares'; import * as ResponseHelper from '@/response-helper'; @@ -47,9 +48,11 @@ export class WorkerServer { private overwritesLoaded = false; + private readonly logger: Logger; + constructor( private readonly globalConfig: GlobalConfig, - private readonly logger: Logger, + private readonly scopedLogger: ScopedLogger, private readonly credentialsOverwrites: CredentialsOverwrites, private readonly externalHooks: ExternalHooks, private readonly instanceSettings: InstanceSettings, @@ -58,7 +61,7 @@ export class WorkerServer { ) { assert(this.instanceSettings.instanceType === 'worker'); - this.logger = this.logger.scoped('scaling'); + this.logger = this.scopedLogger.from('scaling'); this.app = express(); diff --git a/packages/cli/src/services/redis-client.service.ts b/packages/cli/src/services/redis-client.service.ts index c584530165..0c0f500a13 100644 --- a/packages/cli/src/services/redis-client.service.ts +++ b/packages/cli/src/services/redis-client.service.ts @@ -4,7 +4,8 @@ import type { Cluster, RedisOptions } from 'ioredis'; import { Service } from 'typedi'; import { Debounce } from '@/decorators/debounce'; -import { Logger } from '@/logging/logger.service'; +import type { Logger } from '@/logging/logger.service'; +import { ScopedLogger } from '@/logging/scoped-logger.service'; import { TypedEmitter } from '@/typed-emitter'; import type { RedisClientType } from '../scaling/redis/redis.types'; @@ -32,13 +33,15 @@ export class RedisClientService extends TypedEmitter { /** Whether any client has lost connection to Redis. */ private lostConnection = false; + private readonly logger: Logger; + constructor( - private readonly logger: Logger, + private readonly scopedLogger: ScopedLogger, private readonly globalConfig: GlobalConfig, ) { super(); - this.logger = this.logger.scoped(['redis', 'scaling']); + this.logger = this.scopedLogger.from(['redis', 'scaling']); this.registerListeners(); } diff --git a/packages/cli/src/wait-tracker.ts b/packages/cli/src/wait-tracker.ts index 868fafa526..66bcd74a3f 100644 --- a/packages/cli/src/wait-tracker.ts +++ b/packages/cli/src/wait-tracker.ts @@ -7,11 +7,13 @@ import { import { Service } from 'typedi'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; -import { Logger } from '@/logging/logger.service'; +import type { Logger } from '@/logging/logger.service'; import { OrchestrationService } from '@/services/orchestration.service'; import { OwnershipService } from '@/services/ownership.service'; import { WorkflowRunner } from '@/workflow-runner'; +import { ScopedLogger } from './logging/scoped-logger.service'; + @Service() export class WaitTracker { private waitingExecutions: { @@ -23,15 +25,17 @@ export class WaitTracker { mainTimer: NodeJS.Timeout; + private readonly logger: Logger; + constructor( - private readonly logger: Logger, + private readonly scopedLogger: ScopedLogger, private readonly executionRepository: ExecutionRepository, private readonly ownershipService: OwnershipService, private readonly workflowRunner: WorkflowRunner, private readonly orchestrationService: OrchestrationService, private readonly instanceSettings: InstanceSettings, ) { - this.logger = this.logger.scoped('waiting-executions'); + this.logger = this.scopedLogger.from('waiting-executions'); } has(executionId: string) {