2023-04-12 01:59:14 -07:00
|
|
|
import { Service } from 'typedi';
|
2023-09-20 06:21:42 -07:00
|
|
|
import { Brackets, DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm';
|
|
|
|
import { DateUtils } from 'typeorm/util/DateUtils';
|
2023-06-20 10:13:18 -07:00
|
|
|
import type {
|
|
|
|
FindManyOptions,
|
|
|
|
FindOneOptions,
|
|
|
|
FindOptionsWhere,
|
|
|
|
SelectQueryBuilder,
|
|
|
|
} from 'typeorm';
|
|
|
|
import { parse, stringify } from 'flatted';
|
2023-09-20 06:21:42 -07:00
|
|
|
import { LoggerProxy as Logger } from 'n8n-workflow';
|
|
|
|
import type { IExecutionsSummary, IRunExecutionData } from 'n8n-workflow';
|
|
|
|
import { BinaryDataManager } from 'n8n-core';
|
2023-06-20 10:13:18 -07:00
|
|
|
import type {
|
2023-09-20 06:21:42 -07:00
|
|
|
ExecutionPayload,
|
2023-06-20 10:13:18 -07:00
|
|
|
IExecutionBase,
|
|
|
|
IExecutionFlattedDb,
|
|
|
|
IExecutionResponse,
|
|
|
|
} from '@/Interfaces';
|
2023-09-20 06:21:42 -07:00
|
|
|
|
|
|
|
import config from '@/config';
|
2023-06-20 10:13:18 -07:00
|
|
|
import type { IGetExecutionsQueryFilter } from '@/executions/executions.service';
|
|
|
|
import { isAdvancedExecutionFiltersEnabled } from '@/executions/executionHelpers';
|
2023-09-20 06:21:42 -07:00
|
|
|
import type { ExecutionData } from '../entities/ExecutionData';
|
|
|
|
import { ExecutionEntity } from '../entities/ExecutionEntity';
|
2023-06-20 10:13:18 -07:00
|
|
|
import { ExecutionMetadata } from '../entities/ExecutionMetadata';
|
2023-09-20 06:21:42 -07:00
|
|
|
import { ExecutionDataRepository } from './executionData.repository';
|
|
|
|
import { TIME } from '@/constants';
|
2023-06-20 10:13:18 -07:00
|
|
|
|
|
|
|
function parseFiltersToQueryBuilder(
|
|
|
|
qb: SelectQueryBuilder<ExecutionEntity>,
|
|
|
|
filters?: IGetExecutionsQueryFilter,
|
|
|
|
) {
|
|
|
|
if (filters?.status) {
|
|
|
|
qb.andWhere('execution.status IN (:...workflowStatus)', {
|
|
|
|
workflowStatus: filters.status,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
if (filters?.finished) {
|
|
|
|
qb.andWhere({ finished: filters.finished });
|
|
|
|
}
|
|
|
|
if (filters?.metadata && isAdvancedExecutionFiltersEnabled()) {
|
|
|
|
qb.leftJoin(ExecutionMetadata, 'md', 'md.executionId = execution.id');
|
|
|
|
for (const md of filters.metadata) {
|
|
|
|
qb.andWhere('md.key = :key AND md.value = :value', md);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (filters?.startedAfter) {
|
|
|
|
qb.andWhere({
|
|
|
|
startedAt: MoreThanOrEqual(
|
|
|
|
DateUtils.mixedDateToUtcDatetimeString(new Date(filters.startedAfter)),
|
|
|
|
),
|
|
|
|
});
|
|
|
|
}
|
|
|
|
if (filters?.startedBefore) {
|
|
|
|
qb.andWhere({
|
|
|
|
startedAt: LessThanOrEqual(
|
|
|
|
DateUtils.mixedDateToUtcDatetimeString(new Date(filters.startedBefore)),
|
|
|
|
),
|
|
|
|
});
|
|
|
|
}
|
|
|
|
if (filters?.workflowId) {
|
|
|
|
qb.andWhere({
|
|
|
|
workflowId: filters.workflowId,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
2023-04-12 01:59:14 -07:00
|
|
|
|
|
|
|
@Service()
|
|
|
|
export class ExecutionRepository extends Repository<ExecutionEntity> {
|
2023-09-20 06:21:42 -07:00
|
|
|
private logger = Logger;
|
|
|
|
|
|
|
|
deletionBatchSize = 100;
|
|
|
|
|
|
|
|
private intervals: Record<string, NodeJS.Timer | undefined> = {
|
|
|
|
softDeletion: undefined,
|
|
|
|
hardDeletion: undefined,
|
|
|
|
};
|
|
|
|
|
|
|
|
private rates: Record<string, number> = {
|
|
|
|
softDeletion: 1 * TIME.HOUR,
|
|
|
|
hardDeletion: 15 * TIME.MINUTE,
|
|
|
|
};
|
|
|
|
|
|
|
|
private isMainInstance = config.get('generic.instanceType') === 'main';
|
|
|
|
|
|
|
|
private isPruningEnabled = config.getEnv('executions.pruneData');
|
|
|
|
|
2023-07-18 01:51:22 -07:00
|
|
|
constructor(
|
|
|
|
dataSource: DataSource,
|
|
|
|
private readonly executionDataRepository: ExecutionDataRepository,
|
|
|
|
) {
|
2023-04-12 01:59:14 -07:00
|
|
|
super(ExecutionEntity, dataSource.manager);
|
2023-09-20 06:21:42 -07:00
|
|
|
|
|
|
|
if (!this.isMainInstance) return;
|
|
|
|
|
|
|
|
if (this.isPruningEnabled) this.setSoftDeletionInterval();
|
|
|
|
|
|
|
|
this.setHardDeletionInterval();
|
|
|
|
}
|
|
|
|
|
|
|
|
clearTimers() {
|
|
|
|
if (!this.isMainInstance) return;
|
|
|
|
|
|
|
|
this.logger.debug('Clearing soft-deletion and hard-deletion intervals for executions');
|
|
|
|
|
|
|
|
clearInterval(this.intervals.softDeletion);
|
|
|
|
clearInterval(this.intervals.hardDeletion);
|
|
|
|
}
|
|
|
|
|
|
|
|
setSoftDeletionInterval() {
|
|
|
|
this.logger.debug('Setting soft-deletion interval (pruning) for executions');
|
|
|
|
|
|
|
|
this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.hardDeletion);
|
|
|
|
}
|
|
|
|
|
|
|
|
setHardDeletionInterval() {
|
|
|
|
this.logger.debug('Setting hard-deletion interval for executions');
|
|
|
|
|
|
|
|
this.intervals.hardDeletion = setInterval(
|
|
|
|
async () => this.hardDelete(),
|
|
|
|
this.rates.hardDeletion,
|
|
|
|
);
|
2023-06-20 10:13:18 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
async findMultipleExecutions(
|
|
|
|
queryParams: FindManyOptions<ExecutionEntity>,
|
|
|
|
options?: {
|
|
|
|
unflattenData: true;
|
|
|
|
includeData?: true;
|
|
|
|
},
|
|
|
|
): Promise<IExecutionResponse[]>;
|
|
|
|
async findMultipleExecutions(
|
|
|
|
queryParams: FindManyOptions<ExecutionEntity>,
|
|
|
|
options?: {
|
|
|
|
unflattenData?: false | undefined;
|
|
|
|
includeData?: true;
|
|
|
|
},
|
|
|
|
): Promise<IExecutionFlattedDb[]>;
|
|
|
|
async findMultipleExecutions(
|
|
|
|
queryParams: FindManyOptions<ExecutionEntity>,
|
|
|
|
options?: {
|
|
|
|
unflattenData?: boolean;
|
|
|
|
includeData?: boolean;
|
|
|
|
},
|
|
|
|
): Promise<IExecutionBase[]>;
|
|
|
|
async findMultipleExecutions(
|
|
|
|
queryParams: FindManyOptions<ExecutionEntity>,
|
|
|
|
options?: {
|
|
|
|
unflattenData?: boolean;
|
|
|
|
includeData?: boolean;
|
|
|
|
},
|
|
|
|
): Promise<IExecutionFlattedDb[] | IExecutionResponse[] | IExecutionBase[]> {
|
|
|
|
if (options?.includeData) {
|
|
|
|
if (!queryParams.relations) {
|
|
|
|
queryParams.relations = [];
|
|
|
|
}
|
|
|
|
(queryParams.relations as string[]).push('executionData');
|
|
|
|
}
|
|
|
|
|
|
|
|
const executions = await this.find(queryParams);
|
|
|
|
|
|
|
|
if (options?.includeData && options?.unflattenData) {
|
|
|
|
return executions.map((execution) => {
|
|
|
|
const { executionData, ...rest } = execution;
|
|
|
|
return {
|
|
|
|
...rest,
|
|
|
|
data: parse(executionData.data) as IRunExecutionData,
|
|
|
|
workflowData: executionData.workflowData,
|
|
|
|
} as IExecutionResponse;
|
|
|
|
});
|
|
|
|
} else if (options?.includeData) {
|
|
|
|
return executions.map((execution) => {
|
|
|
|
const { executionData, ...rest } = execution;
|
|
|
|
return {
|
|
|
|
...rest,
|
|
|
|
data: execution.executionData.data,
|
|
|
|
workflowData: execution.executionData.workflowData,
|
|
|
|
} as IExecutionFlattedDb;
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
return executions.map((execution) => {
|
|
|
|
const { executionData, ...rest } = execution;
|
|
|
|
return rest;
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
async findSingleExecution(
|
|
|
|
id: string,
|
|
|
|
options?: {
|
|
|
|
includeData: true;
|
|
|
|
unflattenData: true;
|
|
|
|
where?: FindOptionsWhere<ExecutionEntity>;
|
|
|
|
},
|
|
|
|
): Promise<IExecutionResponse | undefined>;
|
|
|
|
async findSingleExecution(
|
|
|
|
id: string,
|
|
|
|
options?: {
|
|
|
|
includeData: true;
|
|
|
|
unflattenData?: false | undefined;
|
|
|
|
where?: FindOptionsWhere<ExecutionEntity>;
|
|
|
|
},
|
|
|
|
): Promise<IExecutionFlattedDb | undefined>;
|
|
|
|
async findSingleExecution(
|
|
|
|
id: string,
|
|
|
|
options?: {
|
|
|
|
includeData?: boolean;
|
|
|
|
unflattenData?: boolean;
|
|
|
|
where?: FindOptionsWhere<ExecutionEntity>;
|
|
|
|
},
|
|
|
|
): Promise<IExecutionBase | undefined>;
|
|
|
|
async findSingleExecution(
|
|
|
|
id: string,
|
|
|
|
options?: {
|
|
|
|
includeData?: boolean;
|
|
|
|
unflattenData?: boolean;
|
|
|
|
where?: FindOptionsWhere<ExecutionEntity>;
|
|
|
|
},
|
|
|
|
): Promise<IExecutionFlattedDb | IExecutionResponse | IExecutionBase | undefined> {
|
|
|
|
const whereClause: FindOneOptions<ExecutionEntity> = {
|
|
|
|
where: {
|
|
|
|
id,
|
|
|
|
...options?.where,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
if (options?.includeData) {
|
|
|
|
whereClause.relations = ['executionData'];
|
|
|
|
}
|
|
|
|
|
|
|
|
const execution = await this.findOne(whereClause);
|
|
|
|
|
|
|
|
if (!execution) {
|
|
|
|
return undefined;
|
|
|
|
}
|
|
|
|
|
|
|
|
const { executionData, ...rest } = execution;
|
|
|
|
|
|
|
|
if (options?.includeData && options?.unflattenData) {
|
|
|
|
return {
|
|
|
|
...rest,
|
|
|
|
data: parse(execution.executionData.data) as IRunExecutionData,
|
|
|
|
workflowData: execution.executionData.workflowData,
|
|
|
|
} as IExecutionResponse;
|
|
|
|
} else if (options?.includeData) {
|
|
|
|
return {
|
|
|
|
...rest,
|
|
|
|
data: execution.executionData.data,
|
|
|
|
workflowData: execution.executionData.workflowData,
|
|
|
|
} as IExecutionFlattedDb;
|
|
|
|
}
|
|
|
|
|
|
|
|
return rest;
|
|
|
|
}
|
|
|
|
|
2023-09-20 06:21:42 -07:00
|
|
|
async createNewExecution(execution: ExecutionPayload) {
|
2023-06-20 10:13:18 -07:00
|
|
|
const { data, workflowData, ...rest } = execution;
|
|
|
|
|
|
|
|
const newExecution = await this.save(rest);
|
|
|
|
await this.executionDataRepository.save({
|
|
|
|
execution: newExecution,
|
|
|
|
workflowData,
|
|
|
|
data: stringify(data),
|
|
|
|
});
|
|
|
|
|
|
|
|
return newExecution;
|
|
|
|
}
|
|
|
|
|
2023-09-04 10:58:36 -07:00
|
|
|
async markAsCrashed(executionIds: string[]) {
|
|
|
|
await this.update(
|
|
|
|
{ id: In(executionIds) },
|
|
|
|
{
|
|
|
|
status: 'crashed',
|
|
|
|
stoppedAt: new Date(),
|
|
|
|
},
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2023-06-20 10:13:18 -07:00
|
|
|
async updateExistingExecution(executionId: string, execution: Partial<IExecutionResponse>) {
|
|
|
|
// Se isolate startedAt because it must be set when the execution starts and should never change.
|
|
|
|
// So we prevent updating it, if it's sent (it usually is and causes problems to executions that
|
|
|
|
// are resumed after waiting for some time, as a new startedAt is set)
|
|
|
|
const { id, data, workflowData, startedAt, ...executionInformation } = execution;
|
|
|
|
if (Object.keys(executionInformation).length > 0) {
|
|
|
|
await this.update({ id: executionId }, executionInformation);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (data || workflowData) {
|
|
|
|
const executionData: Partial<ExecutionData> = {};
|
|
|
|
if (workflowData) {
|
|
|
|
executionData.workflowData = workflowData;
|
|
|
|
}
|
|
|
|
if (data) {
|
|
|
|
executionData.data = stringify(data);
|
|
|
|
}
|
|
|
|
// @ts-ignore
|
|
|
|
await this.executionDataRepository.update({ executionId }, executionData);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async countExecutions(
|
|
|
|
filters: IGetExecutionsQueryFilter | undefined,
|
|
|
|
accessibleWorkflowIds: string[],
|
|
|
|
currentlyRunningExecutions: string[],
|
|
|
|
isOwner: boolean,
|
|
|
|
): Promise<{ count: number; estimated: boolean }> {
|
|
|
|
const dbType = config.getEnv('database.type');
|
|
|
|
if (dbType !== 'postgresdb' || (filters && Object.keys(filters).length > 0) || !isOwner) {
|
|
|
|
const query = this.createQueryBuilder('execution').andWhere(
|
|
|
|
'execution.workflowId IN (:...accessibleWorkflowIds)',
|
|
|
|
{ accessibleWorkflowIds },
|
|
|
|
);
|
|
|
|
if (currentlyRunningExecutions.length > 0) {
|
|
|
|
query.andWhere('execution.id NOT IN (:...currentlyRunningExecutions)', {
|
|
|
|
currentlyRunningExecutions,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
parseFiltersToQueryBuilder(query, filters);
|
|
|
|
|
|
|
|
const count = await query.getCount();
|
|
|
|
return { count, estimated: false };
|
|
|
|
}
|
|
|
|
|
|
|
|
try {
|
|
|
|
// Get an estimate of rows count.
|
|
|
|
const estimateRowsNumberSql =
|
|
|
|
"SELECT n_live_tup FROM pg_stat_all_tables WHERE relname = 'execution_entity';";
|
|
|
|
const rows = (await this.query(estimateRowsNumberSql)) as Array<{ n_live_tup: string }>;
|
|
|
|
|
|
|
|
const estimate = parseInt(rows[0].n_live_tup, 10);
|
|
|
|
// If over 100k, return just an estimate.
|
|
|
|
if (estimate > 100_000) {
|
|
|
|
// if less than 100k, we get the real count as even a full
|
|
|
|
// table scan should not take so long.
|
|
|
|
return { count: estimate, estimated: true };
|
|
|
|
}
|
|
|
|
} catch (error) {
|
|
|
|
if (error instanceof Error) {
|
2023-09-20 06:21:42 -07:00
|
|
|
Logger.warn(`Failed to get executions count from Postgres: ${error.message}`, {
|
2023-06-20 10:13:18 -07:00
|
|
|
error,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const count = await this.count({
|
|
|
|
where: {
|
|
|
|
workflowId: In(accessibleWorkflowIds),
|
|
|
|
},
|
|
|
|
});
|
|
|
|
|
|
|
|
return { count, estimated: false };
|
|
|
|
}
|
|
|
|
|
|
|
|
async searchExecutions(
|
|
|
|
filters: IGetExecutionsQueryFilter | undefined,
|
|
|
|
limit: number,
|
|
|
|
excludedExecutionIds: string[],
|
|
|
|
accessibleWorkflowIds: string[],
|
|
|
|
additionalFilters?: { lastId?: string; firstId?: string },
|
|
|
|
): Promise<IExecutionsSummary[]> {
|
|
|
|
if (accessibleWorkflowIds.length === 0) {
|
|
|
|
return [];
|
|
|
|
}
|
|
|
|
const query = this.createQueryBuilder('execution')
|
|
|
|
.select([
|
|
|
|
'execution.id',
|
|
|
|
'execution.finished',
|
|
|
|
'execution.mode',
|
|
|
|
'execution.retryOf',
|
|
|
|
'execution.retrySuccessId',
|
|
|
|
'execution.status',
|
|
|
|
'execution.startedAt',
|
|
|
|
'execution.stoppedAt',
|
|
|
|
'execution.workflowId',
|
|
|
|
'execution.waitTill',
|
|
|
|
'workflow.name',
|
|
|
|
])
|
|
|
|
.innerJoin('execution.workflow', 'workflow')
|
|
|
|
.limit(limit)
|
|
|
|
// eslint-disable-next-line @typescript-eslint/naming-convention
|
|
|
|
.orderBy({ 'execution.id': 'DESC' })
|
|
|
|
.andWhere('execution.workflowId IN (:...accessibleWorkflowIds)', { accessibleWorkflowIds });
|
|
|
|
if (excludedExecutionIds.length > 0) {
|
|
|
|
query.andWhere('execution.id NOT IN (:...excludedExecutionIds)', { excludedExecutionIds });
|
|
|
|
}
|
|
|
|
|
|
|
|
if (additionalFilters?.lastId) {
|
|
|
|
query.andWhere('execution.id < :lastId', { lastId: additionalFilters.lastId });
|
|
|
|
}
|
|
|
|
if (additionalFilters?.firstId) {
|
|
|
|
query.andWhere('execution.id > :firstId', { firstId: additionalFilters.firstId });
|
|
|
|
}
|
|
|
|
|
|
|
|
parseFiltersToQueryBuilder(query, filters);
|
|
|
|
|
|
|
|
const executions = await query.getMany();
|
|
|
|
|
|
|
|
return executions.map((execution) => {
|
|
|
|
const { workflow, waitTill, ...rest } = execution;
|
|
|
|
return {
|
|
|
|
...rest,
|
|
|
|
waitTill: waitTill ?? undefined,
|
|
|
|
workflowName: workflow.name,
|
|
|
|
};
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2023-09-20 06:21:42 -07:00
|
|
|
async deleteExecutionsByFilter(
|
2023-06-20 10:13:18 -07:00
|
|
|
filters: IGetExecutionsQueryFilter | undefined,
|
|
|
|
accessibleWorkflowIds: string[],
|
|
|
|
deleteConditions: {
|
|
|
|
deleteBefore?: Date;
|
|
|
|
ids?: string[];
|
|
|
|
},
|
|
|
|
) {
|
|
|
|
if (!deleteConditions?.deleteBefore && !deleteConditions?.ids) {
|
|
|
|
throw new Error('Either "deleteBefore" or "ids" must be present in the request body');
|
|
|
|
}
|
|
|
|
|
|
|
|
const query = this.createQueryBuilder('execution')
|
|
|
|
.select(['execution.id'])
|
|
|
|
.andWhere('execution.workflowId IN (:...accessibleWorkflowIds)', { accessibleWorkflowIds });
|
|
|
|
|
|
|
|
if (deleteConditions.deleteBefore) {
|
|
|
|
// delete executions by date, if user may access the underlying workflows
|
|
|
|
query.andWhere('execution.startedAt <= :deleteBefore', {
|
|
|
|
deleteBefore: deleteConditions.deleteBefore,
|
|
|
|
});
|
|
|
|
// Filters are only used when filtering by date
|
|
|
|
parseFiltersToQueryBuilder(query, filters);
|
|
|
|
} else if (deleteConditions.ids) {
|
|
|
|
// delete executions by IDs, if user may access the underlying workflows
|
|
|
|
query.andWhere('execution.id IN (:...executionIds)', { executionIds: deleteConditions.ids });
|
|
|
|
}
|
|
|
|
|
|
|
|
const executions = await query.getMany();
|
|
|
|
|
|
|
|
if (!executions.length) {
|
|
|
|
if (deleteConditions.ids) {
|
2023-09-20 06:21:42 -07:00
|
|
|
Logger.error('Failed to delete an execution due to insufficient permissions', {
|
2023-06-20 10:13:18 -07:00
|
|
|
executionIds: deleteConditions.ids,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2023-07-12 01:08:29 -07:00
|
|
|
const executionIds = executions.map(({ id }) => id);
|
2023-06-20 10:13:18 -07:00
|
|
|
do {
|
|
|
|
// Delete in batches to avoid "SQLITE_ERROR: Expression tree is too large (maximum depth 1000)" error
|
2023-09-20 06:21:42 -07:00
|
|
|
const batch = executionIds.splice(0, this.deletionBatchSize);
|
|
|
|
await this.softDelete(batch);
|
2023-07-12 01:08:29 -07:00
|
|
|
} while (executionIds.length > 0);
|
2023-04-12 01:59:14 -07:00
|
|
|
}
|
2023-09-20 06:21:42 -07:00
|
|
|
|
|
|
|
async prune() {
|
|
|
|
Logger.verbose('Soft-deleting (pruning) execution data from database');
|
|
|
|
|
|
|
|
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
|
|
|
|
const maxCount = config.getEnv('executions.pruneDataMaxCount');
|
|
|
|
|
|
|
|
// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
|
|
|
|
const date = new Date();
|
|
|
|
date.setHours(date.getHours() - maxAge);
|
|
|
|
|
|
|
|
const toPrune: Array<FindOptionsWhere<ExecutionEntity>> = [
|
|
|
|
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
|
|
|
|
{ stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) },
|
|
|
|
];
|
|
|
|
|
|
|
|
if (maxCount > 0) {
|
|
|
|
const executions = await this.find({
|
|
|
|
select: ['id'],
|
|
|
|
skip: maxCount,
|
|
|
|
take: 1,
|
|
|
|
order: { id: 'DESC' },
|
|
|
|
});
|
|
|
|
|
|
|
|
if (executions[0]) {
|
|
|
|
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const [timeBasedWhere, countBasedWhere] = toPrune;
|
|
|
|
|
|
|
|
await this.createQueryBuilder()
|
|
|
|
.update(ExecutionEntity)
|
|
|
|
.set({ deletedAt: new Date() })
|
|
|
|
.where(
|
|
|
|
new Brackets((qb) =>
|
|
|
|
countBasedWhere
|
|
|
|
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
|
|
|
|
: qb.where(timeBasedWhere),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
.execute();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Permanently delete all soft-deleted executions and their binary data, in batches.
|
|
|
|
*/
|
|
|
|
private async hardDelete() {
|
|
|
|
// Find ids of all executions that were deleted over an hour ago
|
|
|
|
const date = new Date();
|
|
|
|
date.setHours(date.getHours() - 1);
|
|
|
|
|
|
|
|
const executionIds = (
|
|
|
|
await this.find({
|
|
|
|
select: ['id'],
|
|
|
|
where: {
|
|
|
|
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
|
|
|
|
},
|
|
|
|
take: this.deletionBatchSize,
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @important This ensures soft-deleted executions are included,
|
|
|
|
* else `@DeleteDateColumn()` at `deletedAt` will exclude them.
|
|
|
|
*/
|
|
|
|
withDeleted: true,
|
|
|
|
})
|
|
|
|
).map(({ id }) => id);
|
|
|
|
|
|
|
|
const binaryDataManager = BinaryDataManager.getInstance();
|
|
|
|
await binaryDataManager.deleteBinaryDataByExecutionIds(executionIds);
|
|
|
|
|
|
|
|
this.logger.debug(`Hard-deleting ${executionIds.length} executions from database`, {
|
|
|
|
executionIds,
|
|
|
|
});
|
|
|
|
|
|
|
|
// Actually delete these executions
|
|
|
|
await this.delete({ id: In(executionIds) });
|
|
|
|
|
|
|
|
/**
|
|
|
|
* If the volume of executions to prune is as high as the batch size, there is a risk
|
|
|
|
* that the pruning process is unable to catch up to the creation of new executions,
|
|
|
|
* with high concurrency possibly leading to errors from duplicate deletions.
|
|
|
|
*
|
|
|
|
* Therefore, in this high-volume case we speed up the hard deletion cycle, until
|
|
|
|
* the number of executions to prune is low enough to fit in a single batch.
|
|
|
|
*/
|
|
|
|
if (executionIds.length === this.deletionBatchSize) {
|
|
|
|
clearInterval(this.intervals.hardDeletion);
|
|
|
|
|
|
|
|
setTimeout(async () => this.hardDelete(), 1 * TIME.SECOND);
|
|
|
|
} else {
|
|
|
|
if (this.intervals.hardDeletion) return;
|
|
|
|
|
|
|
|
this.setHardDeletionInterval();
|
|
|
|
}
|
|
|
|
}
|
2023-04-12 01:59:14 -07:00
|
|
|
}
|