mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-25 04:34:06 -08:00
feat(core): Add Job Summary to Worker response (#7360)
This commit is contained in:
parent
c8c14ca0af
commit
b8608cee6d
|
@ -38,6 +38,7 @@ import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessage
|
|||
import { IConfig } from '@oclif/config';
|
||||
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
|
||||
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
|
||||
import type { WorkerJobStatusSummary } from '../services/orchestration/worker/types';
|
||||
|
||||
export class Worker extends BaseCommand {
|
||||
static description = '\nStarts a n8n worker';
|
||||
|
@ -56,6 +57,10 @@ export class Worker extends BaseCommand {
|
|||
[key: string]: PCancelable<IRun>;
|
||||
} = {};
|
||||
|
||||
static runningJobsSummary: {
|
||||
[jobId: string]: WorkerJobStatusSummary;
|
||||
} = {};
|
||||
|
||||
static jobQueue: JobQueue;
|
||||
|
||||
redisSubscriber: RedisServicePubSubSubscriber;
|
||||
|
@ -232,11 +237,22 @@ export class Worker extends BaseCommand {
|
|||
}
|
||||
|
||||
Worker.runningJobs[job.id] = workflowRun;
|
||||
Worker.runningJobsSummary[job.id] = {
|
||||
jobId: job.id.toString(),
|
||||
executionId,
|
||||
workflowId: fullExecutionData.workflowId ?? '',
|
||||
workflowName: fullExecutionData.workflowData.name,
|
||||
mode: fullExecutionData.mode,
|
||||
startedAt: fullExecutionData.startedAt,
|
||||
retryOf: fullExecutionData.retryOf ?? '',
|
||||
status: fullExecutionData.status,
|
||||
};
|
||||
|
||||
// Wait till the execution is finished
|
||||
await workflowRun;
|
||||
|
||||
delete Worker.runningJobs[job.id];
|
||||
delete Worker.runningJobsSummary[job.id];
|
||||
|
||||
// do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution()
|
||||
// already!
|
||||
|
@ -305,6 +321,7 @@ export class Worker extends BaseCommand {
|
|||
instanceId: this.instanceId,
|
||||
redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
|
||||
getRunningJobIds: () => Object.keys(Worker.runningJobs),
|
||||
getRunningJobsSummary: () => Object.values(Worker.runningJobsSummary),
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import Container from 'typedi';
|
||||
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/handleCommandMessageWorker';
|
||||
import { RedisService } from './redis.service';
|
||||
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
|
||||
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';
|
||||
|
||||
export abstract class OrchestrationHandlerService {
|
||||
protected initialized = false;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { LoggerProxy, jsonParse } from 'n8n-workflow';
|
||||
import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands';
|
||||
import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper';
|
||||
import * as os from 'os';
|
||||
|
||||
export interface RedisServiceCommandLastReceived {
|
||||
[date: string]: Date;
|
||||
|
@ -31,3 +32,9 @@ export function debounceMessageReceiver(message: RedisServiceCommandObject, time
|
|||
lastReceived[message.command] = now;
|
||||
return true;
|
||||
}
|
||||
|
||||
export function getOsCpuString(): string {
|
||||
const cpus = os.cpus();
|
||||
if (cpus.length === 0) return 'no CPU info';
|
||||
return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`;
|
||||
}
|
||||
|
|
|
@ -5,7 +5,9 @@ export async function handleWorkerResponseMessageMain(messageString: string) {
|
|||
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
|
||||
if (workerResponse) {
|
||||
// TODO: Handle worker response
|
||||
LoggerProxy.debug('Received worker response', workerResponse);
|
||||
LoggerProxy.debug(
|
||||
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
|
||||
);
|
||||
}
|
||||
return workerResponse;
|
||||
}
|
||||
|
|
|
@ -1,20 +1,13 @@
|
|||
import { jsonParse, LoggerProxy } from 'n8n-workflow';
|
||||
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
|
||||
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
|
||||
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
|
||||
import * as os from 'os';
|
||||
import Container from 'typedi';
|
||||
import { License } from '@/License';
|
||||
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
|
||||
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
||||
import { debounceMessageReceiver } from '../helpers';
|
||||
|
||||
export interface WorkerCommandReceivedHandlerOptions {
|
||||
queueModeId: string;
|
||||
instanceId: string;
|
||||
redisPublisher: RedisServicePubSubPublisher;
|
||||
getRunningJobIds: () => string[];
|
||||
}
|
||||
import { debounceMessageReceiver, getOsCpuString } from '../helpers';
|
||||
import type { WorkerCommandReceivedHandlerOptions } from './types';
|
||||
|
||||
export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
|
||||
return async (channel: string, messageString: string) => {
|
||||
|
@ -45,11 +38,12 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
|
|||
payload: {
|
||||
workerId: options.queueModeId,
|
||||
runningJobs: options.getRunningJobIds(),
|
||||
runningJobsSummary: options.getRunningJobsSummary(),
|
||||
freeMem: os.freemem(),
|
||||
totalMem: os.totalmem(),
|
||||
uptime: process.uptime(),
|
||||
loadAvg: os.loadavg(),
|
||||
cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`),
|
||||
cpus: getOsCpuString(),
|
||||
arch: os.arch(),
|
||||
platform: os.platform(),
|
||||
hostname: os.hostname(),
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { Service } from 'typedi';
|
||||
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
|
||||
import type { WorkerCommandReceivedHandlerOptions } from './handleCommandMessageWorker';
|
||||
import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker';
|
||||
import type { WorkerCommandReceivedHandlerOptions } from './types';
|
||||
|
||||
@Service()
|
||||
export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService {
|
||||
|
|
21
packages/cli/src/services/orchestration/worker/types.ts
Normal file
21
packages/cli/src/services/orchestration/worker/types.ts
Normal file
|
@ -0,0 +1,21 @@
|
|||
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
|
||||
import type { RedisServicePubSubPublisher } from '../../redis/RedisServicePubSubPublisher';
|
||||
|
||||
export interface WorkerCommandReceivedHandlerOptions {
|
||||
queueModeId: string;
|
||||
instanceId: string;
|
||||
redisPublisher: RedisServicePubSubPublisher;
|
||||
getRunningJobIds: () => string[];
|
||||
getRunningJobsSummary: () => WorkerJobStatusSummary[];
|
||||
}
|
||||
|
||||
export interface WorkerJobStatusSummary {
|
||||
jobId: string;
|
||||
executionId: string;
|
||||
retryOf?: string;
|
||||
startedAt: Date;
|
||||
mode: WorkflowExecuteMode;
|
||||
workflowName: string;
|
||||
workflowId: string;
|
||||
status: ExecutionStatus;
|
||||
}
|
|
@ -1,3 +1,5 @@
|
|||
import type { WorkerJobStatusSummary } from '../orchestration/worker/types';
|
||||
|
||||
export type RedisServiceCommand =
|
||||
| 'getStatus'
|
||||
| 'getId'
|
||||
|
@ -29,11 +31,12 @@ export type RedisServiceWorkerResponseObject = {
|
|||
payload: {
|
||||
workerId: string;
|
||||
runningJobs: string[];
|
||||
runningJobsSummary: WorkerJobStatusSummary[];
|
||||
freeMem: number;
|
||||
totalMem: number;
|
||||
uptime: number;
|
||||
loadAvg: number[];
|
||||
cpus: string[];
|
||||
cpus: string;
|
||||
arch: string;
|
||||
platform: NodeJS.Platform;
|
||||
hostname: string;
|
||||
|
|
Loading…
Reference in a new issue