fix(core): Fix task runner sending too many offers (#12415)
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions

This commit is contained in:
Tomi Turtiainen 2025-01-02 10:16:28 +02:00 committed by GitHub
parent a484ea160b
commit 4498e35192
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 160 additions and 67 deletions

View file

@ -1,32 +1,44 @@
import { WebSocket } from 'ws';
import { TaskRunner } from '@/task-runner';
import { TaskRunner, type TaskRunnerOpts } from '@/task-runner';
class TestRunner extends TaskRunner {}
jest.mock('ws');
describe('TestRunner', () => {
let runner: TestRunner;
const newTestRunner = (opts: Partial<TaskRunnerOpts> = {}) =>
new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
...opts,
});
afterEach(() => {
runner?.clearIdleTimer();
});
describe('constructor', () => {
afterEach(() => {
jest.clearAllMocks();
});
it('should correctly construct WebSocket URI with provided taskBrokerUri', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
runner = newTestRunner({
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});
expect(WebSocket).toHaveBeenCalledWith(
@ -38,25 +50,11 @@ describe('TestRunner', () => {
maxPayload: 1024,
}),
);
runner.clearIdleTimer();
});
it('should handle different taskBrokerUri formats correctly', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
runner = newTestRunner({
taskBrokerUri: 'https://example.com:3000/path',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});
expect(WebSocket).toHaveBeenCalledWith(
@ -68,49 +66,140 @@ describe('TestRunner', () => {
maxPayload: 1024,
}),
);
runner.clearIdleTimer();
});
it('should throw an error if taskBrokerUri is invalid', () => {
expect(
() =>
new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'not-a-valid-uri',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
}),
expect(() =>
newTestRunner({
taskBrokerUri: 'not-a-valid-uri',
}),
).toThrowError(/Invalid URL/);
});
});
describe('sendOffers', () => {
beforeEach(() => {
jest.useFakeTimers();
});
afterEach(() => {
jest.clearAllTimers();
});
it('should not send offers if canSendOffers is false', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
const sendSpy = jest.spyOn(runner, 'send');
expect(runner.canSendOffers).toBe(false);
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(0);
});
it('should enable sending of offer on runnerregistered message', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});
expect(runner.canSendOffers).toBe(true);
});
it('should send maxConcurrency offers when there are no offers', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});
const sendSpy = jest.spyOn(runner, 'send');
runner.sendOffers();
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(2);
expect(sendSpy.mock.calls).toEqual([
[
{
type: 'runner:taskoffer',
taskType: 'test-task',
offerId: expect.any(String),
validFor: expect.any(Number),
},
],
[
{
type: 'runner:taskoffer',
taskType: 'test-task',
offerId: expect.any(String),
validFor: expect.any(Number),
},
],
]);
});
it('should send up to maxConcurrency offers when there is a running task', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});
runner.runningTasks.set('test-task', {
taskId: 'test-task',
active: true,
cancelled: false,
});
const sendSpy = jest.spyOn(runner, 'send');
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(1);
expect(sendSpy.mock.calls).toEqual([
[
{
type: 'runner:taskoffer',
taskType: 'test-task',
offerId: expect.any(String),
validFor: expect.any(Number),
},
],
]);
});
it('should delete stale offers and send new ones', () => {
runner = newTestRunner({
taskType: 'test-task',
maxConcurrency: 2,
});
runner.onMessage({
type: 'broker:runnerregistered',
});
const sendSpy = jest.spyOn(runner, 'send');
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(2);
sendSpy.mockClear();
jest.advanceTimersByTime(6000);
runner.sendOffers();
expect(sendSpy).toHaveBeenCalledTimes(2);
});
});
describe('taskCancelled', () => {
it('should reject pending requests when task is cancelled', () => {
const runner = new TestRunner({
taskType: 'test-task',
maxConcurrency: 5,
idleTimeout: 60,
grantToken: 'test-token',
maxPayloadSize: 1024,
taskBrokerUri: 'http://localhost:8080',
timezone: 'America/New_York',
taskTimeout: 60,
healthcheckServer: {
enabled: false,
host: 'localhost',
port: 8081,
},
});
const runner = newTestRunner();
const taskId = 'test-task';
runner.runningTasks.set(taskId, {

View file

@ -174,9 +174,11 @@ export abstract class TaskRunner extends EventEmitter {
sendOffers() {
this.deleteStaleOffers();
const offersToSend =
this.maxConcurrency -
(Object.values(this.openOffers).length + Object.values(this.runningTasks).length);
if (!this.canSendOffers) {
return;
}
const offersToSend = this.maxConcurrency - (this.openOffers.size + this.runningTasks.size);
for (let i = 0; i < offersToSend; i++) {
// Add a bit of randomness so that not all offers expire at the same time
@ -255,11 +257,12 @@ export abstract class TaskRunner extends EventEmitter {
}
hasOpenTasks() {
return Object.values(this.runningTasks).length < this.maxConcurrency;
return this.runningTasks.size < this.maxConcurrency;
}
offerAccepted(offerId: string, taskId: string) {
if (!this.hasOpenTasks()) {
this.openOffers.delete(offerId);
this.send({
type: 'runner:taskrejected',
taskId,
@ -267,6 +270,7 @@ export abstract class TaskRunner extends EventEmitter {
});
return;
}
const offer = this.openOffers.get(offerId);
if (!offer) {
this.send({