refactor(core): Separate execution startedAt from createdAt (#10810)

This commit is contained in:
Iván Ovejero 2024-09-27 13:32:12 +02:00 committed by GitHub
parent bf7392a878
commit afda049491
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 163 additions and 38 deletions

View file

@ -13,7 +13,7 @@ import { Service } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import type {
ExecutionPayload,
CreateExecutionPayload,
IExecutingWorkflowData,
IExecutionDb,
IExecutionsCurrentSummary,
@ -52,11 +52,10 @@ export class ActiveExecutions {
if (executionId === undefined) {
// Is a new execution so save in DB
const fullExecutionData: ExecutionPayload = {
const fullExecutionData: CreateExecutionPayload = {
data: executionData.executionData!,
mode,
finished: false,
startedAt: new Date(),
workflowData: executionData.workflowData,
status: executionStatus,
workflowId: executionData.workflowData.id,
@ -74,7 +73,10 @@ export class ActiveExecutions {
executionId = await this.executionRepository.createNewExecution(fullExecutionData);
assert(executionId);
await this.concurrencyControl.throttle({ mode, executionId });
if (config.getEnv('executions.mode') === 'regular') {
await this.concurrencyControl.throttle({ mode, executionId });
await this.executionRepository.setRunning(executionId);
}
executionStatus = 'running';
} else {
// Is an existing execution we want to finish so update in DB
@ -86,6 +88,7 @@ export class ActiveExecutions {
data: executionData.executionData!,
waitTill: null,
status: executionStatus,
// this is resuming, so keep `startedAt` as it was
};
await this.executionRepository.updateExistingExecution(executionId, execution);

View file

@ -70,7 +70,6 @@ export class ConcurrencyControlService {
this.productionQueue.on('execution-released', async (executionId) => {
this.log('Execution released', { executionId });
await this.executionRepository.resetStartedAt(executionId);
});
}

View file

@ -47,7 +47,14 @@ export class ExecutionEntity {
status: ExecutionStatus;
@Column(datetimeColumnType)
startedAt: Date;
createdAt: Date;
/**
* Time when the processing of the execution actually started. This column
* is `null` when an execution is enqueued but has not started yet.
*/
@Column({ type: datetimeColumnType, nullable: true })
startedAt: Date | null;
@Index()
@Column({ type: datetimeColumnType, nullable: true })

View file

@ -0,0 +1,27 @@
import type { MigrationContext, ReversibleMigration } from '@/databases/types';
export class SeparateExecutionCreationFromStart1727427440136 implements ReversibleMigration {
async up({
schemaBuilder: { addColumns, column, dropNotNull },
runQuery,
escape,
}: MigrationContext) {
await addColumns('execution_entity', [
column('createdAt').notNull.timestamp().default('NOW()'),
]);
await dropNotNull('execution_entity', 'startedAt');
const executionEntity = escape.tableName('execution_entity');
const createdAt = escape.columnName('createdAt');
const startedAt = escape.columnName('startedAt');
// inaccurate for pre-migration rows but prevents `createdAt` from being nullable
await runQuery(`UPDATE ${executionEntity} SET ${createdAt} = ${startedAt};`);
}
async down({ schemaBuilder: { dropColumns, addNotNull } }: MigrationContext) {
await dropColumns('execution_entity', ['createdAt']);
await addNotNull('execution_entity', 'startedAt');
}
}

View file

@ -64,6 +64,7 @@ import { CreateInvalidAuthTokenTable1723627610222 } from '../common/172362761022
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { AddApiKeysTable1724951148974 } from '../common/1724951148974-AddApiKeysTable';
import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart';
export const mysqlMigrations: Migration[] = [
InitialMigration1588157391238,
@ -130,4 +131,5 @@ export const mysqlMigrations: Migration[] = [
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
AddApiKeysTable1724951148974,
SeparateExecutionCreationFromStart1727427440136,
];

View file

@ -64,6 +64,7 @@ import { CreateInvalidAuthTokenTable1723627610222 } from '../common/172362761022
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { AddApiKeysTable1724951148974 } from '../common/1724951148974-AddApiKeysTable';
import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart';
export const postgresMigrations: Migration[] = [
InitialMigration1587669153312,
@ -130,4 +131,5 @@ export const postgresMigrations: Migration[] = [
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
AddApiKeysTable1724951148974,
SeparateExecutionCreationFromStart1727427440136,
];

View file

@ -61,6 +61,7 @@ import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart';
const sqliteMigrations: Migration[] = [
InitialMigration1588102412422,
@ -124,6 +125,7 @@ const sqliteMigrations: Migration[] = [
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
AddApiKeysTable1724951148974,
SeparateExecutionCreationFromStart1727427440136,
];
export { sqliteMigrations };

View file

@ -42,7 +42,7 @@ import { ExecutionAnnotation } from '@/databases/entities/execution-annotation.e
import { PostgresLiveRowsRetrievalError } from '@/errors/postgres-live-rows-retrieval.error';
import type { ExecutionSummaries } from '@/executions/execution.types';
import type {
ExecutionPayload,
CreateExecutionPayload,
IExecutionBase,
IExecutionFlattedDb,
IExecutionResponse,
@ -198,7 +198,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return executions.map((execution) => {
const { executionData, ...rest } = execution;
return rest;
});
}) as IExecutionFlattedDb[] | IExecutionResponse[] | IExecutionBase[];
}
reportInvalidExecutions(executions: ExecutionEntity[]) {
@ -297,15 +297,15 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
}),
...(options?.includeAnnotation &&
serializedAnnotation && { annotation: serializedAnnotation }),
};
} as IExecutionFlattedDb | IExecutionResponse | IExecutionBase;
}
/**
* Insert a new execution and its execution data using a transaction.
*/
async createNewExecution(execution: ExecutionPayload): Promise<string> {
async createNewExecution(execution: CreateExecutionPayload): Promise<string> {
const { data, workflowData, ...rest } = execution;
const { identifiers: inserted } = await this.insert(rest);
const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() });
const { id: executionId } = inserted[0] as { id: string };
const { connections, nodes, name, settings } = workflowData ?? {};
await this.executionDataRepository.insert({
@ -344,16 +344,25 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
await this.update({ id: executionId }, { status });
}
async resetStartedAt(executionId: string) {
await this.update({ id: executionId }, { startedAt: new Date() });
async setRunning(executionId: string) {
const startedAt = new Date();
await this.update({ id: executionId }, { status: 'running', startedAt });
return startedAt;
}
async updateExistingExecution(executionId: string, execution: Partial<IExecutionResponse>) {
// Se isolate startedAt because it must be set when the execution starts and should never change.
// So we prevent updating it, if it's sent (it usually is and causes problems to executions that
// are resumed after waiting for some time, as a new startedAt is set)
const { id, data, workflowId, workflowData, startedAt, customData, ...executionInformation } =
execution;
const {
id,
data,
workflowId,
workflowData,
createdAt, // must never change
startedAt, // must never change
customData,
...executionInformation
} = execution;
if (Object.keys(executionInformation).length > 0) {
await this.update({ id: executionId }, executionInformation);
}
@ -721,6 +730,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
mode: true,
retryOf: true,
status: true,
createdAt: true,
startedAt: true,
stoppedAt: true,
};
@ -806,6 +816,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
// @tech_debt: These transformations should not be needed
private toSummary(execution: {
id: number | string;
createdAt?: Date | string;
startedAt?: Date | string;
stoppedAt?: Date | string;
waitTill?: Date | string | null;
@ -817,6 +828,13 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return date;
};
if (execution.createdAt) {
execution.createdAt =
execution.createdAt instanceof Date
? execution.createdAt.toISOString()
: normalizeDateString(execution.createdAt);
}
if (execution.startedAt) {
execution.startedAt =
execution.startedAt instanceof Date

View file

@ -3,7 +3,7 @@ import type { ExecutionStatus, IRun, IWorkflowBase } from 'n8n-workflow';
import { Container } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { ExecutionPayload, IExecutionDb } from '@/interfaces';
import type { IExecutionDb, UpdateExecutionPayload } from '@/interfaces';
import { Logger } from '@/logger';
import { ExecutionMetadataService } from '@/services/execution-metadata.service';
import { isWorkflowIdValid } from '@/utils';
@ -46,7 +46,7 @@ export function prepareExecutionDataForDbUpdate(parameters: {
'pinData',
]);
const fullExecutionData: ExecutionPayload = {
const fullExecutionData: UpdateExecutionPayload = {
data: runData.data,
mode: runData.mode,
finished: runData.finished ? runData.finished : false,

View file

@ -32,7 +32,7 @@ import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import type {
ExecutionPayload,
CreateExecutionPayload,
IExecutionFlattedResponse,
IExecutionResponse,
IWorkflowDb,
@ -321,11 +321,10 @@ export class ExecutionService {
},
};
const fullExecutionData: ExecutionPayload = {
const fullExecutionData: CreateExecutionPayload = {
data: executionData,
mode,
finished: false,
startedAt: new Date(),
workflowData,
workflowId: workflow.id,
stoppedAt: new Date(),

View file

@ -115,6 +115,7 @@ export type SaveExecutionDataType = 'all' | 'none';
export interface IExecutionBase {
id: string;
mode: WorkflowExecuteMode;
createdAt: Date; // set by DB
startedAt: Date;
stoppedAt?: Date; // empty value means execution is still running
workflowId: string;
@ -131,10 +132,11 @@ export interface IExecutionDb extends IExecutionBase {
workflowData: IWorkflowBase;
}
/**
* Payload for creating or updating an execution.
*/
export type ExecutionPayload = Omit<IExecutionDb, 'id'>;
/** Payload for creating an execution. */
export type CreateExecutionPayload = Omit<IExecutionDb, 'id' | 'createdAt' | 'startedAt'>;
/** Payload for updating an execution. */
export type UpdateExecutionPayload = Omit<IExecutionDb, 'id' | 'createdAt'>;
export interface IExecutionResponse extends IExecutionBase {
id: string;

View file

@ -47,7 +47,7 @@ export class JobProcessor {
this.logger.info(`[JobProcessor] Starting job ${job.id} (execution ${executionId})`);
await this.executionRepository.updateStatus(executionId, 'running');
const startedAt = await this.executionRepository.setRunning(executionId);
let { staticData } = execution.workflowData;
@ -137,7 +137,7 @@ export class JobProcessor {
workflowId: execution.workflowId,
workflowName: execution.workflowData.name,
mode: execution.mode,
startedAt: execution.startedAt,
startedAt,
retryOf: execution.retryOf ?? '',
status: execution.status,
};

View file

@ -41,7 +41,11 @@ import config from '@/config';
import { CredentialsHelper } from '@/credentials-helper';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExternalHooks } from '@/external-hooks';
import type { IWorkflowExecuteProcess, IWorkflowErrorData, ExecutionPayload } from '@/interfaces';
import type {
IWorkflowExecuteProcess,
IWorkflowErrorData,
UpdateExecutionPayload,
} from '@/interfaces';
import { NodeTypes } from '@/node-types';
import { Push } from '@/push';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
@ -865,7 +869,7 @@ async function executeWorkflow(
// Therefore, database might not contain finished errors.
// Force an update to db as there should be no harm doing this
const fullExecutionData: ExecutionPayload = {
const fullExecutionData: UpdateExecutionPayload = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,

View file

@ -22,7 +22,7 @@ import type { Project } from '@/databases/entities/project';
import type { User } from '@/databases/entities/user';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { ExecutionPayload, IWorkflowDb, IWorkflowErrorData } from '@/interfaces';
import type { CreateExecutionPayload, IWorkflowDb, IWorkflowErrorData } from '@/interfaces';
import { Logger } from '@/logger';
import { NodeTypes } from '@/node-types';
import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service';
@ -206,11 +206,10 @@ export class WorkflowExecutionService {
initialNode,
);
const fullExecutionData: ExecutionPayload = {
const fullExecutionData: CreateExecutionPayload = {
data: fakeExecution.data,
mode: fakeExecution.mode,
finished: false,
startedAt: new Date(),
stoppedAt: new Date(),
workflowData,
waitTill: null,

View file

@ -70,6 +70,7 @@ describe('ExecutionService', () => {
mode: expect.any(String),
retryOf: null,
status: expect.any(String),
createdAt: expect.any(String),
startedAt: expect.any(String),
stoppedAt: expect.any(String),
waitTill: null,
@ -510,6 +511,7 @@ describe('ExecutionService', () => {
mode: expect.any(String),
retryOf: null,
status: expect.any(String),
createdAt: expect.any(String),
startedAt: expect.any(String),
stoppedAt: expect.any(String),
waitTill: null,

View file

@ -159,6 +159,7 @@ test('should report credential in not recently executed workflow', async () => {
const savedExecution = await Container.get(ExecutionRepository).save({
finished: true,
mode: 'manual',
createdAt: date,
startedAt: date,
stoppedAt: date,
workflowId: workflow.id,
@ -227,6 +228,7 @@ test('should not report credentials in recently executed workflow', async () =>
const savedExecution = await Container.get(ExecutionRepository).save({
finished: true,
mode: 'manual',
createdAt: date,
startedAt: date,
stoppedAt: date,
workflowId: workflow.id,

View file

@ -39,6 +39,7 @@ export async function createExecution(
const execution = await Container.get(ExecutionRepository).save({
finished: finished ?? true,
mode: mode ?? 'manual',
createdAt: new Date(),
startedAt: startedAt ?? new Date(),
...(workflow !== undefined && { workflowId: workflow.id }),
stoppedAt: stoppedAt ?? new Date(),

View file

@ -212,6 +212,7 @@
--execution-selector-background: var(--prim-gray-740);
--execution-selector-text: var(--color-text-base);
--execution-select-all-text: var(--color-text-base);
--execution-card-text-waiting: var(--prim-color-secondary-tint-100);
// NDV
--color-run-data-background: var(--prim-gray-800);

View file

@ -273,6 +273,7 @@
--execution-card-border-running: var(--prim-color-alt-b-tint-250);
--execution-card-border-unknown: var(--prim-gray-120);
--execution-card-background-hover: var(--color-foreground-light);
--execution-card-text-waiting: var(--color-secondary);
--execution-selector-background: var(--color-background-dark);
--execution-selector-text: var(--color-text-xlight);
--execution-select-all-text: var(--color-danger);

View file

@ -62,7 +62,9 @@ const classes = computed(() => {
});
const formattedStartedAtDate = computed(() => {
return props.execution.startedAt ? formatDate(props.execution.startedAt) : '';
return props.execution.startedAt
? formatDate(props.execution.startedAt)
: i18n.baseText('executionsList.startingSoon');
});
const formattedWaitTillDate = computed(() => {

View file

@ -150,4 +150,25 @@ describe('WorkflowExecutionsCard', () => {
}
},
);
test('displays correct text for new execution', () => {
const createdAt = new Date('2024-09-27T12:00:00Z');
const props = {
execution: {
id: '1',
mode: 'manual',
status: 'new',
createdAt: createdAt.toISOString(),
},
workflowPermissions: {
execute: true,
},
};
const { getByTestId } = renderComponent({ props });
const executionTimeElement = getByTestId('execution-time');
expect(executionTimeElement).toBeVisible();
expect(executionTimeElement.textContent).toBe('27 Sep - Starting soon');
});
});

View file

@ -11,6 +11,7 @@ import { useI18n } from '@/composables/useI18n';
import type { PermissionsRecord } from '@/permissions';
import { usePostHog } from '@/stores/posthog.store';
import { useSettingsStore } from '@/stores/settings.store';
import { toDayMonth, toTime } from '@/utils/formatters/dateFormatter';
const props = defineProps<{
execution: ExecutionSummary;
@ -87,7 +88,17 @@ function onRetryMenuItemSelect(action: string): void {
:data-test-execution-status="executionUIDetails.name"
>
<div :class="$style.description">
<N8nText color="text-dark" :bold="true" size="medium" data-test-id="execution-time">
<N8nText
v-if="executionUIDetails.name === 'new'"
color="text-dark"
:bold="true"
size="medium"
data-test-id="execution-time"
>
{{ toDayMonth(executionUIDetails.createdAt) }} -
{{ locale.baseText('executionDetails.startingSoon') }}
</N8nText>
<N8nText v-else color="text-dark" :bold="true" size="medium" data-test-id="execution-time">
{{ executionUIDetails.startTime }}
</N8nText>
<div :class="$style.executionStatus">
@ -106,6 +117,15 @@ function onRetryMenuItemSelect(action: string): void {
{{ locale.baseText('executionDetails.runningTimeRunning') }}
<ExecutionsTime :start-time="execution.startedAt" />
</N8nText>
<N8nText
v-if="executionUIDetails.name === 'new' && execution.createdAt"
:color="isActive ? 'text-dark' : 'text-base'"
size="small"
>
<span
>{{ locale.baseText('executionDetails.at') }} {{ toTime(execution.createdAt) }}</span
>
</N8nText>
<N8nText
v-else-if="executionUIDetails.runningTime !== ''"
:color="isActive ? 'text-dark' : 'text-base'"
@ -216,10 +236,10 @@ function onRetryMenuItemSelect(action: string): void {
&.new {
&,
& .executionLink {
border-left: var(--spacing-4xs) var(--border-style-base) var(--execution-card-border-new);
border-left: var(--spacing-4xs) var(--border-style-base) var(--execution-card-border-waiting);
}
.statusLabel {
color: var(--color-text-dark);
color: var(--execution-card-text-waiting);
}
}

View file

@ -5,6 +5,7 @@ import { useI18n } from '@/composables/useI18n';
export interface IExecutionUIData {
name: string;
label: string;
createdAt: string;
startTime: string;
runningTime: string;
showTimestamp: boolean;
@ -17,6 +18,7 @@ export function useExecutionHelpers() {
function getUIDetails(execution: ExecutionSummary): IExecutionUIData {
const status = {
name: 'unknown',
createdAt: execution.createdAt?.toString() ?? '',
startTime: formatDate(execution.startedAt),
label: 'Status unknown',
runningTime: '',

View file

@ -658,6 +658,7 @@
"executionDetails.executionWaiting": "Execution waiting",
"executionDetails.executionWasSuccessful": "Execution was successful",
"executionDetails.of": "of",
"executionDetails.at": "at",
"executionDetails.newMessage": "Execution waiting in the queue.",
"executionDetails.openWorkflow": "Open Workflow",
"executionDetails.readOnly.readOnly": "Read only",
@ -666,6 +667,7 @@
"executionDetails.runningTimeFinished": "in {time}",
"executionDetails.runningTimeRunning": "for",
"executionDetails.runningMessage": "Execution is running. It will show here once finished.",
"executionDetails.startingSoon": "Starting soon",
"executionDetails.workflow": "workflow",
"executionsLandingPage.emptyState.noTrigger.heading": "Set up the first step. Then execute your workflow",
"executionsLandingPage.emptyState.noTrigger.buttonText": "Add first step...",
@ -730,6 +732,7 @@
"executionsList.showMessage.stopExecution.message": "Execution ID {activeExecutionId}",
"executionsList.showMessage.stopExecution.title": "Execution stopped",
"executionsList.startedAt": "Started At",
"executionsList.startingSoon": "Starting soon",
"executionsList.started": "{date} at {time}",
"executionsList.id": "Execution ID",
"executionsList.status": "Status",

View file

@ -22,3 +22,7 @@ export function convertToDisplayDate(fullDate: Date | string | number): {
const [date, time] = formattedDate.split('#');
return { date, time };
}
export const toDayMonth = (fullDate: Date | string) => dateformat(fullDate, 'd mmm');
export const toTime = (fullDate: Date | string) => dateformat(fullDate, 'HH:MM:ss');

View file

@ -2122,6 +2122,7 @@ export interface IWorkflowBase {
name: string;
active: boolean;
createdAt: Date;
startedAt?: Date;
updatedAt: Date;
nodes: INode[];
connections: IConnections;
@ -2463,6 +2464,7 @@ export interface ExecutionSummary {
retryOf?: string | null;
retrySuccessId?: string | null;
waitTill?: Date;
createdAt?: Date;
startedAt: Date;
stoppedAt?: Date;
workflowId: string;