Added logging to n8n (#1381)

* Added logging to n8n

This commit adds logging to n8n using the Winston library.

For now, this commit only allows logging to console (default behavior)
or file (need to pass in config via environment variables).

Other logging methods can be further implemented using hooks. These were
skipped for now as it would require adding more dependencies.

Logging level is notice by default, meaning no additional messages would
be displayed at the moment. Logging level can be set to info or debug as
well to enrich the generated logs.

The ILogger interface was added to the workflow project as it would make
it available for all other projects but the implementation was done on
the cli project.

* Lint fixes and logging level naming. Also fixed the way we use the logger as it was not working previously

* Improvements to logging framework

Using appropriate single quotes
Improving the way the logger is declared

* Improved naming for Log Types

* Removed logger global variable, replacing it by a proxy

* Add logging to CLI commands

* Remove unused GenericHelpers

* Changed back some messages to console instead of logger and added npm
shortcuts for worker and webhook

* Fix typos

* Adding basic file rotation to logs as suggested by @mutdmour

* Fixed linting issues

* Correcting comment to correctly reflect space usage

* Added settings for log files rotation

* Correcting config type from String to Number

* Changed default file settings to number

To reflect previous changes to the type

* Changed the way log messages are added to be called statically. Also minor naming improvements

* Applying latest corrections sent by @ivov

*  Some logging improvements

* Saving logs to a folder inside n8n home instead of root

* Fixed broken tests and linting

* Changed some log messages to improve formatting

* Adding quotes to names  on log messages

* Added execution and session IDs to logs. Also removed unnecessary line breaks

*  Added file caller to log messages (#1657)

This is done using callsites library which already existed
in the project as another library's dependency. So in fact
it does not add any new dependency.

* Adding logs to help debug Salesforce node

*  Add function name to logs and add more logs

*  Improve some error messages

*  Improve some more log messages

*  Rename logging env variables to match others

Co-authored-by: dali <servfrdali@yahoo.fr>
Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
Omar Ajoue 2021-05-02 05:43:01 +02:00 committed by GitHub
parent 0b69310bed
commit c972f3dd50
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 548 additions and 129 deletions

View file

@ -13,7 +13,9 @@
"start:windows": "cd packages/cli/bin && n8n",
"test": "lerna run test",
"tslint": "lerna exec npm run tslint",
"watch": "lerna run --parallel watch"
"watch": "lerna run --parallel watch",
"webhook": "./packages/cli/bin/n8n webhook",
"worker": "./packages/cli/bin/n8n worker"
},
"devDependencies": {
"lerna": "^3.13.1",

View file

@ -13,7 +13,6 @@ import {
CredentialTypes,
Db,
ExternalHooks,
GenericHelpers,
IWorkflowBase,
IWorkflowExecutionDataProcess,
LoadNodesAndCredentials,
@ -23,6 +22,13 @@ import {
WorkflowRunner,
} from '../src';
import {
getLogger,
} from '../src/Logger';
import {
LoggerProxy,
} from 'n8n-workflow';
export class Execute extends Command {
static description = '\nExecutes a given workflow';
@ -44,6 +50,9 @@ export class Execute extends Command {
async run() {
const logger = getLogger();
LoggerProxy.init(logger);
const { flags } = this.parse(Execute);
// Start directly with the init of the database to improve startup time
@ -54,12 +63,12 @@ export class Execute extends Command {
const loadNodesAndCredentialsPromise = loadNodesAndCredentials.init();
if (!flags.id && !flags.file) {
GenericHelpers.logOutput(`Either option "--id" or "--file" have to be set!`);
console.info(`Either option "--id" or "--file" have to be set!`);
return;
}
if (flags.id && flags.file) {
GenericHelpers.logOutput(`Either "id" or "file" can be set never both!`);
console.info(`Either "id" or "file" can be set never both!`);
return;
}
@ -71,7 +80,7 @@ export class Execute extends Command {
workflowData = JSON.parse(await fs.readFile(flags.file, 'utf8'));
} catch (error) {
if (error.code === 'ENOENT') {
GenericHelpers.logOutput(`The file "${flags.file}" could not be found.`);
console.info(`The file "${flags.file}" could not be found.`);
return;
}
@ -81,7 +90,7 @@ export class Execute extends Command {
// Do a basic check if the data in the file looks right
// TODO: Later check with the help of TypeScript data if it is valid or not
if (workflowData === undefined || workflowData.nodes === undefined || workflowData.connections === undefined) {
GenericHelpers.logOutput(`The file "${flags.file}" does not contain valid workflow data.`);
console.info(`The file "${flags.file}" does not contain valid workflow data.`);
return;
}
workflowId = workflowData.id!.toString();
@ -95,8 +104,8 @@ export class Execute extends Command {
workflowId = flags.id;
workflowData = await Db.collections!.Workflow!.findOne(workflowId);
if (workflowData === undefined) {
GenericHelpers.logOutput(`The workflow with the id "${workflowId}" does not exist.`);
return;
console.info(`The workflow with the id "${workflowId}" does not exist.`);
process.exit(1);
}
}
@ -138,7 +147,7 @@ export class Execute extends Command {
if (startNode === undefined) {
// If the workflow does not contain a start-node we can not know what
// should be executed and with which data to start.
GenericHelpers.logOutput(`The workflow does not contain a "Start" node. So it can not be executed.`);
console.info(`The workflow does not contain a "Start" node. So it can not be executed.`);
return Promise.resolve();
}
@ -163,9 +172,10 @@ export class Execute extends Command {
}
if (data.data.resultData.error) {
this.log('Execution was NOT successfull:');
this.log('====================================');
this.log(JSON.stringify(data, null, 2));
console.info('Execution was NOT successful. See log message for details.');
logger.info('Execution error:');
logger.info('====================================');
logger.info(JSON.stringify(data, null, 2));
const { error } = data.data.resultData;
throw {
@ -174,14 +184,15 @@ export class Execute extends Command {
};
}
this.log('Execution was successfull:');
this.log('====================================');
this.log(JSON.stringify(data, null, 2));
console.info('Execution was successful:');
console.info('====================================');
console.info(JSON.stringify(data, null, 2));
} catch (e) {
console.error('\nGOT ERROR');
console.log('====================================');
console.error(e.message);
console.error(e.stack);
console.error('Error executing workflow. See log messages for details.');
logger.error('\nExecution error:');
logger.info('====================================');
logger.error(e.message);
logger.error(e.stack);
this.exit(1);
}

View file

@ -14,10 +14,17 @@ import {
import {
Db,
GenericHelpers,
ICredentialsDecryptedDb,
} from '../../src';
import {
getLogger,
} from '../../src/Logger';
import {
LoggerProxy,
} from 'n8n-workflow';
import * as fs from 'fs';
import * as path from 'path';
@ -59,6 +66,9 @@ export class ExportCredentialsCommand extends Command {
};
async run() {
const logger = getLogger();
LoggerProxy.init(logger);
const { flags } = this.parse(ExportCredentialsCommand);
if (flags.backup) {
@ -68,41 +78,42 @@ export class ExportCredentialsCommand extends Command {
}
if (!flags.all && !flags.id) {
GenericHelpers.logOutput(`Either option "--all" or "--id" have to be set!`);
console.info(`Either option "--all" or "--id" have to be set!`);
return;
}
if (flags.all && flags.id) {
GenericHelpers.logOutput(`You should either use "--all" or "--id" but never both!`);
console.info(`You should either use "--all" or "--id" but never both!`);
return;
}
if (flags.separate) {
try {
if (!flags.output) {
GenericHelpers.logOutput(`You must inform an output directory via --output when using --separate`);
console.info(`You must inform an output directory via --output when using --separate`);
return;
}
if (fs.existsSync(flags.output)) {
if (!fs.lstatSync(flags.output).isDirectory()) {
GenericHelpers.logOutput(`The paramenter --output must be a directory`);
console.info(`The paramenter --output must be a directory`);
return;
}
} else {
fs.mkdirSync(flags.output, { recursive: true });
}
} catch (e) {
console.error('\nFILESYSTEM ERROR');
console.log('====================================');
console.error(e.message);
console.error(e.stack);
console.error('Aborting execution as a filesystem error has been encountered while creating the output directory. See log messages for details.');
logger.error('\nFILESYSTEM ERROR');
logger.info('====================================');
logger.error(e.message);
logger.error(e.stack);
this.exit(1);
}
} else if (flags.output) {
if (fs.existsSync(flags.output)) {
if (fs.lstatSync(flags.output).isDirectory()) {
GenericHelpers.logOutput(`The paramenter --output must be a writeble file`);
console.info(`The paramenter --output must be a writeble file`);
return;
}
}
@ -143,18 +154,21 @@ export class ExportCredentialsCommand extends Command {
const filename = (flags.output!.endsWith(path.sep) ? flags.output! : flags.output + path.sep) + credentials[i].id + '.json';
fs.writeFileSync(filename, fileContents);
}
console.log('Successfully exported', i, 'credentials.');
console.info(`Successfully exported ${i} credentials.`);
} else {
const fileContents = JSON.stringify(credentials, null, flags.pretty ? 2 : undefined);
if (flags.output) {
fs.writeFileSync(flags.output!, fileContents);
console.log('Successfully exported', credentials.length, 'credentials.');
console.info(`Successfully exported ${credentials.length} credentials.`);
} else {
console.log(fileContents);
console.info(fileContents);
}
}
// Force exit as process won't exit using MySQL or Postgres.
process.exit(0);
} catch (error) {
this.error(error.message);
console.error('Error exporting credentials. See log messages for details.');
logger.error(error.message);
this.exit(1);
}
}

View file

@ -9,9 +9,16 @@ import {
import {
Db,
GenericHelpers,
} from '../../src';
import {
getLogger,
} from '../../src/Logger';
import {
LoggerProxy,
} from 'n8n-workflow';
import * as fs from 'fs';
import * as path from 'path';
@ -49,6 +56,9 @@ export class ExportWorkflowsCommand extends Command {
};
async run() {
const logger = getLogger();
LoggerProxy.init(logger);
const { flags } = this.parse(ExportWorkflowsCommand);
if (flags.backup) {
@ -58,41 +68,42 @@ export class ExportWorkflowsCommand extends Command {
}
if (!flags.all && !flags.id) {
GenericHelpers.logOutput(`Either option "--all" or "--id" have to be set!`);
console.info(`Either option "--all" or "--id" have to be set!`);
return;
}
if (flags.all && flags.id) {
GenericHelpers.logOutput(`You should either use "--all" or "--id" but never both!`);
console.info(`You should either use "--all" or "--id" but never both!`);
return;
}
if (flags.separate) {
try {
if (!flags.output) {
GenericHelpers.logOutput(`You must inform an output directory via --output when using --separate`);
console.info(`You must inform an output directory via --output when using --separate`);
return;
}
if (fs.existsSync(flags.output)) {
if (!fs.lstatSync(flags.output).isDirectory()) {
GenericHelpers.logOutput(`The paramenter --output must be a directory`);
console.info(`The paramenter --output must be a directory`);
return;
}
} else {
fs.mkdirSync(flags.output, { recursive: true });
}
} catch (e) {
console.error('\nFILESYSTEM ERROR');
console.log('====================================');
console.error(e.message);
console.error(e.stack);
console.error('Aborting execution as a filesystem error has been encountered while creating the output directory. See log messages for details.');
logger.error('\nFILESYSTEM ERROR');
logger.info('====================================');
logger.error(e.message);
logger.error(e.stack);
this.exit(1);
}
} else if (flags.output) {
if (fs.existsSync(flags.output)) {
if (fs.lstatSync(flags.output).isDirectory()) {
GenericHelpers.logOutput(`The paramenter --output must be a writeble file`);
console.info(`The paramenter --output must be a writeble file`);
return;
}
}
@ -119,18 +130,21 @@ export class ExportWorkflowsCommand extends Command {
const filename = (flags.output!.endsWith(path.sep) ? flags.output! : flags.output + path.sep) + workflows[i].id + '.json';
fs.writeFileSync(filename, fileContents);
}
console.log('Successfully exported', i, 'workflows.');
console.info(`Successfully exported ${i} workflows.`);
} else {
const fileContents = JSON.stringify(workflows, null, flags.pretty ? 2 : undefined);
if (flags.output) {
fs.writeFileSync(flags.output!, fileContents);
console.log('Successfully exported', workflows.length, workflows.length === 1 ? 'workflow.' : 'workflows.');
console.info(`Successfully exported ${workflows.length} ${workflows.length === 1 ? 'workflow.' : 'workflows.'}`);
} else {
console.log(fileContents);
console.info(fileContents);
}
}
// Force exit as process won't exit using MySQL or Postgres.
process.exit(0);
} catch (error) {
this.error(error.message);
console.error('Error exporting workflows. See log messages for details.');
logger.error(error.message);
this.exit(1);
}
}

View file

@ -10,9 +10,16 @@ import {
import {
Db,
GenericHelpers,
} from '../../src';
import {
getLogger,
} from '../../src/Logger';
import {
LoggerProxy,
} from 'n8n-workflow';
import * as fs from 'fs';
import * as glob from 'glob-promise';
import * as path from 'path';
@ -37,17 +44,20 @@ export class ImportCredentialsCommand extends Command {
};
async run() {
const logger = getLogger();
LoggerProxy.init(logger);
const { flags } = this.parse(ImportCredentialsCommand);
if (!flags.input) {
GenericHelpers.logOutput(`An input file or directory with --input must be provided`);
console.info(`An input file or directory with --input must be provided`);
return;
}
if (flags.separate) {
if (fs.existsSync(flags.input)) {
if (!fs.lstatSync(flags.input).isDirectory()) {
GenericHelpers.logOutput(`The paramenter --input must be a directory`);
console.info(`The paramenter --input must be a directory`);
return;
}
}
@ -89,9 +99,11 @@ export class ImportCredentialsCommand extends Command {
await Db.collections.Credentials!.save(fileContents[i]);
}
}
console.log('Successfully imported', i, 'credentials.');
console.info(`Successfully imported ${i} ${i === 1 ? 'credential.' : 'credentials.'}`);
process.exit(0);
} catch (error) {
this.error(error.message);
console.error('An error occurred while exporting credentials. See log messages for details.');
logger.error(error.message);
this.exit(1);
}
}

View file

@ -5,9 +5,16 @@ import {
import {
Db,
GenericHelpers,
} from '../../src';
import {
getLogger,
} from '../../src/Logger';
import {
LoggerProxy,
} from 'n8n-workflow';
import * as fs from 'fs';
import * as glob from 'glob-promise';
import * as path from 'path';
@ -32,17 +39,20 @@ export class ImportWorkflowsCommand extends Command {
};
async run() {
const logger = getLogger();
LoggerProxy.init(logger);
const { flags } = this.parse(ImportWorkflowsCommand);
if (!flags.input) {
GenericHelpers.logOutput(`An input file or directory with --input must be provided`);
console.info(`An input file or directory with --input must be provided`);
return;
}
if (flags.separate) {
if (fs.existsSync(flags.input)) {
if (!fs.lstatSync(flags.input).isDirectory()) {
GenericHelpers.logOutput(`The paramenter --input must be a directory`);
console.info(`The paramenter --input must be a directory`);
return;
}
}
@ -69,9 +79,11 @@ export class ImportWorkflowsCommand extends Command {
}
}
console.log('Successfully imported', i, i === 1 ? 'workflow.' : 'workflows.');
console.info(`Successfully imported ${i} ${i === 1 ? 'workflow.' : 'workflows.'}`);
process.exit(0);
} catch (error) {
this.error(error.message);
console.error('An error occurred while exporting workflows. See log messages for details.');
logger.error(error.message);
this.exit(1);
}
}

View file

@ -25,11 +25,17 @@ import {
} from '../src';
import { IDataObject } from 'n8n-workflow';
import {
getLogger,
} from '../src/Logger';
import {
LoggerProxy,
} from 'n8n-workflow';
let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined;
let processExistCode = 0;
export class Start extends Command {
static description = 'Starts n8n. Makes Web-UI available and starts active workflows';
@ -71,7 +77,7 @@ export class Start extends Command {
* get removed.
*/
static async stopProcess() {
console.log(`\nStopping n8n...`);
getLogger().info('\nStopping n8n...');
try {
const externalHooks = ExternalHooks();
@ -132,13 +138,18 @@ export class Start extends Command {
// Wrap that the process does not close but we can still use async
await (async () => {
try {
const logger = getLogger();
LoggerProxy.init(logger);
logger.info('Initializing n8n process');
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch((error: Error) => {
console.error(`There was an error initializing DB: ${error.message}`);
logger.error(`There was an error initializing DB: "${error.message}"`);
processExistCode = 1;
// @ts-ignore
process.emit('SIGINT');
process.exit(1);
});
// Make sure the settings exist
@ -184,7 +195,7 @@ export class Start extends Command {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
console.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + '. Exiting process.');
logger.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + ". Exiting process.");
process.exit(1);
}
}
@ -213,9 +224,9 @@ export class Start extends Command {
redis.on('error', (error) => {
if (error.toString().includes('ECONNREFUSED') === true) {
console.warn('Redis unavailable - trying to reconnect...');
logger.warn('Redis unavailable - trying to reconnect...');
} else {
console.warn('Error with Redis: ', error);
logger.warn('Error with Redis: ', error);
}
});
}

View file

@ -11,6 +11,13 @@ import {
GenericHelpers,
} from '../../src';
import {
getLogger,
} from '../../src/Logger';
import {
LoggerProxy,
} from 'n8n-workflow';
export class UpdateWorkflowCommand extends Command {
static description = '\Update workflows';
@ -34,25 +41,28 @@ export class UpdateWorkflowCommand extends Command {
};
async run() {
const logger = getLogger();
LoggerProxy.init(logger);
const { flags } = this.parse(UpdateWorkflowCommand);
if (!flags.all && !flags.id) {
GenericHelpers.logOutput(`Either option "--all" or "--id" have to be set!`);
console.info(`Either option "--all" or "--id" have to be set!`);
return;
}
if (flags.all && flags.id) {
GenericHelpers.logOutput(`Either something else on top should be "--all" or "--id" can be set never both!`);
console.info(`Either something else on top should be "--all" or "--id" can be set never both!`);
return;
}
const updateQuery: IDataObject = {};
if (flags.active === undefined) {
GenericHelpers.logOutput(`No update flag like "--active=true" has been set!`);
console.info(`No update flag like "--active=true" has been set!`);
return;
} else {
if (!['false', 'true'].includes(flags.active)) {
GenericHelpers.logOutput(`Valid values for flag "--active" are only "false" or "true"!`);
console.info(`Valid values for flag "--active" are only "false" or "true"!`);
return;
}
updateQuery.active = flags.active === 'true';
@ -63,20 +73,21 @@ export class UpdateWorkflowCommand extends Command {
const findQuery: IDataObject = {};
if (flags.id) {
console.log(`Deactivating workflow with ID: ${flags.id}`);
console.info(`Deactivating workflow with ID: ${flags.id}`);
findQuery.id = flags.id;
} else {
console.log('Deactivating all workflows');
console.info('Deactivating all workflows');
findQuery.active = true;
}
await Db.collections.Workflow!.update(findQuery, updateQuery);
console.log('Done');
console.info('Done');
} catch (e) {
console.error('\nGOT ERROR');
console.log('====================================');
console.error(e.message);
console.error(e.stack);
console.error('Error updating database. See log messages for details.');
logger.error('\nGOT ERROR');
logger.info('====================================');
logger.error(e.message);
logger.error(e.stack);
this.exit(1);
}

View file

@ -20,6 +20,13 @@ import {
} from '../src';
import { IDataObject } from 'n8n-workflow';
import {
getLogger,
} from '../src/Logger';
import {
LoggerProxy,
} from 'n8n-workflow';
let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined;
let processExistCode = 0;
@ -42,7 +49,7 @@ export class Webhook extends Command {
* get removed.
*/
static async stopProcess() {
console.log(`\nStopping n8n...`);
LoggerProxy.info(`\nStopping n8n...`);
try {
const externalHooks = ExternalHooks();
@ -72,7 +79,7 @@ export class Webhook extends Command {
let count = 0;
while (executingWorkflows.length !== 0) {
if (count++ % 4 === 0) {
console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`);
LoggerProxy.info(`Waiting for ${executingWorkflows.length} active executions to finish...`);
}
await new Promise((resolve) => {
setTimeout(resolve, 500);
@ -81,7 +88,7 @@ export class Webhook extends Command {
}
} catch (error) {
console.error('There was an error shutting down n8n.', error);
LoggerProxy.error('There was an error shutting down n8n.', error);
}
process.exit(processExistCode);
@ -89,6 +96,9 @@ export class Webhook extends Command {
async run() {
const logger = getLogger();
LoggerProxy.init(logger);
// Make sure that n8n shuts down gracefully if possible
process.on('SIGTERM', Webhook.stopProcess);
process.on('SIGINT', Webhook.stopProcess);
@ -116,11 +126,12 @@ export class Webhook extends Command {
try {
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch(error => {
console.error(`There was an error initializing DB: ${error.message}`);
logger.error(`There was an error initializing DB: "${error.message}"`);
processExistCode = 1;
// @ts-ignore
process.emit('SIGINT');
process.exit(1);
});
// Make sure the settings exist
@ -166,7 +177,7 @@ export class Webhook extends Command {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
console.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + '. Exiting process.');
logger.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + ". Exiting process.");
process.exit(1);
}
}
@ -195,9 +206,9 @@ export class Webhook extends Command {
redis.on('error', (error) => {
if (error.toString().includes('ECONNREFUSED') === true) {
console.warn('Redis unavailable - trying to reconnect...');
logger.warn('Redis unavailable - trying to reconnect...');
} else {
console.warn('Error with Redis: ', error);
logger.warn('Error with Redis: ', error);
}
});
}
@ -209,14 +220,16 @@ export class Webhook extends Command {
await activeWorkflowRunner.initWebhooks();
const editorUrl = GenericHelpers.getBaseUrl();
this.log('Webhook listener waiting for requests.');
console.info('Webhook listener waiting for requests.');
} catch (error) {
this.error(`There was an error: ${error.message}`);
console.error('Exiting due to error. See log message for details.');
logger.error(`Webhook process cannot continue. "${error.message}"`);
processExistCode = 1;
// @ts-ignore
process.emit('SIGINT');
process.exit(1);
}
})();
}

View file

@ -37,6 +37,14 @@ import {
WorkflowExecuteAdditionalData,
} from '../src';
import {
getLogger,
} from '../src/Logger';
import {
LoggerProxy,
} from 'n8n-workflow';
import * as config from '../config';
import * as Bull from 'bull';
import * as Queue from '../src/Queue';
@ -71,7 +79,7 @@ export class Worker extends Command {
* get removed.
*/
static async stopProcess() {
console.log(`\nStopping n8n...`);
LoggerProxy.info(`Stopping n8n...`);
// Stop accepting new jobs
Worker.jobQueue.pause(true);
@ -95,7 +103,7 @@ export class Worker extends Command {
while (Object.keys(Worker.runningJobs).length !== 0) {
if (count++ % 4 === 0) {
const waitLeft = Math.ceil((stopTime - new Date().getTime()) / 1000);
console.log(`Waiting for ${Object.keys(Worker.runningJobs).length} active executions to finish... (wait ${waitLeft} more seconds)`);
LoggerProxy.info(`Waiting for ${Object.keys(Worker.runningJobs).length} active executions to finish... (wait ${waitLeft} more seconds)`);
}
await new Promise((resolve) => {
setTimeout(resolve, 500);
@ -103,7 +111,7 @@ export class Worker extends Command {
}
} catch (error) {
console.error('There was an error shutting down n8n.', error);
LoggerProxy.error('There was an error shutting down n8n.', error);
}
process.exit(Worker.processExistCode);
@ -113,7 +121,7 @@ export class Worker extends Command {
const jobData = job.data as IBullJobData;
const executionDb = await Db.collections.Execution!.findOne(jobData.executionId) as IExecutionFlattedDb;
const currentExecutionDb = ResponseHelper.unflattenExecutionData(executionDb) as IExecutionResponse;
console.log(`Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${jobData.executionId})`);
LoggerProxy.info(`Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${jobData.executionId})`);
let staticData = currentExecutionDb.workflowData!.staticData;
if (jobData.loadStaticData === true) {
@ -170,7 +178,10 @@ export class Worker extends Command {
}
async run() {
console.log('Starting n8n worker...');
const logger = getLogger();
LoggerProxy.init(logger);
console.info('Starting n8n worker...');
// Make sure that n8n shuts down gracefully if possible
process.on('SIGTERM', Worker.stopProcess);
@ -183,11 +194,12 @@ export class Worker extends Command {
// Start directly with the init of the database to improve startup time
const startDbInitPromise = Db.init().catch(error => {
console.error(`There was an error initializing DB: ${error.message}`);
logger.error(`There was an error initializing DB: "${error.message}"`);
Worker.processExistCode = 1;
// @ts-ignore
process.emit('SIGINT');
process.exit(1);
});
// Make sure the settings exist
@ -221,10 +233,10 @@ export class Worker extends Command {
const versions = await GenericHelpers.getVersions();
console.log('\nn8n worker is now ready');
console.log(` * Version: ${versions.cli}`);
console.log(` * Concurrency: ${flags.concurrency}`);
console.log('');
console.info('\nn8n worker is now ready');
console.info(` * Version: ${versions.cli}`);
console.info(` * Concurrency: ${flags.concurrency}`);
console.info('');
Worker.jobQueue.on('global:progress', (jobId, progress) => {
// Progress of a job got updated which does get used
@ -252,27 +264,28 @@ export class Worker extends Command {
cumulativeTimeout += now - lastTimer;
lastTimer = now;
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
console.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + '. Exiting process.');
logger.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + ". Exiting process.");
process.exit(1);
}
}
console.warn('Redis unavailable - trying to reconnect...');
logger.warn('Redis unavailable - trying to reconnect...');
} else if (error.toString().includes('Error initializing Lua scripts') === true) {
// This is a non-recoverable error
// Happens when worker starts and Redis is unavailable
// Even if Redis comes back online, worker will be zombie
console.error('Error initializing worker.');
logger.error('Error initializing worker.');
process.exit(2);
} else {
console.error('Error from queue: ', error);
logger.error('Error from queue: ', error);
}
});
} catch (error) {
this.error(`There was an error: ${error.message}`);
logger.error(`Worker process cannot continue. "${error.message}"`);
Worker.processExistCode = 1;
// @ts-ignore
process.emit('SIGINT');
process.exit(1);
}
})();

View file

@ -1,5 +1,7 @@
import * as convict from 'convict';
import * as dotenv from 'dotenv';
import * as path from 'path';
import * as core from 'n8n-core';
dotenv.config();
@ -572,6 +574,41 @@ const config = convict({
},
},
logs: {
level: {
doc: 'Log output level. Options are error, warn, info, verbose and debug.',
format: String,
default: 'info',
env: 'N8N_LOG_LEVEL',
},
output: {
doc: 'Where to output logs. Options are: console, file. Multiple can be separated by comma (",")',
format: String,
default: 'console',
env: 'N8N_LOG_OUTPUT',
},
file: {
fileCountMax: {
doc: 'Maximum number of files to keep.',
format: Number,
default: 100,
env: 'N8N_LOG_FILE_COUNT_MAX',
},
fileSizeMax: {
doc: 'Maximum size for each log file in MB.',
format: Number,
default: 16,
env: 'N8N_LOG_FILE_SIZE_MAX',
},
location: {
doc: 'Log file location; only used if log output is set to file.',
format: String,
default: path.join(core.UserSettings.getUserN8nFolderPath(), 'logs/n8n.log'),
env: 'N8N_LOG_FILE_LOCATION',
},
},
},
});
// Overwrite default configuration with settings which got defined in

View file

@ -87,6 +87,7 @@
"body-parser": "^1.18.3",
"body-parser-xml": "^1.1.0",
"bull": "^3.19.0",
"callsites": "^3.1.0",
"client-oauth2": "^4.2.5",
"compression": "^1.7.4",
"connect-history-api-fallback": "^1.6.0",

View file

@ -35,6 +35,9 @@ import {
} from 'n8n-workflow';
import * as express from 'express';
import {
LoggerProxy as Logger,
} from 'n8n-workflow';
export class ActiveWorkflowRunner {
private activeWorkflows: ActiveWorkflows | null = null;
@ -43,7 +46,6 @@ export class ActiveWorkflowRunner {
[key: string]: IActivationError;
} = {};
async init() {
// Get the active workflows from database
@ -59,20 +61,24 @@ export class ActiveWorkflowRunner {
this.activeWorkflows = new ActiveWorkflows();
if (workflowsData.length !== 0) {
console.log('\n ================================');
console.log(' Start Active Workflows:');
console.log(' ================================');
console.info(' ================================');
console.info(' Start Active Workflows:');
console.info(' ================================');
for (const workflowData of workflowsData) {
console.log(` - ${workflowData.name}`);
Logger.debug(`Initializing active workflow "${workflowData.name}" (startup)`, {workflowName: workflowData.name, workflowId: workflowData.id});
try {
await this.add(workflowData.id.toString(), 'init', workflowData);
Logger.verbose(`Successfully started workflow "${workflowData.name}"`, {workflowName: workflowData.name, workflowId: workflowData.id});
console.log(` => Started`);
} catch (error) {
console.log(` => ERROR: Workflow could not be activated:`);
console.log(` ${error.message}`);
Logger.error(`Unable to initialize workflow "${workflowData.name}" (startup)`, {workflowName: workflowData.name, workflowId: workflowData.id});
}
}
Logger.verbose('Finished initializing active workflows (startup)');
}
}
@ -88,6 +94,7 @@ export class ActiveWorkflowRunner {
*/
async removeAll(): Promise<void> {
const activeWorkflowId: string[] = [];
Logger.verbose('Call to remove all active workflows received (removeAll)');
if (this.activeWorkflows !== null) {
// TODO: This should be renamed!
@ -117,6 +124,7 @@ export class ActiveWorkflowRunner {
* @memberof ActiveWorkflowRunner
*/
async executeWebhook(httpMethod: WebhookHttpMethod, path: string, req: express.Request, res: express.Response): Promise<IResponseCallbackData> {
Logger.debug(`Received webhoook "${httpMethod}" for path "${path}"`);
if (this.activeWorkflows === null) {
throw new ResponseHelper.ResponseError('The "activeWorkflows" instance did not get initialized yet.', 404, 404);
}
@ -437,6 +445,7 @@ export class ActiveWorkflowRunner {
return ((workflow: Workflow, node: INode) => {
const returnFunctions = NodeExecuteFunctions.getExecutePollFunctions(workflow, node, additionalData, mode, activation);
returnFunctions.__emit = (data: INodeExecutionData[][]): void => {
Logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`);
this.runWorkflow(workflowData, node, data, additionalData, mode);
};
return returnFunctions;
@ -458,6 +467,7 @@ export class ActiveWorkflowRunner {
return ((workflow: Workflow, node: INode) => {
const returnFunctions = NodeExecuteFunctions.getExecuteTriggerFunctions(workflow, node, additionalData, mode, activation);
returnFunctions.emit = (data: INodeExecutionData[][]): void => {
Logger.debug(`Received trigger for workflow "${workflow.name}"`);
WorkflowHelpers.saveStaticData(workflow);
this.runWorkflow(workflowData, node, data, additionalData, mode).catch((err) => console.error(err));
};
@ -492,6 +502,7 @@ export class ActiveWorkflowRunner {
const canBeActivated = workflowInstance.checkIfWorkflowCanBeActivated(['n8n-nodes-base.start']);
if (canBeActivated === false) {
Logger.error(`Unable to activate workflow "${workflowData.name}"`);
throw new Error(`The workflow can not be activated because it does not contain any nodes which could start the workflow. Only workflows which have trigger or webhook nodes can be activated.`);
}
@ -507,6 +518,7 @@ export class ActiveWorkflowRunner {
if (workflowInstance.getTriggerNodes().length !== 0
|| workflowInstance.getPollNodes().length !== 0) {
await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, mode, activation, getTriggerFunctions, getPollFunctions);
Logger.info(`Successfully activated workflow "${workflowData.name}"`);
}
if (this.activationErrors[workflowId] !== undefined) {

114
packages/cli/src/Logger.ts Normal file
View file

@ -0,0 +1,114 @@
import config = require('../config');
import * as winston from 'winston';
import {
IDataObject,
ILogger,
LogTypes,
} from 'n8n-workflow';
import * as callsites from 'callsites';
import { basename } from 'path';
class Logger implements ILogger {
private logger: winston.Logger;
constructor() {
const level = config.get('logs.level');
const output = (config.get('logs.output') as string).split(',').map(output => output.trim());
this.logger = winston.createLogger({
level,
});
if (output.includes('console')) {
let format: winston.Logform.Format;
if (['debug', 'verbose'].includes(level)) {
format = winston.format.combine(
winston.format.metadata(),
winston.format.timestamp(),
winston.format.colorize({ all: true }),
winston.format.printf(({ level, message, timestamp, metadata }) => {
return `${timestamp} | ${level.padEnd(18)} | ${message}` + (Object.keys(metadata).length ? ` ${JSON.stringify(metadata)}` : '');
}) as winston.Logform.Format
);
} else {
format = winston.format.printf(({ message }) => message) as winston.Logform.Format;
}
this.logger.add(
new winston.transports.Console({
format,
})
);
}
if (output.includes('file')) {
const fileLogFormat = winston.format.combine(
winston.format.timestamp(),
winston.format.metadata(),
winston.format.json()
);
this.logger.add(
new winston.transports.File({
filename: config.get('logs.file.location'),
format: fileLogFormat,
maxsize: config.get('logs.file.fileSizeMax') as number * 1048576, // config * 1mb
maxFiles: config.get('logs.file.fileCountMax'),
})
);
}
}
log(type: LogTypes, message: string, meta: object = {}) {
const callsite = callsites();
// We are using the third array element as the structure is as follows:
// [0]: this file
// [1]: Should be LoggerProxy
// [2]: Should point to the caller.
// Note: getting line number is useless because at this point
// We are in runtime, so it means we are looking at compiled js files
const logDetails = {} as IDataObject;
if (callsite[2] !== undefined) {
logDetails.file = basename(callsite[2].getFileName() || '');
const functionName = callsite[2].getFunctionName();
if (functionName) {
logDetails.function = functionName;
}
}
this.logger.log(type, message, {...meta, ...logDetails});
}
// Convenience methods below
debug(message: string, meta: object = {}) {
this.log('debug', message, meta);
}
info(message: string, meta: object = {}) {
this.log('info', message, meta);
}
error(message: string, meta: object = {}) {
this.log('error', message, meta);
}
verbose(message: string, meta: object = {}) {
this.log('verbose', message, meta);
}
warn(message: string, meta: object = {}) {
this.log('warn', message, meta);
}
}
let activeLoggerInstance: Logger | undefined;
export function getLogger() {
if (activeLoggerInstance === undefined) {
activeLoggerInstance = new Logger();
}
return activeLoggerInstance;
}

View file

@ -7,6 +7,10 @@ import {
IPushDataType,
} from '.';
import {
LoggerProxy as Logger,
} from 'n8n-workflow';
export class Push {
private channel: sseChannel;
private connections: {
@ -24,6 +28,7 @@ export class Push {
this.channel.on('disconnect', (channel: string, res: express.Response) => {
if (res.req !== undefined) {
Logger.debug(`Remove editor-UI session`, { sessionId: res.req.query.sessionId });
delete this.connections[res.req.query.sessionId as string];
}
});
@ -39,6 +44,8 @@ export class Push {
* @memberof Push
*/
add(sessionId: string, req: express.Request, res: express.Response) {
Logger.debug(`Add editor-UI session`, { sessionId });
if (this.connections[sessionId] !== undefined) {
// Make sure to remove existing connection with the same session
// id if one exists already
@ -64,11 +71,12 @@ export class Push {
send(type: IPushDataType, data: any, sessionId?: string) { // tslint:disable-line:no-any
if (sessionId !== undefined && this.connections[sessionId] === undefined) {
// TODO: Log that properly!
console.error(`The session "${sessionId}" is not registred.`);
Logger.error(`The session "${sessionId}" is not registred.`, { sessionId });
return;
}
Logger.debug(`Send data of type "${type}" to editor-UI`, { dataType: type, sessionId });
const sendData: IPushData = {
type,
data,

View file

@ -30,13 +30,14 @@ import {
IWebhookData,
IWebhookResponseData,
IWorkflowExecuteAdditionalData,
LoggerProxy as Logger,
NodeHelpers,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
const activeExecutions = ActiveExecutions.getInstance();
const activeExecutions = ActiveExecutions.getInstance();
/**
* Returns all the webhooks which should be created for the give workflow
@ -286,6 +287,8 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] {
const workflowRunner = new WorkflowRunner();
const executionId = await workflowRunner.run(runData, true, !didSendResponse);
Logger.verbose(`Started execution of workflow "${workflow.name}" from webhook with execution ID ${executionId}`, { executionId });
// Get a promise which resolves when the workflow did execute and send then response
const executePromise = activeExecutions.getPostExecutePromise(executionId) as Promise<IExecutionDb | undefined>;
executePromise.then((data) => {

View file

@ -37,6 +37,7 @@ import {
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
LoggerProxy as Logger,
Workflow,
WorkflowExecuteMode,
WorkflowHooks,
@ -44,11 +45,10 @@ import {
import * as config from '../config';
import { LessThanOrEqual } from "typeorm";
import { LessThanOrEqual } from 'typeorm';
const ERROR_TRIGGER_TYPE = config.get('nodes.errorTriggerType') as string;
/**
* Checks if there was an error and if errorWorkflow or a trigger is defined. If so it collects
* all the data and executes it
@ -85,9 +85,11 @@ function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mo
// Run the error workflow
// To avoid an infinite loop do not run the error workflow again if the error-workflow itself failed and it is its own error-workflow.
if (workflowData.settings !== undefined && workflowData.settings.errorWorkflow && !(mode === 'error' && workflowData.id && workflowData.settings.errorWorkflow.toString() === workflowData.id.toString())) {
Logger.verbose(`Start external error workflow`, { executionId: this.executionId, errorWorkflowId: workflowData.settings.errorWorkflow.toString(), workflowId: this.workflowData.id });
// If a specific error workflow is set run only that one
WorkflowHelpers.executeErrorWorkflow(workflowData.settings.errorWorkflow as string, workflowErrorData);
} else if (mode !== 'error' && workflowData.id !== undefined && workflowData.nodes.some((node) => node.type === ERROR_TRIGGER_TYPE)) {
Logger.verbose(`Start internal error workflow`, { executionId: this.executionId, workflowId: this.workflowData.id });
// If the workflow contains
WorkflowHelpers.executeErrorWorkflow(workflowData.id.toString(), workflowErrorData);
}
@ -102,6 +104,8 @@ function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mo
let throttling = false;
function pruneExecutionData(): void {
if (!throttling) {
Logger.verbose('Pruning execution data from database');
throttling = true;
const timeout = config.get('executions.pruneDataTimeout') as number; // in seconds
const maxAge = config.get('executions.pruneDataMaxAge') as number; // in h
@ -133,6 +137,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
if (this.sessionId === undefined) {
return;
}
Logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
const pushInstance = Push.getInstance();
pushInstance.send('nodeExecuteBefore', {
@ -147,6 +152,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
if (this.sessionId === undefined) {
return;
}
Logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
const pushInstance = Push.getInstance();
pushInstance.send('nodeExecuteAfter', {
@ -158,6 +164,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
],
workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> {
Logger.debug(`Executing hook (hookFunctionsPush)`, { executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
// Push data to session which started the workflow
if (this.sessionId === undefined) {
return;
@ -168,13 +175,14 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
mode: this.mode,
startedAt: new Date(),
retryOf: this.retryOf,
workflowId: this.workflowData.id as string,
workflowId: this.workflowData.id, sessionId: this.sessionId as string,
workflowName: this.workflowData.name,
}, this.sessionId);
},
],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
Logger.debug(`Executing hook (hookFunctionsPush)`, { executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
// Push data to session which started the workflow
if (this.sessionId === undefined) {
return;
@ -195,6 +203,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
};
// Push data to editor-ui once workflow finished
Logger.debug(`Save execution progress to database for execution ID ${this.executionId} `, { executionId: this.executionId, workflowId: this.workflowData.id });
// TODO: Look at this again
const sendData: IPushDataExecutionFinished = {
executionId: this.executionId,
@ -232,6 +241,8 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx
}
try {
Logger.debug(`Save execution progress to database for execution ID ${this.executionId} `, { executionId: this.executionId, nodeName });
const execution = await Db.collections.Execution!.findOne(this.executionId);
if (execution === undefined) {
@ -286,7 +297,7 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx
// For busy machines, we may get "Database is locked" errors.
// We do this to prevent crashes and executions ending in `unknown` state.
console.log(`Failed saving execution progress to database for execution ID ${this.executionId}`, err);
Logger.error(`Failed saving execution progress to database for execution ID ${this.executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`, { ...err, executionId: this.executionId, sessionId: this.sessionId, workflowId: this.workflowData.id });
}
},
@ -307,6 +318,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
workflowExecuteBefore: [],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
Logger.debug(`Executing hook (hookFunctionsSave)`, { executionId: this.executionId, workflowId: this.workflowData.id });
// Prune old execution data
if (config.get('executions.pruneData')) {
@ -321,8 +333,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
try {
await WorkflowHelpers.saveStaticDataById(this.workflowData.id as string, newStaticData);
} catch (e) {
// TODO: Add proper logging!
console.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: ${e.message}`);
Logger.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`, { executionId: this.executionId, workflowId: this.workflowData.id });
}
}
@ -375,6 +386,9 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
fullExecutionData.workflowId = this.workflowData.id.toString();
}
// Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here
Logger.debug(`Save execution data to database for execution ID ${this.executionId}`, { executionId: this.executionId, workflowId: this.workflowData.id });
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
// Save the Execution in DB
@ -420,8 +434,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
try {
await WorkflowHelpers.saveStaticDataById(this.workflowData.id as string, newStaticData);
} catch (e) {
// TODO: Add proper logging!
console.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: ${e.message}`);
Logger.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`, { sessionId: this.sessionId, workflowId: this.workflowData.id });
}
}

View file

@ -18,8 +18,8 @@ import {
IRunExecutionData,
ITaskData,
IWorkflowCredentials,
Workflow,
} from 'n8n-workflow';
LoggerProxy as Logger,
Workflow,} from 'n8n-workflow';
import * as config from '../config';
@ -86,7 +86,7 @@ export async function executeErrorWorkflow(workflowId: string, workflowErrorData
if (workflowData === undefined) {
// The error workflow could not be found
console.error(`ERROR: Calling Error Workflow for "${workflowErrorData.workflow.id}". Could not find error workflow "${workflowId}"`);
Logger.error(`Calling Error Workflow for "${workflowErrorData.workflow.id}". Could not find error workflow "${workflowId}"`, { workflowId });
return;
}
@ -105,7 +105,7 @@ export async function executeErrorWorkflow(workflowId: string, workflowErrorData
}
if (workflowStartNode === undefined) {
console.error(`ERROR: Calling Error Workflow for "${workflowErrorData.workflow.id}". Could not find "${ERROR_TRIGGER_TYPE}" in workflow "${workflowId}"`);
Logger.error(`Calling Error Workflow for "${workflowErrorData.workflow.id}". Could not find "${ERROR_TRIGGER_TYPE}" in workflow "${workflowId}"`);
return;
}
@ -153,7 +153,7 @@ export async function executeErrorWorkflow(workflowId: string, workflowErrorData
const workflowRunner = new WorkflowRunner();
await workflowRunner.run(runData);
} catch (error) {
console.error(`ERROR: Calling Error Workflow for "${workflowErrorData.workflow.id}": ${error.message}`);
Logger.error(`Calling Error Workflow for "${workflowErrorData.workflow.id}": "${error.message}"`, { workflowId: workflowErrorData.workflow.id });
}
}
@ -315,8 +315,7 @@ export async function saveStaticData(workflow: Workflow): Promise <void> {
await saveStaticDataById(workflow.id!, workflow.staticData);
workflow.staticData.__dataChanged = false;
} catch (e) {
// TODO: Add proper logging!
console.error(`There was a problem saving the workflow with id "${workflow.id}" to save changed staticData: ${e.message}`);
Logger.error(`There was a problem saving the workflow with id "${workflow.id}" to save changed staticData: "${e.message}"`, { workflowId: workflow.id });
}
}
}

View file

@ -29,6 +29,7 @@ import {
import {
ExecutionError,
IRun,
LoggerProxy as Logger,
Workflow,
WorkflowExecuteMode,
WorkflowHooks,
@ -177,20 +178,24 @@ export class WorkflowRunner {
// Register the active execution
const executionId = await this.activeExecutions.add(data, undefined);
Logger.verbose(`Execution for workflow ${data.workflowData.name} was assigned id ${executionId}`, {executionId});
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId, true);
let workflowExecution: PCancelable<IRun>;
if (data.executionData !== undefined) {
Logger.debug(`Execution ID ${executionId} had Execution data. Running with payload.`, {executionId});
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode, data.executionData);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) {
Logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {executionId});
// Execute all nodes
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode);
} else {
Logger.debug(`Execution ID ${executionId} is a partial execution.`, {executionId});
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.runPartialWorkflow(workflow, data.runData, data.startNodes, data.destinationNode);
@ -450,6 +455,7 @@ export class WorkflowRunner {
// Listen to data from the subprocess
subprocess.on('message', async (message: IProcessMessage) => {
Logger.debug(`Received child process message of type ${message.type} for execution ID ${executionId}.`, {executionId});
if (message.type === 'start') {
// Now that the execution actually started set the timeout again so that does not time out to early.
startedAt = new Date();
@ -491,11 +497,13 @@ export class WorkflowRunner {
// Also get informed when the processes does exit especially when it did crash or timed out
subprocess.on('exit', async (code, signal) => {
if (signal === 'SIGTERM'){
Logger.debug(`Subprocess for execution ID ${executionId} timed out.`, {executionId});
// Execution timed out and its process has been terminated
const timeoutError = new WorkflowOperationError('Workflow execution timed out!');
this.processError(timeoutError, startedAt, data.executionMode, executionId);
} else if (code !== 0) {
Logger.debug(`Subprocess for execution ID ${executionId} finished with error code ${code}.`, {executionId});
// Process did exit with error code, so something went wrong.
const executionError = new WorkflowOperationError('Workflow execution process did crash for an unknown reason!');

View file

@ -20,23 +20,29 @@ import {
ExecutionError,
IDataObject,
IExecuteWorkflowInfo,
ILogger,
INodeExecutionData,
INodeType,
INodeTypeData,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks,
LoggerProxy,
Workflow,
WorkflowHooks,
WorkflowOperationError,
} from 'n8n-workflow';
import {
getLogger,
} from '../src/Logger';
import * as config from '../config';
export class WorkflowRunnerProcess {
data: IWorkflowExecutionDataProcessWithExecution | undefined;
logger: ILogger;
startedAt = new Date();
workflow: Workflow | undefined;
workflowExecute: WorkflowExecute | undefined;
@ -57,7 +63,13 @@ export class WorkflowRunnerProcess {
process.on('SIGTERM', WorkflowRunnerProcess.stopProcess);
process.on('SIGINT', WorkflowRunnerProcess.stopProcess);
const logger = this.logger = getLogger();
LoggerProxy.init(logger);
this.data = inputData;
logger.verbose('Initializing n8n sub-process', { pid: process.pid, workflowId: this.data.workflowData.id });
let className: string;
let tempNode: INodeType;
let filePath: string;
@ -152,6 +164,8 @@ export class WorkflowRunnerProcess {
throw e;
}
await sendToParentProcess('finishExecution', { executionId, result });
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(result);
return returnData!.data!.main;
};
@ -187,12 +201,7 @@ export class WorkflowRunnerProcess {
parameters,
});
} catch (error) {
// TODO: Add proper logging
console.error(`There was a problem sending hook: "${hook}"`);
console.error('Parameters:');
console.error(parameters);
console.error('Error:');
console.error(error);
this.logger.error(`There was a problem sending hook: "${hook}"`, { parameters, error});
}
}

View file

@ -7,6 +7,7 @@ import {
IPollResponse,
ITriggerResponse,
IWorkflowExecuteAdditionalData,
LoggerProxy as Logger,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
@ -17,6 +18,7 @@ import {
IWorkflowData,
} from './';
export class ActiveWorkflows {
private workflowData: {
[key: string]: IWorkflowData;
@ -163,6 +165,7 @@ export class ActiveWorkflows {
// The trigger function to execute when the cron-time got reached
const executeTrigger = async () => {
Logger.info(`Polling trigger initiated for workflow "${workflow.name}"`, {workflowName: workflow.name, workflowId: workflow.id});
const pollResponse = await workflow.runPoll(node, pollFunctions);
if (pollResponse !== null) {

View file

@ -51,6 +51,9 @@ import * as requestPromise from 'request-promise-native';
import { createHmac } from 'crypto';
import { fromBuffer } from 'file-type';
import { lookup } from 'mime-types';
import {
LoggerProxy as Logger,
} from 'n8n-workflow';
const requestPromiseWithDefaults = requestPromise.defaults({
timeout: 300000, // 5 minutes
@ -188,8 +191,12 @@ export function requestOAuth2(this: IAllExecuteFunctions, credentialsType: strin
};
}
Logger.debug(`OAuth2 token for "${credentialsType}" used by node "${node.name}" expired. Should revalidate.`);
const newToken = await token.refresh(tokenRefreshOptions);
Logger.debug(`OAuth2 token for "${credentialsType}" used by node "${node.name}" has been renewed.`);
credentials.oauthTokenData = newToken.data;
// Find the name of the credentials
@ -201,6 +208,8 @@ export function requestOAuth2(this: IAllExecuteFunctions, credentialsType: strin
// Save the refreshed token
await additionalData.credentialsHelper.updateCredentials(name, credentialsType, credentials);
Logger.debug(`OAuth2 token for "${credentialsType}" used by node "${node.name}" has been saved to database successfully.`);
// Make the request again with the new token
const newRequestOptions = newToken.sign(requestOptions as clientOAuth2.RequestObject);

View file

@ -15,6 +15,7 @@ import {
ITaskDataConnections,
IWaitingForExecution,
IWorkflowExecuteAdditionalData,
LoggerProxy as Logger,
Workflow,
WorkflowExecuteMode,
WorkflowOperationError,
@ -482,6 +483,8 @@ export class WorkflowExecute {
* @memberof WorkflowExecute
*/
processRunExecutionData(workflow: Workflow): PCancelable<IRun> {
Logger.verbose('Workflow execution started', { workflowId: workflow.id });
const startedAt = new Date();
const workflowIssues = workflow.checkReadyForExecution();
@ -502,7 +505,6 @@ export class WorkflowExecute {
this.runExecutionData.startData = {};
}
let currentExecutionTry = '';
let lastExecutionTry = '';
@ -564,6 +566,7 @@ export class WorkflowExecute {
executionData = this.runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData;
executionNode = executionData.node;
Logger.debug(`Start processing node "${executionNode.name}"`, { node: executionNode.name, workflowId: workflow.id });
await this.executeHook('nodeExecuteBefore', [executionNode.name]);
// Get the index of the current run
@ -661,7 +664,9 @@ export class WorkflowExecute {
}
}
Logger.debug(`Running node "${executionNode.name}" started`, { node: executionNode.name, workflowId: workflow.id });
nodeSuccessData = await workflow.runNode(executionData.node, executionData.data, this.runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode);
Logger.debug(`Running node "${executionNode.name}" finished successfully`, { node: executionNode.name, workflowId: workflow.id });
if (nodeSuccessData === undefined) {
// Node did not get executed
@ -698,6 +703,8 @@ export class WorkflowExecute {
message: error.message,
stack: error.stack,
};
Logger.debug(`Running node "${executionNode.name}" finished with error`, { node: executionNode.name, workflowId: workflow.id });
}
}
@ -829,8 +836,10 @@ export class WorkflowExecute {
const fullRunData = this.getFullRunData(startedAt);
if (executionError !== undefined) {
Logger.verbose(`Workflow execution finished with error`, { error: executionError, workflowId: workflow.id });
fullRunData.data.resultData.error = executionError;
} else {
Logger.verbose(`Workflow execution finished successfully`, { workflowId: workflow.id });
fullRunData.finished = true;
}

View file

@ -1,8 +1,10 @@
import {
IConnections,
ILogger,
INode,
IRun,
LoggerProxy,
Workflow,
} from 'n8n-workflow';
@ -1154,9 +1156,19 @@ describe('WorkflowExecute', () => {
},
];
const fakeLogger = {
log: () => {},
debug: () => {},
verbose: () => {},
info: () => {},
warn: () => {},
error: () => {},
} as ILogger;
const executionMode = 'manual';
const nodeTypes = Helpers.NodeTypes();
LoggerProxy.init(fakeLogger);
for (const testData of tests) {
test(testData.description, async () => {

View file

@ -18,6 +18,10 @@ import * as moment from 'moment-timezone';
import * as jwt from 'jsonwebtoken';
import {
LoggerProxy as Logger
} from 'n8n-workflow';
export async function salesforceApiRequest(this: IExecuteFunctions | IExecuteSingleFunctions | ILoadOptionsFunctions, method: string, endpoint: string, body: any = {}, qs: IDataObject = {}, uri?: string, option: IDataObject = {}): Promise<any> { // tslint:disable-line:no-any
const authenticationMethod = this.getNodeParameter('authentication', 0, 'oAuth2') as string;
@ -29,6 +33,7 @@ export async function salesforceApiRequest(this: IExecuteFunctions | IExecuteSin
const response = await getAccessToken.call(this, credentials as IDataObject);
const { instance_url, access_token } = response;
const options = getOptions.call(this, method, (uri || endpoint), body, qs, instance_url as string);
Logger.debug(`Authentication for "Salesforce" node is using "jwt". Invoking URI ${options.uri}`);
options.headers!.Authorization = `Bearer ${access_token}`;
//@ts-ignore
return await this.helpers.request(options);
@ -38,6 +43,7 @@ export async function salesforceApiRequest(this: IExecuteFunctions | IExecuteSin
const credentials = this.getCredentials(credentialsType);
const subdomain = ((credentials!.accessTokenUrl as string).match(/https:\/\/(.+).salesforce\.com/) || [])[1];
const options = getOptions.call(this, method, (uri || endpoint), body, qs, `https://${subdomain}.salesforce.com`);
Logger.debug(`Authentication for "Salesforce" node is using "OAuth2". Invoking URI ${options.uri}`);
//@ts-ignore
return await this.helpers.requestOAuth2.call(this, credentialsType, options);
}

View file

@ -112,6 +112,10 @@ import {
userOperations,
} from './UserDescription';
import {
LoggerProxy as Logger,
} from 'n8n-workflow';
export class Salesforce implements INodeType {
description: INodeTypeDescription = {
displayName: 'Salesforce',
@ -923,6 +927,8 @@ export class Salesforce implements INodeType {
const resource = this.getNodeParameter('resource', 0) as string;
const operation = this.getNodeParameter('operation', 0) as string;
Logger.debug(`Running "Salesforce" node named "${this.getNode.name}" resource "${resource}" operation "${operation}"`);
for (let i = 0; i < items.length; i++) {
if (resource === 'lead') {
//https://developer.salesforce.com/docs/api-explorer/sobject/Lead/post-lead

View file

@ -764,6 +764,16 @@ export interface IWorkflowSettings {
[key: string]: IDataObject | string | number | boolean | undefined;
}
export type LogTypes = 'debug' | 'verbose' | 'info' | 'warn' | 'error';
export interface ILogger {
log: (type: LogTypes, message: string, meta?: object) => void;
debug: (message: string, meta?: object) => void;
verbose: (message: string, meta?: object) => void;
info: (message: string, meta?: object) => void;
warn: (message: string, meta?: object) => void;
error: (message: string, meta?: object) => void;
}
export interface IRawErrorObject {
[key: string]: string | object | number | boolean | undefined | null | string[] | object[] | number[] | boolean[];
}

View file

@ -0,0 +1,45 @@
import {
ILogger,
LogTypes,
} from './Interfaces';
let logger: ILogger | undefined;
export function init(loggerInstance: ILogger) {
logger = loggerInstance;
}
export function getInstance(): ILogger {
if (logger === undefined) {
throw new Error('LoggerProxy not initialized');
}
return logger;
}
export function log(type: LogTypes, message: string, meta: object = {}) {
getInstance().log(type, message, meta);
}
// Convenience methods below
export function debug(message: string, meta: object = {}) {
getInstance().log('debug', message, meta);
}
export function info(message: string, meta: object = {}) {
getInstance().log('info', message, meta);
}
export function error(message: string, meta: object = {}) {
getInstance().log('error', message, meta);
}
export function verbose(message: string, meta: object = {}) {
getInstance().log('verbose', message, meta);
}
export function warn(message: string, meta: object = {}) {
getInstance().log('warn', message, meta);
}

View file

@ -6,9 +6,11 @@ export * from './WorkflowDataProxy';
export * from './WorkflowErrors';
export * from './WorkflowHooks';
import * as LoggerProxy from './LoggerProxy';
import * as NodeHelpers from './NodeHelpers';
import * as ObservableObject from './ObservableObject';
export {
LoggerProxy,
NodeHelpers,
ObservableObject,
};