feat(core): Remove storeMetadata and getSize from binary data manager interface (no-changelog) (#7195)

Depends on: #7164 | Story:
[PAY-838](https://linear.app/n8n/issue/PAY-838/introduce-object-store-service-for-binary-data)

This PR removes `storeMetadata` and `getSize` from the binary data
manager interface, as these are specific to filesystem mode. Also this
disambiguates identifiers:

```
binaryDataId
filesystem:289b4aac51e-dac6-4167-b793-6d5c415e2b47 {mode}:{fileId}

fileId - FS
289b4aac51e-dac6-4167-b793-6d5c415e2b47 {executionId}{uuid}

fileId - S3
/workflows/{workflowId}/executions/{executionId}/binary_data/b4aac51e-dac6-4167-b793-6d5c415e2b47
```

Note: The object store changes originally in this PR were extracted out
into the final PR.

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Iván Ovejero 2023-09-25 10:07:06 +02:00 committed by GitHub
parent 6d6e2488c6
commit dcc9cc13ed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 143 additions and 132 deletions

View file

@ -5,12 +5,13 @@ import { Service } from 'typedi';
import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow'; import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow';
import { FileSystemManager } from './FileSystem.manager'; import { FileSystemManager } from './FileSystem.manager';
import { InvalidBinaryDataManagerError, InvalidBinaryDataModeError, areValidModes } from './utils'; import { UnknownBinaryDataManager, InvalidBinaryDataMode } from './errors';
import { LogCatch } from '../decorators/LogCatch.decorator';
import { areValidModes } from './utils';
import type { Readable } from 'stream'; import type { Readable } from 'stream';
import type { BinaryData } from './types'; import type { BinaryData } from './types';
import type { INodeExecutionData } from 'n8n-workflow'; import type { INodeExecutionData } from 'n8n-workflow';
import { LogCatch } from '../decorators/LogCatch.decorator';
@Service() @Service()
export class BinaryDataService { export class BinaryDataService {
@ -21,7 +22,7 @@ export class BinaryDataService {
private managers: Record<string, BinaryData.Manager> = {}; private managers: Record<string, BinaryData.Manager> = {};
async init(config: BinaryData.Config) { async init(config: BinaryData.Config) {
if (!areValidModes(config.availableModes)) throw new InvalidBinaryDataModeError(); if (!areValidModes(config.availableModes)) throw new InvalidBinaryDataMode();
this.availableModes = config.availableModes; this.availableModes = config.availableModes;
this.mode = config.mode; this.mode = config.mode;
@ -45,47 +46,39 @@ export class BinaryDataService {
return binaryData; return binaryData;
} }
const identifier = await manager.copyByPath(path, executionId); const { fileId, fileSize } = await manager.copyByFilePath(path, executionId, {
binaryData.id = this.createIdentifier(identifier);
binaryData.data = this.mode; // clear binary data from memory
const fileSize = await manager.getSize(identifier);
binaryData.fileSize = prettyBytes(fileSize);
await manager.storeMetadata(identifier, {
fileName: binaryData.fileName, fileName: binaryData.fileName,
mimeType: binaryData.mimeType, mimeType: binaryData.mimeType,
fileSize,
}); });
binaryData.id = this.createBinaryDataId(fileId);
binaryData.fileSize = prettyBytes(fileSize);
binaryData.data = this.mode; // clear binary data from memory
return binaryData; return binaryData;
} }
@LogCatch((error) => Logger.error('Failed to write binary data file', { error })) @LogCatch((error) => Logger.error('Failed to write binary data file', { error }))
async store(binaryData: IBinaryData, input: Buffer | Readable, executionId: string) { async store(binaryData: IBinaryData, bufferOrStream: Buffer | Readable, executionId: string) {
const manager = this.managers[this.mode]; const manager = this.managers[this.mode];
if (!manager) { if (!manager) {
const buffer = await this.binaryToBuffer(input); const buffer = await this.binaryToBuffer(bufferOrStream);
binaryData.data = buffer.toString(BINARY_ENCODING); binaryData.data = buffer.toString(BINARY_ENCODING);
binaryData.fileSize = prettyBytes(buffer.length); binaryData.fileSize = prettyBytes(buffer.length);
return binaryData; return binaryData;
} }
const identifier = await manager.store(input, executionId); const { fileId, fileSize } = await manager.store(bufferOrStream, executionId, {
binaryData.id = this.createIdentifier(identifier);
binaryData.data = this.mode; // clear binary data from memory
const fileSize = await manager.getSize(identifier);
binaryData.fileSize = prettyBytes(fileSize);
await manager.storeMetadata(identifier, {
fileName: binaryData.fileName, fileName: binaryData.fileName,
mimeType: binaryData.mimeType, mimeType: binaryData.mimeType,
fileSize,
}); });
binaryData.id = this.createBinaryDataId(fileId);
binaryData.fileSize = prettyBytes(fileSize);
binaryData.data = this.mode; // clear binary data from memory
return binaryData; return binaryData;
} }
@ -96,34 +89,32 @@ export class BinaryDataService {
}); });
} }
getAsStream(identifier: string, chunkSize?: number) { getAsStream(binaryDataId: string, chunkSize?: number) {
const { mode, id } = this.splitBinaryModeFileId(identifier); const [mode, fileId] = binaryDataId.split(':');
return this.getManager(mode).getStream(id, chunkSize); return this.getManager(mode).getAsStream(fileId, chunkSize);
} }
async getBinaryDataBuffer(binaryData: IBinaryData) { async getAsBuffer(binaryData: IBinaryData) {
if (binaryData.id) return this.retrieveBinaryDataByIdentifier(binaryData.id); if (binaryData.id) {
const [mode, fileId] = binaryData.id.split(':');
return this.getManager(mode).getAsBuffer(fileId);
}
return Buffer.from(binaryData.data, BINARY_ENCODING); return Buffer.from(binaryData.data, BINARY_ENCODING);
} }
async retrieveBinaryDataByIdentifier(identifier: string) { getPath(binaryDataId: string) {
const { mode, id } = this.splitBinaryModeFileId(identifier); const [mode, fileId] = binaryDataId.split(':');
return this.getManager(mode).getBuffer(id); return this.getManager(mode).getPath(fileId);
} }
getPath(identifier: string) { async getMetadata(binaryDataId: string) {
const { mode, id } = this.splitBinaryModeFileId(identifier); const [mode, fileId] = binaryDataId.split(':');
return this.getManager(mode).getPath(id); return this.getManager(mode).getMetadata(fileId);
}
async getMetadata(identifier: string) {
const { mode, id } = this.splitBinaryModeFileId(identifier);
return this.getManager(mode).getMetadata(id);
} }
async deleteManyByExecutionIds(executionIds: string[]) { async deleteManyByExecutionIds(executionIds: string[]) {
@ -167,14 +158,11 @@ export class BinaryDataService {
// private methods // private methods
// ---------------------------------- // ----------------------------------
private createIdentifier(filename: string) { /**
return `${this.mode}:${filename}`; * Create an identifier `${mode}:{fileId}` for `IBinaryData['id']`.
} */
private createBinaryDataId(fileId: string) {
private splitBinaryModeFileId(fileId: string) { return `${this.mode}:${fileId}`;
const [mode, id] = fileId.split(':');
return { mode, id };
} }
private async duplicateBinaryDataInExecData( private async duplicateBinaryDataInExecData(
@ -195,12 +183,12 @@ export class BinaryDataService {
return { key, newId: undefined }; return { key, newId: undefined };
} }
return manager const [_mode, fileId] = binaryDataId.split(':');
?.copyByIdentifier(this.splitBinaryModeFileId(binaryDataId).id, executionId)
.then((filename) => ({ return manager?.copyByFileId(fileId, executionId).then((newFileId) => ({
newId: this.createIdentifier(filename), newId: this.createBinaryDataId(newFileId),
key, key,
})); }));
}); });
return Promise.all(bdPromises).then((b) => { return Promise.all(bdPromises).then((b) => {
@ -222,6 +210,6 @@ export class BinaryDataService {
if (manager) return manager; if (manager) return manager;
throw new InvalidBinaryDataManagerError(mode); throw new UnknownBinaryDataManager(mode);
} }
} }

View file

@ -5,9 +5,9 @@ import { v4 as uuid } from 'uuid';
import { jsonParse } from 'n8n-workflow'; import { jsonParse } from 'n8n-workflow';
import { FileNotFoundError } from '../errors'; import { FileNotFoundError } from '../errors';
import { ensureDirExists } from './utils';
import type { Readable } from 'stream'; import type { Readable } from 'stream';
import type { BinaryMetadata } from 'n8n-workflow';
import type { BinaryData } from './types'; import type { BinaryData } from './types';
const EXECUTION_ID_EXTRACTOR = const EXECUTION_ID_EXTRACTOR =
@ -17,15 +17,15 @@ export class FileSystemManager implements BinaryData.Manager {
constructor(private storagePath: string) {} constructor(private storagePath: string) {}
async init() { async init() {
await this.ensureDirExists(this.storagePath); await ensureDirExists(this.storagePath);
} }
getPath(identifier: string) { getPath(fileId: string) {
return this.resolvePath(identifier); return this.resolvePath(fileId);
} }
async getSize(identifier: string) { async getSize(fileId: string) {
const filePath = this.getPath(identifier); const filePath = this.getPath(fileId);
try { try {
const stats = await fs.stat(filePath); const stats = await fs.stat(filePath);
@ -35,14 +35,14 @@ export class FileSystemManager implements BinaryData.Manager {
} }
} }
getStream(identifier: string, chunkSize?: number) { getAsStream(fileId: string, chunkSize?: number) {
const filePath = this.getPath(identifier); const filePath = this.getPath(fileId);
return createReadStream(filePath, { highWaterMark: chunkSize }); return createReadStream(filePath, { highWaterMark: chunkSize });
} }
async getBuffer(identifier: string) { async getAsBuffer(fileId: string) {
const filePath = this.getPath(identifier); const filePath = this.getPath(fileId);
try { try {
return await fs.readFile(filePath); return await fs.readFile(filePath);
@ -51,29 +51,31 @@ export class FileSystemManager implements BinaryData.Manager {
} }
} }
async storeMetadata(identifier: string, metadata: BinaryMetadata) { async getMetadata(fileId: string): Promise<BinaryData.Metadata> {
const filePath = this.resolvePath(`${identifier}.metadata`); const filePath = this.resolvePath(`${fileId}.metadata`);
await fs.writeFile(filePath, JSON.stringify(metadata), { encoding: 'utf-8' });
}
async getMetadata(identifier: string): Promise<BinaryMetadata> {
const filePath = this.resolvePath(`${identifier}.metadata`);
return jsonParse(await fs.readFile(filePath, { encoding: 'utf-8' })); return jsonParse(await fs.readFile(filePath, { encoding: 'utf-8' }));
} }
async store(binaryData: Buffer | Readable, executionId: string) { async store(
const identifier = this.createIdentifier(executionId); binaryData: Buffer | Readable,
const filePath = this.getPath(identifier); executionId: string,
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
) {
const fileId = this.createFileId(executionId);
const filePath = this.getPath(fileId);
await fs.writeFile(filePath, binaryData); await fs.writeFile(filePath, binaryData);
return identifier; const fileSize = await this.getSize(fileId);
await this.storeMetadata(fileId, { mimeType, fileName, fileSize });
return { fileId, fileSize };
} }
async deleteOne(identifier: string) { async deleteOne(fileId: string) {
const filePath = this.getPath(identifier); const filePath = this.getPath(fileId);
return fs.rm(filePath); return fs.rm(filePath);
} }
@ -98,35 +100,35 @@ export class FileSystemManager implements BinaryData.Manager {
return deletedIds; return deletedIds;
} }
async copyByPath(filePath: string, executionId: string) { async copyByFilePath(
const identifier = this.createIdentifier(executionId); filePath: string,
executionId: string,
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
) {
const newFileId = this.createFileId(executionId);
await fs.cp(filePath, this.getPath(identifier)); await fs.cp(filePath, this.getPath(newFileId));
return identifier; const fileSize = await this.getSize(newFileId);
await this.storeMetadata(newFileId, { mimeType, fileName, fileSize });
return { fileId: newFileId, fileSize };
} }
async copyByIdentifier(identifier: string, executionId: string) { async copyByFileId(fileId: string, executionId: string) {
const newIdentifier = this.createIdentifier(executionId); const newFileId = this.createFileId(executionId);
await fs.copyFile(this.resolvePath(identifier), this.resolvePath(newIdentifier)); await fs.copyFile(this.resolvePath(fileId), this.resolvePath(newFileId));
return newIdentifier; return newFileId;
} }
// ---------------------------------- // ----------------------------------
// private methods // private methods
// ---------------------------------- // ----------------------------------
private async ensureDirExists(dir: string) { private createFileId(executionId: string) {
try {
await fs.access(dir);
} catch {
await fs.mkdir(dir, { recursive: true });
}
}
private createIdentifier(executionId: string) {
return [executionId, uuid()].join(''); return [executionId, uuid()].join('');
} }
@ -139,4 +141,10 @@ export class FileSystemManager implements BinaryData.Manager {
return returnPath; return returnPath;
} }
private async storeMetadata(fileId: string, metadata: BinaryData.Metadata) {
const filePath = this.resolvePath(`${fileId}.metadata`);
await fs.writeFile(filePath, JSON.stringify(metadata), { encoding: 'utf-8' });
}
} }

View file

@ -0,0 +1,13 @@
import { BINARY_DATA_MODES } from './utils';
export class InvalidBinaryDataMode extends Error {
constructor() {
super(`Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`);
}
}
export class UnknownBinaryDataManager extends Error {
constructor(mode: string) {
super(`No binary data manager found for: ${mode}`);
}
}

View file

@ -1,5 +1,4 @@
import type { Readable } from 'stream'; import type { Readable } from 'stream';
import type { BinaryMetadata } from 'n8n-workflow';
import type { BINARY_DATA_MODES } from './utils'; import type { BINARY_DATA_MODES } from './utils';
export namespace BinaryData { export namespace BinaryData {
@ -11,31 +10,39 @@ export namespace BinaryData {
localStoragePath: string; localStoragePath: string;
}; };
export type Metadata = {
fileName?: string;
mimeType?: string;
fileSize: number;
};
export type PreWriteMetadata = Omit<Metadata, 'fileSize'>;
export interface Manager { export interface Manager {
init(): Promise<void>; init(): Promise<void>;
store(binaryData: Buffer | Readable, executionId: string): Promise<string>; store(
getPath(identifier: string): string; binaryData: Buffer | Readable,
executionId: string,
preStoreMetadata: PreWriteMetadata,
): Promise<{ fileId: string; fileSize: number }>;
// @TODO: Refactor to use identifier getPath(fileId: string): string;
getSize(path: string): Promise<number>; getAsBuffer(fileId: string): Promise<Buffer>;
getAsStream(fileId: string, chunkSize?: number): Readable;
getBuffer(identifier: string): Promise<Buffer>; getMetadata(fileId: string): Promise<Metadata>;
getStream(identifier: string, chunkSize?: number): Readable;
// @TODO: Refactor out - not needed for object storage
storeMetadata(identifier: string, metadata: BinaryMetadata): Promise<void>;
// @TODO: Refactor out - not needed for object storage
getMetadata(identifier: string): Promise<BinaryMetadata>;
// @TODO: Refactor to also use `workflowId` to support full path-like identifier: // @TODO: Refactor to also use `workflowId` to support full path-like identifier:
// `workflows/{workflowId}/executions/{executionId}/binary_data/{fileId}` // `workflows/{workflowId}/executions/{executionId}/binary_data/{fileId}`
copyByPath(path: string, executionId: string): Promise<string>; copyByFilePath(
path: string,
executionId: string,
metadata: PreWriteMetadata,
): Promise<{ fileId: string; fileSize: number }>;
copyByIdentifier(identifier: string, prefix: string): Promise<string>; copyByFileId(fileId: string, prefix: string): Promise<string>;
deleteOne(identifier: string): Promise<void>; deleteOne(fileId: string): Promise<void>;
// @TODO: Refactor to also receive `workflowId` to support full path-like identifier: // @TODO: Refactor to also receive `workflowId` to support full path-like identifier:
// `workflows/{workflowId}/executions/{executionId}/binary_data/{fileId}` // `workflows/{workflowId}/executions/{executionId}/binary_data/{fileId}`

View file

@ -1,3 +1,4 @@
import fs from 'fs/promises';
import type { BinaryData } from './types'; import type { BinaryData } from './types';
/** /**
@ -12,14 +13,10 @@ export function areValidModes(modes: string[]): modes is BinaryData.Mode[] {
return modes.every((m) => BINARY_DATA_MODES.includes(m as BinaryData.Mode)); return modes.every((m) => BINARY_DATA_MODES.includes(m as BinaryData.Mode));
} }
export class InvalidBinaryDataModeError extends Error { export async function ensureDirExists(dir: string) {
constructor() { try {
super(`Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`); await fs.access(dir);
} } catch {
} await fs.mkdir(dir, { recursive: true });
export class InvalidBinaryDataManagerError extends Error {
constructor(mode: string) {
super('No binary data manager found for mode: ' + mode);
} }
} }

View file

@ -39,7 +39,6 @@ import pick from 'lodash/pick';
import { extension, lookup } from 'mime-types'; import { extension, lookup } from 'mime-types';
import type { import type {
BinaryHelperFunctions, BinaryHelperFunctions,
BinaryMetadata,
FieldType, FieldType,
FileSystemHelperFunctions, FileSystemHelperFunctions,
FunctionsBase, FunctionsBase,
@ -140,6 +139,7 @@ import {
import { getSecretsProxy } from './Secrets'; import { getSecretsProxy } from './Secrets';
import { getUserN8nFolderPath } from './UserSettings'; import { getUserN8nFolderPath } from './UserSettings';
import Container from 'typedi'; import Container from 'typedi';
import type { BinaryData } from './BinaryData/types';
axios.defaults.timeout = 300000; axios.defaults.timeout = 300000;
// Prevent axios from adding x-form-www-urlencoded headers by default // Prevent axios from adding x-form-www-urlencoded headers by default
@ -947,7 +947,7 @@ export function getBinaryPath(binaryDataId: string): string {
/** /**
* Returns binary file metadata * Returns binary file metadata
*/ */
export async function getBinaryMetadata(binaryDataId: string): Promise<BinaryMetadata> { export async function getBinaryMetadata(binaryDataId: string): Promise<BinaryData.Metadata> {
return Container.get(BinaryDataService).getMetadata(binaryDataId); return Container.get(BinaryDataService).getMetadata(binaryDataId);
} }
@ -992,7 +992,7 @@ export async function getBinaryDataBuffer(
inputIndex: number, inputIndex: number,
): Promise<Buffer> { ): Promise<Buffer> {
const binaryData = inputData.main[inputIndex]![itemIndex]!.binary![propertyName]!; const binaryData = inputData.main[inputIndex]![itemIndex]!.binary![propertyName]!;
return Container.get(BinaryDataService).getBinaryDataBuffer(binaryData); return Container.get(BinaryDataService).getAsBuffer(binaryData);
} }
/** /**

View file

@ -48,12 +48,6 @@ export interface IBinaryData {
id?: string; id?: string;
} }
export interface BinaryMetadata {
fileName?: string;
mimeType?: string;
fileSize: number;
}
// All properties in this interface except for // All properties in this interface except for
// "includeCredentialsOnRefreshOnBody" will get // "includeCredentialsOnRefreshOnBody" will get
// removed once we add the OAuth2 hooks to the // removed once we add the OAuth2 hooks to the
@ -694,7 +688,11 @@ export interface BinaryHelperFunctions {
binaryToBuffer(body: Buffer | Readable): Promise<Buffer>; binaryToBuffer(body: Buffer | Readable): Promise<Buffer>;
getBinaryPath(binaryDataId: string): string; getBinaryPath(binaryDataId: string): string;
getBinaryStream(binaryDataId: string, chunkSize?: number): Readable; getBinaryStream(binaryDataId: string, chunkSize?: number): Readable;
getBinaryMetadata(binaryDataId: string): Promise<BinaryMetadata>; getBinaryMetadata(binaryDataId: string): Promise<{
fileName?: string;
mimeType?: string;
fileSize: number;
}>;
} }
export interface NodeHelperFunctions { export interface NodeHelperFunctions {