mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-13 05:47:31 -08:00
feat(core): Integrate object store as binary data manager (#7253)
Depends on: #7225 | Story: [PAY-848](https://linear.app/n8n/issue/PAY-848) This PR integrates the object store service as a new binary data manager for Enterprise.
This commit is contained in:
parent
e5ad1e7e4d
commit
1a661e6d00
|
@ -15,6 +15,7 @@ import Container, { Service } from 'typedi';
|
||||||
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
|
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
|
||||||
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
|
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
|
||||||
import { RedisService } from './services/redis.service';
|
import { RedisService } from './services/redis.service';
|
||||||
|
import { ObjectStoreService } from 'n8n-core';
|
||||||
|
|
||||||
type FeatureReturnType = Partial<
|
type FeatureReturnType = Partial<
|
||||||
{
|
{
|
||||||
|
@ -103,6 +104,18 @@ export class License {
|
||||||
command: 'reloadLicense',
|
command: 'reloadLicense',
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const isS3Selected = config.getEnv('binaryDataManager.mode') === 's3';
|
||||||
|
const isS3Available = config.getEnv('binaryDataManager.availableModes').includes('s3');
|
||||||
|
const isS3Licensed = _features['feat:binaryDataS3'];
|
||||||
|
|
||||||
|
if (isS3Selected && isS3Available && !isS3Licensed) {
|
||||||
|
this.logger.debug(
|
||||||
|
'License changed with no support for external storage - blocking writes on object store. To restore writes, please upgrade to a license that supports this feature.',
|
||||||
|
);
|
||||||
|
|
||||||
|
Container.get(ObjectStoreService).setReadonly(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async saveCertStr(value: TLicenseBlock): Promise<void> {
|
async saveCertStr(value: TLicenseBlock): Promise<void> {
|
||||||
|
|
|
@ -1446,28 +1446,39 @@ export class Server extends AbstractServer {
|
||||||
// Binary data
|
// Binary data
|
||||||
// ----------------------------------------
|
// ----------------------------------------
|
||||||
|
|
||||||
// Download binary
|
// View or download binary file
|
||||||
this.app.get(
|
this.app.get(
|
||||||
`/${this.restEndpoint}/data/:path`,
|
`/${this.restEndpoint}/data`,
|
||||||
async (req: BinaryDataRequest, res: express.Response): Promise<void> => {
|
async (req: BinaryDataRequest, res: express.Response): Promise<void> => {
|
||||||
// TODO UM: check if this needs permission check for UM
|
const { id: binaryDataId, action } = req.query;
|
||||||
const identifier = req.params.path;
|
let { fileName, mimeType } = req.query;
|
||||||
|
const [mode] = binaryDataId.split(':') as ['filesystem' | 's3', string];
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const binaryPath = this.binaryDataService.getPath(identifier);
|
const binaryPath = this.binaryDataService.getPath(binaryDataId);
|
||||||
let { mode, fileName, mimeType } = req.query;
|
|
||||||
if (!fileName || !mimeType) {
|
if (!fileName || !mimeType) {
|
||||||
try {
|
try {
|
||||||
const metadata = await this.binaryDataService.getMetadata(identifier);
|
const metadata = await this.binaryDataService.getMetadata(binaryDataId);
|
||||||
fileName = metadata.fileName;
|
fileName = metadata.fileName;
|
||||||
mimeType = metadata.mimeType;
|
mimeType = metadata.mimeType;
|
||||||
res.setHeader('Content-Length', metadata.fileSize);
|
res.setHeader('Content-Length', metadata.fileSize);
|
||||||
} catch {}
|
} catch {}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mimeType) res.setHeader('Content-Type', mimeType);
|
if (mimeType) res.setHeader('Content-Type', mimeType);
|
||||||
if (mode === 'download') {
|
|
||||||
|
if (action === 'download') {
|
||||||
res.setHeader('Content-Disposition', `attachment; filename="${fileName}"`);
|
res.setHeader('Content-Disposition', `attachment; filename="${fileName}"`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (mode === 's3') {
|
||||||
|
const readStream = await this.binaryDataService.getAsStream(binaryDataId);
|
||||||
|
readStream.pipe(res);
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
res.sendFile(binaryPath);
|
res.sendFile(binaryPath);
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error instanceof FileNotFoundError) res.writeHead(404).end();
|
if (error instanceof FileNotFoundError) res.writeHead(404).end();
|
||||||
else throw error;
|
else throw error;
|
||||||
|
|
|
@ -485,7 +485,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
|
||||||
workflowId: this.workflowData.id,
|
workflowId: this.workflowData.id,
|
||||||
});
|
});
|
||||||
|
|
||||||
if (this.mode === 'webhook' && config.getEnv('binaryDataManager.mode') === 'filesystem') {
|
if (this.mode === 'webhook' && config.getEnv('binaryDataManager.mode') !== 'default') {
|
||||||
await restoreBinaryDataId(fullRunData, this.executionId);
|
await restoreBinaryDataId(fullRunData, this.executionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,13 +3,13 @@ import { ExitError } from '@oclif/errors';
|
||||||
import { Container } from 'typedi';
|
import { Container } from 'typedi';
|
||||||
import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow';
|
import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow';
|
||||||
import type { IUserSettings } from 'n8n-core';
|
import type { IUserSettings } from 'n8n-core';
|
||||||
import { BinaryDataService, UserSettings } from 'n8n-core';
|
import { BinaryDataService, ObjectStoreService, UserSettings } from 'n8n-core';
|
||||||
import type { AbstractServer } from '@/AbstractServer';
|
import type { AbstractServer } from '@/AbstractServer';
|
||||||
import { getLogger } from '@/Logger';
|
import { getLogger } from '@/Logger';
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import * as Db from '@/Db';
|
import * as Db from '@/Db';
|
||||||
import * as CrashJournal from '@/CrashJournal';
|
import * as CrashJournal from '@/CrashJournal';
|
||||||
import { inTest } from '@/constants';
|
import { LICENSE_FEATURES, inTest } from '@/constants';
|
||||||
import { CredentialTypes } from '@/CredentialTypes';
|
import { CredentialTypes } from '@/CredentialTypes';
|
||||||
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
|
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
|
||||||
import { initErrorHandling } from '@/ErrorReporting';
|
import { initErrorHandling } from '@/ErrorReporting';
|
||||||
|
@ -125,7 +125,119 @@ export abstract class BaseCommand extends Command {
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async initObjectStoreService() {
|
||||||
|
const isSelected = config.getEnv('binaryDataManager.mode') === 's3';
|
||||||
|
const isAvailable = config.getEnv('binaryDataManager.availableModes').includes('s3');
|
||||||
|
|
||||||
|
if (!isSelected && !isAvailable) return;
|
||||||
|
|
||||||
|
if (isSelected && !isAvailable) {
|
||||||
|
throw new Error(
|
||||||
|
'External storage selected but unavailable. Please make external storage available by adding "s3" to `N8N_AVAILABLE_BINARY_DATA_MODES`.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const isLicensed = Container.get(License).isFeatureEnabled(LICENSE_FEATURES.BINARY_DATA_S3);
|
||||||
|
|
||||||
|
if (isSelected && isAvailable && isLicensed) {
|
||||||
|
LoggerProxy.debug(
|
||||||
|
'License found for external storage - object store to init in read-write mode',
|
||||||
|
);
|
||||||
|
|
||||||
|
await this._initObjectStoreService();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isSelected && isAvailable && !isLicensed) {
|
||||||
|
LoggerProxy.debug(
|
||||||
|
'No license found for external storage - object store to init with writes blocked. To enable writes, please upgrade to a license that supports this feature.',
|
||||||
|
);
|
||||||
|
|
||||||
|
await this._initObjectStoreService({ isReadOnly: true });
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isSelected && isAvailable) {
|
||||||
|
LoggerProxy.debug(
|
||||||
|
'External storage unselected but available - object store to init with writes unused',
|
||||||
|
);
|
||||||
|
|
||||||
|
await this._initObjectStoreService();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async _initObjectStoreService(options = { isReadOnly: false }) {
|
||||||
|
const objectStoreService = Container.get(ObjectStoreService);
|
||||||
|
|
||||||
|
const host = config.getEnv('externalStorage.s3.host');
|
||||||
|
|
||||||
|
if (host === '') {
|
||||||
|
throw new Error(
|
||||||
|
'External storage host not configured. Please set `N8N_EXTERNAL_STORAGE_S3_HOST`.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const bucket = {
|
||||||
|
name: config.getEnv('externalStorage.s3.bucket.name'),
|
||||||
|
region: config.getEnv('externalStorage.s3.bucket.region'),
|
||||||
|
};
|
||||||
|
|
||||||
|
if (bucket.name === '') {
|
||||||
|
throw new Error(
|
||||||
|
'External storage bucket name not configured. Please set `N8N_EXTERNAL_STORAGE_S3_BUCKET_NAME`.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bucket.region === '') {
|
||||||
|
throw new Error(
|
||||||
|
'External storage bucket region not configured. Please set `N8N_EXTERNAL_STORAGE_S3_BUCKET_REGION`.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const credentials = {
|
||||||
|
accessKey: config.getEnv('externalStorage.s3.credentials.accessKey'),
|
||||||
|
accessSecret: config.getEnv('externalStorage.s3.credentials.accessSecret'),
|
||||||
|
};
|
||||||
|
|
||||||
|
if (credentials.accessKey === '') {
|
||||||
|
throw new Error(
|
||||||
|
'External storage access key not configured. Please set `N8N_EXTERNAL_STORAGE_S3_ACCESS_KEY`.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (credentials.accessSecret === '') {
|
||||||
|
throw new Error(
|
||||||
|
'External storage access secret not configured. Please set `N8N_EXTERNAL_STORAGE_S3_ACCESS_SECRET`.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
LoggerProxy.debug('Initializing object store service');
|
||||||
|
|
||||||
|
try {
|
||||||
|
await objectStoreService.init(host, bucket, credentials);
|
||||||
|
objectStoreService.setReadonly(options.isReadOnly);
|
||||||
|
|
||||||
|
LoggerProxy.debug('Object store init completed');
|
||||||
|
} catch (e) {
|
||||||
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||||
|
|
||||||
|
LoggerProxy.debug('Object store init failed', { error });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async initBinaryDataService() {
|
async initBinaryDataService() {
|
||||||
|
try {
|
||||||
|
await this.initObjectStoreService();
|
||||||
|
} catch (e) {
|
||||||
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||||
|
LoggerProxy.error(`Failed to init object store: ${error.message}`, { error });
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
const binaryDataConfig = config.getEnv('binaryDataManager');
|
const binaryDataConfig = config.getEnv('binaryDataManager');
|
||||||
await Container.get(BinaryDataService).init(binaryDataConfig);
|
await Container.get(BinaryDataService).init(binaryDataConfig);
|
||||||
}
|
}
|
||||||
|
|
|
@ -908,7 +908,7 @@ export const schema = {
|
||||||
doc: 'Available modes of binary data storage, as comma separated strings',
|
doc: 'Available modes of binary data storage, as comma separated strings',
|
||||||
},
|
},
|
||||||
mode: {
|
mode: {
|
||||||
format: ['default', 'filesystem'] as const,
|
format: ['default', 'filesystem', 's3'] as const,
|
||||||
default: 'default',
|
default: 'default',
|
||||||
env: 'N8N_DEFAULT_BINARY_DATA_MODE',
|
env: 'N8N_DEFAULT_BINARY_DATA_MODE',
|
||||||
doc: 'Storage mode for binary data',
|
doc: 'Storage mode for binary data',
|
||||||
|
@ -921,6 +921,45 @@ export const schema = {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
externalStorage: {
|
||||||
|
s3: {
|
||||||
|
host: {
|
||||||
|
format: String,
|
||||||
|
default: '',
|
||||||
|
env: 'N8N_EXTERNAL_STORAGE_S3_HOST',
|
||||||
|
doc: 'Host of the n8n bucket in S3-compatible external storage, e.g. `s3.us-east-1.amazonaws.com`',
|
||||||
|
},
|
||||||
|
bucket: {
|
||||||
|
name: {
|
||||||
|
format: String,
|
||||||
|
default: '',
|
||||||
|
env: 'N8N_EXTERNAL_STORAGE_S3_BUCKET_NAME',
|
||||||
|
doc: 'Name of the n8n bucket in S3-compatible external storage',
|
||||||
|
},
|
||||||
|
region: {
|
||||||
|
format: String,
|
||||||
|
default: '',
|
||||||
|
env: 'N8N_EXTERNAL_STORAGE_S3_BUCKET_REGION',
|
||||||
|
doc: 'Region of the n8n bucket in S3-compatible external storage, e.g. `us-east-1`',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
credentials: {
|
||||||
|
accessKey: {
|
||||||
|
format: String,
|
||||||
|
default: '',
|
||||||
|
env: 'N8N_EXTERNAL_STORAGE_S3_ACCESS_KEY',
|
||||||
|
doc: 'Access key in S3-compatible external storage',
|
||||||
|
},
|
||||||
|
accessSecret: {
|
||||||
|
format: String,
|
||||||
|
default: '',
|
||||||
|
env: 'N8N_EXTERNAL_STORAGE_S3_ACCESS_SECRET',
|
||||||
|
doc: 'Access secret in S3-compatible external storage',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
deployment: {
|
deployment: {
|
||||||
type: {
|
type: {
|
||||||
format: String,
|
format: String,
|
||||||
|
|
|
@ -81,6 +81,7 @@ export const LICENSE_FEATURES = {
|
||||||
SHOW_NON_PROD_BANNER: 'feat:showNonProdBanner',
|
SHOW_NON_PROD_BANNER: 'feat:showNonProdBanner',
|
||||||
WORKFLOW_HISTORY: 'feat:workflowHistory',
|
WORKFLOW_HISTORY: 'feat:workflowHistory',
|
||||||
DEBUG_IN_EDITOR: 'feat:debugInEditor',
|
DEBUG_IN_EDITOR: 'feat:debugInEditor',
|
||||||
|
BINARY_DATA_S3: 'feat:binaryDataS3',
|
||||||
} as const;
|
} as const;
|
||||||
|
|
||||||
export const LICENSE_QUOTAS = {
|
export const LICENSE_QUOTAS = {
|
||||||
|
|
|
@ -68,6 +68,7 @@ export class E2EController {
|
||||||
[LICENSE_FEATURES.SHOW_NON_PROD_BANNER]: false,
|
[LICENSE_FEATURES.SHOW_NON_PROD_BANNER]: false,
|
||||||
[LICENSE_FEATURES.WORKFLOW_HISTORY]: false,
|
[LICENSE_FEATURES.WORKFLOW_HISTORY]: false,
|
||||||
[LICENSE_FEATURES.DEBUG_IN_EDITOR]: false,
|
[LICENSE_FEATURES.DEBUG_IN_EDITOR]: false,
|
||||||
|
[LICENSE_FEATURES.BINARY_DATA_S3]: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
|
|
|
@ -1,13 +1,14 @@
|
||||||
import Container from 'typedi';
|
import Container from 'typedi';
|
||||||
import { BinaryDataService } from 'n8n-core';
|
import { BinaryDataService } from 'n8n-core';
|
||||||
import type { IRun } from 'n8n-workflow';
|
import type { IRun } from 'n8n-workflow';
|
||||||
|
import type { BinaryData } from 'n8n-core';
|
||||||
|
|
||||||
export function isMissingExecutionId(binaryDataId: string) {
|
export function isMissingExecutionId(
|
||||||
const UUID_CHAR_LENGTH = 36;
|
fileId: string,
|
||||||
|
mode: BinaryData.NonDefaultMode,
|
||||||
return [UUID_CHAR_LENGTH + 'filesystem:'.length, UUID_CHAR_LENGTH + 's3:'.length].some(
|
uuidV4CharLength = 36,
|
||||||
(incorrectLength) => binaryDataId.length === incorrectLength,
|
) {
|
||||||
);
|
return mode === 'filesystem' ? uuidV4CharLength === fileId.length : fileId.includes('/temp/');
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -19,6 +20,9 @@ export function isMissingExecutionId(binaryDataId: string) {
|
||||||
* ```txt
|
* ```txt
|
||||||
* filesystem:11869055-83c4-4493-876a-9092c4708b9b ->
|
* filesystem:11869055-83c4-4493-876a-9092c4708b9b ->
|
||||||
* filesystem:39011869055-83c4-4493-876a-9092c4708b9b
|
* filesystem:39011869055-83c4-4493-876a-9092c4708b9b
|
||||||
|
*
|
||||||
|
* s3:workflows/123/executions/temp/binary_data/69055-83c4-4493-876a-9092c4708b9b ->
|
||||||
|
* s3:workflows/123/executions/390/binary_data/69055-83c4-4493-876a-9092c4708b9b
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
export async function restoreBinaryDataId(run: IRun, executionId: string) {
|
export async function restoreBinaryDataId(run: IRun, executionId: string) {
|
||||||
|
@ -27,14 +31,19 @@ export async function restoreBinaryDataId(run: IRun, executionId: string) {
|
||||||
const promises = Object.keys(runData).map(async (nodeName) => {
|
const promises = Object.keys(runData).map(async (nodeName) => {
|
||||||
const binaryDataId = runData[nodeName]?.[0]?.data?.main?.[0]?.[0]?.binary?.data.id;
|
const binaryDataId = runData[nodeName]?.[0]?.data?.main?.[0]?.[0]?.binary?.data.id;
|
||||||
|
|
||||||
if (!binaryDataId || !isMissingExecutionId(binaryDataId)) return;
|
if (!binaryDataId) return;
|
||||||
|
|
||||||
|
const [mode, fileId] = binaryDataId.split(':') as [BinaryData.NonDefaultMode, string];
|
||||||
|
|
||||||
|
if (!isMissingExecutionId(fileId, mode)) return;
|
||||||
|
|
||||||
|
const correctFileId =
|
||||||
|
mode === 'filesystem' ? `${executionId}${fileId}` : fileId.replace('temp', executionId);
|
||||||
|
|
||||||
|
await Container.get(BinaryDataService).rename(fileId, correctFileId);
|
||||||
|
|
||||||
const [mode, incorrectFileId] = binaryDataId.split(':');
|
|
||||||
const correctFileId = `${executionId}${incorrectFileId}`;
|
|
||||||
const correctBinaryDataId = `${mode}:${correctFileId}`;
|
const correctBinaryDataId = `${mode}:${correctFileId}`;
|
||||||
|
|
||||||
await Container.get(BinaryDataService).rename(incorrectFileId, correctFileId);
|
|
||||||
|
|
||||||
// @ts-expect-error Validated at the top
|
// @ts-expect-error Validated at the top
|
||||||
run.data.resultData.runData[nodeName][0].data.main[0][0].binary.data.id = correctBinaryDataId;
|
run.data.resultData.runData[nodeName][0].data.main[0][0].binary.data.id = correctBinaryDataId;
|
||||||
});
|
});
|
||||||
|
|
|
@ -492,11 +492,12 @@ export declare namespace LicenseRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
export type BinaryDataRequest = AuthenticatedRequest<
|
export type BinaryDataRequest = AuthenticatedRequest<
|
||||||
{ path: string },
|
{},
|
||||||
{},
|
{},
|
||||||
{},
|
{},
|
||||||
{
|
{
|
||||||
mode: 'view' | 'download';
|
id: string;
|
||||||
|
action: 'view' | 'download';
|
||||||
fileName?: string;
|
fileName?: string;
|
||||||
mimeType?: string;
|
mimeType?: string;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname });
|
||||||
beforeAll(async () => {
|
beforeAll(async () => {
|
||||||
LoggerProxy.init(getLogger());
|
LoggerProxy.init(getLogger());
|
||||||
config.set('executions.mode', 'queue');
|
config.set('executions.mode', 'queue');
|
||||||
|
config.set('binaryDataManager.availableModes', 'filesystem');
|
||||||
mockInstance(Telemetry);
|
mockInstance(Telemetry);
|
||||||
mockInstance(PostHogClient);
|
mockInstance(PostHogClient);
|
||||||
mockInstance(InternalHooks);
|
mockInstance(InternalHooks);
|
||||||
|
|
|
@ -74,11 +74,13 @@ export async function initNodeTypes() {
|
||||||
/**
|
/**
|
||||||
* Initialize a BinaryDataService for test runs.
|
* Initialize a BinaryDataService for test runs.
|
||||||
*/
|
*/
|
||||||
export async function initBinaryDataService() {
|
export async function initBinaryDataService(mode: 'default' | 'filesystem' = 'default') {
|
||||||
const binaryDataService = new BinaryDataService();
|
const binaryDataService = new BinaryDataService();
|
||||||
|
await binaryDataService.init({
|
||||||
await binaryDataService.init(config.getEnv('binaryDataManager'));
|
mode,
|
||||||
|
availableModes: [mode],
|
||||||
|
localStoragePath: '',
|
||||||
|
});
|
||||||
Container.set(BinaryDataService, binaryDataService);
|
Container.set(BinaryDataService, binaryDataService);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ import { restoreBinaryDataId } from '@/executionLifecycleHooks/restoreBinaryData
|
||||||
import { BinaryDataService } from 'n8n-core';
|
import { BinaryDataService } from 'n8n-core';
|
||||||
import { mockInstance } from '../integration/shared/utils/mocking';
|
import { mockInstance } from '../integration/shared/utils/mocking';
|
||||||
import type { IRun } from 'n8n-workflow';
|
import type { IRun } from 'n8n-workflow';
|
||||||
|
import config from '@/config';
|
||||||
|
|
||||||
function toIRun(item?: object) {
|
function toIRun(item?: object) {
|
||||||
return {
|
return {
|
||||||
|
@ -27,10 +28,15 @@ function getDataId(run: IRun, kind: 'binary' | 'json') {
|
||||||
return run.data.resultData.runData.myNode[0].data.main[0][0][kind].data.id;
|
return run.data.resultData.runData.myNode[0].data.main[0][0][kind].data.id;
|
||||||
}
|
}
|
||||||
|
|
||||||
describe('restoreBinaryDataId()', () => {
|
const binaryDataService = mockInstance(BinaryDataService);
|
||||||
const binaryDataService = mockInstance(BinaryDataService);
|
|
||||||
|
|
||||||
beforeEach(() => {
|
describe('on filesystem mode', () => {
|
||||||
|
describe('restoreBinaryDataId()', () => {
|
||||||
|
beforeAll(() => {
|
||||||
|
config.set('binaryDataManager.mode', 'filesystem');
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
jest.clearAllMocks();
|
jest.clearAllMocks();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -84,6 +90,80 @@ describe('restoreBinaryDataId()', () => {
|
||||||
expect(binaryDataService.rename).not.toHaveBeenCalled();
|
expect(binaryDataService.rename).not.toHaveBeenCalled();
|
||||||
expect(getDataId(run, 'json')).toBe(dataId);
|
expect(getDataId(run, 'json')).toBe(dataId);
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('on s3 mode', () => {
|
||||||
|
describe('restoreBinaryDataId()', () => {
|
||||||
|
beforeAll(() => {
|
||||||
|
config.set('binaryDataManager.mode', 's3');
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
jest.clearAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should restore if binary data ID is missing execution ID', async () => {
|
||||||
|
const workflowId = '6HYhhKmJch2cYxGj';
|
||||||
|
const executionId = 'temp';
|
||||||
|
const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6';
|
||||||
|
|
||||||
|
const incorrectFileId = `workflows/${workflowId}/executions/temp/binary_data/${binaryDataFileUuid}`;
|
||||||
|
|
||||||
|
const run = toIRun({
|
||||||
|
binary: {
|
||||||
|
data: { id: `s3:${incorrectFileId}` },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await restoreBinaryDataId(run, executionId);
|
||||||
|
|
||||||
|
const correctFileId = incorrectFileId.replace('temp', executionId);
|
||||||
|
const correctBinaryDataId = `s3:${correctFileId}`;
|
||||||
|
|
||||||
|
expect(binaryDataService.rename).toHaveBeenCalledWith(incorrectFileId, correctFileId);
|
||||||
|
expect(getDataId(run, 'binary')).toBe(correctBinaryDataId);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should do nothing if binary data ID is not missing execution ID', async () => {
|
||||||
|
const workflowId = '6HYhhKmJch2cYxGj';
|
||||||
|
const executionId = '999';
|
||||||
|
const binaryDataFileUuid = 'a5c3f1ed-9d59-4155-bc68-9a370b3c51f6';
|
||||||
|
|
||||||
|
const fileId = `workflows/${workflowId}/executions/${executionId}/binary_data/${binaryDataFileUuid}`;
|
||||||
|
|
||||||
|
const binaryDataId = `s3:${fileId}`;
|
||||||
|
|
||||||
|
const run = toIRun({
|
||||||
|
binary: {
|
||||||
|
data: {
|
||||||
|
id: binaryDataId,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await restoreBinaryDataId(run, executionId);
|
||||||
|
|
||||||
|
expect(binaryDataService.rename).not.toHaveBeenCalled();
|
||||||
|
expect(getDataId(run, 'binary')).toBe(binaryDataId);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should do nothing if no binary data ID', async () => {
|
||||||
|
const executionId = '999';
|
||||||
|
const dataId = '123';
|
||||||
|
|
||||||
|
const run = toIRun({
|
||||||
|
json: {
|
||||||
|
data: { id: dataId },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await restoreBinaryDataId(run, executionId);
|
||||||
|
|
||||||
|
expect(binaryDataService.rename).not.toHaveBeenCalled();
|
||||||
|
expect(getDataId(run, 'json')).toBe(dataId);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it('should do nothing on itemless case', async () => {
|
it('should do nothing on itemless case', async () => {
|
||||||
const executionId = '999';
|
const executionId = '999';
|
||||||
|
|
|
@ -1,13 +1,12 @@
|
||||||
/* eslint-disable @typescript-eslint/naming-convention */
|
/* eslint-disable @typescript-eslint/naming-convention */
|
||||||
|
|
||||||
import { readFile, stat } from 'fs/promises';
|
import { readFile, stat } from 'node:fs/promises';
|
||||||
import prettyBytes from 'pretty-bytes';
|
import prettyBytes from 'pretty-bytes';
|
||||||
import { Service } from 'typedi';
|
import Container, { Service } from 'typedi';
|
||||||
import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow';
|
import { BINARY_ENCODING, LoggerProxy as Logger, IBinaryData } from 'n8n-workflow';
|
||||||
|
import { UnknownBinaryDataManagerError, InvalidBinaryDataModeError } from './errors';
|
||||||
import { UnknownBinaryDataManager, InvalidBinaryDataMode } from './errors';
|
|
||||||
import { LogCatch } from '../decorators/LogCatch.decorator';
|
|
||||||
import { areValidModes, toBuffer } from './utils';
|
import { areValidModes, toBuffer } from './utils';
|
||||||
|
import { LogCatch } from '../decorators/LogCatch.decorator';
|
||||||
|
|
||||||
import type { Readable } from 'stream';
|
import type { Readable } from 'stream';
|
||||||
import type { BinaryData } from './types';
|
import type { BinaryData } from './types';
|
||||||
|
@ -20,16 +19,28 @@ export class BinaryDataService {
|
||||||
private managers: Record<string, BinaryData.Manager> = {};
|
private managers: Record<string, BinaryData.Manager> = {};
|
||||||
|
|
||||||
async init(config: BinaryData.Config) {
|
async init(config: BinaryData.Config) {
|
||||||
if (!areValidModes(config.availableModes)) throw new InvalidBinaryDataMode();
|
if (!areValidModes(config.availableModes)) {
|
||||||
|
throw new InvalidBinaryDataModeError();
|
||||||
|
}
|
||||||
|
|
||||||
this.mode = config.mode;
|
this.mode = config.mode;
|
||||||
|
|
||||||
if (config.availableModes.includes('filesystem')) {
|
if (config.availableModes.includes('filesystem')) {
|
||||||
const { FileSystemManager } = await import('./FileSystem.manager');
|
const { FileSystemManager } = await import('./FileSystem.manager');
|
||||||
|
|
||||||
this.managers.filesystem = new FileSystemManager(config.localStoragePath);
|
this.managers.filesystem = new FileSystemManager(config.localStoragePath);
|
||||||
|
|
||||||
await this.managers.filesystem.init();
|
await this.managers.filesystem.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (config.availableModes.includes('s3')) {
|
||||||
|
const { ObjectStoreManager } = await import('./ObjectStore.manager');
|
||||||
|
const { ObjectStoreService } = await import('../ObjectStore/ObjectStore.service.ee');
|
||||||
|
|
||||||
|
this.managers.s3 = new ObjectStoreManager(Container.get(ObjectStoreService));
|
||||||
|
|
||||||
|
await this.managers.s3.init();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@LogCatch((error) => Logger.error('Failed to copy binary data file', { error }))
|
@LogCatch((error) => Logger.error('Failed to copy binary data file', { error }))
|
||||||
|
@ -242,6 +253,6 @@ export class BinaryDataService {
|
||||||
|
|
||||||
if (manager) return manager;
|
if (manager) return manager;
|
||||||
|
|
||||||
throw new UnknownBinaryDataManager(mode);
|
throw new UnknownBinaryDataManagerError(mode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,18 +1,10 @@
|
||||||
/**
|
import { createReadStream } from 'node:fs';
|
||||||
* @tech_debt The `workflowId` arguments on write are for compatibility with the
|
import fs from 'node:fs/promises';
|
||||||
* `BinaryData.Manager` interface. Unused in filesystem mode until we refactor
|
import path from 'node:path';
|
||||||
* how we store binary data files in the `/binaryData` dir.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import { createReadStream } from 'fs';
|
|
||||||
import fs from 'fs/promises';
|
|
||||||
import path from 'path';
|
|
||||||
import { v4 as uuid } from 'uuid';
|
import { v4 as uuid } from 'uuid';
|
||||||
import { jsonParse } from 'n8n-workflow';
|
import { jsonParse } from 'n8n-workflow';
|
||||||
import { rename } from 'node:fs/promises';
|
|
||||||
|
|
||||||
import { FileNotFoundError } from '../errors';
|
|
||||||
import { ensureDirExists } from './utils';
|
import { ensureDirExists } from './utils';
|
||||||
|
import { FileNotFoundError } from '../errors';
|
||||||
|
|
||||||
import type { Readable } from 'stream';
|
import type { Readable } from 'stream';
|
||||||
import type { BinaryData } from './types';
|
import type { BinaryData } from './types';
|
||||||
|
@ -27,18 +19,36 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||||
await ensureDirExists(this.storagePath);
|
await ensureDirExists(this.storagePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async store(
|
||||||
|
workflowId: string,
|
||||||
|
executionId: string,
|
||||||
|
bufferOrStream: Buffer | Readable,
|
||||||
|
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
|
||||||
|
) {
|
||||||
|
const fileId = this.toFileId(workflowId, executionId);
|
||||||
|
const filePath = this.resolvePath(fileId);
|
||||||
|
|
||||||
|
await fs.writeFile(filePath, bufferOrStream);
|
||||||
|
|
||||||
|
const fileSize = await this.getSize(fileId);
|
||||||
|
|
||||||
|
await this.storeMetadata(fileId, { mimeType, fileName, fileSize });
|
||||||
|
|
||||||
|
return { fileId, fileSize };
|
||||||
|
}
|
||||||
|
|
||||||
getPath(fileId: string) {
|
getPath(fileId: string) {
|
||||||
return this.resolvePath(fileId);
|
return this.resolvePath(fileId);
|
||||||
}
|
}
|
||||||
|
|
||||||
async getAsStream(fileId: string, chunkSize?: number) {
|
async getAsStream(fileId: string, chunkSize?: number) {
|
||||||
const filePath = this.getPath(fileId);
|
const filePath = this.resolvePath(fileId);
|
||||||
|
|
||||||
return createReadStream(filePath, { highWaterMark: chunkSize });
|
return createReadStream(filePath, { highWaterMark: chunkSize });
|
||||||
}
|
}
|
||||||
|
|
||||||
async getAsBuffer(fileId: string) {
|
async getAsBuffer(fileId: string) {
|
||||||
const filePath = this.getPath(fileId);
|
const filePath = this.resolvePath(fileId);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await fs.readFile(filePath);
|
return await fs.readFile(filePath);
|
||||||
|
@ -53,30 +63,6 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||||
return jsonParse(await fs.readFile(filePath, { encoding: 'utf-8' }));
|
return jsonParse(await fs.readFile(filePath, { encoding: 'utf-8' }));
|
||||||
}
|
}
|
||||||
|
|
||||||
async store(
|
|
||||||
_workflowId: string,
|
|
||||||
executionId: string,
|
|
||||||
bufferOrStream: Buffer | Readable,
|
|
||||||
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
|
|
||||||
) {
|
|
||||||
const fileId = this.toFileId(executionId);
|
|
||||||
const filePath = this.getPath(fileId);
|
|
||||||
|
|
||||||
await fs.writeFile(filePath, bufferOrStream);
|
|
||||||
|
|
||||||
const fileSize = await this.getSize(fileId);
|
|
||||||
|
|
||||||
await this.storeMetadata(fileId, { mimeType, fileName, fileSize });
|
|
||||||
|
|
||||||
return { fileId, fileSize };
|
|
||||||
}
|
|
||||||
|
|
||||||
async deleteOne(fileId: string) {
|
|
||||||
const filePath = this.getPath(fileId);
|
|
||||||
|
|
||||||
return fs.rm(filePath);
|
|
||||||
}
|
|
||||||
|
|
||||||
async deleteMany(ids: BinaryData.IdsForDeletion) {
|
async deleteMany(ids: BinaryData.IdsForDeletion) {
|
||||||
const executionIds = ids.map((o) => o.executionId);
|
const executionIds = ids.map((o) => o.executionId);
|
||||||
|
|
||||||
|
@ -95,24 +81,25 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
async copyByFilePath(
|
async copyByFilePath(
|
||||||
_workflowId: string,
|
workflowId: string,
|
||||||
executionId: string,
|
executionId: string,
|
||||||
filePath: string,
|
sourcePath: string,
|
||||||
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
|
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
|
||||||
) {
|
) {
|
||||||
const newFileId = this.toFileId(executionId);
|
const targetFileId = this.toFileId(workflowId, executionId);
|
||||||
|
const targetPath = this.resolvePath(targetFileId);
|
||||||
|
|
||||||
await fs.cp(filePath, this.getPath(newFileId));
|
await fs.cp(sourcePath, targetPath);
|
||||||
|
|
||||||
const fileSize = await this.getSize(newFileId);
|
const fileSize = await this.getSize(targetFileId);
|
||||||
|
|
||||||
await this.storeMetadata(newFileId, { mimeType, fileName, fileSize });
|
await this.storeMetadata(targetFileId, { mimeType, fileName, fileSize });
|
||||||
|
|
||||||
return { fileId: newFileId, fileSize };
|
return { fileId: targetFileId, fileSize };
|
||||||
}
|
}
|
||||||
|
|
||||||
async copyByFileId(_workflowId: string, executionId: string, sourceFileId: string) {
|
async copyByFileId(workflowId: string, executionId: string, sourceFileId: string) {
|
||||||
const targetFileId = this.toFileId(executionId);
|
const targetFileId = this.toFileId(workflowId, executionId);
|
||||||
const sourcePath = this.resolvePath(sourceFileId);
|
const sourcePath = this.resolvePath(sourceFileId);
|
||||||
const targetPath = this.resolvePath(targetFileId);
|
const targetPath = this.resolvePath(targetFileId);
|
||||||
|
|
||||||
|
@ -122,12 +109,12 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
async rename(oldFileId: string, newFileId: string) {
|
async rename(oldFileId: string, newFileId: string) {
|
||||||
const oldPath = this.getPath(oldFileId);
|
const oldPath = this.resolvePath(oldFileId);
|
||||||
const newPath = this.getPath(newFileId);
|
const newPath = this.resolvePath(newFileId);
|
||||||
|
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
rename(oldPath, newPath),
|
fs.rename(oldPath, newPath),
|
||||||
rename(`${oldPath}.metadata`, `${newPath}.metadata`),
|
fs.rename(`${oldPath}.metadata`, `${newPath}.metadata`),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +122,12 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||||
// private methods
|
// private methods
|
||||||
// ----------------------------------
|
// ----------------------------------
|
||||||
|
|
||||||
private toFileId(executionId: string) {
|
/**
|
||||||
|
* @tech_debt The `workflowId` argument is for compatibility with the
|
||||||
|
* `BinaryData.Manager` interface. Unused here until we refactor
|
||||||
|
* how we store binary data files in the `/binaryData` dir.
|
||||||
|
*/
|
||||||
|
private toFileId(_workflowId: string, executionId: string) {
|
||||||
return [executionId, uuid()].join('');
|
return [executionId, uuid()].join('');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,7 +148,7 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async getSize(fileId: string) {
|
private async getSize(fileId: string) {
|
||||||
const filePath = this.getPath(fileId);
|
const filePath = this.resolvePath(fileId);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const stats = await fs.stat(filePath);
|
const stats = await fs.stat(filePath);
|
||||||
|
|
120
packages/core/src/BinaryData/ObjectStore.manager.ts
Normal file
120
packages/core/src/BinaryData/ObjectStore.manager.ts
Normal file
|
@ -0,0 +1,120 @@
|
||||||
|
import fs from 'node:fs/promises';
|
||||||
|
import { Service } from 'typedi';
|
||||||
|
import { v4 as uuid } from 'uuid';
|
||||||
|
import { toBuffer } from './utils';
|
||||||
|
import { ObjectStoreService } from '../ObjectStore/ObjectStore.service.ee';
|
||||||
|
|
||||||
|
import type { Readable } from 'node:stream';
|
||||||
|
import type { BinaryData } from './types';
|
||||||
|
|
||||||
|
@Service()
|
||||||
|
export class ObjectStoreManager implements BinaryData.Manager {
|
||||||
|
constructor(private readonly objectStoreService: ObjectStoreService) {}
|
||||||
|
|
||||||
|
async init() {
|
||||||
|
await this.objectStoreService.checkConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
async store(
|
||||||
|
workflowId: string,
|
||||||
|
executionId: string,
|
||||||
|
bufferOrStream: Buffer | Readable,
|
||||||
|
metadata: BinaryData.PreWriteMetadata,
|
||||||
|
) {
|
||||||
|
const fileId = this.toFileId(workflowId, executionId);
|
||||||
|
const buffer = await this.toBuffer(bufferOrStream);
|
||||||
|
|
||||||
|
await this.objectStoreService.put(fileId, buffer, metadata);
|
||||||
|
|
||||||
|
return { fileId, fileSize: buffer.length };
|
||||||
|
}
|
||||||
|
|
||||||
|
getPath(fileId: string) {
|
||||||
|
return fileId; // already full path, no transform needed
|
||||||
|
}
|
||||||
|
|
||||||
|
async getAsBuffer(fileId: string) {
|
||||||
|
return this.objectStoreService.get(fileId, { mode: 'buffer' });
|
||||||
|
}
|
||||||
|
|
||||||
|
async getAsStream(fileId: string) {
|
||||||
|
return this.objectStoreService.get(fileId, { mode: 'stream' });
|
||||||
|
}
|
||||||
|
|
||||||
|
async getMetadata(fileId: string): Promise<BinaryData.Metadata> {
|
||||||
|
const {
|
||||||
|
'content-length': contentLength,
|
||||||
|
'content-type': contentType,
|
||||||
|
'x-amz-meta-filename': fileName,
|
||||||
|
} = await this.objectStoreService.getMetadata(fileId);
|
||||||
|
|
||||||
|
const metadata: BinaryData.Metadata = { fileSize: Number(contentLength) };
|
||||||
|
|
||||||
|
if (contentType) metadata.mimeType = contentType;
|
||||||
|
if (fileName) metadata.fileName = fileName;
|
||||||
|
|
||||||
|
return metadata;
|
||||||
|
}
|
||||||
|
|
||||||
|
async copyByFileId(workflowId: string, executionId: string, sourceFileId: string) {
|
||||||
|
const targetFileId = this.toFileId(workflowId, executionId);
|
||||||
|
|
||||||
|
const sourceFile = await this.objectStoreService.get(sourceFileId, { mode: 'buffer' });
|
||||||
|
|
||||||
|
await this.objectStoreService.put(targetFileId, sourceFile);
|
||||||
|
|
||||||
|
return targetFileId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copy to object store the temp file written by nodes like Webhook, FTP, and SSH.
|
||||||
|
*/
|
||||||
|
async copyByFilePath(
|
||||||
|
workflowId: string,
|
||||||
|
executionId: string,
|
||||||
|
sourcePath: string,
|
||||||
|
metadata: BinaryData.PreWriteMetadata,
|
||||||
|
) {
|
||||||
|
const targetFileId = this.toFileId(workflowId, executionId);
|
||||||
|
const sourceFile = await fs.readFile(sourcePath);
|
||||||
|
|
||||||
|
await this.objectStoreService.put(targetFileId, sourceFile, metadata);
|
||||||
|
|
||||||
|
return { fileId: targetFileId, fileSize: sourceFile.length };
|
||||||
|
}
|
||||||
|
|
||||||
|
async deleteMany(ids: BinaryData.IdsForDeletion) {
|
||||||
|
const prefixes = ids.map(
|
||||||
|
({ workflowId, executionId }) =>
|
||||||
|
`workflows/${workflowId}/executions/${executionId}/binary_data/`,
|
||||||
|
);
|
||||||
|
|
||||||
|
await Promise.all(
|
||||||
|
prefixes.map(async (prefix) => {
|
||||||
|
await this.objectStoreService.deleteMany(prefix);
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async rename(oldFileId: string, newFileId: string) {
|
||||||
|
const oldFile = await this.objectStoreService.get(oldFileId, { mode: 'buffer' });
|
||||||
|
const oldFileMetadata = await this.objectStoreService.getMetadata(oldFileId);
|
||||||
|
|
||||||
|
await this.objectStoreService.put(newFileId, oldFile, oldFileMetadata);
|
||||||
|
await this.objectStoreService.deleteOne(oldFileId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----------------------------------
|
||||||
|
// private methods
|
||||||
|
// ----------------------------------
|
||||||
|
|
||||||
|
private toFileId(workflowId: string, executionId: string) {
|
||||||
|
if (!executionId) executionId = 'temp'; // missing only in edge case, see PR #7244
|
||||||
|
|
||||||
|
return `workflows/${workflowId}/executions/${executionId}/binary_data/${uuid()}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async toBuffer(bufferOrStream: Buffer | Readable) {
|
||||||
|
return toBuffer(bufferOrStream);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,12 +1,10 @@
|
||||||
import { BINARY_DATA_MODES } from './utils';
|
import { BINARY_DATA_MODES } from './utils';
|
||||||
|
|
||||||
export class InvalidBinaryDataMode extends Error {
|
export class InvalidBinaryDataModeError extends Error {
|
||||||
constructor() {
|
message = `Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`;
|
||||||
super(`Invalid binary data mode. Valid modes: ${BINARY_DATA_MODES.join(', ')}`);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export class UnknownBinaryDataManager extends Error {
|
export class UnknownBinaryDataManagerError extends Error {
|
||||||
constructor(mode: string) {
|
constructor(mode: string) {
|
||||||
super(`No binary data manager found for: ${mode}`);
|
super(`No binary data manager found for: ${mode}`);
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,8 +4,10 @@ import type { BINARY_DATA_MODES } from './utils';
|
||||||
export namespace BinaryData {
|
export namespace BinaryData {
|
||||||
export type Mode = (typeof BINARY_DATA_MODES)[number];
|
export type Mode = (typeof BINARY_DATA_MODES)[number];
|
||||||
|
|
||||||
|
export type NonDefaultMode = Exclude<Mode, 'default'>;
|
||||||
|
|
||||||
export type Config = {
|
export type Config = {
|
||||||
mode: 'default' | 'filesystem';
|
mode: Mode;
|
||||||
availableModes: string[];
|
availableModes: string[];
|
||||||
localStoragePath: string;
|
localStoragePath: string;
|
||||||
};
|
};
|
||||||
|
@ -37,17 +39,16 @@ export namespace BinaryData {
|
||||||
getAsStream(fileId: string, chunkSize?: number): Promise<Readable>;
|
getAsStream(fileId: string, chunkSize?: number): Promise<Readable>;
|
||||||
getMetadata(fileId: string): Promise<Metadata>;
|
getMetadata(fileId: string): Promise<Metadata>;
|
||||||
|
|
||||||
|
deleteMany(ids: IdsForDeletion): Promise<void>;
|
||||||
|
|
||||||
copyByFileId(workflowId: string, executionId: string, sourceFileId: string): Promise<string>;
|
copyByFileId(workflowId: string, executionId: string, sourceFileId: string): Promise<string>;
|
||||||
copyByFilePath(
|
copyByFilePath(
|
||||||
workflowId: string,
|
workflowId: string,
|
||||||
executionId: string,
|
executionId: string,
|
||||||
filePath: string,
|
sourcePath: string,
|
||||||
metadata: PreWriteMetadata,
|
metadata: PreWriteMetadata,
|
||||||
): Promise<WriteResult>;
|
): Promise<WriteResult>;
|
||||||
|
|
||||||
deleteOne(fileId: string): Promise<void>;
|
|
||||||
deleteMany(ids: IdsForDeletion): Promise<void>;
|
|
||||||
|
|
||||||
rename(oldFileId: string, newFileId: string): Promise<void>;
|
rename(oldFileId: string, newFileId: string): Promise<void>;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,31 +4,56 @@ import { createHash } from 'node:crypto';
|
||||||
import axios from 'axios';
|
import axios from 'axios';
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { sign } from 'aws4';
|
import { sign } from 'aws4';
|
||||||
import { isStream, parseXml } from './utils';
|
import { isStream, parseXml, writeBlockedMessage } from './utils';
|
||||||
import { ExternalStorageRequestFailed } from './errors';
|
import { LoggerProxy as Logger } from 'n8n-workflow';
|
||||||
|
|
||||||
import type { AxiosRequestConfig, Method } from 'axios';
|
import type { AxiosRequestConfig, AxiosResponse, Method } from 'axios';
|
||||||
import type { Request as Aws4Options, Credentials as Aws4Credentials } from 'aws4';
|
import type { Request as Aws4Options, Credentials as Aws4Credentials } from 'aws4';
|
||||||
import type { ListPage, ObjectStore, RawListPage } from './types';
|
import type {
|
||||||
|
Bucket,
|
||||||
|
ConfigSchemaCredentials,
|
||||||
|
ListPage,
|
||||||
|
RawListPage,
|
||||||
|
RequestOptions,
|
||||||
|
} from './types';
|
||||||
import type { Readable } from 'stream';
|
import type { Readable } from 'stream';
|
||||||
import type { BinaryData } from '..';
|
import type { BinaryData } from '..';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class ObjectStoreService {
|
export class ObjectStoreService {
|
||||||
private credentials: Aws4Credentials;
|
private host = '';
|
||||||
|
|
||||||
|
private bucket: Bucket = { region: '', name: '' };
|
||||||
|
|
||||||
|
private credentials: Aws4Credentials = { accessKeyId: '', secretAccessKey: '' };
|
||||||
|
|
||||||
|
private isReady = false;
|
||||||
|
|
||||||
|
private isReadOnly = false;
|
||||||
|
|
||||||
|
private logger = Logger;
|
||||||
|
|
||||||
|
async init(host: string, bucket: Bucket, credentials: ConfigSchemaCredentials) {
|
||||||
|
this.host = host;
|
||||||
|
this.bucket.name = bucket.name;
|
||||||
|
this.bucket.region = bucket.region;
|
||||||
|
|
||||||
constructor(
|
|
||||||
private bucket: { region: string; name: string },
|
|
||||||
credentials: { accountId: string; secretKey: string },
|
|
||||||
) {
|
|
||||||
this.credentials = {
|
this.credentials = {
|
||||||
accessKeyId: credentials.accountId,
|
accessKeyId: credentials.accessKey,
|
||||||
secretAccessKey: credentials.secretKey,
|
secretAccessKey: credentials.accessSecret,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
await this.checkConnection();
|
||||||
|
|
||||||
|
this.setReady(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
get host() {
|
setReadonly(newState: boolean) {
|
||||||
return `${this.bucket.name}.s3.${this.bucket.region}.amazonaws.com`;
|
this.isReadOnly = newState;
|
||||||
|
}
|
||||||
|
|
||||||
|
setReady(newState: boolean) {
|
||||||
|
this.isReady = newState;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -37,7 +62,9 @@ export class ObjectStoreService {
|
||||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
|
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
|
||||||
*/
|
*/
|
||||||
async checkConnection() {
|
async checkConnection() {
|
||||||
return this.request('HEAD', this.host);
|
if (this.isReady) return;
|
||||||
|
|
||||||
|
return this.request('HEAD', this.host, this.bucket.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -46,6 +73,8 @@ export class ObjectStoreService {
|
||||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
|
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
|
||||||
*/
|
*/
|
||||||
async put(filename: string, buffer: Buffer, metadata: BinaryData.PreWriteMetadata = {}) {
|
async put(filename: string, buffer: Buffer, metadata: BinaryData.PreWriteMetadata = {}) {
|
||||||
|
if (this.isReadOnly) return this.blockWrite(filename);
|
||||||
|
|
||||||
const headers: Record<string, string | number> = {
|
const headers: Record<string, string | number> = {
|
||||||
'Content-Length': buffer.length,
|
'Content-Length': buffer.length,
|
||||||
'Content-MD5': createHash('md5').update(buffer).digest('base64'),
|
'Content-MD5': createHash('md5').update(buffer).digest('base64'),
|
||||||
|
@ -54,7 +83,9 @@ export class ObjectStoreService {
|
||||||
if (metadata.fileName) headers['x-amz-meta-filename'] = metadata.fileName;
|
if (metadata.fileName) headers['x-amz-meta-filename'] = metadata.fileName;
|
||||||
if (metadata.mimeType) headers['Content-Type'] = metadata.mimeType;
|
if (metadata.mimeType) headers['Content-Type'] = metadata.mimeType;
|
||||||
|
|
||||||
return this.request('PUT', this.host, `/${filename}`, { headers, body: buffer });
|
const path = `/${this.bucket.name}/${filename}`;
|
||||||
|
|
||||||
|
return this.request('PUT', this.host, path, { headers, body: buffer });
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -62,9 +93,11 @@ export class ObjectStoreService {
|
||||||
*
|
*
|
||||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
|
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
|
||||||
*/
|
*/
|
||||||
async get(path: string, { mode }: { mode: 'buffer' }): Promise<Buffer>;
|
async get(fileId: string, { mode }: { mode: 'buffer' }): Promise<Buffer>;
|
||||||
async get(path: string, { mode }: { mode: 'stream' }): Promise<Readable>;
|
async get(fileId: string, { mode }: { mode: 'stream' }): Promise<Readable>;
|
||||||
async get(path: string, { mode }: { mode: 'stream' | 'buffer' }) {
|
async get(fileId: string, { mode }: { mode: 'stream' | 'buffer' }) {
|
||||||
|
const path = `${this.bucket.name}/${fileId}`;
|
||||||
|
|
||||||
const { data } = await this.request('GET', this.host, path, {
|
const { data } = await this.request('GET', this.host, path, {
|
||||||
responseType: mode === 'buffer' ? 'arraybuffer' : 'stream',
|
responseType: mode === 'buffer' ? 'arraybuffer' : 'stream',
|
||||||
});
|
});
|
||||||
|
@ -81,27 +114,31 @@ export class ObjectStoreService {
|
||||||
*
|
*
|
||||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
|
* @doc https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
|
||||||
*/
|
*/
|
||||||
async getMetadata(path: string) {
|
async getMetadata(fileId: string) {
|
||||||
type Response = {
|
type Response = {
|
||||||
headers: {
|
headers: {
|
||||||
'content-length': string;
|
'content-length': string;
|
||||||
'content-type'?: string;
|
'content-type'?: string;
|
||||||
'x-amz-meta-filename'?: string;
|
'x-amz-meta-filename'?: string;
|
||||||
} & Record<string, string | number>;
|
} & BinaryData.PreWriteMetadata;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const path = `${this.bucket.name}/${fileId}`;
|
||||||
|
|
||||||
const response: Response = await this.request('HEAD', this.host, path);
|
const response: Response = await this.request('HEAD', this.host, path);
|
||||||
|
|
||||||
return response.headers;
|
return response.headers;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete an object in the configured bucket.
|
* Delete a single object in the configured bucket.
|
||||||
*
|
*
|
||||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
|
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
|
||||||
*/
|
*/
|
||||||
async deleteOne(path: string) {
|
async deleteOne(fileId: string) {
|
||||||
return this.request('DELETE', this.host, `/${encodeURIComponent(path)}`);
|
const path = `${this.bucket.name}/${fileId}`;
|
||||||
|
|
||||||
|
return this.request('DELETE', this.host, path);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -122,13 +159,13 @@ export class ObjectStoreService {
|
||||||
'Content-MD5': createHash('md5').update(body).digest('base64'),
|
'Content-MD5': createHash('md5').update(body).digest('base64'),
|
||||||
};
|
};
|
||||||
|
|
||||||
return this.request('POST', this.host, '/?delete', { headers, body });
|
const path = `${this.bucket.name}/?delete`;
|
||||||
|
|
||||||
|
return this.request('POST', this.host, path, { headers, body });
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List objects with a common prefix in the configured bucket.
|
* List objects with a common prefix in the configured bucket.
|
||||||
*
|
|
||||||
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
|
|
||||||
*/
|
*/
|
||||||
async list(prefix: string) {
|
async list(prefix: string) {
|
||||||
const items = [];
|
const items = [];
|
||||||
|
@ -149,16 +186,18 @@ export class ObjectStoreService {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch a page of objects with a common prefix in the configured bucket. Max 1000 per page.
|
* Fetch a page of objects with a common prefix in the configured bucket.
|
||||||
|
*
|
||||||
|
* Max 1000 objects per page - set by AWS.
|
||||||
|
*
|
||||||
|
* @doc https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
|
||||||
*/
|
*/
|
||||||
async getListPage(prefix: string, nextPageToken?: string) {
|
async getListPage(prefix: string, nextPageToken?: string) {
|
||||||
const bucketlessHost = this.host.split('.').slice(1).join('.');
|
|
||||||
|
|
||||||
const qs: Record<string, string | number> = { 'list-type': 2, prefix };
|
const qs: Record<string, string | number> = { 'list-type': 2, prefix };
|
||||||
|
|
||||||
if (nextPageToken) qs['continuation-token'] = nextPageToken;
|
if (nextPageToken) qs['continuation-token'] = nextPageToken;
|
||||||
|
|
||||||
const { data } = await this.request('GET', bucketlessHost, `/${this.bucket.name}`, { qs });
|
const { data } = await this.request('GET', this.host, this.bucket.name, { qs });
|
||||||
|
|
||||||
if (typeof data !== 'string') {
|
if (typeof data !== 'string') {
|
||||||
throw new TypeError(`Expected XML string but received ${typeof data}`);
|
throw new TypeError(`Expected XML string but received ${typeof data}`);
|
||||||
|
@ -193,11 +232,19 @@ export class ObjectStoreService {
|
||||||
return path.concat(`?${qsParams}`);
|
return path.concat(`?${qsParams}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async request<T = unknown>(
|
private async blockWrite(filename: string): Promise<AxiosResponse> {
|
||||||
|
const logMessage = writeBlockedMessage(filename);
|
||||||
|
|
||||||
|
this.logger.warn(logMessage);
|
||||||
|
|
||||||
|
return { status: 403, statusText: 'Forbidden', data: logMessage, headers: {}, config: {} };
|
||||||
|
}
|
||||||
|
|
||||||
|
private async request(
|
||||||
method: Method,
|
method: Method,
|
||||||
host: string,
|
host: string,
|
||||||
rawPath = '',
|
rawPath = '',
|
||||||
{ qs, headers, body, responseType }: ObjectStore.RequestOptions = {},
|
{ qs, headers, body, responseType }: RequestOptions = {},
|
||||||
) {
|
) {
|
||||||
const path = this.toPath(rawPath, qs);
|
const path = this.toPath(rawPath, qs);
|
||||||
|
|
||||||
|
@ -224,9 +271,17 @@ export class ObjectStoreService {
|
||||||
if (responseType) config.responseType = responseType;
|
if (responseType) config.responseType = responseType;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await axios.request<T>(config);
|
this.logger.debug('Sending request to S3', { config });
|
||||||
} catch (error) {
|
|
||||||
throw new ExternalStorageRequestFailed(error, config);
|
return await axios.request<unknown>(config);
|
||||||
|
} catch (e) {
|
||||||
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||||
|
|
||||||
|
const message = `Request to S3 failed: ${error.message}`;
|
||||||
|
|
||||||
|
this.logger.error(message, { config });
|
||||||
|
|
||||||
|
throw new Error(message, { cause: { error, details: config } });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +0,0 @@
|
||||||
import { AxiosRequestConfig } from 'axios';
|
|
||||||
|
|
||||||
export class ExternalStorageRequestFailed extends Error {
|
|
||||||
constructor(error: unknown, details: AxiosRequestConfig) {
|
|
||||||
super('Request to external object storage failed');
|
|
||||||
this.cause = { error, details };
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -22,11 +22,13 @@ type Item = {
|
||||||
|
|
||||||
export type ListPage = Omit<RawListPage['listBucketResult'], 'contents'> & { contents: Item[] };
|
export type ListPage = Omit<RawListPage['listBucketResult'], 'contents'> & { contents: Item[] };
|
||||||
|
|
||||||
export namespace ObjectStore {
|
export type Bucket = { region: string; name: string };
|
||||||
export type RequestOptions = {
|
|
||||||
|
export type RequestOptions = {
|
||||||
qs?: Record<string, string | number>;
|
qs?: Record<string, string | number>;
|
||||||
headers?: Record<string, string | number>;
|
headers?: Record<string, string | number>;
|
||||||
body?: string | Buffer;
|
body?: string | Buffer;
|
||||||
responseType?: ResponseType;
|
responseType?: ResponseType;
|
||||||
};
|
};
|
||||||
}
|
|
||||||
|
export type ConfigSchemaCredentials = { accessKey: string; accessSecret: string };
|
||||||
|
|
|
@ -14,3 +14,7 @@ export async function parseXml<T>(xml: string): Promise<T> {
|
||||||
valueProcessors: [parseNumbers, parseBooleans],
|
valueProcessors: [parseNumbers, parseBooleans],
|
||||||
}) as Promise<T>;
|
}) as Promise<T>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function writeBlockedMessage(filename: string) {
|
||||||
|
return `Request to write file "${filename}" to object storage was blocked because S3 storage is not available with your current license. Please upgrade to a license that supports this feature, or set N8N_DEFAULT_BINARY_DATA_MODE to an option other than "s3".`;
|
||||||
|
}
|
||||||
|
|
|
@ -17,3 +17,4 @@ export * from './WorkflowExecute';
|
||||||
export { NodeExecuteFunctions, UserSettings };
|
export { NodeExecuteFunctions, UserSettings };
|
||||||
export * from './errors';
|
export * from './errors';
|
||||||
export { ObjectStoreService } from './ObjectStore/ObjectStore.service.ee';
|
export { ObjectStoreService } from './ObjectStore/ObjectStore.service.ee';
|
||||||
|
export { BinaryData } from './BinaryData/types';
|
||||||
|
|
147
packages/core/test/ObjectStore.manager.test.ts
Normal file
147
packages/core/test/ObjectStore.manager.test.ts
Normal file
|
@ -0,0 +1,147 @@
|
||||||
|
import fs from 'node:fs/promises';
|
||||||
|
import { ObjectStoreManager } from '@/BinaryData/ObjectStore.manager';
|
||||||
|
import { ObjectStoreService } from '@/ObjectStore/ObjectStore.service.ee';
|
||||||
|
import { isStream } from '@/ObjectStore/utils';
|
||||||
|
import { mockInstance, toStream } from './utils';
|
||||||
|
|
||||||
|
jest.mock('fs/promises');
|
||||||
|
|
||||||
|
const objectStoreService = mockInstance(ObjectStoreService);
|
||||||
|
const objectStoreManager = new ObjectStoreManager(objectStoreService);
|
||||||
|
|
||||||
|
const toFileId = (workflowId: string, executionId: string, fileUuid: string) =>
|
||||||
|
`workflows/${workflowId}/executions/${executionId}/binary_data/${fileUuid}`;
|
||||||
|
|
||||||
|
const workflowId = 'ObogjVbqpNOQpiyV';
|
||||||
|
const executionId = '999';
|
||||||
|
const fileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb32';
|
||||||
|
const fileId = toFileId(workflowId, executionId, fileUuid);
|
||||||
|
const prefix = `workflows/${workflowId}/executions/${executionId}/binary_data/`;
|
||||||
|
|
||||||
|
const otherWorkflowId = 'FHio8ftV6SrCAfPJ';
|
||||||
|
const otherExecutionId = '888';
|
||||||
|
const otherFileUuid = '71f6209b-5d48-41a2-a224-80d529d8bb33';
|
||||||
|
const otherFileId = toFileId(otherWorkflowId, otherExecutionId, otherFileUuid);
|
||||||
|
|
||||||
|
const mockBuffer = Buffer.from('Test data');
|
||||||
|
const mockStream = toStream(mockBuffer);
|
||||||
|
|
||||||
|
beforeAll(() => {
|
||||||
|
jest.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('store()', () => {
|
||||||
|
it('should store a buffer', async () => {
|
||||||
|
const metadata = { mimeType: 'text/plain' };
|
||||||
|
|
||||||
|
const result = await objectStoreManager.store(workflowId, executionId, mockBuffer, metadata);
|
||||||
|
|
||||||
|
expect(result.fileId.startsWith(prefix)).toBe(true);
|
||||||
|
expect(result.fileSize).toBe(mockBuffer.length);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getPath()', () => {
|
||||||
|
it('should return a path', async () => {
|
||||||
|
const path = objectStoreManager.getPath(fileId);
|
||||||
|
|
||||||
|
expect(path).toBe(fileId);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getAsBuffer()', () => {
|
||||||
|
it('should return a buffer', async () => {
|
||||||
|
// @ts-expect-error Overload signature seemingly causing the return type to be misinferred
|
||||||
|
objectStoreService.get.mockResolvedValue(mockBuffer);
|
||||||
|
|
||||||
|
const result = await objectStoreManager.getAsBuffer(fileId);
|
||||||
|
|
||||||
|
expect(Buffer.isBuffer(result)).toBe(true);
|
||||||
|
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'buffer' });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getAsStream()', () => {
|
||||||
|
it('should return a stream', async () => {
|
||||||
|
objectStoreService.get.mockResolvedValue(mockStream);
|
||||||
|
|
||||||
|
const stream = await objectStoreManager.getAsStream(fileId);
|
||||||
|
|
||||||
|
expect(isStream(stream)).toBe(true);
|
||||||
|
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'stream' });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getMetadata()', () => {
|
||||||
|
it('should return metadata', async () => {
|
||||||
|
const mimeType = 'text/plain';
|
||||||
|
const fileName = 'file.txt';
|
||||||
|
|
||||||
|
objectStoreService.getMetadata.mockResolvedValue({
|
||||||
|
'content-length': '1',
|
||||||
|
'content-type': mimeType,
|
||||||
|
'x-amz-meta-filename': fileName,
|
||||||
|
});
|
||||||
|
|
||||||
|
const metadata = await objectStoreManager.getMetadata(fileId);
|
||||||
|
|
||||||
|
expect(metadata).toEqual(expect.objectContaining({ fileSize: 1, mimeType, fileName }));
|
||||||
|
expect(objectStoreService.getMetadata).toHaveBeenCalledWith(fileId);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('copyByFileId()', () => {
|
||||||
|
it('should copy by file ID and return the file ID', async () => {
|
||||||
|
const targetFileId = await objectStoreManager.copyByFileId(workflowId, executionId, fileId);
|
||||||
|
|
||||||
|
expect(targetFileId.startsWith(prefix)).toBe(true);
|
||||||
|
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'buffer' });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('copyByFilePath()', () => {
|
||||||
|
test('should copy by file path and return the file ID and size', async () => {
|
||||||
|
const sourceFilePath = 'path/to/file/in/filesystem';
|
||||||
|
const metadata = { mimeType: 'text/plain' };
|
||||||
|
|
||||||
|
fs.readFile = jest.fn().mockResolvedValue(mockBuffer);
|
||||||
|
|
||||||
|
const result = await objectStoreManager.copyByFilePath(
|
||||||
|
workflowId,
|
||||||
|
executionId,
|
||||||
|
sourceFilePath,
|
||||||
|
metadata,
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(result.fileId.startsWith(prefix)).toBe(true);
|
||||||
|
expect(fs.readFile).toHaveBeenCalledWith(sourceFilePath);
|
||||||
|
expect(result.fileSize).toBe(mockBuffer.length);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('deleteMany()', () => {
|
||||||
|
it('should delete many files by prefix', async () => {
|
||||||
|
const ids = [
|
||||||
|
{ workflowId, executionId },
|
||||||
|
{ workflowId: otherWorkflowId, executionId: otherExecutionId },
|
||||||
|
];
|
||||||
|
|
||||||
|
const promise = objectStoreManager.deleteMany(ids);
|
||||||
|
|
||||||
|
await expect(promise).resolves.not.toThrow();
|
||||||
|
|
||||||
|
expect(objectStoreService.deleteMany).toHaveBeenCalledTimes(2);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('rename()', () => {
|
||||||
|
it('should rename a file', async () => {
|
||||||
|
const promise = objectStoreManager.rename(fileId, otherFileId);
|
||||||
|
|
||||||
|
await expect(promise).resolves.not.toThrow();
|
||||||
|
|
||||||
|
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'buffer' });
|
||||||
|
expect(objectStoreService.getMetadata).toHaveBeenCalledWith(fileId);
|
||||||
|
expect(objectStoreService.deleteOne).toHaveBeenCalledWith(fileId);
|
||||||
|
});
|
||||||
|
});
|
311
packages/core/test/ObjectStore.service.test.ts
Normal file
311
packages/core/test/ObjectStore.service.test.ts
Normal file
|
@ -0,0 +1,311 @@
|
||||||
|
import axios from 'axios';
|
||||||
|
import { ObjectStoreService } from '@/ObjectStore/ObjectStore.service.ee';
|
||||||
|
import { Readable } from 'stream';
|
||||||
|
import { writeBlockedMessage } from '@/ObjectStore/utils';
|
||||||
|
import { initLogger } from './helpers/utils';
|
||||||
|
|
||||||
|
jest.mock('axios');
|
||||||
|
|
||||||
|
const mockAxios = axios as jest.Mocked<typeof axios>;
|
||||||
|
|
||||||
|
const mockBucket = { region: 'us-east-1', name: 'test-bucket' };
|
||||||
|
const mockHost = `s3.${mockBucket.region}.amazonaws.com`;
|
||||||
|
const mockCredentials = { accessKey: 'mock-access-key', accessSecret: 'mock-secret-key' };
|
||||||
|
const mockUrl = `https://${mockHost}/${mockBucket.name}`;
|
||||||
|
const FAILED_REQUEST_ERROR_MESSAGE = 'Request to S3 failed';
|
||||||
|
const mockError = new Error('Something went wrong!');
|
||||||
|
const fileId =
|
||||||
|
'workflows/ObogjVbqpNOQpiyV/executions/999/binary_data/71f6209b-5d48-41a2-a224-80d529d8bb32';
|
||||||
|
const mockBuffer = Buffer.from('Test data');
|
||||||
|
|
||||||
|
const toDeletionXml = (filename: string) => `<Delete>
|
||||||
|
<Object><Key>${filename}</Key></Object>
|
||||||
|
</Delete>`;
|
||||||
|
|
||||||
|
let objectStoreService: ObjectStoreService;
|
||||||
|
initLogger();
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
objectStoreService = new ObjectStoreService();
|
||||||
|
mockAxios.request.mockResolvedValueOnce({ status: 200 }); // for checkConnection
|
||||||
|
await objectStoreService.init(mockHost, mockBucket, mockCredentials);
|
||||||
|
jest.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('checkConnection()', () => {
|
||||||
|
it('should send a HEAD request to the correct host', async () => {
|
||||||
|
mockAxios.request.mockResolvedValue({ status: 200 });
|
||||||
|
|
||||||
|
objectStoreService.setReady(false);
|
||||||
|
|
||||||
|
await objectStoreService.checkConnection();
|
||||||
|
|
||||||
|
expect(mockAxios.request).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
method: 'HEAD',
|
||||||
|
url: `https://${mockHost}/${mockBucket.name}`,
|
||||||
|
headers: expect.objectContaining({
|
||||||
|
'X-Amz-Content-Sha256': expect.any(String),
|
||||||
|
'X-Amz-Date': expect.any(String),
|
||||||
|
Authorization: expect.any(String),
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw an error on request failure', async () => {
|
||||||
|
objectStoreService.setReady(false);
|
||||||
|
|
||||||
|
mockAxios.request.mockRejectedValue(mockError);
|
||||||
|
|
||||||
|
const promise = objectStoreService.checkConnection();
|
||||||
|
|
||||||
|
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getMetadata()', () => {
|
||||||
|
it('should send a HEAD request to the correct host and path', async () => {
|
||||||
|
mockAxios.request.mockResolvedValue({ status: 200 });
|
||||||
|
|
||||||
|
await objectStoreService.getMetadata(fileId);
|
||||||
|
|
||||||
|
expect(mockAxios.request).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
method: 'HEAD',
|
||||||
|
url: `${mockUrl}/${fileId}`,
|
||||||
|
headers: expect.objectContaining({
|
||||||
|
Host: mockHost,
|
||||||
|
'X-Amz-Content-Sha256': expect.any(String),
|
||||||
|
'X-Amz-Date': expect.any(String),
|
||||||
|
Authorization: expect.any(String),
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw an error on request failure', async () => {
|
||||||
|
mockAxios.request.mockRejectedValue(mockError);
|
||||||
|
|
||||||
|
const promise = objectStoreService.getMetadata(fileId);
|
||||||
|
|
||||||
|
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('put()', () => {
|
||||||
|
it('should send a PUT request to upload an object', async () => {
|
||||||
|
const metadata = { fileName: 'file.txt', mimeType: 'text/plain' };
|
||||||
|
|
||||||
|
mockAxios.request.mockResolvedValue({ status: 200 });
|
||||||
|
|
||||||
|
await objectStoreService.put(fileId, mockBuffer, metadata);
|
||||||
|
|
||||||
|
expect(mockAxios.request).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
method: 'PUT',
|
||||||
|
url: `${mockUrl}/${fileId}`,
|
||||||
|
headers: expect.objectContaining({
|
||||||
|
'Content-Length': mockBuffer.length,
|
||||||
|
'Content-MD5': expect.any(String),
|
||||||
|
'x-amz-meta-filename': metadata.fileName,
|
||||||
|
'Content-Type': metadata.mimeType,
|
||||||
|
}),
|
||||||
|
data: mockBuffer,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should block if read-only', async () => {
|
||||||
|
initLogger();
|
||||||
|
objectStoreService.setReadonly(true);
|
||||||
|
|
||||||
|
const metadata = { fileName: 'file.txt', mimeType: 'text/plain' };
|
||||||
|
|
||||||
|
const promise = objectStoreService.put(fileId, mockBuffer, metadata);
|
||||||
|
|
||||||
|
await expect(promise).resolves.not.toThrow();
|
||||||
|
|
||||||
|
const result = await promise;
|
||||||
|
|
||||||
|
expect(result.status).toBe(403);
|
||||||
|
expect(result.statusText).toBe('Forbidden');
|
||||||
|
|
||||||
|
expect(result.data).toBe(writeBlockedMessage(fileId));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw an error on request failure', async () => {
|
||||||
|
const metadata = { fileName: 'file.txt', mimeType: 'text/plain' };
|
||||||
|
|
||||||
|
mockAxios.request.mockRejectedValue(mockError);
|
||||||
|
|
||||||
|
const promise = objectStoreService.put(fileId, mockBuffer, metadata);
|
||||||
|
|
||||||
|
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('get()', () => {
|
||||||
|
it('should send a GET request to download an object as a buffer', async () => {
|
||||||
|
const fileId = 'file.txt';
|
||||||
|
|
||||||
|
mockAxios.request.mockResolvedValue({ status: 200, data: Buffer.from('Test content') });
|
||||||
|
|
||||||
|
const result = await objectStoreService.get(fileId, { mode: 'buffer' });
|
||||||
|
|
||||||
|
expect(mockAxios.request).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
method: 'GET',
|
||||||
|
url: `${mockUrl}/${fileId}`,
|
||||||
|
responseType: 'arraybuffer',
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(Buffer.isBuffer(result)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should send a GET request to download an object as a stream', async () => {
|
||||||
|
mockAxios.request.mockResolvedValue({ status: 200, data: new Readable() });
|
||||||
|
|
||||||
|
const result = await objectStoreService.get(fileId, { mode: 'stream' });
|
||||||
|
|
||||||
|
expect(mockAxios.request).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
method: 'GET',
|
||||||
|
url: `${mockUrl}/${fileId}`,
|
||||||
|
responseType: 'stream',
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(result instanceof Readable).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw an error on request failure', async () => {
|
||||||
|
mockAxios.request.mockRejectedValue(mockError);
|
||||||
|
|
||||||
|
const promise = objectStoreService.get(fileId, { mode: 'buffer' });
|
||||||
|
|
||||||
|
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('deleteOne()', () => {
|
||||||
|
it('should send a DELETE request to delete a single object', async () => {
|
||||||
|
mockAxios.request.mockResolvedValue({ status: 204 });
|
||||||
|
|
||||||
|
await objectStoreService.deleteOne(fileId);
|
||||||
|
|
||||||
|
expect(mockAxios.request).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
method: 'DELETE',
|
||||||
|
url: `${mockUrl}/${fileId}`,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw an error on request failure', async () => {
|
||||||
|
mockAxios.request.mockRejectedValue(mockError);
|
||||||
|
|
||||||
|
const promise = objectStoreService.deleteOne(fileId);
|
||||||
|
|
||||||
|
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('deleteMany()', () => {
|
||||||
|
it('should send a POST request to delete multiple objects', async () => {
|
||||||
|
const prefix = 'test-dir/';
|
||||||
|
const fileName = 'file.txt';
|
||||||
|
|
||||||
|
const mockList = [
|
||||||
|
{
|
||||||
|
key: fileName,
|
||||||
|
lastModified: '2023-09-24T12:34:56Z',
|
||||||
|
eTag: 'abc123def456',
|
||||||
|
size: 456789,
|
||||||
|
storageClass: 'STANDARD',
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
objectStoreService.list = jest.fn().mockResolvedValue(mockList);
|
||||||
|
|
||||||
|
mockAxios.request.mockResolvedValue({ status: 204 });
|
||||||
|
|
||||||
|
await objectStoreService.deleteMany(prefix);
|
||||||
|
|
||||||
|
expect(objectStoreService.list).toHaveBeenCalledWith(prefix);
|
||||||
|
expect(mockAxios.request).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
method: 'POST',
|
||||||
|
url: `${mockUrl}/?delete`,
|
||||||
|
headers: expect.objectContaining({
|
||||||
|
'Content-Type': 'application/xml',
|
||||||
|
'Content-Length': expect.any(Number),
|
||||||
|
'Content-MD5': expect.any(String),
|
||||||
|
}),
|
||||||
|
data: toDeletionXml(fileName),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw an error on request failure', async () => {
|
||||||
|
mockAxios.request.mockRejectedValue(mockError);
|
||||||
|
|
||||||
|
const promise = objectStoreService.deleteMany('test-dir/');
|
||||||
|
|
||||||
|
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('list()', () => {
|
||||||
|
it('should list objects with a common prefix', async () => {
|
||||||
|
const prefix = 'test-dir/';
|
||||||
|
|
||||||
|
const mockListPage = {
|
||||||
|
contents: [{ key: `${prefix}file1.txt` }, { key: `${prefix}file2.txt` }],
|
||||||
|
isTruncated: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
objectStoreService.getListPage = jest.fn().mockResolvedValue(mockListPage);
|
||||||
|
|
||||||
|
mockAxios.request.mockResolvedValue({ status: 200 });
|
||||||
|
|
||||||
|
const result = await objectStoreService.list(prefix);
|
||||||
|
|
||||||
|
expect(result).toEqual(mockListPage.contents);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should consolidate pages', async () => {
|
||||||
|
const prefix = 'test-dir/';
|
||||||
|
|
||||||
|
const mockFirstListPage = {
|
||||||
|
contents: [{ key: `${prefix}file1.txt` }],
|
||||||
|
isTruncated: true,
|
||||||
|
nextContinuationToken: 'token1',
|
||||||
|
};
|
||||||
|
|
||||||
|
const mockSecondListPage = {
|
||||||
|
contents: [{ key: `${prefix}file2.txt` }],
|
||||||
|
isTruncated: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
objectStoreService.getListPage = jest
|
||||||
|
.fn()
|
||||||
|
.mockResolvedValueOnce(mockFirstListPage)
|
||||||
|
.mockResolvedValueOnce(mockSecondListPage);
|
||||||
|
|
||||||
|
mockAxios.request.mockResolvedValue({ status: 200 });
|
||||||
|
|
||||||
|
const result = await objectStoreService.list(prefix);
|
||||||
|
|
||||||
|
expect(result).toEqual([...mockFirstListPage.contents, ...mockSecondListPage.contents]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw an error on request failure', async () => {
|
||||||
|
mockAxios.request.mockRejectedValue(mockError);
|
||||||
|
|
||||||
|
const promise = objectStoreService.list('test-dir/');
|
||||||
|
|
||||||
|
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
||||||
|
});
|
||||||
|
});
|
|
@ -1,301 +0,0 @@
|
||||||
import axios from 'axios';
|
|
||||||
import { ObjectStoreService } from '../src/ObjectStore/ObjectStore.service.ee';
|
|
||||||
import { Readable } from 'stream';
|
|
||||||
|
|
||||||
jest.mock('axios');
|
|
||||||
|
|
||||||
const mockAxios = axios as jest.Mocked<typeof axios>;
|
|
||||||
|
|
||||||
const MOCK_BUCKET = { region: 'us-east-1', name: 'test-bucket' };
|
|
||||||
const MOCK_CREDENTIALS = { accountId: 'mock-account-id', secretKey: 'mock-secret-key' };
|
|
||||||
const FAILED_REQUEST_ERROR_MESSAGE = 'Request to external object storage failed';
|
|
||||||
const EXPECTED_HOST = `${MOCK_BUCKET.name}.s3.${MOCK_BUCKET.region}.amazonaws.com`;
|
|
||||||
const MOCK_S3_ERROR = new Error('Something went wrong!');
|
|
||||||
|
|
||||||
const toMultipleDeletionXml = (filename: string) => `<Delete>
|
|
||||||
<Object><Key>${filename}</Key></Object>
|
|
||||||
</Delete>`;
|
|
||||||
|
|
||||||
describe('ObjectStoreService', () => {
|
|
||||||
let objectStoreService: ObjectStoreService;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
objectStoreService = new ObjectStoreService(MOCK_BUCKET, MOCK_CREDENTIALS);
|
|
||||||
jest.restoreAllMocks();
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('checkConnection()', () => {
|
|
||||||
it('should send a HEAD request to the correct host', async () => {
|
|
||||||
mockAxios.request.mockResolvedValue({ status: 200 });
|
|
||||||
|
|
||||||
await objectStoreService.checkConnection();
|
|
||||||
|
|
||||||
expect(mockAxios.request).toHaveBeenCalledWith(
|
|
||||||
expect.objectContaining({
|
|
||||||
method: 'HEAD',
|
|
||||||
url: `https://${EXPECTED_HOST}/`,
|
|
||||||
headers: expect.objectContaining({
|
|
||||||
'X-Amz-Content-Sha256': expect.any(String),
|
|
||||||
'X-Amz-Date': expect.any(String),
|
|
||||||
Authorization: expect.any(String),
|
|
||||||
}),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should throw an error on request failure', async () => {
|
|
||||||
mockAxios.request.mockRejectedValue(MOCK_S3_ERROR);
|
|
||||||
|
|
||||||
const promise = objectStoreService.checkConnection();
|
|
||||||
|
|
||||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('getMetadata()', () => {
|
|
||||||
it('should send a HEAD request to the correct host and path', async () => {
|
|
||||||
const path = 'file.txt';
|
|
||||||
|
|
||||||
mockAxios.request.mockResolvedValue({ status: 200 });
|
|
||||||
|
|
||||||
await objectStoreService.getMetadata(path);
|
|
||||||
|
|
||||||
expect(mockAxios.request).toHaveBeenCalledWith(
|
|
||||||
expect.objectContaining({
|
|
||||||
method: 'HEAD',
|
|
||||||
url: `https://${EXPECTED_HOST}/${path}`,
|
|
||||||
headers: expect.objectContaining({
|
|
||||||
Host: EXPECTED_HOST,
|
|
||||||
'X-Amz-Content-Sha256': expect.any(String),
|
|
||||||
'X-Amz-Date': expect.any(String),
|
|
||||||
Authorization: expect.any(String),
|
|
||||||
}),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should throw an error on request failure', async () => {
|
|
||||||
const path = 'file.txt';
|
|
||||||
|
|
||||||
mockAxios.request.mockRejectedValue(MOCK_S3_ERROR);
|
|
||||||
|
|
||||||
const promise = objectStoreService.getMetadata(path);
|
|
||||||
|
|
||||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('put()', () => {
|
|
||||||
it('should send a PUT request to upload an object', async () => {
|
|
||||||
const path = 'file.txt';
|
|
||||||
const buffer = Buffer.from('Test content');
|
|
||||||
const metadata = { fileName: path, mimeType: 'text/plain' };
|
|
||||||
|
|
||||||
mockAxios.request.mockResolvedValue({ status: 200 });
|
|
||||||
|
|
||||||
await objectStoreService.put(path, buffer, metadata);
|
|
||||||
|
|
||||||
expect(mockAxios.request).toHaveBeenCalledWith(
|
|
||||||
expect.objectContaining({
|
|
||||||
method: 'PUT',
|
|
||||||
url: `https://${EXPECTED_HOST}/${path}`,
|
|
||||||
headers: expect.objectContaining({
|
|
||||||
'Content-Length': buffer.length,
|
|
||||||
'Content-MD5': expect.any(String),
|
|
||||||
'x-amz-meta-filename': metadata.fileName,
|
|
||||||
'Content-Type': metadata.mimeType,
|
|
||||||
}),
|
|
||||||
data: buffer,
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should throw an error on request failure', async () => {
|
|
||||||
const path = 'file.txt';
|
|
||||||
const buffer = Buffer.from('Test content');
|
|
||||||
const metadata = { fileName: path, mimeType: 'text/plain' };
|
|
||||||
|
|
||||||
mockAxios.request.mockRejectedValue(MOCK_S3_ERROR);
|
|
||||||
|
|
||||||
const promise = objectStoreService.put(path, buffer, metadata);
|
|
||||||
|
|
||||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('get()', () => {
|
|
||||||
it('should send a GET request to download an object as a buffer', async () => {
|
|
||||||
const path = 'file.txt';
|
|
||||||
|
|
||||||
mockAxios.request.mockResolvedValue({ status: 200, data: Buffer.from('Test content') });
|
|
||||||
|
|
||||||
const result = await objectStoreService.get(path, { mode: 'buffer' });
|
|
||||||
|
|
||||||
expect(mockAxios.request).toHaveBeenCalledWith(
|
|
||||||
expect.objectContaining({
|
|
||||||
method: 'GET',
|
|
||||||
url: `https://${EXPECTED_HOST}/${path}`,
|
|
||||||
responseType: 'arraybuffer',
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(Buffer.isBuffer(result)).toBe(true);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should send a GET request to download an object as a stream', async () => {
|
|
||||||
const path = 'file.txt';
|
|
||||||
|
|
||||||
mockAxios.request.mockResolvedValue({ status: 200, data: new Readable() });
|
|
||||||
|
|
||||||
const result = await objectStoreService.get(path, { mode: 'stream' });
|
|
||||||
|
|
||||||
expect(mockAxios.request).toHaveBeenCalledWith(
|
|
||||||
expect.objectContaining({
|
|
||||||
method: 'GET',
|
|
||||||
url: `https://${EXPECTED_HOST}/${path}`,
|
|
||||||
responseType: 'stream',
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(result instanceof Readable).toBe(true);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should throw an error on request failure', async () => {
|
|
||||||
const path = 'file.txt';
|
|
||||||
|
|
||||||
mockAxios.request.mockRejectedValue(MOCK_S3_ERROR);
|
|
||||||
|
|
||||||
const promise = objectStoreService.get(path, { mode: 'buffer' });
|
|
||||||
|
|
||||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('deleteOne()', () => {
|
|
||||||
it('should send a DELETE request to delete an object', async () => {
|
|
||||||
const path = 'file.txt';
|
|
||||||
|
|
||||||
mockAxios.request.mockResolvedValue({ status: 204 });
|
|
||||||
|
|
||||||
await objectStoreService.deleteOne(path);
|
|
||||||
|
|
||||||
expect(mockAxios.request).toHaveBeenCalledWith(
|
|
||||||
expect.objectContaining({
|
|
||||||
method: 'DELETE',
|
|
||||||
url: `https://${EXPECTED_HOST}/${path}`,
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should throw an error on request failure', async () => {
|
|
||||||
const path = 'file.txt';
|
|
||||||
|
|
||||||
mockAxios.request.mockRejectedValue(MOCK_S3_ERROR);
|
|
||||||
|
|
||||||
const promise = objectStoreService.deleteOne(path);
|
|
||||||
|
|
||||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('deleteMany()', () => {
|
|
||||||
it('should send a POST request to delete multiple objects', async () => {
|
|
||||||
const prefix = 'test-dir/';
|
|
||||||
const fileName = 'file.txt';
|
|
||||||
|
|
||||||
const mockList = [
|
|
||||||
{
|
|
||||||
key: fileName,
|
|
||||||
lastModified: '2023-09-24T12:34:56Z',
|
|
||||||
eTag: 'abc123def456',
|
|
||||||
size: 456789,
|
|
||||||
storageClass: 'STANDARD',
|
|
||||||
},
|
|
||||||
];
|
|
||||||
|
|
||||||
objectStoreService.list = jest.fn().mockResolvedValue(mockList);
|
|
||||||
|
|
||||||
mockAxios.request.mockResolvedValue({ status: 204 });
|
|
||||||
|
|
||||||
await objectStoreService.deleteMany(prefix);
|
|
||||||
|
|
||||||
expect(mockAxios.request).toHaveBeenCalledWith(
|
|
||||||
expect.objectContaining({
|
|
||||||
method: 'POST',
|
|
||||||
url: `https://${EXPECTED_HOST}/?delete`,
|
|
||||||
headers: expect.objectContaining({
|
|
||||||
'Content-Type': 'application/xml',
|
|
||||||
'Content-Length': expect.any(Number),
|
|
||||||
'Content-MD5': expect.any(String),
|
|
||||||
}),
|
|
||||||
data: toMultipleDeletionXml(fileName),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should throw an error on request failure', async () => {
|
|
||||||
const prefix = 'test-dir/';
|
|
||||||
|
|
||||||
mockAxios.request.mockRejectedValue(MOCK_S3_ERROR);
|
|
||||||
|
|
||||||
const promise = objectStoreService.deleteMany(prefix);
|
|
||||||
|
|
||||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('list()', () => {
|
|
||||||
it('should list objects with a common prefix', async () => {
|
|
||||||
const prefix = 'test-dir/';
|
|
||||||
|
|
||||||
const mockListPage = {
|
|
||||||
contents: [{ key: `${prefix}file1.txt` }, { key: `${prefix}file2.txt` }],
|
|
||||||
isTruncated: false,
|
|
||||||
};
|
|
||||||
|
|
||||||
objectStoreService.getListPage = jest.fn().mockResolvedValue(mockListPage);
|
|
||||||
|
|
||||||
mockAxios.request.mockResolvedValue({ status: 200 });
|
|
||||||
|
|
||||||
const result = await objectStoreService.list(prefix);
|
|
||||||
|
|
||||||
expect(result).toEqual(mockListPage.contents);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should consolidate pages', async () => {
|
|
||||||
const prefix = 'test-dir/';
|
|
||||||
|
|
||||||
const mockFirstListPage = {
|
|
||||||
contents: [{ key: `${prefix}file1.txt` }],
|
|
||||||
isTruncated: true,
|
|
||||||
nextContinuationToken: 'token1',
|
|
||||||
};
|
|
||||||
|
|
||||||
const mockSecondListPage = {
|
|
||||||
contents: [{ key: `${prefix}file2.txt` }],
|
|
||||||
isTruncated: false,
|
|
||||||
};
|
|
||||||
|
|
||||||
objectStoreService.getListPage = jest
|
|
||||||
.fn()
|
|
||||||
.mockResolvedValueOnce(mockFirstListPage)
|
|
||||||
.mockResolvedValueOnce(mockSecondListPage);
|
|
||||||
|
|
||||||
mockAxios.request.mockResolvedValue({ status: 200 });
|
|
||||||
|
|
||||||
const result = await objectStoreService.list(prefix);
|
|
||||||
|
|
||||||
expect(result).toEqual([...mockFirstListPage.contents, ...mockSecondListPage.contents]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should throw an error on request failure', async () => {
|
|
||||||
const prefix = 'test-dir/';
|
|
||||||
|
|
||||||
mockAxios.request.mockRejectedValue(MOCK_S3_ERROR);
|
|
||||||
|
|
||||||
const promise = objectStoreService.list(prefix);
|
|
||||||
|
|
||||||
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
22
packages/core/test/utils.ts
Normal file
22
packages/core/test/utils.ts
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
import { Container } from 'typedi';
|
||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import { Duplex } from 'stream';
|
||||||
|
|
||||||
|
import type { DeepPartial } from 'ts-essentials';
|
||||||
|
|
||||||
|
export const mockInstance = <T>(
|
||||||
|
constructor: new (...args: unknown[]) => T,
|
||||||
|
data: DeepPartial<T> | undefined = undefined,
|
||||||
|
) => {
|
||||||
|
const instance = mock<T>(data);
|
||||||
|
Container.set(constructor, instance);
|
||||||
|
return instance;
|
||||||
|
};
|
||||||
|
|
||||||
|
export function toStream(buffer: Buffer) {
|
||||||
|
const duplexStream = new Duplex();
|
||||||
|
duplexStream.push(buffer);
|
||||||
|
duplexStream.push(null);
|
||||||
|
|
||||||
|
return duplexStream;
|
||||||
|
}
|
|
@ -1385,16 +1385,17 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
|
||||||
},
|
},
|
||||||
// Binary data
|
// Binary data
|
||||||
getBinaryUrl(
|
getBinaryUrl(
|
||||||
dataPath: string,
|
binaryDataId: string,
|
||||||
mode: 'view' | 'download',
|
action: 'view' | 'download',
|
||||||
fileName: string,
|
fileName: string,
|
||||||
mimeType: string,
|
mimeType: string,
|
||||||
): string {
|
): string {
|
||||||
const rootStore = useRootStore();
|
const rootStore = useRootStore();
|
||||||
let restUrl = rootStore.getRestUrl;
|
let restUrl = rootStore.getRestUrl;
|
||||||
if (restUrl.startsWith('/')) restUrl = window.location.origin + restUrl;
|
if (restUrl.startsWith('/')) restUrl = window.location.origin + restUrl;
|
||||||
const url = new URL(`${restUrl}/data/${dataPath}`);
|
const url = new URL(`${restUrl}/data`);
|
||||||
url.searchParams.append('mode', mode);
|
url.searchParams.append('id', binaryDataId);
|
||||||
|
url.searchParams.append('action', action);
|
||||||
if (fileName) url.searchParams.append('fileName', fileName);
|
if (fileName) url.searchParams.append('fileName', fileName);
|
||||||
if (mimeType) url.searchParams.append('mimeType', mimeType);
|
if (mimeType) url.searchParams.append('mimeType', mimeType);
|
||||||
return url.toString();
|
return url.toString();
|
||||||
|
|
|
@ -219,7 +219,11 @@ export function createTemporaryDir(prefix = 'n8n') {
|
||||||
|
|
||||||
export async function initBinaryDataService(mode: 'default' | 'filesystem' = 'default') {
|
export async function initBinaryDataService(mode: 'default' | 'filesystem' = 'default') {
|
||||||
const binaryDataService = new BinaryDataService();
|
const binaryDataService = new BinaryDataService();
|
||||||
await binaryDataService.init({ mode: 'default', availableModes: [mode] });
|
await binaryDataService.init({
|
||||||
|
mode: 'default',
|
||||||
|
availableModes: [mode],
|
||||||
|
localStoragePath: createTemporaryDir(),
|
||||||
|
});
|
||||||
Container.set(BinaryDataService, binaryDataService);
|
Container.set(BinaryDataService, binaryDataService);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue