Added checks to Redis and exiting process if connection is unavailable

This commit is contained in:
Omar Ajoue 2021-01-12 12:46:00 +01:00
parent bf9402b07b
commit 1ee7a47976
4 changed files with 98 additions and 2 deletions

View file

@ -5,6 +5,7 @@ import {
} from 'n8n-core';
import { Command, flags } from '@oclif/command';
const open = require('open');
import * as Redis from 'ioredis';
import * as config from '../config';
import {
@ -20,6 +21,7 @@ import {
Server,
TestWebhooks,
} from "../src";
import { IDataObject } from 'n8n-workflow';
let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined;
@ -156,6 +158,61 @@ export class Start extends Command {
// Wait till the database is ready
await startDbInitPromise;
if (config.get('executions.mode') === 'queue') {
const redisHost = config.get('queue.bull.redis.host');
const redisPassword = config.get('queue.bull.redis.password');
const redisPort = config.get('queue.bull.redis.port');
const redisDB = config.get('queue.bull.redis.db');
const redisConnectionTimeoutLimit = config.get('queue.bull.redis.timeoutThreshold');
let lastTimer = 0, cumulativeTimeout = 0;
const settings = {
retryStrategy: (times: number): number | null => {
const now = Date.now();
if (now - lastTimer > 30000) {
// Means we had no timeout at all or last timeout was temporary and we recovered
lastTimer = now;
cumulativeTimeout = 0;
} else {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
console.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + ". Exiting process.");
process.exit(1);
}
}
return 500;
},
} as IDataObject;
if (redisHost) {
settings.host = redisHost;
}
if (redisPassword) {
settings.password = redisPassword;
}
if (redisPort) {
settings.port = redisPort;
}
if (redisDB) {
settings.db = redisDB;
}
// This connection is going to be our heartbeat
// IORedis automatically pings redis and tries to reconnect
// We will be using the retryStrategy above
// to control how and when to exit.
const redis = new Redis(settings);
redis.on('error', (error) => {
if (error.toString().includes('ECONNREFUSED') === true) {
console.warn('Redis unavailable - trying to reconnect...');
} else {
console.warn('Error with Redis: ', error);
}
});
}
if (flags.tunnel === true) {
this.log('\nWaiting for tunnel ...');

View file

@ -7,6 +7,7 @@ import {
} from 'n8n-core';
import {
IDataObject,
INodeTypes,
IRun,
IWorkflowExecuteHooks,
@ -220,7 +221,9 @@ export class Worker extends Command {
// Connect to bull-queue
const prefix = config.get('queue.bull.prefix') as string;
const redisOptions = config.get('queue.bull.redis') as object;
const redisOptions = config.get('queue.bull.redis') as IDataObject;
const redisConnectionTimeoutLimit = config.get('queue.bull.redis.timeoutThreshold');
redisOptions.enableReadyCheck = false;
Worker.jobQueue = new Bull('jobs', { prefix, redis: redisOptions });
Worker.jobQueue.process(flags.concurrency, (job) => this.runJob(job, nodeTypes));
@ -244,6 +247,34 @@ export class Worker extends Command {
}
}
});
let lastTimer = 0, cumulativeTimeout = 0;
Worker.jobQueue.on('error', (error: Error) => {
if (error.toString().includes('ECONNREFUSED') === true) {
const now = Date.now();
if (now - lastTimer > 30000) {
// Means we had no timeout at all or last timeout was temporary and we recovered
lastTimer = now;
cumulativeTimeout = 0;
} else {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
console.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + ". Exiting process.");
process.exit(1);
}
}
console.warn('Redis unavailable - trying to reconnect...');
} else if (error.toString().includes('Error initializing Lua scripts') === true) {
// This is a non-recoverable error
// Happens when worker starts and Redis is unavailable
// Even if Redis comes back online, worker will be zombie
console.error('Error initializing worker.');
process.exit(2);
} else {
console.error('Error from queue: ', error);
}
});
} catch (error) {
this.error(`There was an error: ${error.message}`);

View file

@ -279,6 +279,12 @@ const config = convict({
default: 6379,
env: 'QUEUE_BULL_REDIS_PORT',
},
timeoutThreshold: {
doc: 'Redis timeout threshold',
format: Number,
default: 10000,
env: 'QUEUE_BULL_REDIS_TIMEOUT_THRESHOLD',
},
},
},
},

View file

@ -23,6 +23,7 @@ import {
} from 'n8n-core';
import {
IDataObject,
IExecutionError,
IRun,
Workflow,
@ -55,7 +56,8 @@ export class WorkflowRunner {
// Connect to bull-queue
const prefix = config.get('queue.bull.prefix') as string;
const redisOptions = config.get('queue.bull.redis') as object;
this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions });
// @ts-ignore
this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false });
}
}