feat: Forward logs from task runner to logger (no-changelog) (#11422)

This commit is contained in:
Tomi Turtiainen 2024-10-28 16:44:03 +02:00 committed by GitHub
parent c56f30ce15
commit d4c4db823e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 175 additions and 9 deletions

View file

@ -11,6 +11,7 @@ export const LOG_SCOPES = [
'redis', 'redis',
'scaling', 'scaling',
'waiting-executions', 'waiting-executions',
'task-runner',
] as const; ] as const;
export type LogScope = (typeof LOG_SCOPES)[number]; export type LogScope = (typeof LOG_SCOPES)[number];

View file

@ -0,0 +1,114 @@
import type { Logger } from 'n8n-workflow';
import { Readable } from 'stream';
import { forwardToLogger } from '../forward-to-logger';
describe('forwardToLogger', () => {
let logger: Logger;
let stdout: Readable;
let stderr: Readable;
beforeEach(() => {
logger = {
info: jest.fn(),
error: jest.fn(),
} as unknown as Logger;
stdout = new Readable({ read() {} });
stderr = new Readable({ read() {} });
jest.resetAllMocks();
});
const pushToStdout = async (data: string) => {
stdout.push(Buffer.from(data));
stdout.push(null);
// Wait for the next tick to allow the event loop to process the data
await new Promise((resolve) => setImmediate(resolve));
};
const pushToStderr = async (data: string) => {
stderr.push(Buffer.from(data));
stderr.push(null);
// Wait for the next tick to allow the event loop to process the data
await new Promise((resolve) => setImmediate(resolve));
};
it('should forward stdout data to logger.info', async () => {
forwardToLogger(logger, { stdout, stderr: null });
await pushToStdout('Test stdout message');
await new Promise((resolve) => setImmediate(resolve));
expect(logger.info).toHaveBeenCalledWith('Test stdout message');
});
it('should forward stderr data to logger.error', async () => {
forwardToLogger(logger, { stdout: null, stderr });
await pushToStderr('Test stderr message');
expect(logger.error).toHaveBeenCalledWith('Test stderr message');
});
it('should remove trailing newline from stdout', async () => {
forwardToLogger(logger, { stdout, stderr: null });
await pushToStdout('Test stdout message\n');
expect(logger.info).toHaveBeenCalledWith('Test stdout message');
});
it('should remove trailing newline from stderr', async () => {
forwardToLogger(logger, { stdout: null, stderr });
await pushToStderr('Test stderr message\n');
expect(logger.error).toHaveBeenCalledWith('Test stderr message');
});
it('should forward stderr data to logger.error', async () => {
forwardToLogger(logger, { stdout: null, stderr });
await pushToStderr('Test stderr message');
expect(logger.error).toHaveBeenCalledWith('Test stderr message');
});
it('should include prefix if provided for stdout', async () => {
const prefix = '[PREFIX]';
forwardToLogger(logger, { stdout, stderr: null }, prefix);
await pushToStdout('Message with prefix');
expect(logger.info).toHaveBeenCalledWith('[PREFIX] Message with prefix');
});
it('should include prefix if provided for stderr', async () => {
const prefix = '[PREFIX]';
forwardToLogger(logger, { stdout: null, stderr }, prefix);
await pushToStderr('Error message with prefix');
expect(logger.error).toHaveBeenCalledWith('[PREFIX] Error message with prefix');
});
it('should make sure there is no duplicate space after prefix for stdout', async () => {
const prefix = '[PREFIX] ';
forwardToLogger(logger, { stdout, stderr: null }, prefix);
await pushToStdout('Message with prefix');
expect(logger.info).toHaveBeenCalledWith('[PREFIX] Message with prefix');
});
it('should make sure there is no duplicate space after prefix for stderr', async () => {
const prefix = '[PREFIX] ';
forwardToLogger(logger, { stdout: null, stderr }, prefix);
await pushToStderr('Error message with prefix');
expect(logger.error).toHaveBeenCalledWith('[PREFIX] Error message with prefix');
});
});

View file

