import { createReadStream } from 'fs'; import fs from 'fs/promises'; import path from 'path'; import { v4 as uuid } from 'uuid'; import type { Readable } from 'stream'; import type { BinaryMetadata } from 'n8n-workflow'; import { jsonParse } from 'n8n-workflow'; import type { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; import { FileNotFoundError } from '../errors'; const PREFIX_METAFILE = 'binarymeta'; const PREFIX_PERSISTED_METAFILE = 'persistedmeta'; export class BinaryDataFileSystem implements IBinaryDataManager { private storagePath: string; private binaryDataTTL: number; private persistedBinaryDataTTL: number; constructor(config: IBinaryDataConfig) { this.storagePath = config.localStoragePath; this.binaryDataTTL = config.binaryDataTTL; this.persistedBinaryDataTTL = config.persistedBinaryDataTTL; } async init(startPurger = false): Promise { if (startPurger) { setInterval(async () => { await this.deleteMarkedFiles(); }, this.binaryDataTTL * 30000); setInterval(async () => { await this.deleteMarkedPersistedFiles(); }, this.persistedBinaryDataTTL * 30000); } return fs .readdir(this.storagePath) .catch(async () => fs.mkdir(this.storagePath, { recursive: true })) .then(async () => fs.readdir(this.getBinaryDataMetaPath())) .catch(async () => fs.mkdir(this.getBinaryDataMetaPath(), { recursive: true })) .then(async () => fs.readdir(this.getBinaryDataPersistMetaPath())) .catch(async () => fs.mkdir(this.getBinaryDataPersistMetaPath(), { recursive: true })) .then(async () => this.deleteMarkedFiles()) .then(async () => this.deleteMarkedPersistedFiles()) .then(() => {}); } async getFileSize(identifier: string): Promise { const stats = await fs.stat(this.getBinaryPath(identifier)); return stats.size; } async copyBinaryFile(filePath: string, executionId: string): Promise { const binaryDataId = this.generateFileName(executionId); await this.addBinaryIdToPersistMeta(executionId, binaryDataId); await this.copyFileToLocalStorage(filePath, binaryDataId); return binaryDataId; } async storeBinaryMetadata(identifier: string, metadata: BinaryMetadata) { await fs.writeFile(this.getMetadataPath(identifier), JSON.stringify(metadata), { encoding: 'utf-8', }); } async getBinaryMetadata(identifier: string): Promise { return jsonParse(await fs.readFile(this.getMetadataPath(identifier), { encoding: 'utf-8' })); } async storeBinaryData(binaryData: Buffer | Readable, executionId: string): Promise { const binaryDataId = this.generateFileName(executionId); await this.addBinaryIdToPersistMeta(executionId, binaryDataId); await this.saveToLocalStorage(binaryData, binaryDataId); return binaryDataId; } getBinaryStream(identifier: string, chunkSize?: number): Readable { return createReadStream(this.getBinaryPath(identifier), { highWaterMark: chunkSize }); } async retrieveBinaryDataByIdentifier(identifier: string): Promise { return this.retrieveFromLocalStorage(identifier); } getBinaryPath(identifier: string): string { return this.resolveStoragePath(identifier); } getMetadataPath(identifier: string): string { return this.resolveStoragePath(`${identifier}.metadata`); } async markDataForDeletionByExecutionId(executionId: string): Promise { const tt = new Date(new Date().getTime() + this.binaryDataTTL * 60000); return fs.writeFile( this.resolveStoragePath('meta', `${PREFIX_METAFILE}_${executionId}_${tt.valueOf()}`), '', ); } async deleteMarkedFiles(): Promise { return this.deleteMarkedFilesByMeta(this.getBinaryDataMetaPath(), PREFIX_METAFILE); } async deleteMarkedPersistedFiles(): Promise { return this.deleteMarkedFilesByMeta( this.getBinaryDataPersistMetaPath(), PREFIX_PERSISTED_METAFILE, ); } private async addBinaryIdToPersistMeta(executionId: string, identifier: string): Promise { const currentTime = new Date().getTime(); const timeAtNextHour = currentTime + 3600000 - (currentTime % 3600000); const timeoutTime = timeAtNextHour + this.persistedBinaryDataTTL * 60000; const filePath = this.resolveStoragePath( 'persistMeta', `${PREFIX_PERSISTED_METAFILE}_${executionId}_${timeoutTime}`, ); return fs .readFile(filePath) .catch(async () => fs.writeFile(filePath, identifier)) .then(() => {}); } private async deleteMarkedFilesByMeta(metaPath: string, filePrefix: string): Promise { const currentTimeValue = new Date().valueOf(); const metaFileNames = await fs.readdir(metaPath); const execsAdded: { [key: string]: number } = {}; const proms = metaFileNames.reduce( (prev, curr) => { const [prefix, executionId, ts] = curr.split('_'); if (prefix !== filePrefix) { return prev; } const execTimestamp = parseInt(ts, 10); if (execTimestamp < currentTimeValue) { if (execsAdded[executionId]) { // do not delete data, only meta file prev.push(this.deleteMetaFileByPath(path.join(metaPath, curr))); return prev; } execsAdded[executionId] = 1; prev.push( this.deleteBinaryDataByExecutionId(executionId).then(async () => this.deleteMetaFileByPath(path.join(metaPath, curr)), ), ); } return prev; }, [Promise.resolve()], ); return Promise.all(proms).then(() => {}); } async duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise { const newBinaryDataId = this.generateFileName(prefix); return fs .copyFile(this.resolveStoragePath(binaryDataId), this.resolveStoragePath(newBinaryDataId)) .then(() => newBinaryDataId); } async deleteBinaryDataByExecutionId(executionId: string): Promise { const regex = new RegExp(`${executionId}_*`); const filenames = await fs.readdir(this.storagePath); const proms = filenames.reduce( (allProms, filename) => { if (regex.test(filename)) { allProms.push(fs.rm(this.resolveStoragePath(filename))); } return allProms; }, [Promise.resolve()], ); return Promise.all(proms).then(async () => Promise.resolve()); } async deleteBinaryDataByIdentifier(identifier: string): Promise { return this.deleteFromLocalStorage(identifier); } async persistBinaryDataForExecutionId(executionId: string): Promise { return fs.readdir(this.getBinaryDataPersistMetaPath()).then(async (metafiles) => { const proms = metafiles.reduce( (prev, curr) => { if (curr.startsWith(`${PREFIX_PERSISTED_METAFILE}_${executionId}_`)) { prev.push(fs.rm(path.join(this.getBinaryDataPersistMetaPath(), curr))); return prev; } return prev; }, [Promise.resolve()], ); return Promise.all(proms).then(() => {}); }); } private generateFileName(prefix: string): string { return [prefix, uuid()].join(''); } private getBinaryDataMetaPath() { return path.join(this.storagePath, 'meta'); } private getBinaryDataPersistMetaPath() { return path.join(this.storagePath, 'persistMeta'); } private async deleteMetaFileByPath(metafilePath: string): Promise { return fs.rm(metafilePath); } private async deleteFromLocalStorage(identifier: string) { return fs.rm(this.getBinaryPath(identifier)); } private async copyFileToLocalStorage(source: string, identifier: string): Promise { await fs.cp(source, this.getBinaryPath(identifier)); } private async saveToLocalStorage(binaryData: Buffer | Readable, identifier: string) { await fs.writeFile(this.getBinaryPath(identifier), binaryData); } private async retrieveFromLocalStorage(identifier: string): Promise { const filePath = this.getBinaryPath(identifier); try { return await fs.readFile(filePath); } catch (e) { throw new Error(`Error finding file: ${filePath}`); } } private resolveStoragePath(...args: string[]) { const returnPath = path.join(this.storagePath, ...args); if (path.relative(this.storagePath, returnPath).startsWith('..')) throw new FileNotFoundError('Invalid path detected'); return returnPath; } }