diff --git a/packages/cli/commands/start.ts b/packages/cli/commands/start.ts index e34b457810..db13b66918 100644 --- a/packages/cli/commands/start.ts +++ b/packages/cli/commands/start.ts @@ -6,7 +6,7 @@ /* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ import * as localtunnel from 'localtunnel'; -import { TUNNEL_SUBDOMAIN_ENV, UserSettings } from 'n8n-core'; +import { BinaryDataManager, IBinaryDataConfig, TUNNEL_SUBDOMAIN_ENV, UserSettings } from 'n8n-core'; import { Command, flags } from '@oclif/command'; // eslint-disable-next-line import/no-extraneous-dependencies import * as Redis from 'ioredis'; @@ -305,6 +305,9 @@ export class Start extends Command { const { cli } = await GenericHelpers.getVersions(); InternalHooksManager.init(instanceId, cli); + const binaryDataConfig = config.get('binaryDataManager') as IBinaryDataConfig; + await BinaryDataManager.init(binaryDataConfig, true); + await Server.start(); // Start to get active workflows and run their triggers diff --git a/packages/cli/commands/webhook.ts b/packages/cli/commands/webhook.ts index ebf683e6ca..cafd92148e 100644 --- a/packages/cli/commands/webhook.ts +++ b/packages/cli/commands/webhook.ts @@ -3,7 +3,7 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/unbound-method */ -import { UserSettings } from 'n8n-core'; +import { BinaryDataManager, IBinaryDataConfig, UserSettings } from 'n8n-core'; import { Command, flags } from '@oclif/command'; // eslint-disable-next-line import/no-extraneous-dependencies import * as Redis from 'ioredis'; @@ -152,6 +152,9 @@ export class Webhook extends Command { const { cli } = await GenericHelpers.getVersions(); InternalHooksManager.init(instanceId, cli); + const binaryDataConfig = config.get('binaryDataManager') as IBinaryDataConfig; + await BinaryDataManager.init(binaryDataConfig); + if (config.get('executions.mode') === 'queue') { const redisHost = config.get('queue.bull.redis.host'); const redisPassword = config.get('queue.bull.redis.password'); diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index 1290868abf..827e751b1d 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -10,7 +10,7 @@ import * as PCancelable from 'p-cancelable'; import { Command, flags } from '@oclif/command'; -import { UserSettings, WorkflowExecute } from 'n8n-core'; +import { BinaryDataManager, IBinaryDataConfig, UserSettings, WorkflowExecute } from 'n8n-core'; import { IExecuteResponsePromiseData, INodeTypes, IRun, Workflow, LoggerProxy } from 'n8n-workflow'; @@ -274,6 +274,9 @@ export class Worker extends Command { const versions = await GenericHelpers.getVersions(); const instanceId = await UserSettings.getInstanceId(); + const binaryDataConfig = config.get('binaryDataManager') as IBinaryDataConfig; + await BinaryDataManager.init(binaryDataConfig); + InternalHooksManager.init(instanceId, versions.cli); console.info('\nn8n worker is now ready'); diff --git a/packages/cli/config/index.ts b/packages/cli/config/index.ts index c06c1e200e..50ebbe9668 100644 --- a/packages/cli/config/index.ts +++ b/packages/cli/config/index.ts @@ -650,6 +650,39 @@ const config = convict({ }, }, + binaryDataManager: { + availableModes: { + format: String, + default: 'filesystem', + env: 'N8N_AVAILABLE_BINARY_DATA_MODES', + doc: 'Available modes of binary data storage, as comma separated strings', + }, + mode: { + format: String, + default: 'default', + env: 'N8N_DEFAULT_BINARY_DATA_MODE', + doc: 'Storage mode for binary data, default | filesystem', + }, + localStoragePath: { + format: String, + default: path.join(core.UserSettings.getUserN8nFolderPath(), 'binaryData'), + env: 'N8N_BINARY_DATA_STORAGE_PATH', + doc: 'Path for binary data storage in "filesystem" mode', + }, + binaryDataTTL: { + format: Number, + default: 60, + env: 'N8N_BINARY_DATA_TTL', + doc: 'TTL for binary data of unsaved executions in minutes', + }, + persistedBinaryDataTTL: { + format: Number, + default: 1440, + env: 'N8N_PERSISTED_BINARY_DATA_TTL', + doc: 'TTL for persisted binary data in minutes (binary data gets deleted if not persisted before TTL expires)', + }, + }, + deployment: { type: { format: String, diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index ae977cbcde..5edcfa163d 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -310,6 +310,7 @@ export interface IDiagnosticInfo { [key: string]: string | number | undefined; }; deploymentType: string; + binaryDataMode: string; } export interface IInternalHooksClass { @@ -322,7 +323,11 @@ export interface IInternalHooksClass { onWorkflowCreated(workflow: IWorkflowBase): Promise; onWorkflowDeleted(workflowId: string): Promise; onWorkflowSaved(workflow: IWorkflowBase): Promise; - onWorkflowPostExecute(workflow: IWorkflowBase, runData?: IRun): Promise; + onWorkflowPostExecute( + executionId: string, + workflow: IWorkflowBase, + runData?: IRun, + ): Promise; } export interface IN8nConfig { diff --git a/packages/cli/src/InternalHooks.ts b/packages/cli/src/InternalHooks.ts index a2e1ae54d2..d5a7f266ce 100644 --- a/packages/cli/src/InternalHooks.ts +++ b/packages/cli/src/InternalHooks.ts @@ -1,4 +1,5 @@ /* eslint-disable import/no-cycle */ +import { BinaryDataManager } from 'n8n-core'; import { IDataObject, IRun, TelemetryHelpers } from 'n8n-workflow'; import { IDiagnosticInfo, @@ -28,6 +29,7 @@ export class InternalHooksClass implements IInternalHooksClass { system_info: diagnosticInfo.systemInfo, execution_variables: diagnosticInfo.executionVariables, n8n_deployment_type: diagnosticInfo.deploymentType, + n8n_binary_data_mode: diagnosticInfo.binaryDataMode, }; return Promise.all([ @@ -76,7 +78,11 @@ export class InternalHooksClass implements IInternalHooksClass { }); } - async onWorkflowPostExecute(workflow: IWorkflowBase, runData?: IRun): Promise { + async onWorkflowPostExecute( + executionId: string, + workflow: IWorkflowBase, + runData?: IRun, + ): Promise { const properties: IDataObject = { workflow_id: workflow.id, is_manual: false, @@ -120,7 +126,10 @@ export class InternalHooksClass implements IInternalHooksClass { } } - return this.telemetry.trackWorkflowExecution(properties); + return Promise.all([ + BinaryDataManager.getInstance().persistBinaryDataForExecutionId(executionId), + this.telemetry.trackWorkflowExecution(properties), + ]).then(() => {}); } async onN8nStop(): Promise { diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index dbe60af67f..ffc180bc7b 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -49,7 +49,9 @@ import { compare } from 'bcryptjs'; import * as promClient from 'prom-client'; import { + BinaryDataManager, Credentials, + IBinaryDataConfig, ICredentialTestFunctions, LoadNodeParameterOptions, NodeExecuteFunctions, @@ -2449,12 +2451,27 @@ class App { const filters = { startedAt: LessThanOrEqual(deleteData.deleteBefore), }; + if (deleteData.filters !== undefined) { Object.assign(filters, deleteData.filters); } + const execs = await Db.collections.Execution!.find({ ...filters, select: ['id'] }); + + await Promise.all( + execs.map(async (item) => + BinaryDataManager.getInstance().deleteBinaryDataByExecutionId(item.id.toString()), + ), + ); + await Db.collections.Execution!.delete(filters); } else if (deleteData.ids !== undefined) { + await Promise.all( + deleteData.ids.map(async (id) => + BinaryDataManager.getInstance().deleteBinaryDataByExecutionId(id), + ), + ); + // Deletes all executions with the given ids await Db.collections.Execution!.delete(deleteData.ids); } else { @@ -2650,6 +2667,23 @@ class App { }), ); + // ---------------------------------------- + // Binary data + // ---------------------------------------- + + // Returns binary buffer + this.app.get( + `/${this.restEndpoint}/data/:path`, + ResponseHelper.send(async (req: express.Request, res: express.Response): Promise => { + const dataPath = req.params.path; + return BinaryDataManager.getInstance() + .retrieveBinaryDataByIdentifier(dataPath) + .then((buffer: Buffer) => { + return buffer.toString('base64'); + }); + }), + ); + // ---------------------------------------- // Settings // ---------------------------------------- @@ -2917,6 +2951,7 @@ export async function start(): Promise { await app.externalHooks.run('n8n.ready', [app]); const cpus = os.cpus(); + const binarDataConfig = config.get('binaryDataManager') as IBinaryDataConfig; const diagnosticInfo: IDiagnosticInfo = { basicAuthActive: config.get('security.basicAuth.active') as boolean, databaseType: (await GenericHelpers.getConfigValue('database.type')) as DatabaseType, @@ -2950,6 +2985,7 @@ export async function start(): Promise { executions_data_prune_timeout: config.get('executions.pruneDataTimeout'), }, deploymentType: config.get('deployment.type'), + binaryDataMode: binarDataConfig.mode, }; void Db.collections diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 203bf20b08..93da2cf8f4 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -16,7 +16,7 @@ import * as express from 'express'; // eslint-disable-next-line import/no-extraneous-dependencies import { get } from 'lodash'; -import { BINARY_ENCODING, NodeExecuteFunctions } from 'n8n-core'; +import { BINARY_ENCODING, BinaryDataManager, NodeExecuteFunctions } from 'n8n-core'; import { createDeferredPromise, @@ -37,6 +37,7 @@ import { Workflow, WorkflowExecuteMode, } from 'n8n-workflow'; + // eslint-disable-next-line import/no-cycle import { GenericHelpers, @@ -447,7 +448,7 @@ export async function executeWebhook( IExecutionDb | undefined >; executePromise - .then((data) => { + .then(async (data) => { if (data === undefined) { if (!didSendResponse) { responseCallback(null, { @@ -611,7 +612,10 @@ export async function executeWebhook( if (!didSendResponse) { // Send the webhook response manually res.setHeader('Content-Type', binaryData.mimeType); - res.end(Buffer.from(binaryData.data, BINARY_ENCODING)); + const binaryDataBuffer = await BinaryDataManager.getInstance().retrieveBinaryData( + binaryData, + ); + res.end(binaryDataBuffer); responseCallback(null, { noWebhookResponse: true, diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 8231debcb0..4a0b88594d 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -14,7 +14,7 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable func-names */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import { UserSettings, WorkflowExecute } from 'n8n-core'; +import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core'; import { IDataObject, @@ -481,8 +481,11 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) { // Data is always saved, so we remove from database - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion await Db.collections.Execution!.delete(this.executionId); + await BinaryDataManager.getInstance().markDataForDeletionByExecutionId( + this.executionId, + ); + return; } @@ -515,6 +518,10 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { } // Data is always saved, so we remove from database await Db.collections.Execution!.delete(this.executionId); + await BinaryDataManager.getInstance().markDataForDeletionByExecutionId( + this.executionId, + ); + return; } } @@ -836,6 +843,8 @@ export async function executeWorkflow( workflowData, { parentProcessMode: additionalData.hooks!.mode }, ); + additionalDataIntegrated.executionId = executionId; + // Make sure we pass on the original executeWorkflow function we received // This one already contains changes to talk to parent process // and get executionID from `activeExecutions` running on main process @@ -910,7 +919,7 @@ export async function executeWorkflow( } await externalHooks.run('workflow.postExecute', [data, workflowData]); - void InternalHooksManager.getInstance().onWorkflowPostExecute(workflowData, data); + void InternalHooksManager.getInstance().onWorkflowPostExecute(executionId, workflowData, data); if (data.finished === true) { // Workflow did finish successfully diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index fd18ff3d04..5e8f29442a 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -11,7 +11,7 @@ /* eslint-disable @typescript-eslint/explicit-module-boundary-types */ /* eslint-disable import/no-cycle */ /* eslint-disable @typescript-eslint/no-unused-vars */ -import { IProcessMessage, WorkflowExecute } from 'n8n-core'; +import { BinaryDataManager, IProcessMessage, WorkflowExecute } from 'n8n-core'; import { ExecutionError, @@ -174,6 +174,7 @@ export class WorkflowRunner { postExecutePromise .then(async (executionData) => { void InternalHooksManager.getInstance().onWorkflowPostExecute( + executionId!, data.workflowData, executionData, ); @@ -539,6 +540,7 @@ export class WorkflowRunner { (!workflowDidSucceed && saveDataErrorExecution === 'none') ) { await Db.collections.Execution!.delete(executionId); + await BinaryDataManager.getInstance().markDataForDeletionByExecutionId(executionId); } // eslint-disable-next-line id-denylist } catch (err) { diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 862fa4303f..698bc4a3f4 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -5,7 +5,13 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ /* eslint-disable @typescript-eslint/no-use-before-define */ /* eslint-disable @typescript-eslint/unbound-method */ -import { IProcessMessage, UserSettings, WorkflowExecute } from 'n8n-core'; +import { + BinaryDataManager, + IBinaryDataConfig, + IProcessMessage, + UserSettings, + WorkflowExecute, +} from 'n8n-core'; import { ExecutionError, @@ -141,6 +147,9 @@ export class WorkflowRunnerProcess { const { cli } = await GenericHelpers.getVersions(); InternalHooksManager.init(instanceId, cli); + const binaryDataConfig = config.get('binaryDataManager') as IBinaryDataConfig; + await BinaryDataManager.init(binaryDataConfig); + // Credentials should now be loaded from database. // We check if any node uses credentials. If it does, then // init database. @@ -260,7 +269,11 @@ export class WorkflowRunnerProcess { const { workflow } = executeWorkflowFunctionOutput; result = await workflowExecute.processRunExecutionData(workflow); await externalHooks.run('workflow.postExecute', [result, workflowData]); - void InternalHooksManager.getInstance().onWorkflowPostExecute(workflowData, result); + void InternalHooksManager.getInstance().onWorkflowPostExecute( + executionId, + workflowData, + result, + ); await sendToParentProcess('finishExecution', { executionId, result }); delete this.childExecutions[executionId]; } catch (e) { diff --git a/packages/core/package.json b/packages/core/package.json index 2caed5eefa..d1f46e5fdb 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -47,6 +47,7 @@ "cron": "~1.7.2", "crypto-js": "~4.1.1", "file-type": "^14.6.2", + "flatted": "^3.2.4", "form-data": "^4.0.0", "lodash.get": "^4.4.2", "mime-types": "^2.1.27", @@ -55,7 +56,8 @@ "p-cancelable": "^2.0.0", "qs": "^6.10.1", "request": "^2.88.2", - "request-promise-native": "^1.0.7" + "request-promise-native": "^1.0.7", + "uuid": "^8.3.2" }, "jest": { "transform": { diff --git a/packages/core/src/BinaryDataManager/FileSystem.ts b/packages/core/src/BinaryDataManager/FileSystem.ts new file mode 100644 index 0000000000..07586cb05d --- /dev/null +++ b/packages/core/src/BinaryDataManager/FileSystem.ts @@ -0,0 +1,214 @@ +import { promises as fs } from 'fs'; +import * as path from 'path'; +import { v4 as uuid } from 'uuid'; + +import { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; + +const PREFIX_METAFILE = 'binarymeta'; +const PREFIX_PERSISTED_METAFILE = 'persistedmeta'; + +export class BinaryDataFileSystem implements IBinaryDataManager { + private storagePath: string; + + private binaryDataTTL: number; + + private persistedBinaryDataTTL: number; + + constructor(config: IBinaryDataConfig) { + this.storagePath = config.localStoragePath; + this.binaryDataTTL = config.binaryDataTTL; + this.persistedBinaryDataTTL = config.persistedBinaryDataTTL; + } + + async init(startPurger = false): Promise { + if (startPurger) { + setInterval(async () => { + await this.deleteMarkedFiles(); + }, this.binaryDataTTL * 30000); + + setInterval(async () => { + await this.deleteMarkedPersistedFiles(); + }, this.persistedBinaryDataTTL * 30000); + } + + return fs + .readdir(this.storagePath) + .catch(async () => fs.mkdir(this.storagePath, { recursive: true })) + .then(async () => fs.readdir(this.getBinaryDataMetaPath())) + .catch(async () => fs.mkdir(this.getBinaryDataMetaPath(), { recursive: true })) + .then(async () => fs.readdir(this.getBinaryDataPersistMetaPath())) + .catch(async () => fs.mkdir(this.getBinaryDataPersistMetaPath(), { recursive: true })) + .then(async () => this.deleteMarkedFiles()) + .then(async () => this.deleteMarkedPersistedFiles()) + .then(() => {}); + } + + async storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise { + const binaryDataId = this.generateFileName(executionId); + return this.addBinaryIdToPersistMeta(executionId, binaryDataId).then(async () => + this.saveToLocalStorage(binaryBuffer, binaryDataId).then(() => binaryDataId), + ); + } + + async retrieveBinaryDataByIdentifier(identifier: string): Promise { + return this.retrieveFromLocalStorage(identifier); + } + + async markDataForDeletionByExecutionId(executionId: string): Promise { + const tt = new Date(new Date().getTime() + this.binaryDataTTL * 60000); + return fs.writeFile( + path.join(this.getBinaryDataMetaPath(), `${PREFIX_METAFILE}_${executionId}_${tt.valueOf()}`), + '', + ); + } + + async deleteMarkedFiles(): Promise { + return this.deleteMarkedFilesByMeta(this.getBinaryDataMetaPath(), PREFIX_METAFILE); + } + + async deleteMarkedPersistedFiles(): Promise { + return this.deleteMarkedFilesByMeta( + this.getBinaryDataPersistMetaPath(), + PREFIX_PERSISTED_METAFILE, + ); + } + + private async addBinaryIdToPersistMeta(executionId: string, identifier: string): Promise { + const currentTime = new Date().getTime(); + const timeAtNextHour = currentTime + 3600000 - (currentTime % 3600000); + const timeoutTime = timeAtNextHour + this.persistedBinaryDataTTL * 60000; + + const filePath = path.join( + this.getBinaryDataPersistMetaPath(), + `${PREFIX_PERSISTED_METAFILE}_${executionId}_${timeoutTime}`, + ); + + return fs + .readFile(filePath) + .catch(async () => fs.writeFile(filePath, identifier)) + .then(() => {}); + } + + private async deleteMarkedFilesByMeta(metaPath: string, filePrefix: string): Promise { + const currentTimeValue = new Date().valueOf(); + const metaFileNames = await fs.readdir(metaPath); + + const execsAdded: { [key: string]: number } = {}; + + const proms = metaFileNames.reduce( + (prev, curr) => { + const [prefix, executionId, ts] = curr.split('_'); + + if (prefix !== filePrefix) { + return prev; + } + + const execTimestamp = parseInt(ts, 10); + + if (execTimestamp < currentTimeValue) { + if (execsAdded[executionId]) { + // do not delete data, only meta file + prev.push(this.deleteMetaFileByPath(path.join(metaPath, curr))); + return prev; + } + + execsAdded[executionId] = 1; + prev.push( + this.deleteBinaryDataByExecutionId(executionId).then(async () => + this.deleteMetaFileByPath(path.join(metaPath, curr)), + ), + ); + } + + return prev; + }, + [Promise.resolve()], + ); + + return Promise.all(proms).then(() => {}); + } + + async duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise { + const newBinaryDataId = this.generateFileName(prefix); + + return fs + .copyFile( + path.join(this.storagePath, binaryDataId), + path.join(this.storagePath, newBinaryDataId), + ) + .then(() => newBinaryDataId); + } + + async deleteBinaryDataByExecutionId(executionId: string): Promise { + const regex = new RegExp(`${executionId}_*`); + const filenames = await fs.readdir(path.join(this.storagePath)); + + const proms = filenames.reduce( + (allProms, filename) => { + if (regex.test(filename)) { + allProms.push(fs.rm(path.join(this.storagePath, filename))); + } + + return allProms; + }, + [Promise.resolve()], + ); + + return Promise.all(proms).then(async () => Promise.resolve()); + } + + async deleteBinaryDataByIdentifier(identifier: string): Promise { + return this.deleteFromLocalStorage(identifier); + } + + async persistBinaryDataForExecutionId(executionId: string): Promise { + return fs.readdir(this.getBinaryDataPersistMetaPath()).then(async (metafiles) => { + const proms = metafiles.reduce( + (prev, curr) => { + if (curr.startsWith(`${PREFIX_PERSISTED_METAFILE}_${executionId}_`)) { + prev.push(fs.rm(path.join(this.getBinaryDataPersistMetaPath(), curr))); + return prev; + } + + return prev; + }, + [Promise.resolve()], + ); + + return Promise.all(proms).then(() => {}); + }); + } + + private generateFileName(prefix: string): string { + return `${prefix}_${uuid()}`; + } + + private getBinaryDataMetaPath() { + return path.join(this.storagePath, 'meta'); + } + + private getBinaryDataPersistMetaPath() { + return path.join(this.storagePath, 'persistMeta'); + } + + private async deleteMetaFileByPath(metafilePath: string): Promise { + return fs.rm(metafilePath); + } + + private async deleteFromLocalStorage(identifier: string) { + return fs.rm(path.join(this.storagePath, identifier)); + } + + private async saveToLocalStorage(data: Buffer, identifier: string) { + await fs.writeFile(path.join(this.storagePath, identifier), data); + } + + private async retrieveFromLocalStorage(identifier: string): Promise { + const filePath = path.join(this.storagePath, identifier); + try { + return await fs.readFile(filePath); + } catch (e) { + throw new Error(`Error finding file: ${filePath}`); + } + } +} diff --git a/packages/core/src/BinaryDataManager/index.ts b/packages/core/src/BinaryDataManager/index.ts new file mode 100644 index 0000000000..b7b7c2a9b2 --- /dev/null +++ b/packages/core/src/BinaryDataManager/index.ts @@ -0,0 +1,187 @@ +import { IBinaryData, INodeExecutionData } from 'n8n-workflow'; +import { BINARY_ENCODING } from '../Constants'; +import { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; +import { BinaryDataFileSystem } from './FileSystem'; + +export class BinaryDataManager { + private static instance: BinaryDataManager; + + private managers: { + [key: string]: IBinaryDataManager; + }; + + private binaryDataMode: string; + + private availableModes: string[]; + + constructor(config: IBinaryDataConfig) { + this.binaryDataMode = config.mode; + this.availableModes = config.availableModes.split(','); + this.managers = {}; + } + + static async init(config: IBinaryDataConfig, mainManager = false): Promise { + if (BinaryDataManager.instance) { + throw new Error('Binary Data Manager already initialized'); + } + + BinaryDataManager.instance = new BinaryDataManager(config); + + if (BinaryDataManager.instance.availableModes.includes('filesystem')) { + BinaryDataManager.instance.managers.filesystem = new BinaryDataFileSystem(config); + await BinaryDataManager.instance.managers.filesystem.init(mainManager); + } + + return undefined; + } + + static getInstance(): BinaryDataManager { + if (!BinaryDataManager.instance) { + throw new Error('Binary Data Manager not initialized'); + } + + return BinaryDataManager.instance; + } + + async storeBinaryData( + binaryData: IBinaryData, + binaryBuffer: Buffer, + executionId: string, + ): Promise { + const retBinaryData = binaryData; + + if (this.managers[this.binaryDataMode]) { + return this.managers[this.binaryDataMode] + .storeBinaryData(binaryBuffer, executionId) + .then((filename) => { + retBinaryData.id = this.generateBinaryId(filename); + return retBinaryData; + }); + } + + retBinaryData.data = binaryBuffer.toString(BINARY_ENCODING); + return binaryData; + } + + async retrieveBinaryData(binaryData: IBinaryData): Promise { + if (binaryData.id) { + return this.retrieveBinaryDataByIdentifier(binaryData.id); + } + + return Buffer.from(binaryData.data, BINARY_ENCODING); + } + + async retrieveBinaryDataByIdentifier(identifier: string): Promise { + const { mode, id } = this.splitBinaryModeFileId(identifier); + if (this.managers[mode]) { + return this.managers[mode].retrieveBinaryDataByIdentifier(id); + } + + throw new Error('Storage mode used to store binary data not available'); + } + + async markDataForDeletionByExecutionId(executionId: string): Promise { + if (this.managers[this.binaryDataMode]) { + return this.managers[this.binaryDataMode].markDataForDeletionByExecutionId(executionId); + } + + return Promise.resolve(); + } + + async persistBinaryDataForExecutionId(executionId: string): Promise { + if (this.managers[this.binaryDataMode]) { + return this.managers[this.binaryDataMode].persistBinaryDataForExecutionId(executionId); + } + + return Promise.resolve(); + } + + async deleteBinaryDataByExecutionId(executionId: string): Promise { + if (this.managers[this.binaryDataMode]) { + return this.managers[this.binaryDataMode].deleteBinaryDataByExecutionId(executionId); + } + + return Promise.resolve(); + } + + async duplicateBinaryData( + inputData: Array | unknown, + executionId: string, + ): Promise { + if (inputData && this.managers[this.binaryDataMode]) { + const returnInputData = (inputData as INodeExecutionData[][]).map( + async (executionDataArray) => { + if (executionDataArray) { + return Promise.all( + executionDataArray.map((executionData) => { + if (executionData.binary) { + return this.duplicateBinaryDataInExecData(executionData, executionId); + } + + return executionData; + }), + ); + } + + return executionDataArray; + }, + ); + + return Promise.all(returnInputData); + } + + return Promise.resolve(inputData as INodeExecutionData[][]); + } + + private generateBinaryId(filename: string) { + return `${this.binaryDataMode}:${filename}`; + } + + private splitBinaryModeFileId(fileId: string): { mode: string; id: string } { + const [mode, id] = fileId.split(':'); + return { mode, id }; + } + + private async duplicateBinaryDataInExecData( + executionData: INodeExecutionData, + executionId: string, + ): Promise { + const binaryManager = this.managers[this.binaryDataMode]; + + if (executionData.binary) { + const binaryDataKeys = Object.keys(executionData.binary); + const bdPromises = binaryDataKeys.map(async (key: string) => { + if (!executionData.binary) { + return { key, newId: undefined }; + } + + const binaryDataId = executionData.binary[key].id; + if (!binaryDataId) { + return { key, newId: undefined }; + } + + return binaryManager + ?.duplicateBinaryDataByIdentifier( + this.splitBinaryModeFileId(binaryDataId).id, + executionId, + ) + .then((filename) => ({ + newId: this.generateBinaryId(filename), + key, + })); + }); + + return Promise.all(bdPromises).then((b) => { + return b.reduce((acc, curr) => { + if (acc.binary && curr) { + acc.binary[curr.key].id = curr.newId; + } + + return acc; + }, executionData); + }); + } + + return executionData; + } +} diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index c5f0f41271..b7e9561147 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -234,3 +234,23 @@ export interface IWorkflowData { pollResponses?: IPollResponse[]; triggerResponses?: ITriggerResponse[]; } + +export interface IBinaryDataConfig { + mode: 'default' | 'filesystem'; + availableModes: string; + localStoragePath: string; + binaryDataTTL: number; + persistedBinaryDataTTL: number; +} + +export interface IBinaryDataManager { + init(startPurger: boolean): Promise; + storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise; + retrieveBinaryDataByIdentifier(identifier: string): Promise; + markDataForDeletionByExecutionId(executionId: string): Promise; + deleteMarkedFiles(): Promise; + deleteBinaryDataByIdentifier(identifier: string): Promise; + duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise; + deleteBinaryDataByExecutionId(executionId: string): Promise; + persistBinaryDataForExecutionId(executionId: string): Promise; +} diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index a71ff54cff..7709d63d8c 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -73,9 +73,9 @@ import { lookup } from 'mime-types'; import axios, { AxiosProxyConfig, AxiosRequestConfig, Method } from 'axios'; import { URL, URLSearchParams } from 'url'; +import { BinaryDataManager } from './BinaryDataManager'; // eslint-disable-next-line import/no-cycle import { - BINARY_ENCODING, ICredentialTestFunctions, IHookFunctions, ILoadOptionsFunctions, @@ -682,7 +682,7 @@ export async function getBinaryDataBuffer( inputIndex: number, ): Promise { const binaryData = inputData.main![inputIndex]![itemIndex]!.binary![propertyName]!; - return Buffer.from(binaryData.data, BINARY_ENCODING); + return BinaryDataManager.getInstance().retrieveBinaryData(binaryData); } /** @@ -697,6 +697,7 @@ export async function getBinaryDataBuffer( */ export async function prepareBinaryData( binaryData: Buffer, + executionId: string, filePath?: string, mimeType?: string, ): Promise { @@ -727,10 +728,7 @@ export async function prepareBinaryData( const returnData: IBinaryData = { mimeType, - // TODO: Should program it in a way that it does not have to converted to base64 - // It should only convert to and from base64 when saved in database because - // of for example an error or when there is a wait node. - data: binaryData.toString(BINARY_ENCODING), + data: '', }; if (filePath) { @@ -753,7 +751,7 @@ export async function prepareBinaryData( } } - return returnData; + return BinaryDataManager.getInstance().storeBinaryData(returnData, binaryData, executionId); } /** @@ -1370,7 +1368,19 @@ export function getExecutePollFunctions( }, helpers: { httpRequest, - prepareBinaryData, + async prepareBinaryData( + binaryData: Buffer, + filePath?: string, + mimeType?: string, + ): Promise { + return prepareBinaryData.call( + this, + binaryData, + additionalData.executionId!, + filePath, + mimeType, + ); + }, request: proxyRequestToAxios, async requestOAuth2( this: IAllExecuteFunctions, @@ -1476,8 +1486,19 @@ export function getExecuteTriggerFunctions( }, helpers: { httpRequest, - prepareBinaryData, - + async prepareBinaryData( + binaryData: Buffer, + filePath?: string, + mimeType?: string, + ): Promise { + return prepareBinaryData.call( + this, + binaryData, + additionalData.executionId!, + filePath, + mimeType, + ); + }, request: proxyRequestToAxios, async requestOAuth2( this: IAllExecuteFunctions, @@ -1553,7 +1574,14 @@ export function getExecuteFunctions( workflowInfo: IExecuteWorkflowInfo, inputData?: INodeExecutionData[], ): Promise { - return additionalData.executeWorkflow(workflowInfo, additionalData, inputData); + return additionalData + .executeWorkflow(workflowInfo, additionalData, inputData) + .then(async (result) => + BinaryDataManager.getInstance().duplicateBinaryData( + result, + additionalData.executionId!, + ), + ); }, getContext(type: string): IContextObject { return NodeHelpers.getContext(runExecutionData, type, node); @@ -1672,7 +1700,19 @@ export function getExecuteFunctions( }, helpers: { httpRequest, - prepareBinaryData, + async prepareBinaryData( + binaryData: Buffer, + filePath?: string, + mimeType?: string, + ): Promise { + return prepareBinaryData.call( + this, + binaryData, + additionalData.executionId!, + filePath, + mimeType, + ); + }, async getBinaryDataBuffer( itemIndex: number, propertyName: string, @@ -1853,7 +1893,19 @@ export function getExecuteSingleFunctions( }, helpers: { httpRequest, - prepareBinaryData, + async prepareBinaryData( + binaryData: Buffer, + filePath?: string, + mimeType?: string, + ): Promise { + return prepareBinaryData.call( + this, + binaryData, + additionalData.executionId!, + filePath, + mimeType, + ); + }, request: proxyRequestToAxios, async requestOAuth2( this: IAllExecuteFunctions, @@ -2234,7 +2286,19 @@ export function getExecuteWebhookFunctions( prepareOutputData: NodeHelpers.prepareOutputData, helpers: { httpRequest, - prepareBinaryData, + async prepareBinaryData( + binaryData: Buffer, + filePath?: string, + mimeType?: string, + ): Promise { + return prepareBinaryData.call( + this, + binaryData, + additionalData.executionId!, + filePath, + mimeType, + ); + }, request: proxyRequestToAxios, async requestOAuth2( this: IAllExecuteFunctions, diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index b0c6167aa9..11d130d021 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -10,6 +10,7 @@ try { export * from './ActiveWorkflows'; export * from './ActiveWebhooks'; +export * from './BinaryDataManager'; export * from './Constants'; export * from './Credentials'; export * from './Interfaces'; diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index a0ab00a93b..35af2e6b3e 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -179,6 +179,7 @@ export interface IRestApi { deleteExecutions(sendData: IExecutionDeleteFilter): Promise; retryExecution(id: string, loadWorkflow?: boolean): Promise; getTimezones(): Promise; + getBinaryBufferString(dataPath: string): Promise; } export interface INodeTranslationHeaders { diff --git a/packages/editor-ui/src/components/BinaryDataDisplay.vue b/packages/editor-ui/src/components/BinaryDataDisplay.vue index 6d1d9de118..ec183c7368 100644 --- a/packages/editor-ui/src/components/BinaryDataDisplay.vue +++ b/packages/editor-ui/src/components/BinaryDataDisplay.vue @@ -13,11 +13,7 @@
{{ $locale.baseText('binaryDataDisplay.noDataFoundToDisplay') }}
- - + @@ -30,15 +26,22 @@ import { IRunExecutionData, } from 'n8n-workflow'; +import BinaryDataDisplayEmbed from '@/components/BinaryDataDisplayEmbed.vue'; + import { nodeHelpers } from '@/components/mixins/nodeHelpers'; import mixins from 'vue-typed-mixins'; +import { restApi } from '@/components/mixins/restApi'; export default mixins( nodeHelpers, + restApi, ) .extend({ name: 'BinaryDataDisplay', + components: { + BinaryDataDisplayEmbed, + }, props: [ 'displayData', // IBinaryDisplayData 'windowVisible', // boolean @@ -54,14 +57,15 @@ export default mixins( if (this.displayData.index >= binaryData.length || binaryData[this.displayData.index][this.displayData.key] === undefined) { return null; } - return binaryData[this.displayData.index][this.displayData.key]; + + const binaryDataItem: IBinaryData = binaryData[this.displayData.index][this.displayData.key]; + + return binaryDataItem; }, embedClass (): string[] { - if (this.binaryData !== null && - this.binaryData.mimeType !== undefined && - (this.binaryData.mimeType as string).startsWith('image') - ) { + // @ts-ignore + if (this.binaryData! !== null && this.binaryData!.mimeType! !== undefined && (this.binaryData!.mimeType! as string).startsWith('image')) { return ['image']; } return ['other']; diff --git a/packages/editor-ui/src/components/BinaryDataDisplayEmbed.vue b/packages/editor-ui/src/components/BinaryDataDisplayEmbed.vue new file mode 100644 index 0000000000..305f1b8b53 --- /dev/null +++ b/packages/editor-ui/src/components/BinaryDataDisplayEmbed.vue @@ -0,0 +1,84 @@ + + + + + diff --git a/packages/editor-ui/src/components/mixins/restApi.ts b/packages/editor-ui/src/components/mixins/restApi.ts index d0df6dc124..cffbb5ce78 100644 --- a/packages/editor-ui/src/components/mixins/restApi.ts +++ b/packages/editor-ui/src/components/mixins/restApi.ts @@ -192,6 +192,11 @@ export const restApi = Vue.extend({ getTimezones: (): Promise => { return self.restApi().makeRestApiRequest('GET', `/options/timezones`); }, + + // Binary data + getBinaryBufferString: (dataPath: string): Promise => { + return self.restApi().makeRestApiRequest('GET', `/data/${dataPath}`); + }, }; }, }, diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 0b0ea184f6..03bc3ad6a7 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -28,6 +28,7 @@ export interface IBinaryData { fileName?: string; directory?: string; fileExtension?: string; + id?: string; } export interface IOAuth2Options {