mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-24 04:04:06 -08:00
refactor(core): Remove watchdog interval (#11295)
This commit is contained in:
parent
c79aa01a48
commit
83ca7f8e90
|
@ -82,10 +82,6 @@ class BullConfig {
|
|||
@Nested
|
||||
redis: RedisConfig;
|
||||
|
||||
/** How often (in seconds) to poll the Bull queue to identify executions finished during a Redis crash. `0` to disable. May increase Redis traffic significantly. */
|
||||
@Env('QUEUE_RECOVERY_INTERVAL')
|
||||
queueRecoveryInterval: number = 60; // watchdog interval
|
||||
|
||||
/** @deprecated How long (in seconds) a worker must wait for active executions to finish before exiting. Use `N8N_GRACEFUL_SHUTDOWN_TIMEOUT` instead */
|
||||
@Env('QUEUE_WORKER_TIMEOUT')
|
||||
gracefulShutdownTimeout: number = 30;
|
||||
|
|
|
@ -211,7 +211,6 @@ describe('GlobalConfig', () => {
|
|||
clusterNodes: '',
|
||||
tls: false,
|
||||
},
|
||||
queueRecoveryInterval: 60,
|
||||
gracefulShutdownTimeout: 30,
|
||||
prefix: 'bull',
|
||||
settings: {
|
||||
|
|
|
@ -2,6 +2,16 @@
|
|||
|
||||
This list shows all the versions which include breaking changes and how to upgrade.
|
||||
|
||||
# 1.65.0
|
||||
|
||||
### What changed?
|
||||
|
||||
Queue polling via the env var `QUEUE_RECOVERY_INTERVAL` has been removed.
|
||||
|
||||
### When is action necessary?
|
||||
|
||||
If you have set the env var `QUEUE_RECOVERY_INTERVAL`, so you can remove it as it no longer has any effect.
|
||||
|
||||
# 1.63.0
|
||||
|
||||
### What changed?
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-shadow */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { InstanceSettings, WorkflowExecute } from 'n8n-core';
|
||||
import type {
|
||||
ExecutionError,
|
||||
|
@ -30,7 +29,7 @@ import { ExternalHooks } from '@/external-hooks';
|
|||
import { Logger } from '@/logging/logger.service';
|
||||
import { NodeTypes } from '@/node-types';
|
||||
import type { ScalingService } from '@/scaling/scaling.service';
|
||||
import type { Job, JobData, JobResult } from '@/scaling/scaling.types';
|
||||
import type { Job, JobData } from '@/scaling/scaling.types';
|
||||
import { PermissionChecker } from '@/user-management/permission-checker';
|
||||
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
|
||||
import * as WorkflowHelpers from '@/workflow-helpers';
|
||||
|
@ -439,54 +438,8 @@ export class WorkflowRunner {
|
|||
reject(error);
|
||||
});
|
||||
|
||||
const jobData: Promise<JobResult> = job.finished();
|
||||
|
||||
const { queueRecoveryInterval } = Container.get(GlobalConfig).queue.bull;
|
||||
|
||||
const racingPromises: Array<Promise<JobResult>> = [jobData];
|
||||
|
||||
let clearWatchdogInterval;
|
||||
if (queueRecoveryInterval > 0) {
|
||||
/** ***********************************************
|
||||
* Long explanation about what this solves: *
|
||||
* This only happens in a very specific scenario *
|
||||
* when Redis crashes and recovers shortly *
|
||||
* but during this time, some execution(s) *
|
||||
* finished. The end result is that the main *
|
||||
* process will wait indefinitely and never *
|
||||
* get a response. This adds an active polling to*
|
||||
* the queue that allows us to identify that the *
|
||||
* execution finished and get information from *
|
||||
* the database. *
|
||||
************************************************ */
|
||||
let watchDogInterval: NodeJS.Timeout | undefined;
|
||||
|
||||
const watchDog: Promise<JobResult> = new Promise((res) => {
|
||||
watchDogInterval = setInterval(async () => {
|
||||
const currentJob = await this.scalingService.getJob(job.id);
|
||||
// When null means job is finished (not found in queue)
|
||||
if (currentJob === null) {
|
||||
// Mimic worker's success message
|
||||
res({ success: true });
|
||||
}
|
||||
}, queueRecoveryInterval * 1000);
|
||||
});
|
||||
|
||||
racingPromises.push(watchDog);
|
||||
|
||||
clearWatchdogInterval = () => {
|
||||
if (watchDogInterval) {
|
||||
clearInterval(watchDogInterval);
|
||||
watchDogInterval = undefined;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
await Promise.race(racingPromises);
|
||||
if (clearWatchdogInterval !== undefined) {
|
||||
clearWatchdogInterval();
|
||||
}
|
||||
await job.finished();
|
||||
} catch (error) {
|
||||
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
|
||||
// "workflowExecuteAfter" which we require.
|
||||
|
@ -497,9 +450,6 @@ export class WorkflowRunner {
|
|||
{ retryOf: data.retryOf ? data.retryOf.toString() : undefined },
|
||||
);
|
||||
this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`);
|
||||
if (clearWatchdogInterval !== undefined) {
|
||||
clearWatchdogInterval();
|
||||
}
|
||||
await this.processError(error, new Date(), data.executionMode, executionId, hooks);
|
||||
|
||||
reject(error);
|
||||
|
|
Loading…
Reference in a new issue