mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-23 11:44:06 -08:00
feat: Fail all tasks for a disconnected Task Runner (no-changelog) (#11254)
This commit is contained in:
parent
26ad091f47
commit
ee6e7fec85
|
@ -5,6 +5,8 @@ import type { RunnerMessage, TaskResultData } from '../runner-types';
|
|||
import { TaskBroker } from '../task-broker.service';
|
||||
import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service';
|
||||
|
||||
const createValidUntil = (ms: number) => process.hrtime.bigint() + BigInt(ms * 1_000_000);
|
||||
|
||||
describe('TaskBroker', () => {
|
||||
let taskBroker: TaskBroker;
|
||||
|
||||
|
@ -15,14 +17,12 @@ describe('TaskBroker', () => {
|
|||
|
||||
describe('expireTasks', () => {
|
||||
it('should remove expired task offers and keep valid task offers', () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const validOffer: TaskOffer = {
|
||||
offerId: 'valid',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000), // 1 second in the future
|
||||
validUntil: createValidUntil(1000), // 1 second in the future
|
||||
};
|
||||
|
||||
const expiredOffer1: TaskOffer = {
|
||||
|
@ -30,7 +30,7 @@ describe('TaskBroker', () => {
|
|||
runnerId: 'runner2',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now - BigInt(1000 * 1_000_000), // 1 second in the past
|
||||
validUntil: createValidUntil(-1000), // 1 second in the past
|
||||
};
|
||||
|
||||
const expiredOffer2: TaskOffer = {
|
||||
|
@ -38,7 +38,7 @@ describe('TaskBroker', () => {
|
|||
runnerId: 'runner3',
|
||||
taskType: 'taskType1',
|
||||
validFor: 2000,
|
||||
validUntil: now - BigInt(2000 * 1_000_000), // 2 seconds in the past
|
||||
validUntil: createValidUntil(-2000), // 2 seconds in the past
|
||||
};
|
||||
|
||||
taskBroker.setPendingTaskOffers([validOffer, expiredOffer1, expiredOffer2]);
|
||||
|
@ -102,6 +102,55 @@ describe('TaskBroker', () => {
|
|||
|
||||
expect(runnerIds).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('should remove any pending offers for that runner', () => {
|
||||
const runnerId = 'runner1';
|
||||
const runner = mock<TaskRunner>({ id: runnerId });
|
||||
const messageCallback = jest.fn();
|
||||
|
||||
taskBroker.registerRunner(runner, messageCallback);
|
||||
taskBroker.taskOffered({
|
||||
offerId: 'offer1',
|
||||
runnerId,
|
||||
taskType: 'mock',
|
||||
validFor: 1000,
|
||||
validUntil: createValidUntil(1000),
|
||||
});
|
||||
taskBroker.taskOffered({
|
||||
offerId: 'offer2',
|
||||
runnerId: 'runner2',
|
||||
taskType: 'mock',
|
||||
validFor: 1000,
|
||||
validUntil: createValidUntil(1000),
|
||||
});
|
||||
taskBroker.deregisterRunner(runnerId);
|
||||
|
||||
const offers = taskBroker.getPendingTaskOffers();
|
||||
expect(offers).toHaveLength(1);
|
||||
expect(offers[0].runnerId).toBe('runner2');
|
||||
});
|
||||
|
||||
it('should fail any running tasks for that runner', () => {
|
||||
const runnerId = 'runner1';
|
||||
const runner = mock<TaskRunner>({ id: runnerId });
|
||||
const messageCallback = jest.fn();
|
||||
|
||||
const taskId = 'task1';
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const failSpy = jest.spyOn(taskBroker as any, 'failTask');
|
||||
const rejectSpy = jest.spyOn(taskBroker, 'handleRunnerReject');
|
||||
|
||||
taskBroker.registerRunner(runner, messageCallback);
|
||||
taskBroker.setTasks({
|
||||
[taskId]: { id: taskId, requesterId: 'requester1', runnerId, taskType: 'mock' },
|
||||
task2: { id: 'task2', requesterId: 'requester1', runnerId: 'runner2', taskType: 'mock' },
|
||||
});
|
||||
taskBroker.deregisterRunner(runnerId);
|
||||
|
||||
expect(failSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`);
|
||||
expect(rejectSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`);
|
||||
});
|
||||
});
|
||||
|
||||
describe('deregisterRequester', () => {
|
||||
|
@ -121,14 +170,12 @@ describe('TaskBroker', () => {
|
|||
|
||||
describe('taskRequested', () => {
|
||||
it('should match a pending offer to an incoming request', async () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const offer: TaskOffer = {
|
||||
offerId: 'offer1',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000),
|
||||
validUntil: createValidUntil(1000),
|
||||
};
|
||||
|
||||
taskBroker.setPendingTaskOffers([offer]);
|
||||
|
@ -150,8 +197,6 @@ describe('TaskBroker', () => {
|
|||
|
||||
describe('taskOffered', () => {
|
||||
it('should match a pending request to an incoming offer', () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const request: TaskRequest = {
|
||||
requestId: 'request1',
|
||||
requesterId: 'requester1',
|
||||
|
@ -166,7 +211,7 @@ describe('TaskBroker', () => {
|
|||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000),
|
||||
validUntil: createValidUntil(1000),
|
||||
};
|
||||
|
||||
jest.spyOn(taskBroker, 'acceptOffer').mockResolvedValue(); // allow Jest to exit cleanly
|
||||
|
@ -180,14 +225,12 @@ describe('TaskBroker', () => {
|
|||
|
||||
describe('settleTasks', () => {
|
||||
it('should match task offers with task requests by task type', () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const offer1: TaskOffer = {
|
||||
offerId: 'offer1',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000),
|
||||
validUntil: createValidUntil(1000),
|
||||
};
|
||||
|
||||
const offer2: TaskOffer = {
|
||||
|
@ -195,7 +238,7 @@ describe('TaskBroker', () => {
|
|||
runnerId: 'runner2',
|
||||
taskType: 'taskType2',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000),
|
||||
validUntil: createValidUntil(1000),
|
||||
};
|
||||
|
||||
const request1: TaskRequest = {
|
||||
|
@ -235,14 +278,12 @@ describe('TaskBroker', () => {
|
|||
});
|
||||
|
||||
it('should not match a request whose acceptance is in progress', () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const offer: TaskOffer = {
|
||||
offerId: 'offer1',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000),
|
||||
validUntil: createValidUntil(1000),
|
||||
};
|
||||
|
||||
const request: TaskRequest = {
|
||||
|
@ -271,14 +312,12 @@ describe('TaskBroker', () => {
|
|||
});
|
||||
|
||||
it('should expire tasks before settling', () => {
|
||||
const now = process.hrtime.bigint();
|
||||
|
||||
const validOffer: TaskOffer = {
|
||||
offerId: 'valid',
|
||||
runnerId: 'runner1',
|
||||
taskType: 'taskType1',
|
||||
validFor: 1000,
|
||||
validUntil: now + BigInt(1000 * 1_000_000), // 1 second in the future
|
||||
validUntil: createValidUntil(1000), // 1 second in the future
|
||||
};
|
||||
|
||||
const expiredOffer: TaskOffer = {
|
||||
|
@ -286,7 +325,7 @@ describe('TaskBroker', () => {
|
|||
runnerId: 'runner2',
|
||||
taskType: 'taskType2', // will be removed before matching
|
||||
validFor: 1000,
|
||||
validUntil: now - BigInt(1000 * 1_000_000), // 1 second in the past
|
||||
validUntil: createValidUntil(-1000), // 1 second in the past
|
||||
};
|
||||
|
||||
const request1: TaskRequest = {
|
||||
|
|
|
@ -75,15 +75,11 @@ export class TaskBroker {
|
|||
|
||||
expireTasks() {
|
||||
const now = process.hrtime.bigint();
|
||||
const invalidOffers: number[] = [];
|
||||
for (let i = 0; i < this.pendingTaskOffers.length; i++) {
|
||||
for (let i = this.pendingTaskOffers.length - 1; i >= 0; i--) {
|
||||
if (this.pendingTaskOffers[i].validUntil < now) {
|
||||
invalidOffers.push(i);
|
||||
this.pendingTaskOffers.splice(i, 1);
|
||||
}
|
||||
}
|
||||
|
||||
// We reverse the list so the later indexes are valid after deleting earlier ones
|
||||
invalidOffers.reverse().forEach((i) => this.pendingTaskOffers.splice(i, 1));
|
||||
}
|
||||
|
||||
registerRunner(runner: TaskRunner, messageCallback: MessageCallback) {
|
||||
|
@ -92,6 +88,21 @@ export class TaskBroker {
|
|||
|
||||
deregisterRunner(runnerId: string) {
|
||||
this.knownRunners.delete(runnerId);
|
||||
|
||||
// Remove any pending offers
|
||||
for (let i = this.pendingTaskOffers.length - 1; i >= 0; i--) {
|
||||
if (this.pendingTaskOffers[i].runnerId === runnerId) {
|
||||
this.pendingTaskOffers.splice(i, 1);
|
||||
}
|
||||
}
|
||||
|
||||
// Fail any tasks
|
||||
for (const task of this.tasks.values()) {
|
||||
if (task.runnerId === runnerId) {
|
||||
void this.failTask(task.id, `The Task Runner (${runnerId}) has disconnected`);
|
||||
this.handleRunnerReject(task.id, `The Task Runner (${runnerId}) has disconnected`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
registerRequester(requesterId: string, messageCallback: RequesterMessageCallback) {
|
||||
|
|
Loading…
Reference in a new issue