feat(core): Add Job Summary to Worker response (#7360)

This commit is contained in:
Michael Auerswald 2023-10-06 17:52:27 +02:00 committed by GitHub
parent c8c14ca0af
commit b8608cee6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 58 additions and 14 deletions

View file

@ -38,6 +38,7 @@ import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessage
import { IConfig } from '@oclif/config';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import type { WorkerJobStatusSummary } from '../services/orchestration/worker/types';
export class Worker extends BaseCommand {
static description = '\nStarts a n8n worker';
@ -56,6 +57,10 @@ export class Worker extends BaseCommand {
[key: string]: PCancelable<IRun>;
} = {};
static runningJobsSummary: {
[jobId: string]: WorkerJobStatusSummary;
} = {};
static jobQueue: JobQueue;
redisSubscriber: RedisServicePubSubSubscriber;
@ -232,11 +237,22 @@ export class Worker extends BaseCommand {
}
Worker.runningJobs[job.id] = workflowRun;
Worker.runningJobsSummary[job.id] = {
jobId: job.id.toString(),
executionId,
workflowId: fullExecutionData.workflowId ?? '',
workflowName: fullExecutionData.workflowData.name,
mode: fullExecutionData.mode,
startedAt: fullExecutionData.startedAt,
retryOf: fullExecutionData.retryOf ?? '',
status: fullExecutionData.status,
};
// Wait till the execution is finished
await workflowRun;
delete Worker.runningJobs[job.id];
delete Worker.runningJobsSummary[job.id];
// do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution()
// already!
@ -305,6 +321,7 @@ export class Worker extends BaseCommand {
instanceId: this.instanceId,
redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs),
getRunningJobsSummary: () => Object.values(Worker.runningJobsSummary),
});
}

View file

@ -1,7 +1,7 @@
import Container from 'typedi';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/handleCommandMessageWorker';
import { RedisService } from './redis.service';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';
export abstract class OrchestrationHandlerService {
protected initialized = false;

View file

@ -1,6 +1,7 @@
import { LoggerProxy, jsonParse } from 'n8n-workflow';
import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper';
import * as os from 'os';
export interface RedisServiceCommandLastReceived {
[date: string]: Date;
@ -31,3 +32,9 @@ export function debounceMessageReceiver(message: RedisServiceCommandObject, time
lastReceived[message.command] = now;
return true;
}
export function getOsCpuString(): string {
const cpus = os.cpus();
if (cpus.length === 0) return 'no CPU info';
return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`;
}

View file

@ -5,7 +5,9 @@ export async function handleWorkerResponseMessageMain(messageString: string) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
if (workerResponse) {
// TODO: Handle worker response
LoggerProxy.debug('Received worker response', workerResponse);
LoggerProxy.debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
}
return workerResponse;
}

View file

@ -1,20 +1,13 @@
import { jsonParse, LoggerProxy } from 'n8n-workflow';
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import * as os from 'os';
import Container from 'typedi';
import { License } from '@/License';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { debounceMessageReceiver } from '../helpers';
export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string;
instanceId: string;
redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[];
}
import { debounceMessageReceiver, getOsCpuString } from '../helpers';
import type { WorkerCommandReceivedHandlerOptions } from './types';
export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
return async (channel: string, messageString: string) => {
@ -45,11 +38,12 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
payload: {
workerId: options.queueModeId,
runningJobs: options.getRunningJobIds(),
runningJobsSummary: options.getRunningJobsSummary(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
uptime: process.uptime(),
loadAvg: os.loadavg(),
cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`),
cpus: getOsCpuString(),
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),

View file

@ -1,7 +1,7 @@
import { Service } from 'typedi';
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
import type { WorkerCommandReceivedHandlerOptions } from './handleCommandMessageWorker';
import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker';
import type { WorkerCommandReceivedHandlerOptions } from './types';
@Service()
export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService {

View file

@ -0,0 +1,21 @@
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import type { RedisServicePubSubPublisher } from '../../redis/RedisServicePubSubPublisher';
export interface WorkerCommandReceivedHandlerOptions {
queueModeId: string;
instanceId: string;
redisPublisher: RedisServicePubSubPublisher;
getRunningJobIds: () => string[];
getRunningJobsSummary: () => WorkerJobStatusSummary[];
}
export interface WorkerJobStatusSummary {
jobId: string;
executionId: string;
retryOf?: string;
startedAt: Date;
mode: WorkflowExecuteMode;
workflowName: string;
workflowId: string;
status: ExecutionStatus;
}

View file

@ -1,3 +1,5 @@
import type { WorkerJobStatusSummary } from '../orchestration/worker/types';
export type RedisServiceCommand =
| 'getStatus'
| 'getId'
@ -29,11 +31,12 @@ export type RedisServiceWorkerResponseObject = {
payload: {
workerId: string;
runningJobs: string[];
runningJobsSummary: WorkerJobStatusSummary[];
freeMem: number;
totalMem: number;
uptime: number;
loadAvg: number[];
cpus: string[];
cpus: string;
arch: string;
platform: NodeJS.Platform;
hostname: string;