feat(core): Add Job Summary to Worker response (#7360)

This commit is contained in:
Michael Auerswald 2023-10-06 17:52:27 +02:00 committed by GitHub
parent c8c14ca0af
commit b8608cee6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 58 additions and 14 deletions

View file

@ -38,6 +38,7 @@ import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessage
import { IConfig } from '@oclif/config'; import { IConfig } from '@oclif/config';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import type { WorkerJobStatusSummary } from '../services/orchestration/worker/types';
export class Worker extends BaseCommand { export class Worker extends BaseCommand {
static description = '\nStarts a n8n worker'; static description = '\nStarts a n8n worker';
@ -56,6 +57,10 @@ export class Worker extends BaseCommand {
[key: string]: PCancelable<IRun>; [key: string]: PCancelable<IRun>;
} = {}; } = {};
static runningJobsSummary: {
[jobId: string]: WorkerJobStatusSummary;
} = {};
static jobQueue: JobQueue; static jobQueue: JobQueue;
redisSubscriber: RedisServicePubSubSubscriber; redisSubscriber: RedisServicePubSubSubscriber;
@ -232,11 +237,22 @@ export class Worker extends BaseCommand {
} }
Worker.runningJobs[job.id] = workflowRun; Worker.runningJobs[job.id] = workflowRun;
Worker.runningJobsSummary[job.id] = {
jobId: job.id.toString(),
executionId,
workflowId: fullExecutionData.workflowId ?? '',
workflowName: fullExecutionData.workflowData.name,
mode: fullExecutionData.mode,
startedAt: fullExecutionData.startedAt,
retryOf: fullExecutionData.retryOf ?? '',
status: fullExecutionData.status,
};
// Wait till the execution is finished // Wait till the execution is finished
await workflowRun; await workflowRun;
delete Worker.runningJobs[job.id]; delete Worker.runningJobs[job.id];
delete Worker.runningJobsSummary[job.id];
// do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution() // do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution()
// already! // already!
@ -305,6 +321,7 @@ export class Worker extends BaseCommand {
instanceId: this.instanceId, instanceId: this.instanceId,
redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher, redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs), getRunningJobIds: () => Object.keys(Worker.runningJobs),
getRunningJobsSummary: () => Object.values(Worker.runningJobsSummary),
}); });
} }

View file

@ -1,7 +1,7 @@
import Container from 'typedi'; import Container from 'typedi';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/handleCommandMessageWorker';
import { RedisService } from './redis.service'; import { RedisService } from './redis.service';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';
export abstract class OrchestrationHandlerService { export abstract class OrchestrationHandlerService {
protected initialized = false; protected initialized = false;

View file

@ -1,6 +1,7 @@
import { LoggerProxy, jsonParse } from 'n8n-workflow'; import { LoggerProxy, jsonParse } from 'n8n-workflow';
import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands'; import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper'; import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper';
import * as os from 'os';
export interface RedisServiceCommandLastReceived { export interface RedisServiceCommandLastReceived {
[date: string]: Date; [date: string]: Date;
@ -31,3 +32,9 @@ export function debounceMessageReceiver(message: RedisServiceCommandObject, time
lastReceived[message.command] = now; lastReceived[message.command] = now;
return true; return true;
} }
export function getOsCpuString(): string {
const cpus = os.cpus();
if (cpus.length === 0) return 'no CPU info';
return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`;
}

View file

@ -5,7 +5,9 @@ export async function handleWorkerResponseMessageMain(messageString: string) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString); const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
if (workerResponse) { if (workerResponse) {
// TODO: Handle worker response // TODO: Handle worker response
LoggerProxy.debug('Received worker response', workerResponse); LoggerProxy.debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
} }
return workerResponse; return workerResponse;
} }

View file

@ -1,20 +1,13 @@
import { jsonParse, LoggerProxy } from 'n8n-workflow'; import { jsonParse, LoggerProxy } from 'n8n-workflow';
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import * as os from 'os'; import * as os from 'os';
import Container from 'typedi'; import Container from 'typedi';
import { License } from '@/License'; import { License } from '@/License';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { debounceMessageReceiver } from '../helpers'; import { debounceMessageReceiver, getOsCpuString } from '../helpers';
import type { WorkerCommandReceivedHandlerOptions } from './types';
export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string;
instanceId: string;
redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[];
}
export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) { export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
return async (channel: string, messageString: string) => { return async (channel: string, messageString: string) => {
@ -45,11 +38,12 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
payload: { payload: {
workerId: options.queueModeId, workerId: options.queueModeId,
runningJobs: options.getRunningJobIds(), runningJobs: options.getRunningJobIds(),
runningJobsSummary: options.getRunningJobsSummary(),
freeMem: os.freemem(), freeMem: os.freemem(),
totalMem: os.totalmem(), totalMem: os.totalmem(),
uptime: process.uptime(), uptime: process.uptime(),
loadAvg: os.loadavg(), loadAvg: os.loadavg(),
cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`), cpus: getOsCpuString(),
arch: os.arch(), arch: os.arch(),
platform: os.platform(), platform: os.platform(),
hostname: os.hostname(), hostname: os.hostname(),

View file

@ -1,7 +1,7 @@
import { Service } from 'typedi'; import { Service } from 'typedi';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import type { WorkerCommandReceivedHandlerOptions } from './handleCommandMessageWorker';
import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker'; import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker';
import type { WorkerCommandReceivedHandlerOptions } from './types';
@Service() @Service()
export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService { export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService {

View file

@ -0,0 +1,21 @@
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import type { RedisServicePubSubPublisher } from '../../redis/RedisServicePubSubPublisher';
export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string;
instanceId: string;
redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[];
getRunningJobsSummary: () => WorkerJobStatusSummary[];
}
export interface WorkerJobStatusSummary {
jobId: string;
executionId: string;
retryOf?: string;
startedAt: Date;
mode: WorkflowExecuteMode;
workflowName: string;
workflowId: string;
status: ExecutionStatus;
}

View file

@ -1,3 +1,5 @@
import type { WorkerJobStatusSummary } from '../orchestration/worker/types';
export type RedisServiceCommand = export type RedisServiceCommand =
| 'getStatus' | 'getStatus'
| 'getId' | 'getId'
@ -29,11 +31,12 @@ export type RedisServiceWorkerResponseObject = {
payload: { payload: {
workerId: string; workerId: string;
runningJobs: string[]; runningJobs: string[];
runningJobsSummary: WorkerJobStatusSummary[];
freeMem: number; freeMem: number;
totalMem: number; totalMem: number;
uptime: number; uptime: number;
loadAvg: number[]; loadAvg: number[];
cpus: string[]; cpus: string;
arch: string; arch: string;
platform: NodeJS.Platform; platform: NodeJS.Platform;
hostname: string; hostname: string;