diff --git a/packages/cli/src/WaitTracker.ts b/packages/cli/src/WaitTracker.ts index 80eb337eaf..4bbf3dd026 100644 --- a/packages/cli/src/WaitTracker.ts +++ b/packages/cli/src/WaitTracker.ts @@ -4,11 +4,6 @@ import { WorkflowOperationError, } from 'n8n-workflow'; import { Container, Service } from 'typedi'; -import type { FindManyOptions, ObjectLiteral } from 'typeorm'; -import { Not, LessThanOrEqual } from 'typeorm'; -import { DateUtils } from 'typeorm/util/DateUtils'; - -import config from '@/config'; import * as ResponseHelper from '@/ResponseHelper'; import type { IExecutionResponse, @@ -18,7 +13,6 @@ import type { import { WorkflowRunner } from '@/WorkflowRunner'; import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents'; import { ExecutionRepository } from '@db/repositories/execution.repository'; -import type { ExecutionEntity } from '@db/entities/ExecutionEntity'; import { OwnershipService } from './services/ownership.service'; import { Logger } from '@/Logger'; @@ -48,28 +42,8 @@ export class WaitTracker { async getWaitingExecutions() { this.logger.debug('Wait tracker querying database for waiting executions'); - // Find all the executions which should be triggered in the next 70 seconds - const findQuery: FindManyOptions = { - select: ['id', 'waitTill'], - where: { - waitTill: LessThanOrEqual(new Date(Date.now() + 70000)), - status: Not('crashed'), - }, - order: { - waitTill: 'ASC', - }, - }; - const dbType = config.getEnv('database.type'); - if (dbType === 'sqlite') { - // This is needed because of issue in TypeORM <> SQLite: - // https://github.com/typeorm/typeorm/issues/2286 - (findQuery.where! as ObjectLiteral).waitTill = LessThanOrEqual( - DateUtils.mixedDateToUtcDatetimeString(new Date(Date.now() + 70000)), - ); - } - - const executions = await this.executionRepository.findMultipleExecutions(findQuery); + const executions = await this.executionRepository.getWaitingExecutions(); if (executions.length === 0) { return; diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 54a9f2ece6..89b032ab06 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -516,4 +516,28 @@ export class ExecutionRepository extends Repository { async deleteByIds(executionIds: string[]) { return this.delete({ id: In(executionIds) }); } + + async getWaitingExecutions() { + // Find all the executions which should be triggered in the next 70 seconds + const waitTill = new Date(Date.now() + 70000); + const where: FindOptionsWhere = { + waitTill: LessThanOrEqual(waitTill), + status: Not('crashed'), + }; + + const dbType = config.getEnv('database.type'); + if (dbType === 'sqlite') { + // This is needed because of issue in TypeORM <> SQLite: + // https://github.com/typeorm/typeorm/issues/2286 + where.waitTill = LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(waitTill)); + } + + return this.findMultipleExecutions({ + select: ['id', 'waitTill'], + where, + order: { + waitTill: 'ASC', + }, + }); + } } diff --git a/packages/cli/test/unit/repositories/execution.repository.test.ts b/packages/cli/test/unit/repositories/execution.repository.test.ts new file mode 100644 index 0000000000..e101e77557 --- /dev/null +++ b/packages/cli/test/unit/repositories/execution.repository.test.ts @@ -0,0 +1,52 @@ +import { mock } from 'jest-mock-extended'; +import Container from 'typedi'; +import type { EntityMetadata } from 'typeorm'; +import { EntityManager, DataSource, Not, LessThanOrEqual } from 'typeorm'; + +import config from '@/config'; +import { ExecutionEntity } from '@db/entities/ExecutionEntity'; +import { ExecutionRepository } from '@db/repositories/execution.repository'; + +import { mockInstance } from '../../shared/mocking'; + +describe('ExecutionRepository', () => { + const entityManager = mockInstance(EntityManager); + const dataSource = mockInstance(DataSource, { manager: entityManager }); + dataSource.getMetadata.mockReturnValue(mock({ target: ExecutionEntity })); + Object.assign(entityManager, { connection: dataSource }); + + const executionRepository = Container.get(ExecutionRepository); + const mockDate = new Date('2023-12-28 12:34:56.789Z'); + + beforeAll(() => { + jest.clearAllMocks(); + jest.useFakeTimers().setSystemTime(mockDate); + }); + + afterAll(() => jest.useRealTimers()); + + describe('getWaitingExecutions()', () => { + test.each(['sqlite', 'postgres'])( + 'on %s, should be called with expected args', + async (dbType) => { + jest.spyOn(config, 'getEnv').mockReturnValueOnce(dbType); + entityManager.find.mockResolvedValueOnce([]); + + await executionRepository.getWaitingExecutions(); + + expect(entityManager.find).toHaveBeenCalledWith(ExecutionEntity, { + order: { waitTill: 'ASC' }, + select: ['id', 'waitTill'], + where: { + status: Not('crashed'), + waitTill: LessThanOrEqual( + dbType === 'sqlite' + ? '2023-12-28 12:36:06.789' + : new Date('2023-12-28T12:36:06.789Z'), + ), + }, + }); + }, + ); + }); +});