mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
perf(core): Make execution queries faster (#9817)
This commit is contained in:
parent
3428f28a73
commit
dc7dc995d5
|
@ -1,5 +1,5 @@
|
|||
import type { QueryRunner } from '@n8n/typeorm';
|
||||
import { TableIndex } from '@n8n/typeorm';
|
||||
import { TableIndex, TypeORMError } from '@n8n/typeorm';
|
||||
import LazyPromise from 'p-lazy';
|
||||
|
||||
abstract class IndexOperation extends LazyPromise<void> {
|
||||
|
@ -48,10 +48,29 @@ export class CreateIndex extends IndexOperation {
|
|||
}
|
||||
|
||||
export class DropIndex extends IndexOperation {
|
||||
constructor(
|
||||
tableName: string,
|
||||
columnNames: string[],
|
||||
tablePrefix: string,
|
||||
queryRunner: QueryRunner,
|
||||
customIndexName?: string,
|
||||
protected skipIfMissing = false,
|
||||
) {
|
||||
super(tableName, columnNames, tablePrefix, queryRunner, customIndexName);
|
||||
}
|
||||
|
||||
async execute(queryRunner: QueryRunner) {
|
||||
return await queryRunner.dropIndex(
|
||||
this.fullTableName,
|
||||
this.customIndexName ?? this.fullIndexName,
|
||||
);
|
||||
return await queryRunner
|
||||
.dropIndex(this.fullTableName, this.customIndexName ?? this.fullIndexName)
|
||||
.catch((error) => {
|
||||
if (
|
||||
error instanceof TypeORMError &&
|
||||
error.message.includes('not found') &&
|
||||
this.skipIfMissing
|
||||
) {
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,8 +32,14 @@ export const createSchemaBuilder = (tablePrefix: string, queryRunner: QueryRunne
|
|||
customIndexName?: string,
|
||||
) => new CreateIndex(tableName, columnNames, isUnique, tablePrefix, queryRunner, customIndexName),
|
||||
|
||||
dropIndex: (tableName: string, columnNames: string[], customIndexName?: string) =>
|
||||
new DropIndex(tableName, columnNames, tablePrefix, queryRunner, customIndexName),
|
||||
dropIndex: (
|
||||
tableName: string,
|
||||
columnNames: string[],
|
||||
{ customIndexName, skipIfMissing }: { customIndexName?: string; skipIfMissing?: boolean } = {
|
||||
skipIfMissing: false,
|
||||
},
|
||||
) =>
|
||||
new DropIndex(tableName, columnNames, tablePrefix, queryRunner, customIndexName, skipIfMissing),
|
||||
|
||||
addForeignKey: (
|
||||
tableName: string,
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
import type { MigrationContext, ReversibleMigration } from '@/databases/types';
|
||||
|
||||
/**
|
||||
* Add new indices:
|
||||
*
|
||||
* - `workflowId, startedAt` for `ExecutionRepository.findManyByRangeQuery` (default query) and for `ExecutionRepository.findManyByRangeQuery` (filter query)
|
||||
* - `waitTill, status, deletedAt` for `ExecutionRepository.getWaitingExecutions`
|
||||
* - `stoppedAt, status, deletedAt` for `ExecutionRepository.softDeletePrunableExecutions`
|
||||
*
|
||||
* Remove unused indices in sqlite:
|
||||
*
|
||||
* - `stoppedAt` (duplicate with different casing)
|
||||
* - `waitTill`
|
||||
* - `status, workflowId`
|
||||
*
|
||||
* Remove unused indices in MySQL:
|
||||
*
|
||||
* - `status`
|
||||
*
|
||||
* Remove unused indices in all DBs:
|
||||
*
|
||||
* - `waitTill, id`
|
||||
* - `workflowId, id`
|
||||
*
|
||||
* Remove incomplete index in all DBs:
|
||||
*
|
||||
* - `stopped_at` (replaced with composite index)
|
||||
*
|
||||
* Keep index as is:
|
||||
*
|
||||
* - `deletedAt` for query at `ExecutionRepository.hardDeleteSoftDeletedExecutions`
|
||||
*/
|
||||
export class RefactorExecutionIndices1723796243146 implements ReversibleMigration {
|
||||
async up({ schemaBuilder, isPostgres, isSqlite, isMysql, runQuery, escape }: MigrationContext) {
|
||||
if (isSqlite || isPostgres) {
|
||||
const executionEntity = escape.tableName('execution_entity');
|
||||
|
||||
const workflowId = escape.columnName('workflowId');
|
||||
const startedAt = escape.columnName('startedAt');
|
||||
const waitTill = escape.columnName('waitTill');
|
||||
const status = escape.columnName('status');
|
||||
const deletedAt = escape.columnName('deletedAt');
|
||||
const stoppedAt = escape.columnName('stoppedAt');
|
||||
|
||||
await runQuery(`
|
||||
CREATE INDEX idx_execution_entity_workflow_id_started_at
|
||||
ON ${executionEntity} (${workflowId}, ${startedAt})
|
||||
WHERE ${startedAt} IS NOT NULL AND ${deletedAt} IS NULL;
|
||||
`);
|
||||
|
||||
await runQuery(`
|
||||
CREATE INDEX idx_execution_entity_wait_till_status_deleted_at
|
||||
ON ${executionEntity} (${waitTill}, ${status}, ${deletedAt})
|
||||
WHERE ${waitTill} IS NOT NULL AND ${deletedAt} IS NULL;
|
||||
`);
|
||||
|
||||
await runQuery(`
|
||||
CREATE INDEX idx_execution_entity_stopped_at_status_deleted_at
|
||||
ON ${executionEntity} (${stoppedAt}, ${status}, ${deletedAt})
|
||||
WHERE ${stoppedAt} IS NOT NULL AND ${deletedAt} IS NULL;
|
||||
`);
|
||||
} else if (isMysql) {
|
||||
await schemaBuilder.createIndex('execution_entity', ['workflowId', 'startedAt']);
|
||||
await schemaBuilder.createIndex('execution_entity', ['waitTill', 'status', 'deletedAt']);
|
||||
await schemaBuilder.createIndex('execution_entity', ['stoppedAt', 'status', 'deletedAt']);
|
||||
}
|
||||
|
||||
if (isSqlite) {
|
||||
await schemaBuilder.dropIndex('execution_entity', ['waitTill'], {
|
||||
customIndexName: 'idx_execution_entity_wait_till',
|
||||
skipIfMissing: true,
|
||||
});
|
||||
|
||||
await schemaBuilder.dropIndex('execution_entity', ['status', 'workflowId'], {
|
||||
customIndexName: 'IDX_8b6f3f9ae234f137d707b98f3bf43584',
|
||||
skipIfMissing: true,
|
||||
});
|
||||
}
|
||||
|
||||
if (isMysql) {
|
||||
await schemaBuilder.dropIndex('execution_entity', ['status'], {
|
||||
customIndexName: 'IDX_8b6f3f9ae234f137d707b98f3bf43584',
|
||||
skipIfMissing: true,
|
||||
});
|
||||
}
|
||||
|
||||
// all DBs
|
||||
|
||||
await schemaBuilder.dropIndex(
|
||||
'execution_entity',
|
||||
['stoppedAt'],
|
||||
isSqlite ? { customIndexName: 'idx_execution_entity_stopped_at', skipIfMissing: true } : {},
|
||||
);
|
||||
await schemaBuilder.dropIndex('execution_entity', ['waitTill', 'id'], {
|
||||
customIndexName: isPostgres
|
||||
? 'IDX_85b981df7b444f905f8bf50747'
|
||||
: 'IDX_b94b45ce2c73ce46c54f20b5f9',
|
||||
skipIfMissing: true,
|
||||
});
|
||||
await schemaBuilder.dropIndex('execution_entity', ['workflowId', 'id'], {
|
||||
customIndexName:
|
||||
isPostgres || isMysql
|
||||
? 'idx_execution_entity_workflow_id_id'
|
||||
: 'IDX_81fc04c8a17de15835713505e4',
|
||||
skipIfMissing: true,
|
||||
});
|
||||
}
|
||||
|
||||
async down({ schemaBuilder }: MigrationContext) {
|
||||
await schemaBuilder.dropIndex('execution_entity', ['workflowId', 'startedAt']);
|
||||
await schemaBuilder.dropIndex('execution_entity', ['waitTill', 'status']);
|
||||
await schemaBuilder.dropIndex('execution_entity', ['stoppedAt', 'deletedAt', 'status']);
|
||||
|
||||
await schemaBuilder.createIndex('execution_entity', ['waitTill', 'id']);
|
||||
await schemaBuilder.createIndex('execution_entity', ['stoppedAt']);
|
||||
await schemaBuilder.createIndex('execution_entity', ['workflowId', 'id']);
|
||||
}
|
||||
}
|
|
@ -58,6 +58,7 @@ import { MoveSshKeysToDatabase1711390882123 } from '../common/1711390882123-Move
|
|||
import { RemoveNodesAccess1712044305787 } from '../common/1712044305787-RemoveNodesAccess';
|
||||
import { MakeExecutionStatusNonNullable1714133768521 } from '../common/1714133768521-MakeExecutionStatusNonNullable';
|
||||
import { AddActivatedAtUserSetting1717498465931 } from './1717498465931-AddActivatedAtUserSetting';
|
||||
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
|
||||
import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101653148-AddConstraintToExecutionMetadata';
|
||||
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
|
||||
|
||||
|
@ -123,4 +124,5 @@ export const mysqlMigrations: Migration[] = [
|
|||
AddActivatedAtUserSetting1717498465931,
|
||||
AddConstraintToExecutionMetadata1720101653148,
|
||||
CreateInvalidAuthTokenTable1723627610222,
|
||||
RefactorExecutionIndices1723796243146,
|
||||
];
|
||||
|
|
|
@ -57,6 +57,7 @@ import { MoveSshKeysToDatabase1711390882123 } from '../common/1711390882123-Move
|
|||
import { RemoveNodesAccess1712044305787 } from '../common/1712044305787-RemoveNodesAccess';
|
||||
import { MakeExecutionStatusNonNullable1714133768521 } from '../common/1714133768521-MakeExecutionStatusNonNullable';
|
||||
import { AddActivatedAtUserSetting1717498465931 } from './1717498465931-AddActivatedAtUserSetting';
|
||||
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
|
||||
import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101653148-AddConstraintToExecutionMetadata';
|
||||
import { FixExecutionMetadataSequence1721377157740 } from './1721377157740-FixExecutionMetadataSequence';
|
||||
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
|
||||
|
@ -123,4 +124,5 @@ export const postgresMigrations: Migration[] = [
|
|||
AddConstraintToExecutionMetadata1720101653148,
|
||||
FixExecutionMetadataSequence1721377157740,
|
||||
CreateInvalidAuthTokenTable1723627610222,
|
||||
RefactorExecutionIndices1723796243146,
|
||||
];
|
||||
|
|
|
@ -55,6 +55,7 @@ import { MoveSshKeysToDatabase1711390882123 } from '../common/1711390882123-Move
|
|||
import { RemoveNodesAccess1712044305787 } from '../common/1712044305787-RemoveNodesAccess';
|
||||
import { MakeExecutionStatusNonNullable1714133768521 } from '../common/1714133768521-MakeExecutionStatusNonNullable';
|
||||
import { AddActivatedAtUserSetting1717498465931 } from './1717498465931-AddActivatedAtUserSetting';
|
||||
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
|
||||
import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101653148-AddConstraintToExecutionMetadata';
|
||||
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
|
||||
|
||||
|
@ -117,6 +118,7 @@ const sqliteMigrations: Migration[] = [
|
|||
AddActivatedAtUserSetting1717498465931,
|
||||
AddConstraintToExecutionMetadata1720101653148,
|
||||
CreateInvalidAuthTokenTable1723627610222,
|
||||
RefactorExecutionIndices1723796243146,
|
||||
];
|
||||
|
||||
export { sqliteMigrations };
|
||||
|
|
|
@ -784,8 +784,8 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
|||
if (firstId) qb.andWhere('execution.id > :firstId', { firstId });
|
||||
if (lastId) qb.andWhere('execution.id < :lastId', { lastId });
|
||||
|
||||
if (query.order?.stoppedAt === 'DESC') {
|
||||
qb.orderBy({ 'execution.stoppedAt': 'DESC' });
|
||||
if (query.order?.startedAt === 'DESC') {
|
||||
qb.orderBy({ 'execution.startedAt': 'DESC' });
|
||||
} else if (query.order?.top) {
|
||||
qb.orderBy(`(CASE WHEN execution.status = '${query.order.top}' THEN 0 ELSE 1 END)`);
|
||||
} else {
|
||||
|
|
|
@ -11,6 +11,8 @@ export interface MigrationContext {
|
|||
tablePrefix: string;
|
||||
dbType: DatabaseType;
|
||||
isMysql: boolean;
|
||||
isSqlite: boolean;
|
||||
isPostgres: boolean;
|
||||
dbName: string;
|
||||
migrationName: string;
|
||||
nodeTypes: INodeTypes;
|
||||
|
|
|
@ -93,6 +93,8 @@ function parseJson<T>(data: string | T): T {
|
|||
const globalConfig = Container.get(GlobalConfig);
|
||||
const dbType = globalConfig.database.type;
|
||||
const isMysql = ['mariadb', 'mysqldb'].includes(dbType);
|
||||
const isSqlite = dbType === 'sqlite';
|
||||
const isPostgres = dbType === 'postgresdb';
|
||||
const dbName = globalConfig.database[dbType === 'mariadb' ? 'mysqldb' : dbType].database;
|
||||
const tablePrefix = globalConfig.database.tablePrefix;
|
||||
|
||||
|
@ -101,6 +103,8 @@ const createContext = (queryRunner: QueryRunner, migration: Migration): Migratio
|
|||
tablePrefix,
|
||||
dbType,
|
||||
isMysql,
|
||||
isSqlite,
|
||||
isPostgres,
|
||||
dbName,
|
||||
migrationName: migration.name,
|
||||
queryRunner,
|
||||
|
|
|
@ -361,7 +361,7 @@ export class ExecutionService {
|
|||
/**
|
||||
* Return:
|
||||
*
|
||||
* - the latest summaries of current and completed executions that satisfy a query,
|
||||
* - the summaries of latest current and completed executions that satisfy a query,
|
||||
* - the total count of all completed executions that satisfy the query, and
|
||||
* - whether the total of completed executions is an estimate.
|
||||
*
|
||||
|
@ -382,7 +382,7 @@ export class ExecutionService {
|
|||
this.findRangeWithCount({
|
||||
...query,
|
||||
status: completedStatuses,
|
||||
order: { stoppedAt: 'DESC' },
|
||||
order: { startedAt: 'DESC' },
|
||||
}),
|
||||
]);
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ export namespace ExecutionSummaries {
|
|||
type OrderFields = {
|
||||
order?: {
|
||||
top?: ExecutionStatus;
|
||||
stoppedAt?: 'DESC';
|
||||
startedAt?: 'DESC';
|
||||
};
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in a new issue