refactor(core): Include workflow ID in binary data writes (no-changelog) (#7220)

Depends on: https://github.com/n8n-io/n8n/pull/7195 | Story:
[PAY-837](https://linear.app/n8n/issue/PAY-837/implement-object-store-manager-for-binary-data)

This PR includes `workflowId` in binary data writes so that the S3
manager can support this filepath structure
`/workflows/{workflowId}/executions/{executionId}/binaryData/{binaryFilename}`
to easily delete binary data for workflows. Also all binary data service
and manager methods that take `workflowId` and `executionId` are made
consistent in arg order.

Note: `workflowId` is included in filesystem mode for compatibility with
the common interface, but `workflowId` will remain unused by filesystem
mode until we decide to restructure how this mode stores data.

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Iván Ovejero 2023-09-25 18:04:52 +02:00 committed by GitHub
parent 75541e91f2
commit 77d6e3fc07
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 121 additions and 70 deletions

View file

@ -157,7 +157,7 @@ export class ActiveWebhooks {
*
*/
async removeWorkflow(workflow: Workflow): Promise<boolean> {
const workflowId = workflow.id!.toString();
const workflowId = workflow.id;
if (this.workflowWebhooks[workflowId] === undefined) {
// If it did not exist then there is nothing to remove

View file

@ -417,7 +417,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
}
try {
await this.removeWorkflowWebhooks(workflow.id as string);
await this.removeWorkflowWebhooks(workflow.id);
} catch (error1) {
ErrorReporter.error(error1);
Logger.error(

View file

@ -492,7 +492,7 @@ export class WorkflowsService {
// Workflow is saved so update in database
try {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
await WorkflowsService.saveStaticDataById(workflow.id!, workflow.staticData);
await WorkflowsService.saveStaticDataById(workflow.id, workflow.staticData);
workflow.staticData.__dataChanged = false;
} catch (error) {
ErrorReporter.error(error);

View file

@ -100,7 +100,7 @@ jest.mock('@/Db', () => {
find: jest.fn(async () => generateWorkflows(databaseActiveWorkflowsCount)),
findOne: jest.fn(async (searchParams) => {
return databaseActiveWorkflowsList.find(
(workflow) => workflow.id.toString() === searchParams.where.id.toString(),
(workflow) => workflow.id === searchParams.where.id.toString(),
);
}),
update: jest.fn(),

View file

@ -35,21 +35,33 @@ export class BinaryDataService {
}
@LogCatch((error) => Logger.error('Failed to copy binary data file', { error }))
async copyBinaryFile(binaryData: IBinaryData, path: string, executionId: string) {
async copyBinaryFile(
workflowId: string,
executionId: string,
binaryData: IBinaryData,
filePath: string,
) {
const manager = this.managers[this.mode];
if (!manager) {
const { size } = await stat(path);
const { size } = await stat(filePath);
binaryData.fileSize = prettyBytes(size);
binaryData.data = await readFile(path, { encoding: BINARY_ENCODING });
binaryData.data = await readFile(filePath, { encoding: BINARY_ENCODING });
return binaryData;
}
const { fileId, fileSize } = await manager.copyByFilePath(path, executionId, {
const metadata = {
fileName: binaryData.fileName,
mimeType: binaryData.mimeType,
});
};
const { fileId, fileSize } = await manager.copyByFilePath(
workflowId,
executionId,
filePath,
metadata,
);
binaryData.id = this.createBinaryDataId(fileId);
binaryData.fileSize = prettyBytes(fileSize);
@ -59,7 +71,12 @@ export class BinaryDataService {
}
@LogCatch((error) => Logger.error('Failed to write binary data file', { error }))
async store(binaryData: IBinaryData, bufferOrStream: Buffer | Readable, executionId: string) {
async store(
workflowId: string,
executionId: string,
bufferOrStream: Buffer | Readable,
binaryData: IBinaryData,
) {
const manager = this.managers[this.mode];
if (!manager) {
@ -70,10 +87,17 @@ export class BinaryDataService {
return binaryData;
}
const { fileId, fileSize } = await manager.store(bufferOrStream, executionId, {
const metadata = {
fileName: binaryData.fileName,
mimeType: binaryData.mimeType,
});
};
const { fileId, fileSize } = await manager.store(
workflowId,
executionId,
bufferOrStream,
metadata,
);
binaryData.id = this.createBinaryDataId(fileId);
binaryData.fileSize = prettyBytes(fileSize);
@ -128,7 +152,11 @@ export class BinaryDataService {
@LogCatch((error) =>
Logger.error('Failed to copy all binary data files for execution', { error }),
)
async duplicateBinaryData(inputData: Array<INodeExecutionData[] | null>, executionId: string) {
async duplicateBinaryData(
workflowId: string,
executionId: string,
inputData: Array<INodeExecutionData[] | null>,
) {
if (inputData && this.managers[this.mode]) {
const returnInputData = (inputData as INodeExecutionData[][]).map(
async (executionDataArray) => {
@ -136,7 +164,7 @@ export class BinaryDataService {
return Promise.all(
executionDataArray.map(async (executionData) => {
if (executionData.binary) {
return this.duplicateBinaryDataInExecData(executionData, executionId);
return this.duplicateBinaryDataInExecData(workflowId, executionId, executionData);
}
return executionData;
@ -174,8 +202,9 @@ export class BinaryDataService {
}
private async duplicateBinaryDataInExecData(
executionData: INodeExecutionData,
workflowId: string,
executionId: string,
executionData: INodeExecutionData,
) {
const manager = this.managers[this.mode];
@ -193,7 +222,7 @@ export class BinaryDataService {
const [_mode, fileId] = binaryDataId.split(':');
return manager?.copyByFileId(fileId, executionId).then((newFileId) => ({
return manager?.copyByFileId(workflowId, executionId, fileId).then((newFileId) => ({
newId: this.createBinaryDataId(newFileId),
key,
}));

View file

@ -1,3 +1,9 @@
/**
* @tech_debt The `workflowId` arguments on write are for compatibility with the
* `BinaryData.Manager` interface. Unused in filesystem mode until we refactor
* how we store binary data files in the `/binaryData` dir.
*/
import { createReadStream } from 'fs';
import fs from 'fs/promises';
import path from 'path';
@ -25,17 +31,6 @@ export class FileSystemManager implements BinaryData.Manager {
return this.resolvePath(fileId);
}
async getSize(fileId: string) {
const filePath = this.getPath(fileId);
try {
const stats = await fs.stat(filePath);
return stats.size;
} catch (error) {
throw new Error('Failed to find binary data file in filesystem', { cause: error });
}
}
async getAsStream(fileId: string, chunkSize?: number) {
const filePath = this.getPath(fileId);
@ -59,14 +54,15 @@ export class FileSystemManager implements BinaryData.Manager {
}
async store(
binaryData: Buffer | Readable,
_workflowId: string,
executionId: string,
bufferOrStream: Buffer | Readable,
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
) {
const fileId = this.createFileId(executionId);
const filePath = this.getPath(fileId);
await fs.writeFile(filePath, binaryData);
await fs.writeFile(filePath, bufferOrStream);
const fileSize = await this.getSize(fileId);
@ -102,8 +98,9 @@ export class FileSystemManager implements BinaryData.Manager {
}
async copyByFilePath(
filePath: string,
_workflowId: string,
executionId: string,
filePath: string,
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
) {
const newFileId = this.createFileId(executionId);
@ -117,7 +114,7 @@ export class FileSystemManager implements BinaryData.Manager {
return { fileId: newFileId, fileSize };
}
async copyByFileId(fileId: string, executionId: string) {
async copyByFileId(_workflowId: string, executionId: string, fileId: string) {
const newFileId = this.createFileId(executionId);
await fs.copyFile(this.resolvePath(fileId), this.resolvePath(newFileId));
@ -158,4 +155,15 @@ export class FileSystemManager implements BinaryData.Manager {
await fs.writeFile(filePath, JSON.stringify(metadata), { encoding: 'utf-8' });
}
private async getSize(fileId: string) {
const filePath = this.getPath(fileId);
try {
const stats = await fs.stat(filePath);
return stats.size;
} catch (error) {
throw new Error('Failed to find binary data file in filesystem', { cause: error });
}
}
}

View file

@ -16,36 +16,34 @@ export namespace BinaryData {
fileSize: number;
};
export type WriteResult = { fileId: string; fileSize: number };
export type PreWriteMetadata = Omit<Metadata, 'fileSize'>;
export interface Manager {
init(): Promise<void>;
store(
binaryData: Buffer | Readable,
workflowId: string,
executionId: string,
preStoreMetadata: PreWriteMetadata,
): Promise<{ fileId: string; fileSize: number }>;
bufferOrStream: Buffer | Readable,
metadata: PreWriteMetadata,
): Promise<WriteResult>;
getPath(fileId: string): string;
getAsBuffer(fileId: string): Promise<Buffer>;
getAsStream(fileId: string, chunkSize?: number): Promise<Readable>;
getMetadata(fileId: string): Promise<Metadata>;
// @TODO: Refactor to also use `workflowId` to support full path-like identifier:
// `workflows/{workflowId}/executions/{executionId}/binary_data/{fileId}`
copyByFileId(workflowId: string, executionId: string, fileId: string): Promise<string>;
copyByFilePath(
path: string,
workflowId: string,
executionId: string,
filePath: string,
metadata: PreWriteMetadata,
): Promise<{ fileId: string; fileSize: number }>;
copyByFileId(fileId: string, prefix: string): Promise<string>;
): Promise<WriteResult>;
deleteOne(fileId: string): Promise<void>;
// @TODO: Refactor to also receive `workflowId` to support full path-like identifier:
// `workflows/{workflowId}/executions/{executionId}/binary_data/{fileId}`
deleteManyByExecutionIds(executionIds: string[]): Promise<string[]>;
rename(oldFileId: string, newFileId: string): Promise<void>;

View file

@ -999,19 +999,26 @@ export async function getBinaryDataBuffer(
* Store an incoming IBinaryData & related buffer using the configured binary data manager.
*
* @export
* @param {IBinaryData} data
* @param {Buffer | Readable} binaryData
* @param {IBinaryData} binaryData
* @param {Buffer | Readable} bufferOrStream
* @returns {Promise<IBinaryData>}
*/
export async function setBinaryDataBuffer(
data: IBinaryData,
binaryData: Buffer | Readable,
binaryData: IBinaryData,
bufferOrStream: Buffer | Readable,
workflowId: string,
executionId: string,
): Promise<IBinaryData> {
return Container.get(BinaryDataService).store(data, binaryData, executionId);
return Container.get(BinaryDataService).store(
workflowId,
executionId,
bufferOrStream,
binaryData,
);
}
export async function copyBinaryFile(
workflowId: string,
executionId: string,
filePath: string,
fileName: string,
@ -1061,7 +1068,12 @@ export async function copyBinaryFile(
returnData.fileName = path.parse(filePath).base;
}
return Container.get(BinaryDataService).copyBinaryFile(returnData, filePath, executionId);
return Container.get(BinaryDataService).copyBinaryFile(
workflowId,
executionId,
returnData,
filePath,
);
}
/**
@ -1071,6 +1083,7 @@ export async function copyBinaryFile(
async function prepareBinaryData(
binaryData: Buffer | Readable,
executionId: string,
workflowId: string,
filePath?: string,
mimeType?: string,
): Promise<IBinaryData> {
@ -1152,7 +1165,7 @@ async function prepareBinaryData(
}
}
return setBinaryDataBuffer(returnData, binaryData, executionId);
return setBinaryDataBuffer(returnData, binaryData, workflowId, executionId);
}
/**
@ -2324,7 +2337,7 @@ export function getNodeWebhookUrl(
undefined,
false,
) as boolean;
return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id!, node, path.toString(), isFullPath);
return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id, node, path.toString(), isFullPath);
}
/**
@ -2560,25 +2573,27 @@ const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions =>
},
});
const getNodeHelperFunctions = ({
executionId,
}: IWorkflowExecuteAdditionalData): NodeHelperFunctions => ({
const getNodeHelperFunctions = (
{ executionId }: IWorkflowExecuteAdditionalData,
workflowId: string,
): NodeHelperFunctions => ({
copyBinaryFile: async (filePath, fileName, mimeType) =>
copyBinaryFile(executionId!, filePath, fileName, mimeType),
copyBinaryFile(workflowId, executionId!, filePath, fileName, mimeType),
});
const getBinaryHelperFunctions = ({
executionId,
}: IWorkflowExecuteAdditionalData): BinaryHelperFunctions => ({
const getBinaryHelperFunctions = (
{ executionId }: IWorkflowExecuteAdditionalData,
workflowId: string,
): BinaryHelperFunctions => ({
getBinaryPath,
getBinaryStream,
getBinaryMetadata,
binaryToBuffer: async (body: Buffer | Readable) =>
Container.get(BinaryDataService).binaryToBuffer(body),
prepareBinaryData: async (binaryData, filePath, mimeType) =>
prepareBinaryData(binaryData, executionId!, filePath, mimeType),
prepareBinaryData(binaryData, executionId!, workflowId, filePath, mimeType),
setBinaryDataBuffer: async (data, binaryData) =>
setBinaryDataBuffer(data, binaryData, executionId!),
setBinaryDataBuffer(data, binaryData, workflowId, executionId!),
copyBinaryFile: async () => {
throw new Error('copyBinaryFile has been removed. Please upgrade this node');
},
@ -2638,7 +2653,7 @@ export function getExecutePollFunctions(
helpers: {
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
returnJsonArray,
},
};
@ -2697,7 +2712,7 @@ export function getExecuteTriggerFunctions(
helpers: {
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
returnJsonArray,
},
};
@ -2763,8 +2778,9 @@ export function getExecuteFunctions(
})
.then(async (result) =>
Container.get(BinaryDataService).duplicateBinaryData(
result,
workflow.id,
additionalData.executionId!,
result,
),
);
},
@ -2872,7 +2888,7 @@ export function getExecuteFunctions(
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getFileSystemHelperFunctions(node),
...getBinaryHelperFunctions(additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
assertBinaryData: (itemIndex, propertyName) =>
assertBinaryData(inputData, node, itemIndex, propertyName, 0),
getBinaryDataBuffer: async (itemIndex, propertyName) =>
@ -2882,7 +2898,7 @@ export function getExecuteFunctions(
normalizeItems,
constructExecutionMetaData,
},
nodeHelpers: getNodeHelperFunctions(additionalData),
nodeHelpers: getNodeHelperFunctions(additionalData, workflow.id),
};
})(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions;
}
@ -3014,7 +3030,7 @@ export function getExecuteSingleFunctions(
helpers: {
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
assertBinaryData: (propertyName, inputIndex = 0) =>
assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex),
@ -3271,10 +3287,10 @@ export function getExecuteWebhookFunctions(
helpers: {
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
returnJsonArray,
},
nodeHelpers: getNodeHelperFunctions(additionalData),
nodeHelpers: getNodeHelperFunctions(additionalData, workflow.id),
};
})(workflow, node);
}

View file

@ -56,7 +56,7 @@ function dedupe<T>(arr: T[]): T[] {
}
export class Workflow {
id: string | undefined;
id: string;
name: string | undefined;
@ -92,7 +92,7 @@ export class Workflow {
settings?: IWorkflowSettings;
pinData?: IPinData;
}) {
this.id = parameters.id;
this.id = parameters.id as string;
this.name = parameters.name;
this.nodeTypes = parameters.nodeTypes;
this.pinData = parameters.pinData;