mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 21:07:28 -08:00
0c779de704
* Implemented timeout for workers and corrected timeout for subworkflows
* Fixed issue with timeouts when running on separate processes
* Standardized timeouts across all n8n
Now the maxTimeout setting takes effect whenever a timeout is set
to any workflows.
This causes local timeouts (either set on a per-workflow basis or
via global settings) to be capped by the maximum timeout. This
behavior already existed but was not applied to all places.
Also changed the way n8n enforces timeouts for subworkflows, making
it work always.
In effect, with this change, if you have one workflow that calls others
only the main workflow's timeout is taken into consideration, limiting
the maximum time that other workflows combined can run.
* ⚡ Fix timeout problem in "own" mode
Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
281 lines
9.1 KiB
TypeScript
281 lines
9.1 KiB
TypeScript
import * as PCancelable from 'p-cancelable';
|
|
|
|
import { Command, flags } from '@oclif/command';
|
|
import {
|
|
UserSettings,
|
|
WorkflowExecute,
|
|
} from 'n8n-core';
|
|
|
|
import {
|
|
IDataObject,
|
|
INodeTypes,
|
|
IRun,
|
|
IWorkflowExecuteHooks,
|
|
Workflow,
|
|
WorkflowHooks,
|
|
} from 'n8n-workflow';
|
|
|
|
import {
|
|
FindOneOptions,
|
|
} from 'typeorm';
|
|
|
|
import {
|
|
ActiveExecutions,
|
|
CredentialsOverwrites,
|
|
CredentialTypes,
|
|
Db,
|
|
ExternalHooks,
|
|
GenericHelpers,
|
|
IBullJobData,
|
|
IBullJobResponse,
|
|
IExecutionFlattedDb,
|
|
IExecutionResponse,
|
|
LoadNodesAndCredentials,
|
|
NodeTypes,
|
|
ResponseHelper,
|
|
WorkflowCredentials,
|
|
WorkflowExecuteAdditionalData,
|
|
} from '../src';
|
|
|
|
import * as config from '../config';
|
|
import * as Bull from 'bull';
|
|
import * as Queue from '../src/Queue';
|
|
|
|
export class Worker extends Command {
|
|
static description = '\nStarts a n8n worker';
|
|
|
|
static examples = [
|
|
`$ n8n worker --concurrency=5`,
|
|
];
|
|
|
|
static flags = {
|
|
help: flags.help({ char: 'h' }),
|
|
concurrency: flags.integer({
|
|
default: 10,
|
|
description: 'How many jobs can run in parallel.',
|
|
}),
|
|
};
|
|
|
|
static runningJobs: {
|
|
[key: string]: PCancelable<IRun>;
|
|
} = {};
|
|
|
|
static jobQueue: Bull.Queue;
|
|
|
|
static processExistCode = 0;
|
|
// static activeExecutions = ActiveExecutions.getInstance();
|
|
|
|
/**
|
|
* Stoppes the n8n in a graceful way.
|
|
* Make for example sure that all the webhooks from third party services
|
|
* get removed.
|
|
*/
|
|
static async stopProcess() {
|
|
console.log(`\nStopping n8n...`);
|
|
|
|
// Stop accepting new jobs
|
|
Worker.jobQueue.pause(true);
|
|
|
|
try {
|
|
const externalHooks = ExternalHooks();
|
|
await externalHooks.run('n8n.stop', []);
|
|
|
|
const maxStopTime = 30000;
|
|
|
|
const stopTime = new Date().getTime() + maxStopTime;
|
|
|
|
setTimeout(() => {
|
|
// In case that something goes wrong with shutdown we
|
|
// kill after max. 30 seconds no matter what
|
|
process.exit(Worker.processExistCode);
|
|
}, maxStopTime);
|
|
|
|
// Wait for active workflow executions to finish
|
|
let count = 0;
|
|
while (Object.keys(Worker.runningJobs).length !== 0) {
|
|
if (count++ % 4 === 0) {
|
|
const waitLeft = Math.ceil((stopTime - new Date().getTime()) / 1000);
|
|
console.log(`Waiting for ${Object.keys(Worker.runningJobs).length} active executions to finish... (wait ${waitLeft} more seconds)`);
|
|
}
|
|
await new Promise((resolve) => {
|
|
setTimeout(resolve, 500);
|
|
});
|
|
}
|
|
|
|
} catch (error) {
|
|
console.error('There was an error shutting down n8n.', error);
|
|
}
|
|
|
|
process.exit(Worker.processExistCode);
|
|
}
|
|
|
|
async runJob(job: Bull.Job, nodeTypes: INodeTypes): Promise<IBullJobResponse> {
|
|
const jobData = job.data as IBullJobData;
|
|
const executionDb = await Db.collections.Execution!.findOne(jobData.executionId) as IExecutionFlattedDb;
|
|
const currentExecutionDb = ResponseHelper.unflattenExecutionData(executionDb) as IExecutionResponse;
|
|
console.log(`Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${jobData.executionId})`);
|
|
|
|
let staticData = currentExecutionDb.workflowData!.staticData;
|
|
if (jobData.loadStaticData === true) {
|
|
const findOptions = {
|
|
select: ['id', 'staticData'],
|
|
} as FindOneOptions;
|
|
const workflowData = await Db.collections!.Workflow!.findOne(currentExecutionDb.workflowData.id, findOptions);
|
|
if (workflowData === undefined) {
|
|
throw new Error(`The workflow with the ID "${currentExecutionDb.workflowData.id}" could not be found`);
|
|
}
|
|
staticData = workflowData.staticData;
|
|
}
|
|
|
|
let workflowTimeout = config.get('executions.timeout') as number; // initialize with default
|
|
if (currentExecutionDb.workflowData.settings && currentExecutionDb.workflowData.settings.executionTimeout) {
|
|
workflowTimeout = currentExecutionDb.workflowData.settings!.executionTimeout as number; // preference on workflow setting
|
|
}
|
|
|
|
let executionTimeoutTimestamp: number | undefined;
|
|
if (workflowTimeout > 0) {
|
|
workflowTimeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number);
|
|
executionTimeoutTimestamp = Date.now() + workflowTimeout * 1000;
|
|
}
|
|
|
|
const workflow = new Workflow({ id: currentExecutionDb.workflowData.id as string, name: currentExecutionDb.workflowData.name, nodes: currentExecutionDb.workflowData!.nodes, connections: currentExecutionDb.workflowData!.connections, active: currentExecutionDb.workflowData!.active, nodeTypes, staticData, settings: currentExecutionDb.workflowData!.settings });
|
|
|
|
const credentials = await WorkflowCredentials(currentExecutionDb.workflowData.nodes);
|
|
|
|
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials, undefined, executionTimeoutTimestamp);
|
|
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(currentExecutionDb.mode, job.data.executionId, currentExecutionDb.workflowData, { retryOf: currentExecutionDb.retryOf as string });
|
|
|
|
let workflowExecute: WorkflowExecute;
|
|
let workflowRun: PCancelable<IRun>;
|
|
if (currentExecutionDb.data !== undefined) {
|
|
workflowExecute = new WorkflowExecute(additionalData, currentExecutionDb.mode, currentExecutionDb.data);
|
|
workflowRun = workflowExecute.processRunExecutionData(workflow);
|
|
} else {
|
|
// Execute all nodes
|
|
// Can execute without webhook so go on
|
|
workflowExecute = new WorkflowExecute(additionalData, currentExecutionDb.mode);
|
|
workflowRun = workflowExecute.run(workflow);
|
|
}
|
|
|
|
Worker.runningJobs[job.id] = workflowRun;
|
|
|
|
// Wait till the execution is finished
|
|
const runData = await workflowRun;
|
|
|
|
delete Worker.runningJobs[job.id];
|
|
|
|
return {
|
|
success: true,
|
|
};
|
|
}
|
|
|
|
async run() {
|
|
console.log('Starting n8n worker...');
|
|
|
|
// Make sure that n8n shuts down gracefully if possible
|
|
process.on('SIGTERM', Worker.stopProcess);
|
|
process.on('SIGINT', Worker.stopProcess);
|
|
|
|
// Wrap that the process does not close but we can still use async
|
|
await (async () => {
|
|
try {
|
|
const { flags } = this.parse(Worker);
|
|
|
|
// Start directly with the init of the database to improve startup time
|
|
const startDbInitPromise = Db.init().catch(error => {
|
|
console.error(`There was an error initializing DB: ${error.message}`);
|
|
|
|
Worker.processExistCode = 1;
|
|
// @ts-ignore
|
|
process.emit('SIGINT');
|
|
});
|
|
|
|
// Make sure the settings exist
|
|
await UserSettings.prepareUserSettings();
|
|
|
|
// Load all node and credential types
|
|
const loadNodesAndCredentials = LoadNodesAndCredentials();
|
|
await loadNodesAndCredentials.init();
|
|
|
|
// Load the credentials overwrites if any exist
|
|
const credentialsOverwrites = CredentialsOverwrites();
|
|
await credentialsOverwrites.init();
|
|
|
|
// Load all external hooks
|
|
const externalHooks = ExternalHooks();
|
|
await externalHooks.init();
|
|
|
|
// Add the found types to an instance other parts of the application can use
|
|
const nodeTypes = NodeTypes();
|
|
await nodeTypes.init(loadNodesAndCredentials.nodeTypes);
|
|
const credentialTypes = CredentialTypes();
|
|
await credentialTypes.init(loadNodesAndCredentials.credentialTypes);
|
|
|
|
// Wait till the database is ready
|
|
await startDbInitPromise;
|
|
|
|
const redisConnectionTimeoutLimit = config.get('queue.bull.redis.timeoutThreshold');
|
|
|
|
Worker.jobQueue = Queue.getInstance().getBullObjectInstance();
|
|
Worker.jobQueue.process(flags.concurrency, (job) => this.runJob(job, nodeTypes));
|
|
|
|
const versions = await GenericHelpers.getVersions();
|
|
|
|
console.log('\nn8n worker is now ready');
|
|
console.log(` * Version: ${versions.cli}`);
|
|
console.log(` * Concurrency: ${flags.concurrency}`);
|
|
console.log('');
|
|
|
|
Worker.jobQueue.on('global:progress', (jobId, progress) => {
|
|
// Progress of a job got updated which does get used
|
|
// to communicate that a job got canceled.
|
|
|
|
if (progress === -1) {
|
|
// Job has to get canceled
|
|
if (Worker.runningJobs[jobId] !== undefined) {
|
|
// Job is processed by current worker so cancel
|
|
Worker.runningJobs[jobId].cancel();
|
|
delete Worker.runningJobs[jobId];
|
|
}
|
|
}
|
|
});
|
|
|
|
let lastTimer = 0, cumulativeTimeout = 0;
|
|
Worker.jobQueue.on('error', (error: Error) => {
|
|
if (error.toString().includes('ECONNREFUSED') === true) {
|
|
const now = Date.now();
|
|
if (now - lastTimer > 30000) {
|
|
// Means we had no timeout at all or last timeout was temporary and we recovered
|
|
lastTimer = now;
|
|
cumulativeTimeout = 0;
|
|
} else {
|
|
cumulativeTimeout += now - lastTimer;
|
|
lastTimer = now;
|
|
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
|
|
console.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + '. Exiting process.');
|
|
process.exit(1);
|
|
}
|
|
}
|
|
console.warn('Redis unavailable - trying to reconnect...');
|
|
} else if (error.toString().includes('Error initializing Lua scripts') === true) {
|
|
// This is a non-recoverable error
|
|
// Happens when worker starts and Redis is unavailable
|
|
// Even if Redis comes back online, worker will be zombie
|
|
console.error('Error initializing worker.');
|
|
process.exit(2);
|
|
} else {
|
|
console.error('Error from queue: ', error);
|
|
}
|
|
});
|
|
} catch (error) {
|
|
this.error(`There was an error: ${error.message}`);
|
|
|
|
Worker.processExistCode = 1;
|
|
// @ts-ignore
|
|
process.emit('SIGINT');
|
|
}
|
|
})();
|
|
|
|
}
|
|
}
|