mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
wip: addressing PR feedback
This commit is contained in:
parent
ac54d349e9
commit
37bd0aa655
|
@ -16,15 +16,15 @@ import { ConcurrencyQueue } from './concurrency-queue';
|
||||||
export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
|
export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
|
||||||
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];
|
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];
|
||||||
|
|
||||||
export type ConcurrencyType = 'production' | 'evaluation';
|
export type ConcurrencyQueueType = 'production' | 'evaluation';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class ConcurrencyControlService {
|
export class ConcurrencyControlService {
|
||||||
private isEnabled: boolean;
|
private isEnabled: boolean;
|
||||||
|
|
||||||
private readonly limits: Map<ConcurrencyType, number>;
|
private readonly limits: Map<ConcurrencyQueueType, number>;
|
||||||
|
|
||||||
private readonly queues: Map<ConcurrencyType, ConcurrencyQueue>;
|
private readonly queues: Map<ConcurrencyQueueType, ConcurrencyQueue>;
|
||||||
|
|
||||||
private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map(
|
private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map(
|
||||||
(t) => CLOUD_TEMP_PRODUCTION_LIMIT - t,
|
(t) => CLOUD_TEMP_PRODUCTION_LIMIT - t,
|
||||||
|
@ -77,7 +77,7 @@ export class ConcurrencyControlService {
|
||||||
if (this.shouldReport(capacity)) {
|
if (this.shouldReport(capacity)) {
|
||||||
this.telemetry.track('User hit concurrency limit', {
|
this.telemetry.track('User hit concurrency limit', {
|
||||||
threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity,
|
threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity,
|
||||||
concurrencyType: type,
|
concurrencyQueue: type,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -87,7 +87,7 @@ export class ConcurrencyControlService {
|
||||||
this.eventService.emit('execution-throttled', { executionId, type });
|
this.eventService.emit('execution-throttled', { executionId, type });
|
||||||
});
|
});
|
||||||
|
|
||||||
queue.on('execution-released', async (executionId) => {
|
queue.on('execution-released', (executionId) => {
|
||||||
this.logger.debug('Execution released', { executionId, type });
|
this.logger.debug('Execution released', { executionId, type });
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -108,11 +108,7 @@ export class ConcurrencyControlService {
|
||||||
async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
|
async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
|
||||||
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
||||||
|
|
||||||
const queue = this.getQueue(mode);
|
await this.getQueue(mode)?.enqueue(executionId);
|
||||||
|
|
||||||
if (queue) {
|
|
||||||
await queue.enqueue(executionId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -121,11 +117,7 @@ export class ConcurrencyControlService {
|
||||||
release({ mode }: { mode: ExecutionMode }) {
|
release({ mode }: { mode: ExecutionMode }) {
|
||||||
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
||||||
|
|
||||||
const queue = this.getQueue(mode);
|
this.getQueue(mode)?.dequeue();
|
||||||
|
|
||||||
if (queue) {
|
|
||||||
queue.dequeue();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -134,11 +126,7 @@ export class ConcurrencyControlService {
|
||||||
remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
|
remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) {
|
||||||
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
if (!this.isEnabled || this.isUnlimited(mode)) return;
|
||||||
|
|
||||||
const queue = this.getQueue(mode);
|
this.getQueue(mode)?.remove(executionId);
|
||||||
|
|
||||||
if (queue) {
|
|
||||||
queue.remove(executionId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -190,9 +178,7 @@ export class ConcurrencyControlService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private isUnlimited(mode: ExecutionMode) {
|
private isUnlimited(mode: ExecutionMode) {
|
||||||
const queue = this.getQueue(mode);
|
return this.getQueue(mode) === undefined;
|
||||||
|
|
||||||
return queue === undefined;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private shouldReport(capacity: number) {
|
private shouldReport(capacity: number) {
|
||||||
|
|
|
@ -73,6 +73,10 @@ const isAnnotationEnabled = computed(
|
||||||
() => settingsStore.isEnterpriseFeatureEnabled[EnterpriseEditionFeature.AdvancedExecutionFilters],
|
() => settingsStore.isEnterpriseFeatureEnabled[EnterpriseEditionFeature.AdvancedExecutionFilters],
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate the number of executions counted towards the production executions concurrency limit.
|
||||||
|
* Evaluation executions are not counted towards this limit and the evaluation limit isn't shown in the UI.
|
||||||
|
*/
|
||||||
const runningExecutionsCount = computed(() => {
|
const runningExecutionsCount = computed(() => {
|
||||||
return props.executions.filter(
|
return props.executions.filter(
|
||||||
(execution) =>
|
(execution) =>
|
||||||
|
|
|
@ -54,6 +54,10 @@ const executionListRef = ref<HTMLElement | null>(null);
|
||||||
|
|
||||||
const workflowPermissions = computed(() => getResourcePermissions(props.workflow?.scopes).workflow);
|
const workflowPermissions = computed(() => getResourcePermissions(props.workflow?.scopes).workflow);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate the number of executions counted towards the production executions concurrency limit.
|
||||||
|
* Evaluation executions are not counted towards this limit and the evaluation limit isn't shown in the UI.
|
||||||
|
*/
|
||||||
const runningExecutionsCount = computed(() => {
|
const runningExecutionsCount = computed(() => {
|
||||||
return props.executions.filter(
|
return props.executions.filter(
|
||||||
(execution) =>
|
(execution) =>
|
||||||
|
|
Loading…
Reference in a new issue