fix(core): Filter out certain executions from crash recovery (#9904)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Iván Ovejero 2024-07-02 17:07:07 +02:00 committed by GitHub
parent 61c20d1ae3
commit 7044d1ca28
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 217 additions and 20 deletions

View file

@ -204,6 +204,12 @@ export class ActiveExecutions {
async shutdown(cancelAll = false) {
let executionIds = Object.keys(this.activeExecutions);
if (config.getEnv('executions.mode') === 'regular') {
// removal of active executions will no longer release capacity back,
// so that throttled executions cannot resume during shutdown
this.concurrencyControl.disable();
}
if (cancelAll) {
if (config.getEnv('executions.mode') === 'regular') {
await this.concurrencyControl.removeAll(this.activeExecutions);

View file

@ -32,6 +32,7 @@ import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { EventRelay } from '@/eventbus/event-relay.service';
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
@ -375,6 +376,10 @@ export class Start extends BaseCommand {
projectId: project.id,
};
Container.get(EventRelay).emit('execution-started-during-bootup', {
executionId: execution.id,
});
// do not block - each execution either runs concurrently or is queued
void workflowRunner.run(data, undefined, false, execution.id);
}

View file

@ -12,11 +12,13 @@ import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutingWorkflowData } from '@/Interfaces';
import type { Telemetry } from '@/telemetry';
import type { EventRelay } from '@/eventbus/event-relay.service';
describe('ConcurrencyControlService', () => {
const logger = mock<Logger>();
const executionRepository = mock<ExecutionRepository>();
const telemetry = mock<Telemetry>();
const eventRelay = mock<EventRelay>();
afterEach(() => {
config.set('executions.concurrency.productionLimit', -1);
@ -35,7 +37,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
/**
* Assert
@ -56,7 +63,7 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
new ConcurrencyControlService(logger, executionRepository, telemetry);
new ConcurrencyControlService(logger, executionRepository, telemetry, eventRelay);
} catch (error) {
/**
* Assert
@ -74,7 +81,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
/**
* Assert
@ -92,7 +104,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
/**
* Act
@ -111,7 +128,12 @@ describe('ConcurrencyControlService', () => {
/**
* Act
*/
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
/**
* Assert
@ -135,7 +157,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');
/**
@ -156,7 +183,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');
/**
@ -180,7 +212,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');
/**
@ -201,7 +238,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');
/**
@ -225,7 +267,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
/**
@ -248,7 +295,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
/**
@ -271,7 +323,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', 2);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
jest
.spyOn(ConcurrencyQueue.prototype, 'getAll')
@ -310,7 +367,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', -1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue');
/**
@ -333,7 +395,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', -1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue');
/**
@ -355,7 +422,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', -1);
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove');
/**
@ -385,7 +457,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
config.set('deployment.type', 'cloud');
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
/**
* Act
@ -410,7 +487,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
config.set('deployment.type', 'cloud');
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
/**
* Act
@ -437,7 +519,12 @@ describe('ConcurrencyControlService', () => {
*/
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
config.set('deployment.type', 'cloud');
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
const service = new ConcurrencyControlService(
logger,
executionRepository,
telemetry,
eventRelay,
);
/**
* Act

View file

@ -8,13 +8,14 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { IExecutingWorkflowData } from '@/Interfaces';
import { Telemetry } from '@/telemetry';
import { EventRelay } from '@/eventbus/event-relay.service';
export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];
@Service()
export class ConcurrencyControlService {
private readonly isEnabled: boolean;
private isEnabled: boolean;
private readonly productionLimit: number;
@ -28,6 +29,7 @@ export class ConcurrencyControlService {
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly telemetry: Telemetry,
private readonly eventRelay: EventRelay,
) {
this.productionLimit = config.getEnv('executions.concurrency.productionLimit');
@ -61,6 +63,7 @@ export class ConcurrencyControlService {
this.productionQueue.on('execution-throttled', ({ executionId }: { executionId: string }) => {
this.log('Execution throttled', { executionId });
this.eventRelay.emit('execution-throttled', { executionId });
});
this.productionQueue.on('execution-released', async (executionId: string) => {
@ -130,6 +133,10 @@ export class ConcurrencyControlService {
this.logger.info('Canceled enqueued executions with response promises', { executionIds });
}
disable() {
this.isEnabled = false;
}
// ----------------------------------
// private
// ----------------------------------

View file

@ -49,7 +49,7 @@ export class ConcurrencyQueue extends EventEmitter {
}
getAll() {
return new Set(...this.queue.map((item) => item.executionId));
return new Set(this.queue.map((item) => item.executionId));
}
private resolveNext() {

View file

@ -0,0 +1,45 @@
import { AbstractEventMessage, isEventMessageOptionsWithType } from './AbstractEventMessage';
import type { JsonObject } from 'n8n-workflow';
import { EventMessageTypeNames } from 'n8n-workflow';
import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions';
import type { AbstractEventPayload } from './AbstractEventPayload';
import type { EventNamesExecutionType } from '.';
export interface EventPayloadExecution extends AbstractEventPayload {
executionId: string;
}
export interface EventMessageExecutionOptions extends AbstractEventMessageOptions {
eventName: EventNamesExecutionType;
payload?: EventPayloadExecution;
}
export class EventMessageExecution extends AbstractEventMessage {
readonly __type = EventMessageTypeNames.execution;
eventName: EventNamesExecutionType;
payload: EventPayloadExecution;
constructor(options: EventMessageExecutionOptions) {
super(options);
if (options.payload) this.setPayload(options.payload);
if (options.anonymize) {
this.anonymize();
}
}
setPayload(payload: EventPayloadExecution): this {
this.payload = payload;
return this;
}
deserialize(data: JsonObject): this {
if (isEventMessageOptionsWithType(data, this.__type)) {
this.setOptionsOrDefault(data);
if (data.payload) this.setPayload(data.payload as EventPayloadExecution);
}
return this;
}
}

View file

@ -1,5 +1,6 @@
import type { EventMessageAiNode } from './EventMessageAiNode';
import type { EventMessageAudit } from './EventMessageAudit';
import type { EventMessageExecution } from './EventMessageExecution';
import type { EventMessageGeneric } from './EventMessageGeneric';
import type { EventMessageNode } from './EventMessageNode';
import type { EventMessageWorkflow } from './EventMessageWorkflow';
@ -13,6 +14,10 @@ export const eventNamesWorkflow = [
] as const;
export const eventNamesGeneric = ['n8n.worker.started', 'n8n.worker.stopped'] as const;
export const eventNamesNode = ['n8n.node.started', 'n8n.node.finished'] as const;
export const eventNamesExecution = [
'n8n.execution.throttled',
'n8n.execution.started-during-bootup',
] as const;
export const eventNamesAudit = [
'n8n.audit.user.login.success',
'n8n.audit.user.login.failed',
@ -42,12 +47,14 @@ export const eventNamesAudit = [
export type EventNamesWorkflowType = (typeof eventNamesWorkflow)[number];
export type EventNamesAuditType = (typeof eventNamesAudit)[number];
export type EventNamesNodeType = (typeof eventNamesNode)[number];
export type EventNamesExecutionType = (typeof eventNamesExecution)[number];
export type EventNamesGenericType = (typeof eventNamesGeneric)[number];
export type EventNamesTypes =
| EventNamesAuditType
| EventNamesWorkflowType
| EventNamesNodeType
| EventNamesExecutionType
| EventNamesGenericType
| EventNamesAiNodesType
| 'n8n.destination.test';
@ -65,4 +72,5 @@ export type EventMessageTypes =
| EventMessageWorkflow
| EventMessageAudit
| EventMessageNode
| EventMessageExecution
| EventMessageAiNode;

View file

@ -35,6 +35,8 @@ import {
type EventMessageAiNodeOptions,
} from '../EventMessageClasses/EventMessageAiNode';
import { License } from '@/License';
import type { EventMessageExecutionOptions } from '../EventMessageClasses/EventMessageExecution';
import { EventMessageExecution } from '../EventMessageClasses/EventMessageExecution';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
@ -397,4 +399,8 @@ export class MessageEventBus extends EventEmitter {
async sendAiNodeEvent(options: EventMessageAiNodeOptions) {
await this.send(new EventMessageAiNode(options));
}
async sendExecutionEvent(options: EventMessageExecutionOptions) {
await this.send(new EventMessageExecution(options));
}
}

View file

@ -222,6 +222,8 @@ export class MessageEventBusLogWriter {
case 'n8n.workflow.success':
case 'n8n.workflow.failed':
case 'n8n.workflow.crashed':
case 'n8n.execution.throttled':
case 'n8n.execution.started-during-bootup':
delete results.unfinishedExecutions[executionId];
break;
case 'n8n.node.started':

View file

@ -51,6 +51,10 @@ export class AuditEventRelay {
);
this.eventRelay.on('community-package-updated', (event) => this.communityPackageUpdated(event));
this.eventRelay.on('community-package-deleted', (event) => this.communityPackageDeleted(event));
this.eventRelay.on('execution-throttled', (event) => this.executionThrottled(event));
this.eventRelay.on('execution-started-during-bootup', (event) =>
this.executionStartedDuringBootup(event),
);
}
/**
@ -339,4 +343,22 @@ export class AuditEventRelay {
payload: { ...user, ...rest },
});
}
/**
* Execution
*/
private executionThrottled({ executionId }: Event['execution-throttled']) {
void this.eventBus.sendExecutionEvent({
eventName: 'n8n.execution.throttled',
payload: { executionId },
});
}
private executionStartedDuringBootup({ executionId }: Event['execution-started-during-bootup']) {
void this.eventBus.sendExecutionEvent({
eventName: 'n8n.execution.started-during-bootup',
payload: { executionId },
});
}
}

View file

@ -182,4 +182,12 @@ export type Event = {
packageAuthor?: string;
packageAuthorEmail?: string;
};
'execution-throttled': {
executionId: string;
};
'execution-started-during-bootup': {
executionId: string;
};
};

View file

@ -11,6 +11,7 @@ export const enum EventMessageTypeNames {
confirm = '$$EventMessageConfirm',
workflow = '$$EventMessageWorkflow',
node = '$$EventMessageNode',
execution = '$$EventMessageExecution',
aiNode = '$$EventMessageAiNode',
}