refactor import export

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-11-11 15:50:22 +01:00
parent 798b15a852
commit 113f4fa672
No known key found for this signature in database
16 changed files with 190 additions and 155 deletions

View file

@ -181,7 +181,7 @@
"redis": "4.6.12",
"sqlite3": "5.1.7",
"temp": "0.9.4",
"tmp-promise": "3.0.3",
"tmp-promise": "catalog:",
"zod": "catalog:",
"zod-to-json-schema": "3.23.3"
}

View file

@ -171,6 +171,7 @@
"swagger-ui-express": "5.0.1",
"syslog-client": "1.1.1",
"tar-stream": "^3.1.7",
"tmp-promise": "catalog:",
"typedi": "catalog:",
"unzip-stream": "0.3.4",
"uuid": "catalog:",

View file

@ -1,31 +1,63 @@
import { Flags } from '@oclif/core';
import { tmpdir } from 'node:os';
import path from 'node:path';
import colors from 'picocolors';
import Container from 'typedi';
import * as Db from '@/db';
import { BaseCommand } from '../base-command';
export class ExportBackupCommand extends BaseCommand {
static description = 'Backup to a zip file';
static examples = ['$ n8n export:backup', '$ n8n export:backup --output=backup.zip'];
static examples = ['$ n8n export:backup', '$ n8n export:backup --output=/path/to/directory'];
static flags = {
output: Flags.string({
char: 'o',
description: 'Directory to output the archive file in',
default: tmpdir(),
description: 'Output file to export the backup into',
}),
full: Flags.boolean({
char: 'f',
description: 'Whether to export all data, or only the important tables',
}),
};
async init() {
this.logger.warn('Import/Export functionality is currently very experimental.');
this.logger.warn(colors.bold(colors.red('Please do not use this in production')));
await Db.init().catch(
async (error: Error) => await this.exitWithCrash('There was an error initializing DB', error),
);
}
async run() {
const { flags } = await this.parse(ExportBackupCommand);
let output = flags.output;
if (!output) {
const outputDir = process.cwd();
const outputFile = `n8n-backup-${new Date().toISOString().substring(0, 10)}.tar.gz`;
this.logger.warn(
`No output path was provided. Exporting backup as ${colors.bold(outputFile)} in the current directory`,
);
output = path.join(outputDir, outputFile);
}
const { DatabaseExportService } = await import(
'@/databases/import-export/database-export.service'
);
const databaseExportService = Container.get(DatabaseExportService);
databaseExportService.setConfig({
output,
mode: flags.full ? 'full' : 'lightweight',
});
await Container.get(DatabaseExportService).export();
await databaseExportService.export();
}
async catch(error: Error) {
if ('oclif' in error) return;
this.logger.error(error.message);
}
}

View file

@ -1,31 +1,70 @@
import { Flags } from '@oclif/core';
import { tmpdir } from 'node:os';
import { ensureError } from 'n8n-workflow';
import fs from 'node:fs';
import colors from 'picocolors';
import Container from 'typedi';
import * as Db from '@/db';
import { BaseCommand } from '../base-command';
export class ImportBackupCommand extends BaseCommand {
static description = 'Import from a backup zip file';
static examples = ['$ n8n import:backup', '$ n8n import:backup --input=backup.zip'];
static examples = [
'$ n8n import:backup --input=backup.zip',
'$ n8n import:backup --input=backup.zip --delete-existing-data',
];
// TODO: add `clean` flag, or add a prompt to confirm DB truncation
static flags = {
input: Flags.string({
char: 'o',
description: 'Directory to load the archive file from',
default: tmpdir(),
char: 'i',
description: 'Path to the backup archive file',
}),
'delete-existing-data': Flags.boolean({
description: 'Delete all existing data in the database',
}),
};
async init() {
this.logger.warn('Import/Export functionality is currently very experimental.');
this.logger.warn(colors.bold(colors.red('Please do not use this in production')));
await Db.init().catch(
async (error: Error) => await this.exitWithCrash('There was an error initializing DB', error),
);
await Db.migrate().catch(
async (error: Error) =>
await this.exitWithCrash('There was an error running database migrations', error),
);
}
async run() {
const {
flags: { input, 'delete-existing-data': deleteExistingData },
} = await this.parse(ImportBackupCommand);
if (!input || !fs.existsSync(input) || !fs.lstatSync(input).isFile()) {
this.logger.error(colors.red('A valid backup file must be provided via --input'));
this.exit(1);
}
const { DatabaseImportService } = await import(
'@/databases/import-export/database-import.service'
);
await Container.get(DatabaseImportService).import();
const databaseImportService = Container.get(DatabaseImportService);
databaseImportService.setConfig({ input, deleteExistingData });
try {
await databaseImportService.import();
} catch (error) {
this.logger.error('[ImportService] Import failed - changes rolled back');
this.logger.error(colors.red(ensureError(error).message));
this.exit(1);
}
}
async catch(error: Error) {
if ('oclif' in error) return;
this.logger.error(error.message);
}
}

