mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
* Added logging to n8n This commit adds logging to n8n using the Winston library. For now, this commit only allows logging to console (default behavior) or file (need to pass in config via environment variables). Other logging methods can be further implemented using hooks. These were skipped for now as it would require adding more dependencies. Logging level is notice by default, meaning no additional messages would be displayed at the moment. Logging level can be set to info or debug as well to enrich the generated logs. The ILogger interface was added to the workflow project as it would make it available for all other projects but the implementation was done on the cli project. * Lint fixes and logging level naming. Also fixed the way we use the logger as it was not working previously * Improvements to logging framework Using appropriate single quotes Improving the way the logger is declared * Improved naming for Log Types * Removed logger global variable, replacing it by a proxy * Add logging to CLI commands * Remove unused GenericHelpers * Changed back some messages to console instead of logger and added npm shortcuts for worker and webhook * Fix typos * Adding basic file rotation to logs as suggested by @mutdmour * Fixed linting issues * Correcting comment to correctly reflect space usage * Added settings for log files rotation * Correcting config type from String to Number * Changed default file settings to number To reflect previous changes to the type * Changed the way log messages are added to be called statically. Also minor naming improvements * Applying latest corrections sent by @ivov * ⚡ Some logging improvements * Saving logs to a folder inside n8n home instead of root * Fixed broken tests and linting * Changed some log messages to improve formatting * Adding quotes to names on log messages * Added execution and session IDs to logs. Also removed unnecessary line breaks * ⚡ Added file caller to log messages (#1657) This is done using callsites library which already existed in the project as another library's dependency. So in fact it does not add any new dependency. * Adding logs to help debug Salesforce node * ⚡ Add function name to logs and add more logs * ⚡ Improve some error messages * ⚡ Improve some more log messages * ⚡ Rename logging env variables to match others Co-authored-by: dali <servfrdali@yahoo.fr> Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
362 lines
12 KiB
TypeScript
362 lines
12 KiB
TypeScript
|
|
import {
|
|
CredentialsOverwrites,
|
|
CredentialTypes,
|
|
Db,
|
|
ExternalHooks,
|
|
IWorkflowExecuteProcess,
|
|
IWorkflowExecutionDataProcessWithExecution,
|
|
NodeTypes,
|
|
WorkflowExecuteAdditionalData,
|
|
WorkflowHelpers,
|
|
} from './';
|
|
|
|
import {
|
|
IProcessMessage,
|
|
WorkflowExecute,
|
|
} from 'n8n-core';
|
|
|
|
import {
|
|
ExecutionError,
|
|
IDataObject,
|
|
IExecuteWorkflowInfo,
|
|
ILogger,
|
|
INodeExecutionData,
|
|
INodeType,
|
|
INodeTypeData,
|
|
IRun,
|
|
ITaskData,
|
|
IWorkflowExecuteAdditionalData,
|
|
IWorkflowExecuteHooks,
|
|
LoggerProxy,
|
|
Workflow,
|
|
WorkflowHooks,
|
|
WorkflowOperationError,
|
|
} from 'n8n-workflow';
|
|
|
|
import {
|
|
getLogger,
|
|
} from '../src/Logger';
|
|
|
|
import * as config from '../config';
|
|
|
|
export class WorkflowRunnerProcess {
|
|
data: IWorkflowExecutionDataProcessWithExecution | undefined;
|
|
logger: ILogger;
|
|
startedAt = new Date();
|
|
workflow: Workflow | undefined;
|
|
workflowExecute: WorkflowExecute | undefined;
|
|
executionIdCallback: (executionId: string) => void | undefined;
|
|
childExecutions: {
|
|
[key: string]: IWorkflowExecuteProcess,
|
|
} = {};
|
|
|
|
static async stopProcess() {
|
|
setTimeout(() => {
|
|
// Attempt a graceful shutdown, giving executions 30 seconds to finish
|
|
process.exit(0);
|
|
}, 30000);
|
|
}
|
|
|
|
|
|
async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise<IRun> {
|
|
process.on('SIGTERM', WorkflowRunnerProcess.stopProcess);
|
|
process.on('SIGINT', WorkflowRunnerProcess.stopProcess);
|
|
|
|
const logger = this.logger = getLogger();
|
|
LoggerProxy.init(logger);
|
|
|
|
this.data = inputData;
|
|
|
|
logger.verbose('Initializing n8n sub-process', { pid: process.pid, workflowId: this.data.workflowData.id });
|
|
|
|
let className: string;
|
|
let tempNode: INodeType;
|
|
let filePath: string;
|
|
|
|
this.startedAt = new Date();
|
|
|
|
const nodeTypesData: INodeTypeData = {};
|
|
for (const nodeTypeName of Object.keys(this.data.nodeTypeData)) {
|
|
className = this.data.nodeTypeData[nodeTypeName].className;
|
|
|
|
filePath = this.data.nodeTypeData[nodeTypeName].sourcePath;
|
|
const tempModule = require(filePath);
|
|
|
|
try {
|
|
tempNode = new tempModule[className]() as INodeType;
|
|
} catch (error) {
|
|
throw new Error(`Error loading node "${nodeTypeName}" from: "${filePath}"`);
|
|
}
|
|
|
|
nodeTypesData[nodeTypeName] = {
|
|
type: tempNode,
|
|
sourcePath: filePath,
|
|
};
|
|
}
|
|
|
|
const nodeTypes = NodeTypes();
|
|
await nodeTypes.init(nodeTypesData);
|
|
|
|
// Init credential types the workflow uses (is needed to apply default values to credentials)
|
|
const credentialTypes = CredentialTypes();
|
|
await credentialTypes.init(inputData.credentialsTypeData);
|
|
|
|
// Load the credentials overwrites if any exist
|
|
const credentialsOverwrites = CredentialsOverwrites();
|
|
await credentialsOverwrites.init(inputData.credentialsOverwrite);
|
|
|
|
// Load all external hooks
|
|
const externalHooks = ExternalHooks();
|
|
await externalHooks.init();
|
|
|
|
// This code has been split into 3 ifs just to make it easier to understand
|
|
// Can be made smaller but in the end it will make it impossible to read.
|
|
if (inputData.workflowData.settings !== undefined && inputData.workflowData.settings.saveExecutionProgress === true) {
|
|
// Workflow settings specifying it should save
|
|
await Db.init();
|
|
} else if (inputData.workflowData.settings !== undefined && inputData.workflowData.settings.saveExecutionProgress !== false && config.get('executions.saveExecutionProgress') as boolean) {
|
|
// Workflow settings not saying anything about saving but default settings says so
|
|
await Db.init();
|
|
} else if (inputData.workflowData.settings === undefined && config.get('executions.saveExecutionProgress') as boolean) {
|
|
// Workflow settings not saying anything about saving but default settings says so
|
|
await Db.init();
|
|
}
|
|
|
|
// Start timeout for the execution
|
|
let workflowTimeout = config.get('executions.timeout') as number; // initialize with default
|
|
if (this.data.workflowData.settings && this.data.workflowData.settings.executionTimeout) {
|
|
workflowTimeout = this.data.workflowData.settings!.executionTimeout as number; // preference on workflow setting
|
|
}
|
|
|
|
if (workflowTimeout > 0) {
|
|
workflowTimeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number);
|
|
}
|
|
|
|
this.workflow = new Workflow({ id: this.data.workflowData.id as string | undefined, name: this.data.workflowData.name, nodes: this.data.workflowData!.nodes, connections: this.data.workflowData!.connections, active: this.data.workflowData!.active, nodeTypes, staticData: this.data.workflowData!.staticData, settings: this.data.workflowData!.settings });
|
|
const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials, undefined, workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000);
|
|
additionalData.hooks = this.getProcessForwardHooks();
|
|
|
|
const executeWorkflowFunction = additionalData.executeWorkflow;
|
|
additionalData.executeWorkflow = async (workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[] | undefined): Promise<Array<INodeExecutionData[] | null> | IRun> => {
|
|
const workflowData = await WorkflowExecuteAdditionalData.getWorkflowData(workflowInfo);
|
|
const runData = await WorkflowExecuteAdditionalData.getRunData(workflowData, inputData);
|
|
await sendToParentProcess('startExecution', { runData });
|
|
const executionId: string = await new Promise((resolve) => {
|
|
this.executionIdCallback = (executionId: string) => {
|
|
resolve(executionId);
|
|
};
|
|
});
|
|
let result: IRun;
|
|
try {
|
|
const executeWorkflowFunctionOutput = await executeWorkflowFunction(workflowInfo, additionalData, inputData, executionId, workflowData, runData) as {workflowExecute: WorkflowExecute, workflow: Workflow} as IWorkflowExecuteProcess;
|
|
const workflowExecute = executeWorkflowFunctionOutput.workflowExecute;
|
|
this.childExecutions[executionId] = executeWorkflowFunctionOutput;
|
|
const workflow = executeWorkflowFunctionOutput.workflow;
|
|
result = await workflowExecute.processRunExecutionData(workflow) as IRun;
|
|
await externalHooks.run('workflow.postExecute', [result, workflowData]);
|
|
await sendToParentProcess('finishExecution', { executionId, result });
|
|
delete this.childExecutions[executionId];
|
|
} catch (e) {
|
|
await sendToParentProcess('finishExecution', { executionId });
|
|
delete this.childExecutions[executionId];
|
|
// Throw same error we had
|
|
throw e;
|
|
}
|
|
|
|
await sendToParentProcess('finishExecution', { executionId, result });
|
|
|
|
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(result);
|
|
return returnData!.data!.main;
|
|
};
|
|
|
|
if (this.data.executionData !== undefined) {
|
|
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode, this.data.executionData);
|
|
return this.workflowExecute.processRunExecutionData(this.workflow);
|
|
} else if (this.data.runData === undefined || this.data.startNodes === undefined || this.data.startNodes.length === 0 || this.data.destinationNode === undefined) {
|
|
// Execute all nodes
|
|
|
|
// Can execute without webhook so go on
|
|
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
|
|
return this.workflowExecute.run(this.workflow, undefined, this.data.destinationNode);
|
|
} else {
|
|
// Execute only the nodes between start and destination nodes
|
|
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
|
|
return this.workflowExecute.runPartialWorkflow(this.workflow, this.data.runData, this.data.startNodes, this.data.destinationNode);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Sends hook data to the parent process that it executes them
|
|
*
|
|
* @param {string} hook
|
|
* @param {any[]} parameters
|
|
* @memberof WorkflowRunnerProcess
|
|
*/
|
|
sendHookToParentProcess(hook: string, parameters: any[]) { // tslint:disable-line:no-any
|
|
try {
|
|
sendToParentProcess('processHook', {
|
|
hook,
|
|
parameters,
|
|
});
|
|
} catch (error) {
|
|
this.logger.error(`There was a problem sending hook: "${hook}"`, { parameters, error});
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Create a wrapper for hooks which simply forwards the data to
|
|
* the parent process where they then can be executed with access
|
|
* to database and to PushService
|
|
*
|
|
* @returns
|
|
*/
|
|
getProcessForwardHooks(): WorkflowHooks {
|
|
const hookFunctions: IWorkflowExecuteHooks = {
|
|
nodeExecuteBefore: [
|
|
async (nodeName: string): Promise<void> => {
|
|
this.sendHookToParentProcess('nodeExecuteBefore', [nodeName]);
|
|
},
|
|
],
|
|
nodeExecuteAfter: [
|
|
async (nodeName: string, data: ITaskData): Promise<void> => {
|
|
this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data]);
|
|
},
|
|
],
|
|
workflowExecuteBefore: [
|
|
async (): Promise<void> => {
|
|
this.sendHookToParentProcess('workflowExecuteBefore', []);
|
|
},
|
|
],
|
|
workflowExecuteAfter: [
|
|
async (fullRunData: IRun, newStaticData?: IDataObject): Promise<void> => {
|
|
this.sendHookToParentProcess('workflowExecuteAfter', [fullRunData, newStaticData]);
|
|
},
|
|
],
|
|
};
|
|
|
|
const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute();
|
|
for (const key of Object.keys(preExecuteFunctions)) {
|
|
if (hookFunctions[key] === undefined) {
|
|
hookFunctions[key] = [];
|
|
}
|
|
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
|
|
}
|
|
|
|
return new WorkflowHooks(hookFunctions, this.data!.executionMode, this.data!.executionId, this.data!.workflowData, { sessionId: this.data!.sessionId, retryOf: this.data!.retryOf as string });
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
* Sends data to parent process
|
|
*
|
|
* @param {string} type The type of data to send
|
|
* @param {*} data The data
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async function sendToParentProcess(type: string, data: any): Promise<void> { // tslint:disable-line:no-any
|
|
return new Promise((resolve, reject) => {
|
|
process.send!({
|
|
type,
|
|
data,
|
|
}, (error: Error) => {
|
|
if (error) {
|
|
return reject(error);
|
|
}
|
|
|
|
resolve();
|
|
});
|
|
});
|
|
}
|
|
|
|
|
|
const workflowRunner = new WorkflowRunnerProcess();
|
|
|
|
|
|
// Listen to messages from parent process which send the data of
|
|
// the worflow to process
|
|
process.on('message', async (message: IProcessMessage) => {
|
|
try {
|
|
if (message.type === 'startWorkflow') {
|
|
await sendToParentProcess('start', {});
|
|
|
|
const runData = await workflowRunner.runWorkflow(message.data);
|
|
|
|
await sendToParentProcess('end', {
|
|
runData,
|
|
});
|
|
|
|
// Once the workflow got executed make sure the process gets killed again
|
|
process.exit();
|
|
} else if (message.type === 'stopExecution' || message.type === 'timeout') {
|
|
// The workflow execution should be stopped
|
|
let runData: IRun;
|
|
|
|
if (workflowRunner.workflowExecute !== undefined) {
|
|
|
|
const executionIds = Object.keys(workflowRunner.childExecutions);
|
|
|
|
for (const executionId of executionIds) {
|
|
const childWorkflowExecute = workflowRunner.childExecutions[executionId];
|
|
runData = childWorkflowExecute.workflowExecute.getFullRunData(workflowRunner.childExecutions[executionId].startedAt);
|
|
const timeOutError = message.type === 'timeout' ? new WorkflowOperationError('Workflow execution timed out!') : undefined;
|
|
|
|
// If there is any data send it to parent process, if execution timedout add the error
|
|
await childWorkflowExecute.workflowExecute.processSuccessExecution(workflowRunner.childExecutions[executionId].startedAt, childWorkflowExecute.workflow, timeOutError);
|
|
}
|
|
|
|
// Workflow started already executing
|
|
runData = workflowRunner.workflowExecute.getFullRunData(workflowRunner.startedAt);
|
|
|
|
const timeOutError = message.type === 'timeout' ? new WorkflowOperationError('Workflow execution timed out!') : undefined;
|
|
|
|
// If there is any data send it to parent process, if execution timedout add the error
|
|
await workflowRunner.workflowExecute.processSuccessExecution(workflowRunner.startedAt, workflowRunner.workflow!, timeOutError);
|
|
} else {
|
|
// Workflow did not get started yet
|
|
runData = {
|
|
data: {
|
|
resultData: {
|
|
runData: {},
|
|
},
|
|
},
|
|
finished: message.type !== 'timeout',
|
|
mode: workflowRunner.data!.executionMode,
|
|
startedAt: workflowRunner.startedAt,
|
|
stoppedAt: new Date(),
|
|
};
|
|
|
|
workflowRunner.sendHookToParentProcess('workflowExecuteAfter', [runData]);
|
|
}
|
|
|
|
await sendToParentProcess(message.type === 'timeout' ? message.type : 'end', {
|
|
runData,
|
|
});
|
|
|
|
// Stop process
|
|
process.exit();
|
|
} else if (message.type === 'executionId') {
|
|
workflowRunner.executionIdCallback(message.data.executionId);
|
|
}
|
|
} catch (error) {
|
|
|
|
// Catch all uncaught errors and forward them to parent process
|
|
const executionError = {
|
|
...error,
|
|
name: error!.name || 'Error',
|
|
message: error!.message,
|
|
stack: error!.stack,
|
|
} as ExecutionError;
|
|
|
|
await sendToParentProcess('processError', {
|
|
executionError,
|
|
});
|
|
process.exit();
|
|
}
|
|
});
|