@ -2,9 +2,10 @@ import { TaskRunnersConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { ChildProcess, SpawnOptions } from 'node:child_process'; import type { ChildProcess, SpawnOptions } from 'node:child_process';
import { mockInstance } from '../../../test/shared/mocking'; import { Logger } from '@/logging/logger.service';
import type { TaskRunnerAuthService } from '../auth/task-runner-auth.service'; import type { TaskRunnerAuthService } from '@/runners/auth/task-runner-auth.service';
import { TaskRunnerProcess } from '../task-runner-process'; import { TaskRunnerProcess } from '@/runners/task-runner-process';
import { mockInstance } from '@test/mocking';
const spawnMock = jest.fn(() => const spawnMock = jest.fn(() =>
mock<ChildProcess>({ mock<ChildProcess>({
@ -19,11 +20,12 @@ const spawnMock = jest.fn(() =>
require('child_process').spawn = spawnMock; require('child_process').spawn = spawnMock;
describe('TaskRunnerProcess', () => { describe('TaskRunnerProcess', () => {
const logger = mockInstance(Logger);
const runnerConfig = mockInstance(TaskRunnersConfig); const runnerConfig = mockInstance(TaskRunnersConfig);
runnerConfig.disabled = false; runnerConfig.disabled = false;
runnerConfig.mode = 'internal_childprocess'; runnerConfig.mode = 'internal_childprocess';
const authService = mock<TaskRunnerAuthService>(); const authService = mock<TaskRunnerAuthService>();
let taskRunnerProcess = new TaskRunnerProcess(runnerConfig, authService); let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);
afterEach(async () => { afterEach(async () => {
spawnMock.mockClear(); spawnMock.mockClear();
@ -33,7 +35,7 @@ describe('TaskRunnerProcess', () => {
it('should throw if runner mode is external', () => { it('should throw if runner mode is external', () => {
runnerConfig.mode = 'external'; runnerConfig.mode = 'external';
expect(() => new TaskRunnerProcess(runnerConfig, authService)).toThrow(); expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow();
runnerConfig.mode = 'internal_childprocess'; runnerConfig.mode = 'internal_childprocess';
}); });
@ -41,7 +43,7 @@ describe('TaskRunnerProcess', () => {
describe('start', () => { describe('start', () => {
beforeEach(() => { beforeEach(() => {
taskRunnerProcess = new TaskRunnerProcess(runnerConfig, authService); taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);
}); });
test.each(['PATH', 'NODE_FUNCTION_ALLOW_BUILTIN', 'NODE_FUNCTION_ALLOW_EXTERNAL'])( test.each(['PATH', 'NODE_FUNCTION_ALLOW_BUILTIN', 'NODE_FUNCTION_ALLOW_EXTERNAL'])(

View file

@ -0,0 +1,42 @@
import type { Logger } from 'n8n-workflow';
import type { Readable } from 'stream';
/**
* Forwards stdout and stderr of a given producer to the given
* logger's info and error methods respectively.
*/
export function forwardToLogger(
logger: Logger,
producer: {
stdout?: Readable | null;
stderr?: Readable | null;
},
prefix?: string,
) {
if (prefix) {
prefix = prefix.trimEnd();
}
const stringify = (data: Buffer) => {
let str = data.toString();
// Remove possible trailing newline (otherwise it's duplicated)
if (str.endsWith('\n')) {
str = str.slice(0, -1);
}
return prefix ? `${prefix} ${str}` : str;
};
if (producer.stdout) {
producer.stdout.on('data', (data: Buffer) => {
logger.info(stringify(data));
});
}
if (producer.stderr) {
producer.stderr.on('data', (data: Buffer) => {
logger.error(stringify(data));
});
}
}

View file

@ -4,8 +4,11 @@ import { spawn } from 'node:child_process';
import * as process from 'node:process'; import * as process from 'node:process';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { OnShutdown } from '@/decorators/on-shutdown';
import { Logger } from '@/logging/logger.service';
import { TaskRunnerAuthService } from './auth/task-runner-auth.service'; import { TaskRunnerAuthService } from './auth/task-runner-auth.service';
import { OnShutdown } from '../decorators/on-shutdown'; import { forwardToLogger } from './forward-to-logger';
type ChildProcess = ReturnType<typeof spawn>; type ChildProcess = ReturnType<typeof spawn>;
@ -38,6 +41,8 @@ export class TaskRunnerProcess {
private isShuttingDown = false; private isShuttingDown = false;
private logger: Logger;
private readonly passthroughEnvVars = [ private readonly passthroughEnvVars = [
'PATH', 'PATH',
'NODE_FUNCTION_ALLOW_BUILTIN', 'NODE_FUNCTION_ALLOW_BUILTIN',
@ -45,6 +50,7 @@ export class TaskRunnerProcess {
] as const; ] as const;
constructor( constructor(
logger: Logger,
private readonly runnerConfig: TaskRunnersConfig, private readonly runnerConfig: TaskRunnersConfig,
private readonly authService: TaskRunnerAuthService, private readonly authService: TaskRunnerAuthService,
) { ) {
@ -52,6 +58,8 @@ export class TaskRunnerProcess {
this.runnerConfig.mode === 'internal_childprocess' || this.runnerConfig.mode === 'internal_childprocess' ||
this.runnerConfig.mode === 'internal_launcher', this.runnerConfig.mode === 'internal_launcher',
); );
this.logger = logger.scoped('task-runner');
} }
async start() { async start() {
@ -64,8 +72,7 @@ export class TaskRunnerProcess {
? this.startLauncher(grantToken, n8nUri) ? this.startLauncher(grantToken, n8nUri)
: this.startNode(grantToken, n8nUri); : this.startNode(grantToken, n8nUri);
this.process.stdout?.pipe(process.stdout); forwardToLogger(this.logger, this.process, '[Task Runner]: ');
this.process.stderr?.pipe(process.stderr);
this.monitorProcess(this.process); this.monitorProcess(this.process);
} }