From 797342343f5ef560e8333e2ad67b4395bc0aad0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 12 Jun 2024 15:05:43 +0200 Subject: [PATCH] perf(core): Introduce concurrency control for main mode (#9453) --- packages/cli/src/ActiveExecutions.ts | 16 +- packages/cli/src/InternalHooks.ts | 4 + .../handlers/executions/executions.handler.ts | 14 + packages/cli/src/commands/start.ts | 42 ++ packages/cli/src/commands/worker.ts | 6 +- .../concurrency-control.service.test.ts | 369 ++++++++++++++++++ .../__tests__/concurrency-queue.test.ts | 61 +++ .../concurrency-control.service.ts | 160 ++++++++ .../cli/src/concurrency/concurrency-queue.ts | 59 +++ packages/cli/src/config/schema.ts | 9 + .../repositories/execution.repository.ts | 14 + .../errors/invalid-concurrency-limit.error.ts | 7 + .../errors/queued-execution-retry.error.ts | 9 + .../errors/unknown-execution-mode.error.ts | 7 + .../MessageEventBus/MessageEventBus.ts | 2 +- .../cli/src/executions/execution.service.ts | 71 ++-- .../cli/src/executions/execution.types.ts | 3 +- .../src/executions/executions.controller.ts | 2 +- .../integration/commands/ldap/reset.test.ts | 3 + .../environments/SourceControl.test.ts | 3 + .../execution.service.integration.test.ts | 50 ++- .../integration/executions.controller.test.ts | 7 + .../integration/publicApi/executions.test.ts | 4 + .../integration/publicApi/workflows.test.ts | 3 + .../cli/test/integration/users.api.test.ts | 2 + .../cli/test/integration/webhooks.api.test.ts | 3 + ...er-with-active-workflow-manager.ee.test.ts | 4 + .../cli/test/unit/ActiveExecutions.test.ts | 6 +- packages/cli/test/unit/WorkflowRunner.test.ts | 4 + .../controllers/executions.controller.test.ts | 12 +- packages/workflow/src/ExecutionStatus.ts | 20 +- 31 files changed, 919 insertions(+), 57 deletions(-) create mode 100644 packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts create mode 100644 packages/cli/src/concurrency/__tests__/concurrency-queue.test.ts create mode 100644 packages/cli/src/concurrency/concurrency-control.service.ts create mode 100644 packages/cli/src/concurrency/concurrency-queue.ts create mode 100644 packages/cli/src/errors/invalid-concurrency-limit.error.ts create mode 100644 packages/cli/src/errors/queued-execution-retry.error.ts create mode 100644 packages/cli/src/errors/unknown-execution-mode.error.ts diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index e799dba13e..df270fcad7 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -18,6 +18,8 @@ import type { import { isWorkflowIdValid } from '@/utils'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { Logger } from '@/Logger'; +import { ConcurrencyControlService } from './concurrency/concurrency-control.service'; +import config from './config'; @Service() export class ActiveExecutions { @@ -31,6 +33,7 @@ export class ActiveExecutions { constructor( private readonly logger: Logger, private readonly executionRepository: ExecutionRepository, + private readonly concurrencyControl: ConcurrencyControlService, ) {} /** @@ -38,12 +41,13 @@ export class ActiveExecutions { */ async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise { let executionStatus: ExecutionStatus = executionId ? 'running' : 'new'; + const mode = executionData.executionMode; if (executionId === undefined) { // Is a new execution so save in DB const fullExecutionData: ExecutionPayload = { data: executionData.executionData!, - mode: executionData.executionMode, + mode, finished: false, startedAt: new Date(), workflowData: executionData.workflowData, @@ -64,10 +68,14 @@ export class ActiveExecutions { if (executionId === undefined) { throw new ApplicationError('There was an issue assigning an execution id to the execution'); } + + await this.concurrencyControl.throttle({ mode, executionId }); executionStatus = 'running'; } else { // Is an existing execution we want to finish so update in DB + await this.concurrencyControl.throttle({ mode, executionId }); + const execution: Pick = { id: executionId, data: executionData.executionData!, @@ -128,6 +136,8 @@ export class ActiveExecutions { // Remove from the list of active executions delete this.activeExecutions[executionId]; + + this.concurrencyControl.release({ mode: execution.executionData.executionMode }); } /** @@ -191,6 +201,10 @@ export class ActiveExecutions { let executionIds = Object.keys(this.activeExecutions); if (cancelAll) { + if (config.getEnv('executions.mode') === 'regular') { + await this.concurrencyControl.removeAll(this.activeExecutions); + } + const stopPromises = executionIds.map( async (executionId) => await this.stopExecution(executionId), ); diff --git a/packages/cli/src/InternalHooks.ts b/packages/cli/src/InternalHooks.ts index 61a119b9fe..f0c063c2a6 100644 --- a/packages/cli/src/InternalHooks.ts +++ b/packages/cli/src/InternalHooks.ts @@ -1261,4 +1261,8 @@ export class InternalHooks { }) { return await this.telemetry.track('Project settings updated', data); } + + async onConcurrencyLimitHit({ threshold }: { threshold: number }) { + await this.telemetry.track('User hit concurrency limit', { threshold }); + } } diff --git a/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts b/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts index d7b9e1cb2f..cd936d1d1c 100644 --- a/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts +++ b/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts @@ -9,6 +9,7 @@ import { getSharedWorkflowIds } from '../workflows/workflows.service'; import { encodeNextCursor } from '../../shared/services/pagination.service'; import { InternalHooks } from '@/InternalHooks'; import { ExecutionRepository } from '@db/repositories/execution.repository'; +import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; export = { deleteExecution: [ @@ -32,6 +33,19 @@ export = { return res.status(404).json({ message: 'Not Found' }); } + if (execution.status === 'running') { + return res.status(400).json({ + message: 'Cannot delete a running execution', + }); + } + + if (execution.status === 'new') { + Container.get(ConcurrencyControlService).remove({ + executionId: execution.id, + mode: execution.mode, + }); + } + await Container.get(ExecutionRepository).hardDelete({ workflowId: execution.workflowId, executionId: execution.id, diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index fdcab49131..a78ea57cf9 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -27,6 +27,10 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error'; import { WaitTracker } from '@/WaitTracker'; import { BaseCommand } from './BaseCommand'; +import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; +import { ExecutionService } from '@/executions/execution.service'; +import { OwnershipService } from '@/services/ownership.service'; +import { WorkflowRunner } from '@/WorkflowRunner'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); @@ -288,6 +292,10 @@ export class Start extends BaseCommand { await this.initPruning(); + if (config.getEnv('executions.mode') === 'regular') { + await this.runEnqueuedExecutions(); + } + // Start to get active workflows and run their triggers await this.activeWorkflowManager.init(); @@ -347,4 +355,38 @@ export class Start extends BaseCommand { if (error.stack) this.logger.error(error.stack); await this.exitWithCrash('Exiting due to an error.', error); } + + /** + * During startup, we may find executions that had been enqueued at the time of shutdown. + * + * If so, start running any such executions concurrently up to the concurrency limit, and + * enqueue any remaining ones until we have spare concurrency capacity again. + */ + private async runEnqueuedExecutions() { + const executions = await Container.get(ExecutionService).findAllEnqueuedExecutions(); + + if (executions.length === 0) return; + + this.logger.debug( + '[Startup] Found enqueued executions to run', + executions.map((e) => e.id), + ); + + const ownershipService = Container.get(OwnershipService); + const workflowRunner = Container.get(WorkflowRunner); + + for (const execution of executions) { + const project = await ownershipService.getWorkflowProjectCached(execution.workflowId); + + const data: IWorkflowExecutionDataProcess = { + executionMode: execution.mode, + executionData: execution.data, + workflowData: execution.workflowData, + projectId: project.id, + }; + + // do not block - each execution either runs concurrently or is queued + void workflowRunner.run(data, undefined, false, execution.id); + } + } } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 46d7d32315..ad7987471b 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -317,8 +317,12 @@ export class Worker extends BaseCommand { Worker.jobQueue = Container.get(Queue); await Worker.jobQueue.init(); this.logger.debug('Queue singleton ready'); + + const envConcurrency = config.getEnv('executions.concurrency.productionLimit'); + const concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency; + void Worker.jobQueue.process( - flags.concurrency, + concurrency, async (job) => await this.runJob(job, this.nodeTypes), ); diff --git a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts new file mode 100644 index 0000000000..1c80d8de60 --- /dev/null +++ b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts @@ -0,0 +1,369 @@ +import { mock } from 'jest-mock-extended'; +import config from '@/config'; +import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; +import type { Logger } from '@/Logger'; +import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error'; +import { ConcurrencyQueue } from '../concurrency-queue'; +import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; +import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import type { IExecutingWorkflowData } from '@/Interfaces'; +import type { Telemetry } from '@/telemetry'; + +describe('ConcurrencyControlService', () => { + const logger = mock(); + const executionRepository = mock(); + const telemetry = mock(); + + afterEach(() => { + config.set('executions.concurrency.productionLimit', -1); + config.set('executions.mode', 'integrated'); + + jest.clearAllMocks(); + }); + + describe('constructor', () => { + it('should be enabled if production cap is positive', () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 1); + + /** + * Act + */ + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + + /** + * Assert + */ + // @ts-expect-error Private property + expect(service.isEnabled).toBe(true); + // @ts-expect-error Private property + expect(service.productionQueue).toBeDefined(); + }); + + it('should throw if production cap is 0', () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 0); + + try { + /** + * Act + */ + new ConcurrencyControlService(logger, executionRepository, telemetry); + } catch (error) { + /** + * Assert + */ + expect(error).toBeInstanceOf(InvalidConcurrencyLimitError); + } + }); + + it('should be disabled if production cap is -1', () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', -1); + + /** + * Act + */ + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + + /** + * Assert + */ + // @ts-expect-error Private property + expect(service.isEnabled).toBe(false); + }); + + it('should be disabled if production cap is lower than -1', () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', -2); + + /** + * Act + */ + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + + /** + * Act + */ + // @ts-expect-error Private property + expect(service.isEnabled).toBe(false); + }); + + it('should be disabled on queue mode', () => { + /** + * Arrange + */ + config.set('executions.mode', 'queue'); + config.set('executions.concurrency.productionLimit', 2); + + /** + * Act + */ + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + + /** + * Assert + */ + // @ts-expect-error Private property + expect(service.isEnabled).toBe(false); + }); + }); + + // ---------------------------------- + // enabled + // ---------------------------------- + + describe('if enabled', () => { + describe('throttle', () => { + it.each(['cli', 'error', 'integrated', 'internal', 'manual', 'retry'])( + 'should do nothing on %s mode', + async (mode: ExecutionMode) => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 1); + + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); + + /** + * Act + */ + await service.throttle({ mode, executionId: '1' }); + + /** + * Assert + */ + expect(enqueueSpy).not.toHaveBeenCalled(); + }, + ); + + it.each(['webhook', 'trigger'])('should enqueue on %s mode', async (mode: ExecutionMode) => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 1); + + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); + + /** + * Act + */ + await service.throttle({ mode, executionId: '1' }); + + /** + * Assert + */ + expect(enqueueSpy).toHaveBeenCalled(); + }); + }); + + describe('release', () => { + it.each(['cli', 'error', 'integrated', 'internal', 'manual', 'retry'])( + 'should do nothing on %s mode', + async (mode: ExecutionMode) => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 1); + + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); + + /** + * Act + */ + await service.throttle({ mode, executionId: '1' }); + + /** + * Assert + */ + expect(dequeueSpy).not.toHaveBeenCalled(); + }, + ); + + it.each(['webhook', 'trigger'])('should dequeue on %s mode', (mode: ExecutionMode) => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 1); + + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); + + /** + * Act + */ + service.release({ mode }); + + /** + * Assert + */ + expect(dequeueSpy).toHaveBeenCalled(); + }); + }); + + describe('remove', () => { + it.each(['cli', 'error', 'integrated', 'internal', 'manual', 'retry'])( + 'should do nothing on %s mode', + async (mode: ExecutionMode) => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 1); + + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + await service.throttle({ mode, executionId: '1' }); + + /** + * Assert + */ + expect(removeSpy).not.toHaveBeenCalled(); + }, + ); + + it.each(['webhook', 'trigger'])( + 'should remove an execution on %s mode', + (mode: ExecutionMode) => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 1); + + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + service.remove({ mode, executionId: '1' }); + + /** + * Assert + */ + expect(removeSpy).toHaveBeenCalled(); + }, + ); + }); + + describe('removeAll', () => { + it('should remove all executions from the production queue', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 2); + + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + + jest + .spyOn(ConcurrencyQueue.prototype, 'getAll') + .mockReturnValueOnce(new Set(['1', '2', '3'])); + + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + await service.removeAll({ + '1': mock(), + '2': mock(), + '3': mock(), + }); + + /** + * Assert + */ + expect(removeSpy).toHaveBeenNthCalledWith(1, '1'); + expect(removeSpy).toHaveBeenNthCalledWith(2, '2'); + expect(removeSpy).toHaveBeenNthCalledWith(3, '3'); + }); + }); + }); + + // ---------------------------------- + // disabled + // ---------------------------------- + + describe('if disabled', () => { + describe('throttle', () => { + it('should do nothing', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', -1); + + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); + + /** + * Act + */ + await service.throttle({ mode: 'trigger', executionId: '1' }); + await service.throttle({ mode: 'webhook', executionId: '2' }); + + /** + * Assert + */ + expect(enqueueSpy).not.toHaveBeenCalled(); + }); + }); + + describe('release', () => { + it('should do nothing', () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', -1); + + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); + + /** + * Act + */ + service.release({ mode: 'webhook' }); + + /** + * Assert + */ + expect(dequeueSpy).not.toHaveBeenCalled(); + }); + }); + + describe('remove', () => { + it('should do nothing', () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', -1); + + const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + service.remove({ mode: 'webhook', executionId: '1' }); + + /** + * Assert + */ + expect(removeSpy).not.toHaveBeenCalled(); + }); + }); + }); +}); diff --git a/packages/cli/src/concurrency/__tests__/concurrency-queue.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-queue.test.ts new file mode 100644 index 0000000000..3488e0b492 --- /dev/null +++ b/packages/cli/src/concurrency/__tests__/concurrency-queue.test.ts @@ -0,0 +1,61 @@ +import { ConcurrencyQueue } from '../concurrency-queue'; + +describe('ConcurrencyQueue', () => { + beforeAll(() => { + jest.useFakeTimers(); + }); + + it('should limit concurrency', async () => { + const queue = new ConcurrencyQueue(1); + const state: Record = {}; + + // eslint-disable-next-line @typescript-eslint/promise-function-async + const sleep = jest.fn(() => new Promise((resolve) => setTimeout(resolve, 500))); + + const testFn = async (item: { executionId: string }) => { + await queue.enqueue(item.executionId); + state[item.executionId] = 'started'; + await sleep(); + queue.dequeue(); + state[item.executionId] = 'finished'; + }; + + void Promise.all([ + testFn({ executionId: '1' }), + testFn({ executionId: '2' }), + testFn({ executionId: '3' }), + testFn({ executionId: '4' }), + testFn({ executionId: '5' }), + ]); + + // At T+0 seconds this method hasn't yielded to the event-loop, so no `testFn` calls are made + expect(sleep).toHaveBeenCalledTimes(0); + expect(state).toEqual({}); + + // At T+0.4 seconds the first `testFn` has been called, but hasn't resolved + await jest.advanceTimersByTimeAsync(400); + expect(sleep).toHaveBeenCalledTimes(1); + expect(state).toEqual({ 1: 'started' }); + + // At T+0.5 seconds the first promise has resolved, and the second one has stared + await jest.advanceTimersByTimeAsync(100); + expect(sleep).toHaveBeenCalledTimes(2); + expect(state).toEqual({ 1: 'finished', 2: 'started' }); + + // At T+1 seconds the first two promises have resolved, and the third one has stared + await jest.advanceTimersByTimeAsync(500); + expect(sleep).toHaveBeenCalledTimes(3); + expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'started' }); + + // If the fourth promise is removed, the fifth one is started in the next tick + queue.remove('4'); + await jest.advanceTimersByTimeAsync(1); + expect(sleep).toHaveBeenCalledTimes(4); + expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'started', 5: 'started' }); + + // at T+5 seconds, all but the fourth promise should be resolved + await jest.advanceTimersByTimeAsync(4000); + expect(sleep).toHaveBeenCalledTimes(4); + expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'finished', 5: 'finished' }); + }); +}); diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts new file mode 100644 index 0000000000..21429c00ed --- /dev/null +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -0,0 +1,160 @@ +import { Logger } from '@/Logger'; +import config from '@/config'; +import { Service } from 'typedi'; +import { ConcurrencyQueue } from './concurrency-queue'; +import { UnknownExecutionModeError } from '@/errors/unknown-execution-mode.error'; +import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error'; +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; +import type { IExecutingWorkflowData } from '@/Interfaces'; +import { Telemetry } from '@/telemetry'; + +@Service() +export class ConcurrencyControlService { + private readonly isEnabled: boolean; + + private readonly productionLimit: number; + + private readonly productionQueue: ConcurrencyQueue; + + private readonly limitsToReport = [5, 10, 20, 50, 100, 200]; + + constructor( + private readonly logger: Logger, + private readonly executionRepository: ExecutionRepository, + private readonly telemetry: Telemetry, + ) { + this.productionLimit = config.getEnv('executions.concurrency.productionLimit'); + + if (this.productionLimit === 0) { + throw new InvalidConcurrencyLimitError(this.productionLimit); + } + + if (this.productionLimit < -1) { + this.productionLimit = -1; + } + + if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') { + this.isEnabled = false; + this.log('Service disabled'); + return; + } + + this.productionQueue = new ConcurrencyQueue(this.productionLimit); + + this.logInit(); + + this.isEnabled = true; + + this.productionQueue.on( + 'execution-throttled', + async ({ executionId, capacity }: { executionId: string; capacity: number }) => { + this.log('Execution throttled', { executionId }); + + /** + * Temporary until base data for cloud plans is collected. + */ + if (this.shouldReport(capacity)) { + await this.telemetry.track('User hit concurrency limit', { threshold: capacity }); + } + }, + ); + + this.productionQueue.on('execution-released', async (executionId: string) => { + this.log('Execution released', { executionId }); + await this.executionRepository.resetStartedAt(executionId); + }); + } + + /** + * Block or let through an execution based on concurrency capacity. + */ + async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { + if (!this.isEnabled || this.isUnlimited(mode)) return; + + await this.productionQueue.enqueue(executionId); + } + + /** + * Release capacity back so the next execution in the production queue can proceed. + */ + release({ mode }: { mode: ExecutionMode }) { + if (!this.isEnabled || this.isUnlimited(mode)) return; + + this.productionQueue.dequeue(); + } + + /** + * Remove an execution from the production queue, releasing capacity back. + */ + remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { + if (!this.isEnabled || this.isUnlimited(mode)) return; + + this.productionQueue.remove(executionId); + } + + /** + * Empty the production queue, releasing all capacity back. Also cancel any + * enqueued executions that have response promises, as these cannot + * be re-run via `Start.runEnqueuedExecutions` during startup. + */ + async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) { + if (!this.isEnabled) return; + + const enqueuedProductionIds = this.productionQueue.getAll(); + + for (const id of enqueuedProductionIds) { + this.productionQueue.remove(id); + } + + const executionIds = Object.entries(activeExecutions) + .filter(([_, execution]) => execution.status === 'new' && execution.responsePromise) + .map(([executionId, _]) => executionId); + + if (executionIds.length === 0) return; + + await this.executionRepository.cancelMany(executionIds); + + this.logger.info('Canceled enqueued executions with response promises', { executionIds }); + } + + // ---------------------------------- + // private + // ---------------------------------- + + private logInit() { + this.log('Enabled'); + + this.log( + [ + 'Production execution concurrency is', + this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(), + ].join(' '), + ); + } + + private isUnlimited(mode: ExecutionMode) { + if ( + mode === 'error' || + mode === 'integrated' || + mode === 'cli' || + mode === 'internal' || + mode === 'manual' || + mode === 'retry' + ) { + return true; + } + + if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1; + + throw new UnknownExecutionModeError(mode); + } + + private log(message: string, meta?: object) { + this.logger.debug(['[Concurrency Control]', message].join(' '), meta); + } + + private shouldReport(capacity: number) { + return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity); + } +} diff --git a/packages/cli/src/concurrency/concurrency-queue.ts b/packages/cli/src/concurrency/concurrency-queue.ts new file mode 100644 index 0000000000..220d2a2363 --- /dev/null +++ b/packages/cli/src/concurrency/concurrency-queue.ts @@ -0,0 +1,59 @@ +import { Service } from 'typedi'; +import { EventEmitter } from 'node:events'; + +@Service() +export class ConcurrencyQueue extends EventEmitter { + private readonly queue: Array<{ + executionId: string; + resolve: () => void; + }> = []; + + constructor(private capacity: number) { + super(); + } + + async enqueue(executionId: string) { + this.capacity--; + + if (this.capacity < 0) { + this.emit('execution-throttled', { executionId, capacity: this.capacity }); + + // eslint-disable-next-line @typescript-eslint/return-await + return new Promise((resolve) => this.queue.push({ executionId, resolve })); + } + } + + dequeue() { + this.capacity++; + + this.resolveNext(); + } + + remove(executionId: string) { + const index = this.queue.findIndex((item) => item.executionId === executionId); + + if (index > -1) { + this.queue.splice(index, 1); + + this.capacity++; + + this.resolveNext(); + } + } + + getAll() { + return new Set(...this.queue.map((item) => item.executionId)); + } + + private resolveNext() { + const item = this.queue.shift(); + + if (!item) return; + + const { resolve, executionId } = item; + + this.emit('execution-released', executionId); + + resolve(); + } +} diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 1ea7c57f00..69b29372da 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -254,6 +254,15 @@ export const schema = { env: 'EXECUTIONS_MODE', }, + concurrency: { + productionLimit: { + doc: "Max production executions allowed to run concurrently, in main process for regular mode and in worker for queue mode. Default for main mode is `-1` (disabled). Default for queue mode is taken from the worker's `--concurrency` flag.", + format: Number, + default: -1, + env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT', + }, + }, + // A Workflow times out and gets canceled after this time (seconds). // If the workflow is executed in the main process a soft timeout // is executed (takes effect after the current node finishes). diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 33ec7ed7ec..6609173fc6 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -285,6 +285,10 @@ export class ExecutionRepository extends Repository { await this.update({ id: executionId }, { status }); } + async resetStartedAt(executionId: string) { + await this.update({ id: executionId }, { startedAt: new Date() }); + } + async updateExistingExecution(executionId: string, execution: Partial) { // Se isolate startedAt because it must be set when the execution starts and should never change. // So we prevent updating it, if it's sent (it usually is and causes problems to executions that @@ -597,6 +601,14 @@ export class ExecutionRepository extends Repository { }); } + async cancel(executionId: string) { + await this.update({ id: executionId }, { status: 'canceled', stoppedAt: new Date() }); + } + + async cancelMany(executionIds: string[]) { + await this.update({ id: In(executionIds) }, { status: 'canceled', stoppedAt: new Date() }); + } + // ---------------------------------- // new API // ---------------------------------- @@ -717,6 +729,8 @@ export class ExecutionRepository extends Repository { if (query.order?.stoppedAt === 'DESC') { qb.orderBy({ 'execution.stoppedAt': 'DESC' }); + } else if (query.order?.top) { + qb.orderBy(`(CASE WHEN execution.status = '${query.order.top}' THEN 0 ELSE 1 END)`); } else { qb.orderBy({ 'execution.id': 'DESC' }); } diff --git a/packages/cli/src/errors/invalid-concurrency-limit.error.ts b/packages/cli/src/errors/invalid-concurrency-limit.error.ts new file mode 100644 index 0000000000..3f59373525 --- /dev/null +++ b/packages/cli/src/errors/invalid-concurrency-limit.error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class InvalidConcurrencyLimitError extends ApplicationError { + constructor(value: number) { + super('Concurrency limit set to invalid value', { level: 'warning', extra: { value } }); + } +} diff --git a/packages/cli/src/errors/queued-execution-retry.error.ts b/packages/cli/src/errors/queued-execution-retry.error.ts new file mode 100644 index 0000000000..a3c0bcb588 --- /dev/null +++ b/packages/cli/src/errors/queued-execution-retry.error.ts @@ -0,0 +1,9 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class QueuedExecutionRetryError extends ApplicationError { + constructor() { + super('Execution is queued to run (not yet started) so it cannot be retried', { + level: 'warning', + }); + } +} diff --git a/packages/cli/src/errors/unknown-execution-mode.error.ts b/packages/cli/src/errors/unknown-execution-mode.error.ts new file mode 100644 index 0000000000..8350bfa4af --- /dev/null +++ b/packages/cli/src/errors/unknown-execution-mode.error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class UnknownExecutionModeError extends ApplicationError { + constructor(mode: string) { + super('Unknown execution mode', { extra: { mode } }); + } +} diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index d8f60697a3..6b44fe9ed1 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -140,7 +140,7 @@ export class MessageEventBus extends EventEmitter { const dbUnfinishedExecutionIds = ( await this.executionRepository.find({ where: { - status: In(['running', 'new', 'unknown']), + status: In(['running', 'unknown']), }, select: ['id'], }) diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index 81735c5f83..917adec7f4 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -37,6 +37,8 @@ import { NotFoundError } from '@/errors/response-errors/not-found.error'; import config from '@/config'; import { WaitTracker } from '@/WaitTracker'; import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity'; +import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error'; +import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error'; export const schemaGetExecutionsQueryFilter = { @@ -87,6 +89,7 @@ export class ExecutionService { private readonly nodeTypes: NodeTypes, private readonly waitTracker: WaitTracker, private readonly workflowRunner: WorkflowRunner, + private readonly concurrencyControl: ConcurrencyControlService, ) {} async findOne( @@ -133,6 +136,8 @@ export class ExecutionService { throw new NotFoundError(`The execution with the ID "${executionId}" does not exist.`); } + if (execution.status === 'new') throw new QueuedExecutionRetryError(); + if (!execution.data.executionData) throw new AbortedExecutionRetryError(); if (execution.finished) { @@ -244,14 +249,10 @@ export class ExecutionService { } } - return await this.executionRepository.deleteExecutionsByFilter( - requestFilters, - sharedWorkflowIds, - { - deleteBefore, - ids, - }, - ); + await this.executionRepository.deleteExecutionsByFilter(requestFilters, sharedWorkflowIds, { + deleteBefore, + ids, + }); } async createErrorExecution( @@ -359,31 +360,37 @@ export class ExecutionService { } /** - * Find summaries of active and finished executions that satisfy a query. + * Return: * - * Return also the total count of all finished executions that satisfy the query, - * and whether the total is an estimate or not. Active executions are excluded - * from the total and count for pagination purposes. + * - the latest summaries of current and completed executions that satisfy a query, + * - the total count of all completed executions that satisfy the query, and + * - whether the total of completed executions is an estimate. + * + * By default, "current" means executions starting and running. With concurrency + * control, "current" means executions enqueued to start and running. */ - async findAllRunningAndLatest(query: ExecutionSummaries.RangeQuery) { - const currentlyRunningStatuses: ExecutionStatus[] = ['new', 'running']; - const allStatuses = new Set(ExecutionStatusList); - currentlyRunningStatuses.forEach((status) => allStatuses.delete(status)); - const notRunningStatuses: ExecutionStatus[] = Array.from(allStatuses); + async findLatestCurrentAndCompleted(query: ExecutionSummaries.RangeQuery) { + const currentStatuses: ExecutionStatus[] = ['new', 'running']; - const [activeResult, finishedResult] = await Promise.all([ - this.findRangeWithCount({ ...query, status: currentlyRunningStatuses }), + const completedStatuses = ExecutionStatusList.filter((s) => !currentStatuses.includes(s)); + + const [current, completed] = await Promise.all([ this.findRangeWithCount({ ...query, - status: notRunningStatuses, + status: currentStatuses, + order: { top: 'running' }, // ensure limit cannot exclude running + }), + this.findRangeWithCount({ + ...query, + status: completedStatuses, order: { stoppedAt: 'DESC' }, }), ]); return { - results: activeResult.results.concat(finishedResult.results), - count: finishedResult.count, - estimated: finishedResult.estimated, + results: current.results.concat(completed.results), + count: completed.count, // exclude current from count for pagination + estimated: completed.estimated, }; } @@ -395,6 +402,13 @@ export class ExecutionService { if (!execution) throw new NotFoundError('Execution not found'); + if (execution.status === 'new') { + this.concurrencyControl.remove({ mode: execution.mode, executionId }); + await this.executionRepository.cancel(executionId); + + return; + } + const stopResult = await this.activeExecutions.stopExecution(execution.id); if (stopResult) return this.toExecutionStopResult(execution); @@ -432,4 +446,15 @@ export class ExecutionService { status: execution.status, }; } + + async findAllEnqueuedExecutions() { + return await this.executionRepository.findMultipleExecutions( + { + select: ['id', 'mode'], + where: { status: 'new' }, + order: { id: 'ASC' }, + }, + { includeData: true, unflattenData: true }, + ); + } } diff --git a/packages/cli/src/executions/execution.types.ts b/packages/cli/src/executions/execution.types.ts index 91df96b858..9a1bdc88c8 100644 --- a/packages/cli/src/executions/execution.types.ts +++ b/packages/cli/src/executions/execution.types.ts @@ -79,7 +79,8 @@ export namespace ExecutionSummaries { type OrderFields = { order?: { - stoppedAt: 'DESC'; + top?: ExecutionStatus; + stoppedAt?: 'DESC'; }; }; } diff --git a/packages/cli/src/executions/executions.controller.ts b/packages/cli/src/executions/executions.controller.ts index 1085555604..64b6a54274 100644 --- a/packages/cli/src/executions/executions.controller.ts +++ b/packages/cli/src/executions/executions.controller.ts @@ -53,7 +53,7 @@ export class ExecutionsController { const noRange = !query.range.lastId || !query.range.firstId; if (noStatus && noRange) { - return await this.executionService.findAllRunningAndLatest(query); + return await this.executionService.findLatestCurrentAndCompleted(query); } return await this.executionService.findRangeWithCount(query); diff --git a/packages/cli/test/integration/commands/ldap/reset.test.ts b/packages/cli/test/integration/commands/ldap/reset.test.ts index 9a0ebecd51..24efed477c 100644 --- a/packages/cli/test/integration/commands/ldap/reset.test.ts +++ b/packages/cli/test/integration/commands/ldap/reset.test.ts @@ -21,6 +21,9 @@ import { getLdapSynchronizations, saveLdapSynchronization } from '@/Ldap/helpers import { createLdapConfig } from '../../shared/ldap'; import { LdapService } from '@/Ldap/ldap.service'; import { v4 as uuid } from 'uuid'; +import { Telemetry } from '@/telemetry'; + +mockInstance(Telemetry); const oclifConfig = new Config({ root: __dirname }); diff --git a/packages/cli/test/integration/environments/SourceControl.test.ts b/packages/cli/test/integration/environments/SourceControl.test.ts index c64981c096..cac369422b 100644 --- a/packages/cli/test/integration/environments/SourceControl.test.ts +++ b/packages/cli/test/integration/environments/SourceControl.test.ts @@ -9,9 +9,12 @@ import type { SourceControlledFile } from '@/environments/sourceControl/types/so import * as utils from '../shared/utils/'; import { createUser } from '../shared/db/users'; import type { SuperAgentTest } from '../shared/types'; +import { mockInstance } from '@test/mocking'; +import { Telemetry } from '@/telemetry'; let authOwnerAgent: SuperAgentTest; let owner: User; +mockInstance(Telemetry); const testServer = utils.setupTestServer({ endpointGroups: ['sourceControl', 'license', 'auth'], diff --git a/packages/cli/test/integration/execution.service.integration.test.ts b/packages/cli/test/integration/execution.service.integration.test.ts index ba8cc89d36..348c2a9aa6 100644 --- a/packages/cli/test/integration/execution.service.integration.test.ts +++ b/packages/cli/test/integration/execution.service.integration.test.ts @@ -27,6 +27,7 @@ describe('ExecutionService', () => { mock(), mock(), mock(), + mock(), ); }); @@ -344,17 +345,17 @@ describe('ExecutionService', () => { }); }); - describe('findAllActiveAndLatestFinished', () => { - test('should return all active and latest 20 finished executions', async () => { + describe('findLatestCurrentAndCompleted', () => { + test('should return latest current and completed executions', async () => { const workflow = await createWorkflow(); - const totalFinished = 21; + const totalCompleted = 21; await Promise.all([ createExecution({ status: 'running' }, workflow), createExecution({ status: 'running' }, workflow), createExecution({ status: 'running' }, workflow), - ...new Array(totalFinished) + ...new Array(totalCompleted) .fill(null) .map(async () => await createExecution({ status: 'success' }, workflow)), ]); @@ -365,14 +366,14 @@ describe('ExecutionService', () => { accessibleWorkflowIds: [workflow.id], }; - const output = await executionService.findAllRunningAndLatest(query); + const output = await executionService.findLatestCurrentAndCompleted(query); - expect(output.results).toHaveLength(23); // 3 active + 20 finished (excludes 21st) - expect(output.count).toBe(totalFinished); // 21 finished, excludes active + expect(output.results).toHaveLength(23); // 3 current + 20 completed (excludes 21st) + expect(output.count).toBe(totalCompleted); // 21 finished, excludes current expect(output.estimated).toBe(false); }); - test('should handle zero active executions', async () => { + test('should handle zero current executions', async () => { const workflow = await createWorkflow(); const totalFinished = 5; @@ -389,14 +390,14 @@ describe('ExecutionService', () => { accessibleWorkflowIds: [workflow.id], }; - const output = await executionService.findAllRunningAndLatest(query); + const output = await executionService.findLatestCurrentAndCompleted(query); expect(output.results).toHaveLength(totalFinished); // 5 finished expect(output.count).toBe(totalFinished); // 5 finished, excludes active expect(output.estimated).toBe(false); }); - test('should handle zero finished executions', async () => { + test('should handle zero completed executions', async () => { const workflow = await createWorkflow(); await Promise.all([ @@ -411,7 +412,7 @@ describe('ExecutionService', () => { accessibleWorkflowIds: [workflow.id], }; - const output = await executionService.findAllRunningAndLatest(query); + const output = await executionService.findLatestCurrentAndCompleted(query); expect(output.results).toHaveLength(3); // 3 finished expect(output.count).toBe(0); // 0 finished, excludes active @@ -427,11 +428,36 @@ describe('ExecutionService', () => { accessibleWorkflowIds: [workflow.id], }; - const output = await executionService.findAllRunningAndLatest(query); + const output = await executionService.findLatestCurrentAndCompleted(query); expect(output.results).toHaveLength(0); expect(output.count).toBe(0); expect(output.estimated).toBe(false); }); + + test('should prioritize `running` over `new` executions', async () => { + const workflow = await createWorkflow(); + + await Promise.all([ + createExecution({ status: 'new' }, workflow), + createExecution({ status: 'new' }, workflow), + createExecution({ status: 'running' }, workflow), + createExecution({ status: 'running' }, workflow), + createExecution({ status: 'new' }, workflow), + createExecution({ status: 'new' }, workflow), + ]); + + const query: ExecutionSummaries.RangeQuery = { + kind: 'range', + range: { limit: 2 }, + accessibleWorkflowIds: [workflow.id], + }; + + const { results } = await executionService.findLatestCurrentAndCompleted(query); + + expect(results).toHaveLength(2); + expect(results[0].status).toBe('running'); + expect(results[1].status).toBe('running'); + }); }); }); diff --git a/packages/cli/test/integration/executions.controller.test.ts b/packages/cli/test/integration/executions.controller.test.ts index e73cb98b47..ef04dde99b 100644 --- a/packages/cli/test/integration/executions.controller.test.ts +++ b/packages/cli/test/integration/executions.controller.test.ts @@ -5,8 +5,15 @@ import { createMember, createOwner } from './shared/db/users'; import { createWorkflow, shareWorkflowWithUsers } from './shared/db/workflows'; import * as testDb from './shared/testDb'; import { setupTestServer } from './shared/utils'; +import { mockInstance } from '../shared/mocking'; + +import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; +import { WaitTracker } from '@/WaitTracker'; import { createTeamProject, linkUserToProject } from './shared/db/projects'; +mockInstance(WaitTracker); +mockInstance(ConcurrencyControlService, { isEnabled: false }); + const testServer = setupTestServer({ endpointGroups: ['executions'] }); let owner: User; diff --git a/packages/cli/test/integration/publicApi/executions.test.ts b/packages/cli/test/integration/publicApi/executions.test.ts index ce26a19f8f..d6db284c07 100644 --- a/packages/cli/test/integration/publicApi/executions.test.ts +++ b/packages/cli/test/integration/publicApi/executions.test.ts @@ -17,6 +17,8 @@ import { createWaitingExecution, } from '../shared/db/executions'; import type { SuperAgentTest } from '../shared/types'; +import { mockInstance } from '@test/mocking'; +import { Telemetry } from '@/telemetry'; let owner: User; let user1: User; @@ -26,6 +28,8 @@ let authUser1Agent: SuperAgentTest; let authUser2Agent: SuperAgentTest; let workflowRunner: ActiveWorkflowManager; +mockInstance(Telemetry); + const testServer = utils.setupTestServer({ endpointGroups: ['publicApi'] }); beforeAll(async () => { diff --git a/packages/cli/test/integration/publicApi/workflows.test.ts b/packages/cli/test/integration/publicApi/workflows.test.ts index eb4a7f461a..10737a30f0 100644 --- a/packages/cli/test/integration/publicApi/workflows.test.ts +++ b/packages/cli/test/integration/publicApi/workflows.test.ts @@ -20,6 +20,9 @@ import { createWorkflow, createWorkflowWithTrigger } from '../shared/db/workflow import { createTag } from '../shared/db/tags'; import { mockInstance } from '../../shared/mocking'; import type { SuperAgentTest } from '../shared/types'; +import { Telemetry } from '@/telemetry'; + +mockInstance(Telemetry); let owner: User; let ownerPersonalProject: Project; diff --git a/packages/cli/test/integration/users.api.test.ts b/packages/cli/test/integration/users.api.test.ts index 9e968caca4..26183cba39 100644 --- a/packages/cli/test/integration/users.api.test.ts +++ b/packages/cli/test/integration/users.api.test.ts @@ -27,7 +27,9 @@ import * as testDb from './shared/testDb'; import { mockInstance } from '../shared/mocking'; import type { SuperAgentTest } from './shared/types'; import { createTeamProject, getPersonalProject, linkUserToProject } from './shared/db/projects'; +import { Telemetry } from '@/telemetry'; +mockInstance(Telemetry); mockInstance(ExecutionService); const testServer = utils.setupTestServer({ diff --git a/packages/cli/test/integration/webhooks.api.test.ts b/packages/cli/test/integration/webhooks.api.test.ts index 4858d1f7b4..64cb760c8e 100644 --- a/packages/cli/test/integration/webhooks.api.test.ts +++ b/packages/cli/test/integration/webhooks.api.test.ts @@ -15,6 +15,9 @@ import * as testDb from './shared/testDb'; import { createUser } from './shared/db/users'; import { createWorkflow } from './shared/db/workflows'; import type { SuperAgentTest } from './shared/types'; +import { Telemetry } from '@/telemetry'; + +mockInstance(Telemetry); describe('Webhook API', () => { mockInstance(ExternalHooks); diff --git a/packages/cli/test/integration/workflows/workflows.controller-with-active-workflow-manager.ee.test.ts b/packages/cli/test/integration/workflows/workflows.controller-with-active-workflow-manager.ee.test.ts index 42f3a49e06..607639b091 100644 --- a/packages/cli/test/integration/workflows/workflows.controller-with-active-workflow-manager.ee.test.ts +++ b/packages/cli/test/integration/workflows/workflows.controller-with-active-workflow-manager.ee.test.ts @@ -5,6 +5,10 @@ import * as testDb from '../shared/testDb'; import { createUser } from '../shared/db/users'; import { createWorkflowWithTrigger } from '../shared/db/workflows'; import { createTeamProject } from '../shared/db/projects'; +import { mockInstance } from '@test/mocking'; +import { Telemetry } from '@/telemetry'; + +mockInstance(Telemetry); let member: User; let anotherMember: User; diff --git a/packages/cli/test/unit/ActiveExecutions.test.ts b/packages/cli/test/unit/ActiveExecutions.test.ts index 892d976b4b..b2454de87c 100644 --- a/packages/cli/test/unit/ActiveExecutions.test.ts +++ b/packages/cli/test/unit/ActiveExecutions.test.ts @@ -6,6 +6,8 @@ import { createDeferredPromise } from 'n8n-workflow'; import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; import type { ExecutionRepository } from '@db/repositories/execution.repository'; import { mock } from 'jest-mock-extended'; +import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; +import { mockInstance } from '@test/mocking'; const FAKE_EXECUTION_ID = '15'; const FAKE_SECOND_EXECUTION_ID = '20'; @@ -18,11 +20,13 @@ const executionRepository = mock({ createNewExecution, }); +const concurrencyControl = mockInstance(ConcurrencyControlService, { isEnabled: false }); + describe('ActiveExecutions', () => { let activeExecutions: ActiveExecutions; beforeEach(() => { - activeExecutions = new ActiveExecutions(mock(), executionRepository); + activeExecutions = new ActiveExecutions(mock(), executionRepository, concurrencyControl); }); afterEach(() => { diff --git a/packages/cli/test/unit/WorkflowRunner.test.ts b/packages/cli/test/unit/WorkflowRunner.test.ts index 72f3da9fd2..c972d6bb73 100644 --- a/packages/cli/test/unit/WorkflowRunner.test.ts +++ b/packages/cli/test/unit/WorkflowRunner.test.ts @@ -9,12 +9,16 @@ import { setupTestServer } from '../integration/shared/utils'; import { createUser } from '../integration/shared/db/users'; import { createWorkflow } from '../integration/shared/db/workflows'; import { createExecution } from '../integration/shared/db/executions'; +import { mockInstance } from '@test/mocking'; +import { Telemetry } from '@/telemetry'; let owner: User; let runner: WorkflowRunner; let hookFunctions: IWorkflowExecuteHooks; setupTestServer({ endpointGroups: [] }); +mockInstance(Telemetry); + class Watchers { workflowExecuteAfter = jest.fn(); } diff --git a/packages/cli/test/unit/controllers/executions.controller.test.ts b/packages/cli/test/unit/controllers/executions.controller.test.ts index 046a5a0231..2a4c733c5a 100644 --- a/packages/cli/test/unit/controllers/executions.controller.test.ts +++ b/packages/cli/test/unit/controllers/executions.controller.test.ts @@ -79,13 +79,13 @@ describe('ExecutionsController', () => { 'should fetch executions per query', async (rangeQuery) => { workflowSharingService.getSharedWorkflowIds.mockResolvedValue(['123']); - executionService.findAllRunningAndLatest.mockResolvedValue(NO_EXECUTIONS); + executionService.findLatestCurrentAndCompleted.mockResolvedValue(NO_EXECUTIONS); const req = mock({ rangeQuery }); await executionsController.getMany(req); - expect(executionService.findAllRunningAndLatest).not.toHaveBeenCalled(); + expect(executionService.findLatestCurrentAndCompleted).not.toHaveBeenCalled(); expect(executionService.findRangeWithCount).toHaveBeenCalledWith(rangeQuery); }, ); @@ -96,13 +96,13 @@ describe('ExecutionsController', () => { 'should fetch executions per query', async (rangeQuery) => { workflowSharingService.getSharedWorkflowIds.mockResolvedValue(['123']); - executionService.findAllRunningAndLatest.mockResolvedValue(NO_EXECUTIONS); + executionService.findLatestCurrentAndCompleted.mockResolvedValue(NO_EXECUTIONS); const req = mock({ rangeQuery }); await executionsController.getMany(req); - expect(executionService.findAllRunningAndLatest).toHaveBeenCalled(); + expect(executionService.findLatestCurrentAndCompleted).toHaveBeenCalled(); expect(executionService.findRangeWithCount).not.toHaveBeenCalled(); }, ); @@ -111,7 +111,7 @@ describe('ExecutionsController', () => { describe('if both status and range provided', () => { it('should fetch executions per query', async () => { workflowSharingService.getSharedWorkflowIds.mockResolvedValue(['123']); - executionService.findAllRunningAndLatest.mockResolvedValue(NO_EXECUTIONS); + executionService.findLatestCurrentAndCompleted.mockResolvedValue(NO_EXECUTIONS); const rangeQuery: ExecutionSummaries.RangeQuery = { kind: 'range', @@ -124,7 +124,7 @@ describe('ExecutionsController', () => { await executionsController.getMany(req); - expect(executionService.findAllRunningAndLatest).not.toHaveBeenCalled(); + expect(executionService.findLatestCurrentAndCompleted).not.toHaveBeenCalled(); expect(executionService.findRangeWithCount).toHaveBeenCalledWith(rangeQuery); }); }); diff --git a/packages/workflow/src/ExecutionStatus.ts b/packages/workflow/src/ExecutionStatus.ts index 715abbaeab..24fbdbf1ed 100644 --- a/packages/workflow/src/ExecutionStatus.ts +++ b/packages/workflow/src/ExecutionStatus.ts @@ -1,13 +1,13 @@ export const ExecutionStatusList = [ - 'canceled' as const, - 'crashed' as const, - 'error' as const, - 'new' as const, - 'running' as const, - 'success' as const, - 'unknown' as const, - 'waiting' as const, - 'warning' as const, -]; + 'canceled', + 'crashed', + 'error', + 'new', + 'running', + 'success', + 'unknown', + 'waiting', + 'warning', +] as const; export type ExecutionStatus = (typeof ExecutionStatusList)[number];