View file

@ -15,8 +15,6 @@ import { ActiveExecutions } from '@/active-executions';
import { ActiveWorkflowManager } from '@/active-workflow-manager';
import config from '@/config';
import { EDITOR_UI_DIST_DIR, LICENSE_FEATURES } from '@/constants';
// import { DatabaseExportService } from '@/databases/import-export/database-export.service';
// import { DatabaseImportService } from '@/databases/import-export/database-import.service';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { SettingsRepository } from '@/databases/repositories/settings.repository';
import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error';
@ -271,13 +269,6 @@ export class Start extends BaseCommand {
async run() {
const { flags } = await this.parse(Start);
// @TEMP
// await Container.get(DatabaseExportService).export();
// Container.get(DatabaseImportService).setConfig({
// importFilePath: '/tmp/backup/n8n-db-export-2024-09-24.tar.gz',
// });
// await Container.get(DatabaseImportService).import();
// Load settings from database and set them to config.
const databaseSettings = await Container.get(SettingsRepository).findBy({
loadOnStartup: true,

View file

@ -1,6 +1,3 @@
/** Base filename for the tarball, to be suffixed with `-{timestamp}.zip`. */
export const ZIP_BASE_FILE_NAME = 'n8n-db-export';
/** Name of the file describing the export. */
export const MANIFEST_FILENAME = 'manifest.json';

View file

@ -5,11 +5,15 @@ import fs from 'node:fs';
import path from 'node:path';
import { createGzip } from 'node:zlib';
import tar from 'tar-stream';
import type { DirectoryResult } from 'tmp-promise';
import { dir } from 'tmp-promise';
import { Service } from 'typedi';
import { UnsupportedSourceError } from '@/errors/unsupported-source.error';
import { FilesystemService } from '@/filesystem/filesystem.service';
import { Logger } from '@/logging/logger.service';
import { BATCH_SIZE, EXCLUDE_LIST, MANIFEST_FILENAME, ZIP_BASE_FILE_NAME } from './constants';
import { BATCH_SIZE, EXCLUDE_LIST, MANIFEST_FILENAME } from './constants';
import type { DatabaseExportConfig, Manifest, Row } from './types';
import { DatabaseSchemaService } from '../database-schema.service';
import type { DatabaseType } from '../types';
@ -18,52 +22,46 @@ import type { DatabaseType } from '../types';
@Service()
export class DatabaseExportService {
private config: DatabaseExportConfig = {
outDir: '/tmp/backup', // @TODO: Update to cwd
mode: 'full',
};
private config = {} as DatabaseExportConfig;
private readonly rowCounts: Manifest['rowCounts'] = {};
private readonly dbType: DatabaseType;
private tmpDir: DirectoryResult;
constructor(
private readonly globalConfig: GlobalConfig,
globalConfig: GlobalConfig,
private readonly fsService: FilesystemService,
private readonly schemaService: DatabaseSchemaService,
private readonly logger: Logger,
) {
this.dbType = globalConfig.database.type;
if (this.dbType !== 'postgresdb') throw new UnsupportedSourceError(this.dbType);
}
setConfig(config: Partial<DatabaseExportConfig>) {
this.config = { ...this.config, ...config };
}
get tarballPath() {
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
const tarballFileName = `${ZIP_BASE_FILE_NAME}-${year}-${month}-${day}.tar.gz`;
return path.join(this.config.outDir, tarballFileName);
}
// #region Export
async export() {
this.logger.info('[ExportService] Starting export', { outDir: this.config.outDir });
this.logger.info('[ExportService] Starting export');
const { output } = this.config;
const outputDir = path.dirname(output);
await this.fsService.ensureDir(outputDir);
this.tmpDir = await dir();
this.logger.debug(`Writing temporary files in ${this.tmpDir.path}`);
try {
await fs.promises.access(this.config.outDir);
} catch {
await fs.promises.mkdir(this.config.outDir, { recursive: true });
await this.writeTarball();
} finally {
await fs.promises.rm(this.tmpDir.path, { recursive: true, force: true });
}
await this.writeTarball();
this.logger.info('[ExportService] Completed export', { zipPath: this.tarballPath });
this.logger.info(`[ExportService] Exported backup to ${output}`);
}
// #endregion
@ -73,7 +71,7 @@ export class DatabaseExportService {
private async writeTarball() {
const pack = tar.pack();
pack.pipe(createGzip()).pipe(fs.createWriteStream(this.tarballPath));
pack.pipe(createGzip()).pipe(fs.createWriteStream(this.config.output));
// DB row -> entryStream -> tarStream -> gzipStream -> writeStream
@ -90,7 +88,7 @@ export class DatabaseExportService {
if (totalRowsCount === 0) continue;
const tableFilePath = path.join(this.config.outDir, `${tableName}.jsonl`);
const tableFilePath = path.join(this.tmpDir.path, `${tableName}.jsonl`);
const writeStream = fs.createWriteStream(tableFilePath);
let offset = 0;
@ -124,7 +122,6 @@ export class DatabaseExportService {
writeStream.end();
pack.entry({ name: `${tableName}.jsonl` }, await fs.promises.readFile(tableFilePath));
await fs.promises.rm(tableFilePath);
}
const manifest: Manifest = {
@ -171,9 +168,7 @@ export class DatabaseExportService {
// #region Utils
private isJson(column: ColumnMetadata) {
return this.globalConfig.database.type === 'sqlite'
? column.type === 'simple-json'
: column.type === 'json';
return this.dbType === 'sqlite' ? column.type === 'simple-json' : column.type === 'json';
}
/** Check whether the column is not JSON-type but may contain JSON. */

View file

@ -3,44 +3,48 @@ import { ensureError, jsonParse } from 'n8n-workflow';
import fs from 'node:fs';
import path from 'node:path';
import readline from 'node:readline';
import type { DirectoryResult } from 'tmp-promise';
import { dir } from 'tmp-promise';
import { Service } from 'typedi';
import { MalformedManifestError } from '@/errors/malformed-manifest.error';
import { MigrationsMismatchError } from '@/errors/migrations-mismatch.error';
import { NotObjectLiteralError } from '@/errors/not-object-literal.error';
import { RowCountMismatchError } from '@/errors/row-count-mismatch.error';
import { UnsupportedDestinationError } from '@/errors/unsupported-destination.error';
import { FilesystemService } from '@/filesystem/filesystem.service';
import { Logger } from '@/logging/logger.service';
import { isObjectLiteral } from '@/utils';
import { MANIFEST_FILENAME } from './constants';
import { manifestSchema } from './manifest.schema';
import type { DatabaseImportConfig, Manifest } from './types';
import { MalformedManifestError } from '../../errors/malformed-manifest.error';
import { MigrationsMismatchError } from '../../errors/migrations-mismatch.error';
import { UnsupportedDestinationError } from '../../errors/unsupported-destination.error';
import { FilesystemService } from '../../filesystem/filesystem.service';
import { DatabaseSchemaService } from '../database-schema.service';
import type { DatabaseType } from '../types';
// @TODO: Check minimum version for Postgres?
// @TODO: Make all info logs debug
@Service()
export class DatabaseImportService {
private config: DatabaseImportConfig = {
importFilePath: '/tmp/backup/n8n-db-export-2024-10-11.tar.gz', // @TODO: Pass from command
extractDirPath: '/tmp/backup',
truncateDestination: true, // @TODO: Only for dev, default it to `false` later
};
/** Paths to files extracted from the tarball. */
private extractFilePaths: string[] = [];
private config: DatabaseImportConfig = {} as DatabaseImportConfig;
private manifest: Manifest;
private readonly dbType: DatabaseType;
private tmpDir: DirectoryResult;
constructor(
private readonly globalConfig: GlobalConfig,
globalConfig: GlobalConfig,
private readonly fsService: FilesystemService,
private readonly schemaService: DatabaseSchemaService,
private readonly logger: Logger,
) {}
) {
this.dbType = globalConfig.database.type;
if (this.dbType !== 'postgresdb') throw new UnsupportedDestinationError(this.dbType);
}
setConfig(config: Partial<DatabaseImportConfig>) {
this.config = { ...this.config, ...config };
@ -51,21 +55,40 @@ export class DatabaseImportService {
/** Import DB tables from a tarball of `.jsonl` files in the storage dir. */
async import() {
this.logger.info('[ImportService] Starting import');
const { input } = this.config;
await this.fsService.checkAccessible(input);
await this.preImportChecks();
this.tmpDir = await dir();
this.logger.debug(`Extracting temporary files in ${this.tmpDir.path}`);
try {
await this.fsService.extractTarball(this.config.input, this.tmpDir.path);
this.manifest = await this.getManifest();
const destinationLastMigration = await this.schemaService.getLastMigration();
if (this.manifest.lastExecutedMigration !== destinationLastMigration) {
throw new MigrationsMismatchError(
this.manifest.lastExecutedMigration,
destinationLastMigration,
);
}
if (this.config.deleteExistingData) {
for (const { entityTarget } of this.schemaService.getTables()) {
await this.schemaService.getDataSource().getRepository(entityTarget).delete({});
}
} else {
await this.schemaService.checkAllTablesEmpty();
}
await this.schemaService.disableForeignKeysPostgres();
await this.adjustSequences();
await this.importFiles();
await this.checkImportsAgainstManifest();
} catch (error) {
this.logger.error('[ImportService] Import failed - changes rolled back', {
error: ensureError(error),
});
} finally {
await this.schemaService.enableForeignKeysPostgres();
await this.postImportCleanup();
await fs.promises.rm(this.tmpDir.path, { recursive: true, force: true });
}
this.logger.info('[ImportService] Completed import');
@ -75,42 +98,8 @@ export class DatabaseImportService {
// #region Import steps
private async preImportChecks() {
await this.fsService.checkAccessible(this.config.extractDirPath);
const dbType = this.globalConfig.database.type;
if (dbType !== 'postgresdb') throw new UnsupportedDestinationError(dbType);
this.extractFilePaths = await this.fsService.extractTarball(
this.config.importFilePath,
this.config.extractDirPath,
);
this.manifest = await this.getManifest();
const destinationLastMigration = await this.schemaService.getLastMigration();
if (this.manifest.lastExecutedMigration !== destinationLastMigration) {
throw new MigrationsMismatchError(
this.manifest.lastExecutedMigration,
destinationLastMigration,
);
}
if (this.config.truncateDestination) {
for (const { entityTarget } of this.schemaService.getTables()) {
await this.schemaService.getDataSource().getRepository(entityTarget).delete({});
}
} else {
await this.schemaService.checkAllTablesEmpty();
}
this.logger.info('[ImportService] Pre-import checks passed');
}
private async getManifest() {
const manifestFilePath = path.join(this.config.extractDirPath, MANIFEST_FILENAME);
const manifestFilePath = path.join(this.tmpDir.path, MANIFEST_FILENAME);
const manifestJson = await fs.promises.readFile(manifestFilePath, 'utf8');
@ -125,7 +114,7 @@ export class DatabaseImportService {
private async importFiles() {
await this.schemaService.getDataSource().transaction(async (tx) => {
for (const { tableName, entityTarget } of this.schemaService.getTables()) {
const jsonlFilePath = path.join(this.config.extractDirPath, tableName) + '.jsonl';
const jsonlFilePath = path.join(this.tmpDir.path, tableName) + '.jsonl';
try {
await fs.promises.access(jsonlFilePath);
@ -192,11 +181,5 @@ export class DatabaseImportService {
this.logger.info('[ImportService] Imports match manifest');
}
private async postImportCleanup() {
await this.fsService.removeFiles(this.extractFilePaths);
this.extractFilePaths.length = 0;
}
// #endregion
}

View file

@ -11,23 +11,20 @@ export type Sequence = { [tableName: string]: number };
export type Manifest = z.infer<typeof manifestSchema>;
export type DatabaseExportConfig = {
/** Dir to place the export in. By default, the current working directory. */
outDir: string;
/** Path of the backup file.
* @default generated from the current UTC date. written in the current working directory. */
output: string;
/** Whether to export all data or only a smaller subset of data. */
/** Whether to export all data or only a smaller subset of data.
* @default 'full' */
mode: 'full' | 'lightweight';
};
export type DatabaseImportConfig = {
/** Absolute path to the file to import. */
importFilePath: string;
/** Absolute path to the backup file to import from. */
input: string;
// REMOVE
extractDirPath: string;
/**
* Whether to truncate all tables in the destination DB.
* @default true // @TODO: Only for dev, change to `false` later
*/
truncateDestination: boolean;
/** Whether to delete data in all tables in the destination DB.
* @default false */
deleteExistingData: boolean;
};

View file

@ -0,0 +1,9 @@
import { ApplicationError } from 'n8n-workflow';
export class UnsupportedSourceError extends ApplicationError {
constructor(dbType: string) {
super(`Exporting from ${dbType} is not supported. Please import from a Sqlite database.`, {
level: 'warning',
});
}
}

View file

@ -59,26 +59,18 @@ export class FilesystemService {
* Extract a tarball to a given directory.
* @param tarballPath Path to the tarball file to extract.
* @param dstDir Path to the directory to extract the tarball into.
* @returns Paths to the extracted files.
*/
async extractTarball(tarballPath: string, dstDir: string) {
await this.checkAccessible(tarballPath); // @TODO: Clearer error if tarball missing
const extractedFilePaths: string[] = [];
const extract = tar.extract();
extract.on('entry', async (header, stream, next) => {
const filePath = path.join(dstDir, header.name);
await pipeline(stream, fs.createWriteStream(filePath));
extractedFilePaths.push(filePath);
next();
});
await pipeline(fs.createReadStream(tarballPath), createGunzip(), extract);
this.logger.info('[FilesystemService] Extracted tarball', { tarballPath });
return extractedFilePaths;
this.logger.debug('[FilesystemService] Extracted tarball', { tarballPath });
}
}

View file

@ -1,6 +1,6 @@
import { createHash, randomBytes } from 'crypto';
import { chmodSync, existsSync, mkdirSync, readFileSync, statSync, writeFileSync } from 'fs';
import { ApplicationError, jsonParse, ALPHABET, toResult } from 'n8n-workflow';
import { ApplicationError, jsonParse, ALPHABET, toResult, LoggerProxy } from 'n8n-workflow';
import { customAlphabet } from 'nanoid';
import path from 'path';
import { Service } from 'typedi';
@ -136,7 +136,7 @@ export class InstanceSettings {
errorMessage: `Error parsing n8n-config file "${this.settingsFile}". It does not seem to be valid JSON.`,
});
if (!inTest) console.info(`User settings loaded from: ${this.settingsFile}`);
if (!inTest) LoggerProxy.debug(`User settings loaded from: ${this.settingsFile}`);
const { encryptionKey, tunnelSubdomain } = settings;

View file

@ -46,7 +46,7 @@
"n8n-core": "workspace:*",
"n8n-workflow": "workspace:*",
"replace-in-file": "^6.0.0",
"tmp-promise": "^3.0.3",
"tmp-promise": "catalog:",
"typedi": "catalog:"
}
}

View file

@ -904,7 +904,7 @@
"simple-git": "3.17.0",
"snowflake-sdk": "1.12.0",
"ssh2-sftp-client": "7.2.3",
"tmp-promise": "3.0.3",
"tmp-promise": "catalog:",
"ts-ics": "1.2.2",
"typedi": "catalog:",
"uuid": "catalog:",

View file

@ -45,6 +45,9 @@ catalogs:
nanoid:
specifier: 3.3.6
version: 3.3.6
tmp-promise:
specifier: 3.0.3
version: 3.0.3
typedi:
specifier: 0.10.0
version: 0.10.0
@ -556,7 +559,7 @@ importers:
specifier: 0.9.4
version: 0.9.4
tmp-promise:
specifier: 3.0.3
specifier: 'catalog:'
version: 3.0.3
zod:
specifier: 'catalog:'
@ -979,6 +982,9 @@ importers:
tar-stream:
specifier: ^3.1.7
version: 3.1.7
tmp-promise:
specifier: 'catalog:'
version: 3.0.3
typedi:
specifier: 'catalog:'
version: 0.10.0(patch_hash=sk6omkefrosihg7lmqbzh7vfxe)
@ -1593,7 +1599,7 @@ importers:
specifier: ^6.0.0
version: 6.3.5
tmp-promise:
specifier: ^3.0.3
specifier: 'catalog:'
version: 3.0.3
typedi:
specifier: 'catalog:'
@ -1783,7 +1789,7 @@ importers:
specifier: 7.2.3
version: 7.2.3
tmp-promise:
specifier: 3.0.3
specifier: 'catalog:'
version: 3.0.3
ts-ics:
specifier: 1.2.2
@ -11500,10 +11506,6 @@ packages:
resolution: {integrity: sha512-jRCJlojKnZ3addtTOjdIqoRuPEKBvNXcGYqzO6zWZX8KfKEpnGY5jfggJQ3EjKuu8D4bJRr0y+cYJFmYbImXGw==}
engines: {node: '>=0.6.0'}
tmp@0.2.1:
resolution: {integrity: sha512-76SUhtfqR2Ijn+xllcI5P1oyannHNHByD80W1q447gU3mp9G9PSpGdWmjUOHRDPiHYacIk66W7ubDTuPF3BEtQ==}
engines: {node: '>=8.17.0'}
tmp@0.2.3:
resolution: {integrity: sha512-nZD7m9iCPC5g0pYmcaxogYKggSfLsdxl8of3Q/oIbqCqLLIO9IAF0GWjX1z9NZRHPiXv8Wex4yDCaZsgEw0Y8w==}
engines: {node: '>=14.14'}
@ -24960,16 +24962,12 @@ snapshots:
tmp-promise@3.0.3:
dependencies:
tmp: 0.2.1
tmp: 0.2.3
tmp@0.0.33:
dependencies:
os-tmpdir: 1.0.2
tmp@0.2.1:
dependencies:
rimraf: 3.0.2
tmp@0.2.3: {}
tmpl@1.0.5: {}

View file

@ -18,6 +18,7 @@ catalog:
lodash: 4.17.21
luxon: 3.4.4
nanoid: 3.3.6
tmp-promise: 3.0.3
typedi: 0.10.0
uuid: 10.0.0
xml2js: 0.6.2