1
0
Fork 0
mirror of https://github.com/n8n-io/n8n.git synced 2025-02-02 07:01:30 -08:00

fix(core): Increase task runner offer validity and introduce randomness (no-changelog) ()

This commit is contained in:
Tomi Turtiainen 2024-11-27 10:36:37 +02:00 committed by GitHub
parent b2c3312ce3
commit 137193c3e9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,4 +1,4 @@
import { ApplicationError, ensureError } from 'n8n-workflow'; import { ApplicationError, ensureError, randomInt } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { EventEmitter } from 'node:events'; import { EventEmitter } from 'node:events';
import { type MessageEvent, WebSocket } from 'ws'; import { type MessageEvent, WebSocket } from 'ws';
@ -42,8 +42,11 @@ export interface RPCCallObject {
[name: string]: ((...args: unknown[]) => Promise<unknown>) | RPCCallObject; [name: string]: ((...args: unknown[]) => Promise<unknown>) | RPCCallObject;
} }
const VALID_TIME_MS = 1000; const OFFER_VALID_TIME_MS = 5000;
const VALID_EXTRA_MS = 100; const OFFER_VALID_EXTRA_MS = 100;
/** Converts milliseconds to nanoseconds */
const msToNs = (ms: number) => BigInt(ms * 1_000_000);
export interface TaskRunnerOpts extends BaseRunnerConfig { export interface TaskRunnerOpts extends BaseRunnerConfig {
taskType: string; taskType: string;
@ -167,16 +170,20 @@ export abstract class TaskRunner extends EventEmitter {
(Object.values(this.openOffers).length + Object.values(this.runningTasks).length); (Object.values(this.openOffers).length + Object.values(this.runningTasks).length);
for (let i = 0; i < offersToSend; i++) { for (let i = 0; i < offersToSend; i++) {
// Add a bit of randomness so that not all offers expire at the same time
const validForInMs = OFFER_VALID_TIME_MS + randomInt(500);
// Add a little extra time to account for latency
const validUntil = process.hrtime.bigint() + msToNs(validForInMs + OFFER_VALID_EXTRA_MS);
const offer: TaskOffer = { const offer: TaskOffer = {
offerId: nanoid(), offerId: nanoid(),
validUntil: process.hrtime.bigint() + BigInt((VALID_TIME_MS + VALID_EXTRA_MS) * 1_000_000), // Adding a little extra time to account for latency validUntil,
}; };
this.openOffers.set(offer.offerId, offer); this.openOffers.set(offer.offerId, offer);
this.send({ this.send({
type: 'runner:taskoffer', type: 'runner:taskoffer',
taskType: this.taskType, taskType: this.taskType,
offerId: offer.offerId, offerId: offer.offerId,
validFor: VALID_TIME_MS, validFor: validForInMs,
}); });
} }
} }