From 45c9d1ee504e659764ccc1896c78dd82d10e83c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 25 Sep 2024 18:13:10 +0200 Subject: [PATCH] Consolidate both branches --- packages/cli/src/commands/export/backup.ts | 13 +- packages/cli/src/commands/import/backup.ts | 12 +- .../import-export/database-export.service.ts | 118 +++++++------- .../import-export/database-import.service.ts | 11 +- .../cli/src/filesystem/filesystem.service.ts | 55 ------- packages/cli/src/services/backup.service.ts | 149 ------------------ 6 files changed, 75 insertions(+), 283 deletions(-) delete mode 100644 packages/cli/src/services/backup.service.ts diff --git a/packages/cli/src/commands/export/backup.ts b/packages/cli/src/commands/export/backup.ts index e5f9c784d0..02078c21be 100644 --- a/packages/cli/src/commands/export/backup.ts +++ b/packages/cli/src/commands/export/backup.ts @@ -1,10 +1,7 @@ import { Flags } from '@oclif/core'; import { tmpdir } from 'node:os'; -import { join } from 'path'; import Container from 'typedi'; -import { BackupService } from '@/services/backup.service'; - import { BaseCommand } from '../base-command'; export class ExportBackupCommand extends BaseCommand { @@ -21,11 +18,11 @@ export class ExportBackupCommand extends BaseCommand { }; async run() { - const { flags } = await this.parse(ExportBackupCommand); - const zipPath = join(flags.output, 'n8n-backup.zip'); - const backupService = Container.get(BackupService); - await backupService.createBackup(zipPath); - console.log(`data exported to ${zipPath}`); + const { DatabaseExportService } = await import( + '@/databases/import-export/database-export.service' + ); + + await Container.get(DatabaseExportService).export(); } async catch(error: Error) { diff --git a/packages/cli/src/commands/import/backup.ts b/packages/cli/src/commands/import/backup.ts index a5a6be4747..dc338cc216 100644 --- a/packages/cli/src/commands/import/backup.ts +++ b/packages/cli/src/commands/import/backup.ts @@ -1,10 +1,7 @@ import { Flags } from '@oclif/core'; import { tmpdir } from 'node:os'; -import { join } from 'path'; import Container from 'typedi'; -import { BackupService } from '@/services/backup.service'; - import { BaseCommand } from '../base-command'; export class ImportBackupCommand extends BaseCommand { @@ -22,11 +19,10 @@ export class ImportBackupCommand extends BaseCommand { }; async run() { - const { flags } = await this.parse(ImportBackupCommand); - const zipPath = join(flags.input, 'n8n-backup.zip'); - const backupService = Container.get(BackupService); - await backupService.importBackup(zipPath); - console.log(`data imported from ${zipPath}`); + const { DatabaseImportService } = await import( + '@/databases/import-export/database-import.service' + ); + await Container.get(DatabaseImportService).import(); } async catch(error: Error) { diff --git a/packages/cli/src/databases/import-export/database-export.service.ts b/packages/cli/src/databases/import-export/database-export.service.ts index 854c349512..eab1a639e9 100644 --- a/packages/cli/src/databases/import-export/database-export.service.ts +++ b/packages/cli/src/databases/import-export/database-export.service.ts @@ -1,9 +1,11 @@ import { GlobalConfig } from '@n8n/config'; import type { ColumnMetadata } from '@n8n/typeorm/metadata/ColumnMetadata'; +import archiver from 'archiver'; import { jsonParse } from 'n8n-workflow'; import { strict } from 'node:assert'; import fs from 'node:fs'; import path from 'node:path'; +import { PassThrough } from 'node:stream'; import { Service } from 'typedi'; import { Logger } from '@/logger'; @@ -13,6 +15,7 @@ import type { Manifest } from './manifest.schema'; import type { DatabaseExportConfig, Row } from './types'; import { FilesystemService } from '../../filesystem/filesystem.service'; import { DatabaseSchemaService } from '../database-schema.service'; +import type { DatabaseType } from '../types'; // @TODO: Check minimum version for each DB type? // @TODO: Optional table exclude list @@ -31,12 +34,27 @@ export class DatabaseExportService { /** Number of rows in tables being exported. */ private readonly rowCounts: { [tableName: string]: number } = {}; + private readonly dbType: DatabaseType; + + get tarballPath() { + const now = new Date(); + const year = now.getFullYear(); + const month = String(now.getMonth() + 1).padStart(2, '0'); + const day = String(now.getDate()).padStart(2, '0'); + + const tarballFileName = `${this.config.tarballBaseFileName}-${year}-${month}-${day}.tar.gz`; + + return path.join(this.config.storageDirPath, tarballFileName); + } + constructor( private readonly globalConfig: GlobalConfig, private readonly fsService: FilesystemService, private readonly schemaService: DatabaseSchemaService, private readonly logger: Logger, - ) {} + ) { + this.dbType = globalConfig.database.type; + } setConfig(config: Partial) { this.config = { ...this.config, ...config }; @@ -49,39 +67,35 @@ export class DatabaseExportService { await this.fsService.ensureDir(this.config.storageDirPath); this.logger.info('[ExportService] Starting export', { - dbType: this.globalConfig.database.type, + dbType: this.dbType, storageDirPath: this.config.storageDirPath, }); - await this.writeJsonlFiles(); - - if (this.exportFilePaths.length === 0) { - this.logger.info('[ExportService] Found no tables to export, aborted export'); - return; - } - - this.logger.info('[ExportService] Exported tables', { exportedTables: this.exportFilePaths }); - - await this.writeManifest(); - - const tarballPath = path.join(this.config.storageDirPath, this.tarballFileName()); - - await this.fsService.createTarball(tarballPath, this.exportFilePaths); + await this.writeTarball(); await this.postExportCleanup(); - this.logger.info('[ExportService] Completed export', { tarballPath }); + this.logger.info('[ExportService] Completed export', { tarballPath: this.tarballPath }); } // #endregion // #region Export steps - private async writeJsonlFiles() { + private async writeTarball() { + const tarballPath = path.join(this.config.storageDirPath, this.tarballPath); + + const archive = archiver('zip', { zlib: { level: 9 } }); + + archive.pipe(fs.createWriteStream(tarballPath)); + + const writeStream = new PassThrough(); + for (const { tableName, columns } of this.schemaService.getTables()) { + archive.append(writeStream, { name: `${tableName}.jsonl` }); + let offset = 0; let totalRows = 0; - let writeStream: fs.WriteStream | undefined; while (true) { const rows = await this.schemaService @@ -92,10 +106,6 @@ export class DatabaseExportService { if (rows.length === 0) break; - writeStream ??= fs.createWriteStream( - path.join(this.config.storageDirPath, tableName) + '.jsonl', - ); - for (const row of rows) { for (const column of columns) { this.normalizeRow(row, { column, tableName }); @@ -110,15 +120,26 @@ export class DatabaseExportService { offset += this.config.batchSize; this.logger.info(`[ExportService] Exported ${totalRows} rows from ${tableName}`); + + writeStream.end(); } - if (writeStream) { - writeStream.end(); - const jsonlFilePath = path.join(this.config.storageDirPath, tableName + '.jsonl'); - this.exportFilePaths.push(jsonlFilePath); - this.rowCounts[tableName] = totalRows; - } + this.rowCounts[tableName] = totalRows; } + + const manifest: Manifest = { + lastExecutedMigration: await this.schemaService.getLastMigration(), + sourceDbType: this.dbType, + exportedAt: new Date().toISOString(), + rowCounts: this.rowCounts, + sequences: await this.schemaService.getSequences(), + }; + + const manifestBuffer = Buffer.from(JSON.stringify(manifest, null, 2), 'utf-8'); + + archive.append(manifestBuffer, { name: MANIFEST_FILENAME }); + + await archive.finalize(); } /** Make values in SQLite and MySQL rows compatible with Postgres. */ @@ -126,11 +147,9 @@ export class DatabaseExportService { row: Row, { column, tableName }: { column: ColumnMetadata; tableName: string }, ) { - const dbType = this.globalConfig.database.type; + if (this.dbType === 'postgresdb') return; - if (dbType === 'postgresdb') return; - - if (dbType === 'sqlite' && column.type === Boolean) { + if (this.dbType === 'sqlite' && column.type === Boolean) { const value = row[column.propertyName]; strict( @@ -141,7 +160,10 @@ export class DatabaseExportService { row[column.propertyName] = value === 1; } - if (dbType === 'sqlite' && (this.isJson(column) || this.isPossiblyJson(tableName, column))) { + if ( + this.dbType === 'sqlite' && + (this.isJson(column) || this.isPossiblyJson(tableName, column)) + ) { const value = row[column.propertyName]; if (typeof value === 'string') { @@ -152,25 +174,6 @@ export class DatabaseExportService { // @TODO: MySQL and MariaDB normalizations } - /** Write a manifest file describing the export. */ - private async writeManifest() { - const manifestFilePath = path.join(this.config.storageDirPath, MANIFEST_FILENAME); - - const manifest: Manifest = { - lastExecutedMigration: await this.schemaService.getLastMigration(), - sourceDbType: this.globalConfig.database.type, - exportedAt: new Date().toISOString(), - rowCounts: this.rowCounts, - sequences: await this.schemaService.getSequences(), - }; - - await fs.promises.writeFile(manifestFilePath, JSON.stringify(manifest, null, 2), 'utf8'); - - this.exportFilePaths.push(manifestFilePath); - - this.logger.info('[ExportService] Wrote manifest', { metadata: manifest }); - } - /** Clear all `.jsonl` and `.json` files from the storage dir. */ async postExportCleanup() { await this.fsService.removeFiles(this.exportFilePaths); @@ -193,14 +196,5 @@ export class DatabaseExportService { return tableName === 'settings' && column.propertyName === 'value'; } - private tarballFileName() { - const now = new Date(); - const year = now.getFullYear(); - const month = String(now.getMonth() + 1).padStart(2, '0'); - const day = String(now.getDate()).padStart(2, '0'); - - return `${this.config.tarballBaseFileName}-${year}-${month}-${day}.tar.gz`; - } - // #endregion } diff --git a/packages/cli/src/databases/import-export/database-import.service.ts b/packages/cli/src/databases/import-export/database-import.service.ts index d889df2ea3..8ce868dc67 100644 --- a/packages/cli/src/databases/import-export/database-import.service.ts +++ b/packages/cli/src/databases/import-export/database-import.service.ts @@ -3,7 +3,9 @@ import { ensureError, jsonParse } from 'n8n-workflow'; import fs from 'node:fs'; import path from 'node:path'; import readline from 'node:readline'; +import { pipeline } from 'node:stream/promises'; import { Service } from 'typedi'; +import { Extract } from 'unzip-stream'; import { NotObjectLiteralError } from '@/errors/not-object-literal.error'; import { RowCountMismatchError } from '@/errors/row-count-mismatch.error'; @@ -84,7 +86,12 @@ export class DatabaseImportService { if (dbType !== 'postgresdb') throw new UnsupportedDestinationError(dbType); - await this.fsService.extractTarball(this.config.importFilePath, this.config.extractDirPath); + // @TODO: Stream instead of extracting to filesystem + + await pipeline( + fs.createReadStream(this.config.importFilePath), + Extract({ path: this.config.extractDirPath }), + ); this.manifest = await this.getManifest(); @@ -139,6 +146,8 @@ export class DatabaseImportService { crlfDelay: Infinity, // treat CR and LF as single char }); + // @TODO: Insert in batches + const txRepository = tx.getRepository(entityTarget); for await (const line of lineStream) { diff --git a/packages/cli/src/filesystem/filesystem.service.ts b/packages/cli/src/filesystem/filesystem.service.ts index 8608cd7956..572e34bf49 100644 --- a/packages/cli/src/filesystem/filesystem.service.ts +++ b/packages/cli/src/filesystem/filesystem.service.ts @@ -1,18 +1,10 @@ import { FileNotFoundError } from 'n8n-core'; import { ensureError } from 'n8n-workflow'; import fs from 'node:fs'; -import path from 'node:path'; -import { pipeline } from 'node:stream/promises'; -import { createGzip, createGunzip } from 'node:zlib'; -import tar from 'tar-stream'; import { Service } from 'typedi'; -import { Logger } from '@/logger'; - @Service() export class FilesystemService { - constructor(private readonly logger: Logger) {} - /** * Ensure a directory exists by checking or creating it. * @param dirPath Path to the directory to check or create. @@ -54,51 +46,4 @@ export class FilesystemService { } } } - - /** - * Create a tarball from the given file paths. - * @param srcPaths Paths to the files to include in the tarball. - * @param tarballPath Path to the tarball file to create. - */ - async createTarball(tarballPath: string, srcPaths: string[]) { - const pack = tar.pack(); - - for (const filePath of srcPaths) { - const fileContent = await fs.promises.readFile(filePath); // @TODO: Read stream - pack.entry({ name: path.basename(filePath) }, fileContent); - } - - pack.finalize(); - - await pipeline(pack, createGzip(), fs.createWriteStream(tarballPath)); - - this.logger.info('[FilesystemService] Created tarball', { tarballPath }); - } - - /** - * Extract a tarball to a given directory. - * @param tarballPath Path to the tarball file to extract. - * @param dstDir Path to the directory to extract the tarball into. - * @returns Paths to the extracted files. - */ - async extractTarball(tarballPath: string, dstDir: string) { - await this.checkAccessible(tarballPath); // @TODO: Clearer error if tarball missing - - const extractedFilePaths: string[] = []; - - const extract = tar.extract(); - - extract.on('entry', async (header, stream, next) => { - const filePath = path.join(dstDir, header.name); - await pipeline(stream, fs.createWriteStream(filePath)); - extractedFilePaths.push(filePath); - next(); - }); - - await pipeline(fs.createReadStream(tarballPath), createGunzip(), extract); - - this.logger.info('[FilesystemService] Extracted tarball', { tarballPath }); - - return extractedFilePaths; - } } diff --git a/packages/cli/src/services/backup.service.ts b/packages/cli/src/services/backup.service.ts deleted file mode 100644 index 32dc7a1494..0000000000 --- a/packages/cli/src/services/backup.service.ts +++ /dev/null @@ -1,149 +0,0 @@ -import { GlobalConfig } from '@n8n/config'; -import { DataSource, MigrationExecutor } from '@n8n/typeorm'; -import archiver from 'archiver'; -import { ApplicationError } from 'n8n-workflow'; -import { createReadStream, createWriteStream, existsSync } from 'node:fs'; -import { mkdir, readFile } from 'node:fs/promises'; -import { dirname, join } from 'node:path'; -import { createInterface } from 'node:readline'; -import { PassThrough } from 'node:stream'; -import { pipeline } from 'node:stream/promises'; -import { Service } from 'typedi'; -import { Extract } from 'unzip-stream'; - -import { jsonColumnType } from '@/databases/entities/abstract-entity'; - -/** These tables are not backed up to reduce the backup size */ -const excludeList = [ - 'execution_annotation_tags', - 'execution_annotations', - 'execution_data', - 'execution_entity', - 'execution_metadata', - 'annotation_tag_entity', -]; - -@Service() -export class BackupService { - constructor( - private readonly globalConfig: GlobalConfig, - private readonly dataSource: DataSource, - ) {} - - async createBackup(archivePath: string) { - if (existsSync(archivePath)) { - throw new ApplicationError( - 'Backup file already exists. Please delete that file and try again.', - ); - } - - await mkdir(dirname(archivePath), { recursive: true }); - const archive = archiver('zip', { zlib: { level: 9 } }); - archive.pipe(createWriteStream(archivePath)); - - for (const { name: tableName, columns } of this.tables) { - const totalRowsCount = await this.dataSource - .query(`SELECT COUNT(*) AS count FROM ${tableName}`) - .then((rows: Array<{ count: number }>) => rows[0].count); - if (totalRowsCount === 0) continue; - - const fileName = `${tableName}.jsonl`; - const stream = new PassThrough(); - archive.append(stream, { name: fileName }); - - let cursor = 0; - const batchSize = 10; - while (cursor < totalRowsCount) { - const rows = await this.dataSource.query( - `SELECT * from ${tableName} LIMIT ${cursor}, ${batchSize}`, - ); - - for (const row of rows) { - // Our sqlite setup has some quirks. The following code normalizes the exported data so that it can be imported into a new postgres or sqlite database. - if (this.globalConfig.database.type === 'sqlite') { - for (const { type: columnType, propertyName } of columns) { - if (propertyName in row) { - // Our sqlite setup used `simple-json` for JSON columns, which is stored as strings. - // This is because when we wrote this code, sqlite did not support native JSON column types. - if (columnType === jsonColumnType) { - row[propertyName] = JSON.parse(row[propertyName]); - } - // Sqlite does not have a separate Boolean data type, and uses integers 0/1 to mark values as boolean - else if (columnType === Boolean) { - row[propertyName] = Boolean(row[propertyName]); - } - } - } - } - - stream.write(JSON.stringify(row)); - stream.write('\n'); - } - - cursor += batchSize; - } - - stream.end(); - } - - // Add this hidden file to store the last migration. - // This is used during import to ensure that the importing DB schema is up to date - archive.append(Buffer.from(await this.getLastMigrationName(), 'utf8'), { - name: '.lastMigration', - }); - - await archive.finalize(); - } - - async importBackup(archivePath: string) { - if (!existsSync(archivePath)) { - throw new ApplicationError('Backup archive not found. Please check the path.'); - } - - // TODO: instead of extracting to the filesystem, stream the files directly - const backupPath = '/tmp/backup'; - await pipeline(createReadStream(archivePath), Extract({ path: backupPath })); - - const lastMigrationInBackup = await readFile(join(backupPath, '.lastMigration'), 'utf8'); - const getLastMigrationInDB = await this.getLastMigrationName(); - if (lastMigrationInBackup !== getLastMigrationInDB) { - throw new ApplicationError('Last Migrations Differ, make sure to use the same n8n version'); - } - - // (2. if clean truncate) - // (2. if no clean, check if tables are empty) - // 3. disable foreign keys - - // 4. import each jsonl - for (const { name, target } of this.tables) { - const repo = this.dataSource.getRepository(target); - await repo.delete({}); - - const filePath = join(backupPath, `${name}.jsonl`); - if (!existsSync(filePath)) continue; - - const fileStream = createReadStream(filePath); - const lineStream = createInterface({ input: fileStream }); - for await (const line of lineStream) { - // TODO: insert in batches to reduce DB load - await repo.insert(JSON.parse(line)); - } - - fileStream.close(); - } - - // 5. enable foreign keys - } - - async getLastMigrationName() { - const migrationExecutor = new MigrationExecutor(this.dataSource); - const executedMigrations = await migrationExecutor.getExecutedMigrations(); - return executedMigrations.at(0)!.name; - } - - get tables() { - return this.dataSource.entityMetadatas - .filter((v) => !excludeList.includes(v.tableName)) - .map(({ tableName, columns, target }) => ({ name: tableName, columns, target })); - } -}