refactor(core): Move workerCommandHandler into worker.ts (no-changelog) (#7160)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-09-13 14:51:41 +02:00 committed by GitHub
parent 217de21605
commit 34ebffea45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 90 deletions

View file

@ -2,12 +2,13 @@ import express from 'express';
import http from 'http';
import type PCancelable from 'p-cancelable';
import { Container } from 'typedi';
import * as os from 'os';
import { flags } from '@oclif/command';
import { WorkflowExecute } from 'n8n-core';
import type { ExecutionStatus, IExecuteResponsePromiseData, INodeTypes, IRun } from 'n8n-workflow';
import { Workflow, NodeOperationError, LoggerProxy, sleep } from 'n8n-workflow';
import { Workflow, NodeOperationError, LoggerProxy, sleep, jsonParse } from 'n8n-workflow';
import * as Db from '@/Db';
import * as ResponseHelper from '@/ResponseHelper';
@ -31,7 +32,8 @@ import { eventBus } from '../eventbus';
import { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher';
import { RedisServicePubSubSubscriber } from '../services/redis/RedisServicePubSubSubscriber';
import { EventMessageGeneric } from '../eventbus/EventMessageClasses/EventMessageGeneric';
import { getWorkerCommandReceivedHandler } from './workerCommandHandler';
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import { type RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
export class Worker extends BaseCommand {
static description = '\nStarts a n8n worker';
@ -281,12 +283,7 @@ export class Worker extends BaseCommand {
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'WorkerCommandReceivedHandler',
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
getWorkerCommandReceivedHandler({
uniqueInstanceId: this.uniqueInstanceId,
redisPublisher: this.redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs),
}),
this.getWorkerCommandReceivedHandler(),
);
}
@ -467,4 +464,78 @@ export class Worker extends BaseCommand {
async catch(error: Error) {
await this.exitWithCrash('Worker exiting due to an error.', error);
}
private getWorkerCommandReceivedHandler() {
const { uniqueInstanceId, redisPublisher } = this;
const getRunningJobIds = () => Object.keys(Worker.runningJobs);
return async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) {
if (!messageString) return;
let message: RedisServiceCommandObject;
try {
message = jsonParse<RedisServiceCommandObject>(messageString);
} catch {
LoggerProxy.debug(
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`,
);
return;
}
if (message) {
if (message.targets && !message.targets.includes(uniqueInstanceId)) {
return; // early return if the message is not for this worker
}
switch (message.command) {
case 'getStatus':
await redisPublisher.publishToWorkerChannel({
workerId: uniqueInstanceId,
command: message.command,
payload: {
workerId: uniqueInstanceId,
runningJobs: getRunningJobIds(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
uptime: process.uptime(),
loadAvg: os.loadavg(),
cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`),
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),
net: Object.values(os.networkInterfaces()).flatMap(
(interfaces) =>
interfaces?.map((net) => `${net.family} - address: ${net.address}`) ?? '',
),
},
});
break;
case 'getId':
await redisPublisher.publishToWorkerChannel({
workerId: uniqueInstanceId,
command: message.command,
});
break;
case 'restartEventBus':
await eventBus.restart();
await redisPublisher.publishToWorkerChannel({
workerId: uniqueInstanceId,
command: message.command,
payload: {
result: 'success',
},
});
break;
case 'stopWorker':
// TODO: implement proper shutdown
// await this.stopProcess();
break;
default:
LoggerProxy.debug(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`,
);
break;
}
}
}
};
}
}

View file

@ -1,82 +0,0 @@
import { jsonParse, LoggerProxy } from 'n8n-workflow';
import { eventBus } from '../eventbus';
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher';
import * as os from 'os';
export function getWorkerCommandReceivedHandler(options: {
uniqueInstanceId: string;
redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[];
}) {
return async (channel: string, messageString: string) => {
if (channel === COMMAND_REDIS_CHANNEL) {
if (!messageString) return;
let message: RedisServiceCommandObject;
try {
message = jsonParse<RedisServiceCommandObject>(messageString);
} catch {
LoggerProxy.debug(
`Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`,
);
return;
}
if (message) {
if (message.targets && !message.targets.includes(options.uniqueInstanceId)) {
return; // early return if the message is not for this worker
}
switch (message.command) {
case 'getStatus':
await options.redisPublisher.publishToWorkerChannel({
workerId: options.uniqueInstanceId,
command: message.command,
payload: {
workerId: options.uniqueInstanceId,
runningJobs: options.getRunningJobIds(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
uptime: process.uptime(),
loadAvg: os.loadavg(),
cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`),
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),
net: Object.values(os.networkInterfaces()).flatMap(
(interfaces) =>
interfaces?.map((net) => `${net.family} - address: ${net.address}`) ?? '',
),
},
});
break;
case 'getId':
await options.redisPublisher.publishToWorkerChannel({
workerId: options.uniqueInstanceId,
command: message.command,
});
break;
case 'restartEventBus':
await eventBus.restart();
await options.redisPublisher.publishToWorkerChannel({
workerId: options.uniqueInstanceId,
command: message.command,
payload: {
result: 'success',
},
});
break;
case 'stopWorker':
// TODO: implement proper shutdown
// await this.stopProcess();
break;
default:
LoggerProxy.debug(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`,
);
break;
}
}
}
};
}