diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index bcc9db18ff..b2928fc3b9 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -15,6 +15,7 @@ import { } from 'n8n-workflow'; import { + ActiveExecutions, CredentialsOverwrites, CredentialTypes, Db, @@ -41,15 +42,65 @@ export class Worker extends Command { static flags = { help: flags.help({ char: 'h' }), concurrency: flags.integer({ - default: 10, + // default: 10, + // TODO: Change setting + default: 1, description: 'How many jobs can run in parallel.', }), }; - runningJobs: { + static runningJobs: { [key: string]: PCancelable; } = {}; + static jobQueue: Bull.Queue; + + static processExistCode = 0; + // static activeExecutions = ActiveExecutions.getInstance(); + + /** + * Stoppes the n8n in a graceful way. + * Make for example sure that all the webhooks from third party services + * get removed. + */ + static async stopProcess() { + console.log(`\nStopping n8n...`); + + // Stop accepting new jobs + Worker.jobQueue.pause(true); + + try { + const externalHooks = ExternalHooks(); + await externalHooks.run('n8n.stop', []); + + const maxStopTime = 30000; + + const stopTime = new Date().getTime() + maxStopTime; + + setTimeout(() => { + // In case that something goes wrong with shutdown we + // kill after max. 30 seconds no matter what + process.exit(Worker.processExistCode); + }, maxStopTime); + + // Wait for active workflow executions to finish + let count = 0; + while (Object.keys(Worker.runningJobs).length !== 0) { + if (count++ % 4 === 0) { + let waitLeft = Math.ceil((stopTime - new Date().getTime()) / 1000); + console.log(`Waiting for ${Object.keys(Worker.runningJobs).length} active executions to finish... (wait ${waitLeft} more seconds)`); + } + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + } + + } catch (error) { + console.error('There was an error shutting down n8n.', error); + } + + process.exit(Worker.processExistCode); + } getWorkflowHooks(jobData: IBullJobData, executionId: string): WorkflowHooks { const hookFunctions: IWorkflowExecuteHooks = {}; @@ -72,7 +123,6 @@ export class Worker extends Command { console.log(`Start job: ${job.id} (Workflow ID: ${jobData.workflowData.id})`); // TODO: Can in the future query most of that data from the DB to lighten redis load - const workflow = new Workflow({ id: jobData.workflowData.id as string, name: jobData.workflowData.name, nodes: jobData.workflowData!.nodes, connections: jobData.workflowData!.connections, active: jobData.workflowData!.active, nodeTypes, staticData: jobData.workflowData!.staticData, settings: jobData.workflowData!.settings }); const credentials = await WorkflowCredentials(jobData.workflowData.nodes); @@ -97,12 +147,12 @@ export class Worker extends Command { workflowRun = workflowExecute.runPartialWorkflow(workflow, jobData.runData, jobData.startNodes, jobData.destinationNode); } - this.runningJobs[job.id] = workflowRun; + Worker.runningJobs[job.id] = workflowRun; // Wait till the execution is finished const runData = await workflowRun; - delete this.runningJobs[job.id]; + delete Worker.runningJobs[job.id]; return { runData, @@ -112,79 +162,82 @@ export class Worker extends Command { async run() { console.log('Starting n8n worker...'); - try { - const { flags } = this.parse(Worker); + // Make sure that n8n shuts down gracefully if possible + process.on('SIGTERM', Worker.stopProcess); + process.on('SIGINT', Worker.stopProcess); - // Start directly with the init of the database to improve startup time - const startDbInitPromise = Db.init().catch(error => { - console.error(`There was an error initializing DB: ${error.message}`); + // Wrap that the process does not close but we can still use async + await (async () => { + try { + const { flags } = this.parse(Worker); + // Start directly with the init of the database to improve startup time + const startDbInitPromise = Db.init().catch(error => { + console.error(`There was an error initializing DB: ${error.message}`); + + Worker.processExistCode = 1; + // @ts-ignore + process.emit('SIGINT'); + }); + + // Make sure the settings exist + await UserSettings.prepareUserSettings(); + + // Load all node and credential types + const loadNodesAndCredentials = LoadNodesAndCredentials(); + await loadNodesAndCredentials.init(); + + // Load the credentials overwrites if any exist + const credentialsOverwrites = CredentialsOverwrites(); + await credentialsOverwrites.init(); + + // Load all external hooks + const externalHooks = ExternalHooks(); + await externalHooks.init(); + + // Add the found types to an instance other parts of the application can use + const nodeTypes = NodeTypes(); + await nodeTypes.init(loadNodesAndCredentials.nodeTypes); + const credentialTypes = CredentialTypes(); + await credentialTypes.init(loadNodesAndCredentials.credentialTypes); + + // Wait till the database is ready + await startDbInitPromise; + + // Connect to bull-queue + const prefix = config.get('queue.bull.prefix') as string; + const redisOptions = config.get('queue.bull.redis') as object; + Worker.jobQueue = new Bull('jobs', { prefix, redis: redisOptions }); + Worker.jobQueue.process(flags.concurrency, (job) => this.runJob(job, nodeTypes)); + + const versions = await GenericHelpers.getVersions(); + + console.log('\nn8n worker is now ready'); + console.log(` * Version: ${versions.cli}`); + console.log(` * Concurrency: ${flags.concurrency}`); + console.log(''); + + Worker.jobQueue.on('global:progress', (jobId, progress) => { + // Progress of a job got updated which does get used + // to communicate that a job got canceled. + + if (progress === -1) { + // Job has to get canceled + if (Worker.runningJobs[jobId] !== undefined) { + // Job is processed by current worker so cancel + Worker.runningJobs[jobId].cancel(); + delete Worker.runningJobs[jobId]; + } + } + }); + } catch (error) { + this.error(`There was an error: ${error.message}`); + + Worker.processExistCode = 1; // @ts-ignore process.emit('SIGINT'); - }); + } + })(); - // Make sure the settings exist - await UserSettings.prepareUserSettings(); - - // Load all node and credential types - const loadNodesAndCredentials = LoadNodesAndCredentials(); - await loadNodesAndCredentials.init(); - - // Load the credentials overwrites if any exist - const credentialsOverwrites = CredentialsOverwrites(); - await credentialsOverwrites.init(); - - // Load all external hooks - const externalHooks = ExternalHooks(); - await externalHooks.init(); - - // Add the found types to an instance other parts of the application can use - const nodeTypes = NodeTypes(); - await nodeTypes.init(loadNodesAndCredentials.nodeTypes); - const credentialTypes = CredentialTypes(); - await credentialTypes.init(loadNodesAndCredentials.credentialTypes); - - // Wait till the database is ready - await startDbInitPromise; - - // Connect to bull-queue - const prefix = config.get('queue.bull.prefix') as string; - const redisOptions = config.get('queue.bull.redis') as object; - const jobQueue = new Bull('jobs', { prefix, redis: redisOptions }); - jobQueue.process(flags.concurrency, (job) => this.runJob(job, nodeTypes)); - - const versions = await GenericHelpers.getVersions(); - - console.log('\nn8n worker is now ready'); - console.log(` * Version: ${versions.cli}`); - console.log(` * Concurrency: ${flags.concurrency}`); - console.log(''); - - jobQueue.on('global:progress', (jobId, progress) => { - // Progress of a job got updated which does get used - // to communicate that a job got canceled. - - if (progress === -1) { - // Job has to get canceled - if (this.runningJobs[jobId] !== undefined) { - // Job is processed by current worker so cancel - this.runningJobs[jobId].cancel(); - delete this.runningJobs[jobId]; - } - } - }); - - - } catch (e) { - // TODO: Do some testing here how to really display the error - // this.error(`There was an error: ${error.message}`); - console.error('\nGOT ERROR'); - console.log('===================================='); - console.error(e.message); - console.error(e.stack); - - // @ts-ignore - process.emit('SIGINT'); - } } }