mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-25 12:44:07 -08:00
feat(core): Add Job Summary to Worker response (#7360)
This commit is contained in:
parent
c8c14ca0af
commit
b8608cee6d
|
@ -38,6 +38,7 @@ import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessage
|
||||||
import { IConfig } from '@oclif/config';
|
import { IConfig } from '@oclif/config';
|
||||||
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
|
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
|
||||||
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
|
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
|
||||||
|
import type { WorkerJobStatusSummary } from '../services/orchestration/worker/types';
|
||||||
|
|
||||||
export class Worker extends BaseCommand {
|
export class Worker extends BaseCommand {
|
||||||
static description = '\nStarts a n8n worker';
|
static description = '\nStarts a n8n worker';
|
||||||
|
@ -56,6 +57,10 @@ export class Worker extends BaseCommand {
|
||||||
[key: string]: PCancelable<IRun>;
|
[key: string]: PCancelable<IRun>;
|
||||||
} = {};
|
} = {};
|
||||||
|
|
||||||
|
static runningJobsSummary: {
|
||||||
|
[jobId: string]: WorkerJobStatusSummary;
|
||||||
|
} = {};
|
||||||
|
|
||||||
static jobQueue: JobQueue;
|
static jobQueue: JobQueue;
|
||||||
|
|
||||||
redisSubscriber: RedisServicePubSubSubscriber;
|
redisSubscriber: RedisServicePubSubSubscriber;
|
||||||
|
@ -232,11 +237,22 @@ export class Worker extends BaseCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
Worker.runningJobs[job.id] = workflowRun;
|
Worker.runningJobs[job.id] = workflowRun;
|
||||||
|
Worker.runningJobsSummary[job.id] = {
|
||||||
|
jobId: job.id.toString(),
|
||||||
|
executionId,
|
||||||
|
workflowId: fullExecutionData.workflowId ?? '',
|
||||||
|
workflowName: fullExecutionData.workflowData.name,
|
||||||
|
mode: fullExecutionData.mode,
|
||||||
|
startedAt: fullExecutionData.startedAt,
|
||||||
|
retryOf: fullExecutionData.retryOf ?? '',
|
||||||
|
status: fullExecutionData.status,
|
||||||
|
};
|
||||||
|
|
||||||
// Wait till the execution is finished
|
// Wait till the execution is finished
|
||||||
await workflowRun;
|
await workflowRun;
|
||||||
|
|
||||||
delete Worker.runningJobs[job.id];
|
delete Worker.runningJobs[job.id];
|
||||||
|
delete Worker.runningJobsSummary[job.id];
|
||||||
|
|
||||||
// do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution()
|
// do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution()
|
||||||
// already!
|
// already!
|
||||||
|
@ -305,6 +321,7 @@ export class Worker extends BaseCommand {
|
||||||
instanceId: this.instanceId,
|
instanceId: this.instanceId,
|
||||||
redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
|
redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
|
||||||
getRunningJobIds: () => Object.keys(Worker.runningJobs),
|
getRunningJobIds: () => Object.keys(Worker.runningJobs),
|
||||||
|
getRunningJobsSummary: () => Object.values(Worker.runningJobsSummary),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import Container from 'typedi';
|
import Container from 'typedi';
|
||||||
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/handleCommandMessageWorker';
|
|
||||||
import { RedisService } from './redis.service';
|
import { RedisService } from './redis.service';
|
||||||
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
|
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
|
||||||
|
import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types';
|
||||||
|
|
||||||
export abstract class OrchestrationHandlerService {
|
export abstract class OrchestrationHandlerService {
|
||||||
protected initialized = false;
|
protected initialized = false;
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import { LoggerProxy, jsonParse } from 'n8n-workflow';
|
import { LoggerProxy, jsonParse } from 'n8n-workflow';
|
||||||
import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands';
|
import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands';
|
||||||
import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper';
|
import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper';
|
||||||
|
import * as os from 'os';
|
||||||
|
|
||||||
export interface RedisServiceCommandLastReceived {
|
export interface RedisServiceCommandLastReceived {
|
||||||
[date: string]: Date;
|
[date: string]: Date;
|
||||||
|
@ -31,3 +32,9 @@ export function debounceMessageReceiver(message: RedisServiceCommandObject, time
|
||||||
lastReceived[message.command] = now;
|
lastReceived[message.command] = now;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function getOsCpuString(): string {
|
||||||
|
const cpus = os.cpus();
|
||||||
|
if (cpus.length === 0) return 'no CPU info';
|
||||||
|
return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`;
|
||||||
|
}
|
||||||
|
|
|
@ -5,7 +5,9 @@ export async function handleWorkerResponseMessageMain(messageString: string) {
|
||||||
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
|
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
|
||||||
if (workerResponse) {
|
if (workerResponse) {
|
||||||
// TODO: Handle worker response
|
// TODO: Handle worker response
|
||||||
LoggerProxy.debug('Received worker response', workerResponse);
|
LoggerProxy.debug(
|
||||||
|
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
return workerResponse;
|
return workerResponse;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,20 +1,13 @@
|
||||||
import { jsonParse, LoggerProxy } from 'n8n-workflow';
|
import { jsonParse, LoggerProxy } from 'n8n-workflow';
|
||||||
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
|
import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands';
|
||||||
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
|
import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
|
||||||
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
|
|
||||||
import * as os from 'os';
|
import * as os from 'os';
|
||||||
import Container from 'typedi';
|
import Container from 'typedi';
|
||||||
import { License } from '@/License';
|
import { License } from '@/License';
|
||||||
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
|
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
|
||||||
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
||||||
import { debounceMessageReceiver } from '../helpers';
|
import { debounceMessageReceiver, getOsCpuString } from '../helpers';
|
||||||
|
import type { WorkerCommandReceivedHandlerOptions } from './types';
|
||||||
export interface WorkerCommandReceivedHandlerOptions {
|
|
||||||
queueModeId: string;
|
|
||||||
instanceId: string;
|
|
||||||
redisPublisher: RedisServicePubSubPublisher;
|
|
||||||
getRunningJobIds: () => string[];
|
|
||||||
}
|
|
||||||
|
|
||||||
export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
|
export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
|
||||||
return async (channel: string, messageString: string) => {
|
return async (channel: string, messageString: string) => {
|
||||||
|
@ -45,11 +38,12 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
|
||||||
payload: {
|
payload: {
|
||||||
workerId: options.queueModeId,
|
workerId: options.queueModeId,
|
||||||
runningJobs: options.getRunningJobIds(),
|
runningJobs: options.getRunningJobIds(),
|
||||||
|
runningJobsSummary: options.getRunningJobsSummary(),
|
||||||
freeMem: os.freemem(),
|
freeMem: os.freemem(),
|
||||||
totalMem: os.totalmem(),
|
totalMem: os.totalmem(),
|
||||||
uptime: process.uptime(),
|
uptime: process.uptime(),
|
||||||
loadAvg: os.loadavg(),
|
loadAvg: os.loadavg(),
|
||||||
cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`),
|
cpus: getOsCpuString(),
|
||||||
arch: os.arch(),
|
arch: os.arch(),
|
||||||
platform: os.platform(),
|
platform: os.platform(),
|
||||||
hostname: os.hostname(),
|
hostname: os.hostname(),
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
|
import { OrchestrationHandlerService } from '../../orchestration.handler.base.service';
|
||||||
import type { WorkerCommandReceivedHandlerOptions } from './handleCommandMessageWorker';
|
|
||||||
import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker';
|
import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker';
|
||||||
|
import type { WorkerCommandReceivedHandlerOptions } from './types';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService {
|
export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService {
|
||||||
|
|
21
packages/cli/src/services/orchestration/worker/types.ts
Normal file
21
packages/cli/src/services/orchestration/worker/types.ts
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
import type { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
|
||||||
|
import type { RedisServicePubSubPublisher } from '../../redis/RedisServicePubSubPublisher';
|
||||||
|
|
||||||
|
export interface WorkerCommandReceivedHandlerOptions {
|
||||||
|
queueModeId: string;
|
||||||
|
instanceId: string;
|
||||||
|
redisPublisher: RedisServicePubSubPublisher;
|
||||||
|
getRunningJobIds: () => string[];
|
||||||
|
getRunningJobsSummary: () => WorkerJobStatusSummary[];
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface WorkerJobStatusSummary {
|
||||||
|
jobId: string;
|
||||||
|
executionId: string;
|
||||||
|
retryOf?: string;
|
||||||
|
startedAt: Date;
|
||||||
|
mode: WorkflowExecuteMode;
|
||||||
|
workflowName: string;
|
||||||
|
workflowId: string;
|
||||||
|
status: ExecutionStatus;
|
||||||
|
}
|
|
@ -1,3 +1,5 @@
|
||||||
|
import type { WorkerJobStatusSummary } from '../orchestration/worker/types';
|
||||||
|
|
||||||
export type RedisServiceCommand =
|
export type RedisServiceCommand =
|
||||||
| 'getStatus'
|
| 'getStatus'
|
||||||
| 'getId'
|
| 'getId'
|
||||||
|
@ -29,11 +31,12 @@ export type RedisServiceWorkerResponseObject = {
|
||||||
payload: {
|
payload: {
|
||||||
workerId: string;
|
workerId: string;
|
||||||
runningJobs: string[];
|
runningJobs: string[];
|
||||||
|
runningJobsSummary: WorkerJobStatusSummary[];
|
||||||
freeMem: number;
|
freeMem: number;
|
||||||
totalMem: number;
|
totalMem: number;
|
||||||
uptime: number;
|
uptime: number;
|
||||||
loadAvg: number[];
|
loadAvg: number[];
|
||||||
cpus: string[];
|
cpus: string;
|
||||||
arch: string;
|
arch: string;
|
||||||
platform: NodeJS.Platform;
|
platform: NodeJS.Platform;
|
||||||
hostname: string;
|
hostname: string;
|
||||||
|
|
Loading…
Reference in a new issue