Consolidate both branches

This commit is contained in:
Iván Ovejero 2024-09-25 18:13:10 +02:00
parent 2fc418ec25
commit 45c9d1ee50
No known key found for this signature in database
6 changed files with 75 additions and 283 deletions

View file

@ -1,10 +1,7 @@
import { Flags } from '@oclif/core';
import { tmpdir } from 'node:os';
import { join } from 'path';
import Container from 'typedi';
import { BackupService } from '@/services/backup.service';
import { BaseCommand } from '../base-command';
export class ExportBackupCommand extends BaseCommand {
@ -21,11 +18,11 @@ export class ExportBackupCommand extends BaseCommand {
};
async run() {
const { flags } = await this.parse(ExportBackupCommand);
const zipPath = join(flags.output, 'n8n-backup.zip');
const backupService = Container.get(BackupService);
await backupService.createBackup(zipPath);
console.log(`data exported to ${zipPath}`);
const { DatabaseExportService } = await import(
'@/databases/import-export/database-export.service'
);
await Container.get(DatabaseExportService).export();
}
async catch(error: Error) {

View file

@ -1,10 +1,7 @@
import { Flags } from '@oclif/core';
import { tmpdir } from 'node:os';
import { join } from 'path';
import Container from 'typedi';
import { BackupService } from '@/services/backup.service';
import { BaseCommand } from '../base-command';
export class ImportBackupCommand extends BaseCommand {
@ -22,11 +19,10 @@ export class ImportBackupCommand extends BaseCommand {
};
async run() {
const { flags } = await this.parse(ImportBackupCommand);
const zipPath = join(flags.input, 'n8n-backup.zip');
const backupService = Container.get(BackupService);
await backupService.importBackup(zipPath);
console.log(`data imported from ${zipPath}`);
const { DatabaseImportService } = await import(
'@/databases/import-export/database-import.service'
);
await Container.get(DatabaseImportService).import();
}
async catch(error: Error) {

View file

@ -1,9 +1,11 @@
import { GlobalConfig } from '@n8n/config';
import type { ColumnMetadata } from '@n8n/typeorm/metadata/ColumnMetadata';
import archiver from 'archiver';
import { jsonParse } from 'n8n-workflow';
import { strict } from 'node:assert';
import fs from 'node:fs';
import path from 'node:path';
import { PassThrough } from 'node:stream';
import { Service } from 'typedi';
import { Logger } from '@/logger';
@ -13,6 +15,7 @@ import type { Manifest } from './manifest.schema';
import type { DatabaseExportConfig, Row } from './types';
import { FilesystemService } from '../../filesystem/filesystem.service';
import { DatabaseSchemaService } from '../database-schema.service';
import type { DatabaseType } from '../types';
// @TODO: Check minimum version for each DB type?
// @TODO: Optional table exclude list
@ -31,12 +34,27 @@ export class DatabaseExportService {
/** Number of rows in tables being exported. */
private readonly rowCounts: { [tableName: string]: number } = {};
private readonly dbType: DatabaseType;
get tarballPath() {
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
const tarballFileName = `${this.config.tarballBaseFileName}-${year}-${month}-${day}.tar.gz`;
return path.join(this.config.storageDirPath, tarballFileName);
}
constructor(
private readonly globalConfig: GlobalConfig,
private readonly fsService: FilesystemService,
private readonly schemaService: DatabaseSchemaService,
private readonly logger: Logger,
) {}
) {
this.dbType = globalConfig.database.type;
}
setConfig(config: Partial<DatabaseExportConfig>) {
this.config = { ...this.config, ...config };
@ -49,39 +67,35 @@ export class DatabaseExportService {
await this.fsService.ensureDir(this.config.storageDirPath);
this.logger.info('[ExportService] Starting export', {
dbType: this.globalConfig.database.type,
dbType: this.dbType,
storageDirPath: this.config.storageDirPath,
});
await this.writeJsonlFiles();
if (this.exportFilePaths.length === 0) {
this.logger.info('[ExportService] Found no tables to export, aborted export');
return;
}
this.logger.info('[ExportService] Exported tables', { exportedTables: this.exportFilePaths });
await this.writeManifest();
const tarballPath = path.join(this.config.storageDirPath, this.tarballFileName());
await this.fsService.createTarball(tarballPath, this.exportFilePaths);
await this.writeTarball();
await this.postExportCleanup();
this.logger.info('[ExportService] Completed export', { tarballPath });
this.logger.info('[ExportService] Completed export', { tarballPath: this.tarballPath });
}
// #endregion
// #region Export steps
private async writeJsonlFiles() {
private async writeTarball() {
const tarballPath = path.join(this.config.storageDirPath, this.tarballPath);
const archive = archiver('zip', { zlib: { level: 9 } });
archive.pipe(fs.createWriteStream(tarballPath));
const writeStream = new PassThrough();
for (const { tableName, columns } of this.schemaService.getTables()) {
archive.append(writeStream, { name: `${tableName}.jsonl` });
let offset = 0;
let totalRows = 0;
let writeStream: fs.WriteStream | undefined;
while (true) {
const rows = await this.schemaService
@ -92,10 +106,6 @@ export class DatabaseExportService {
if (rows.length === 0) break;
writeStream ??= fs.createWriteStream(
path.join(this.config.storageDirPath, tableName) + '.jsonl',
);
for (const row of rows) {
for (const column of columns) {
this.normalizeRow(row, { column, tableName });
@ -110,15 +120,26 @@ export class DatabaseExportService {
offset += this.config.batchSize;
this.logger.info(`[ExportService] Exported ${totalRows} rows from ${tableName}`);
writeStream.end();
}
if (writeStream) {
writeStream.end();
const jsonlFilePath = path.join(this.config.storageDirPath, tableName + '.jsonl');
this.exportFilePaths.push(jsonlFilePath);
this.rowCounts[tableName] = totalRows;
}
this.rowCounts[tableName] = totalRows;
}
const manifest: Manifest = {
lastExecutedMigration: await this.schemaService.getLastMigration(),
sourceDbType: this.dbType,
exportedAt: new Date().toISOString(),
rowCounts: this.rowCounts,
sequences: await this.schemaService.getSequences(),
};
const manifestBuffer = Buffer.from(JSON.stringify(manifest, null, 2), 'utf-8');
archive.append(manifestBuffer, { name: MANIFEST_FILENAME });
await archive.finalize();
}
/** Make values in SQLite and MySQL rows compatible with Postgres. */
@ -126,11 +147,9 @@ export class DatabaseExportService {
row: Row,
{ column, tableName }: { column: ColumnMetadata; tableName: string },
) {
const dbType = this.globalConfig.database.type;
if (this.dbType === 'postgresdb') return;
if (dbType === 'postgresdb') return;
if (dbType === 'sqlite' && column.type === Boolean) {
if (this.dbType === 'sqlite' && column.type === Boolean) {
const value = row[column.propertyName];
strict(
@ -141,7 +160,10 @@ export class DatabaseExportService {
row[column.propertyName] = value === 1;
}
if (dbType === 'sqlite' && (this.isJson(column) || this.isPossiblyJson(tableName, column))) {
if (
this.dbType === 'sqlite' &&
(this.isJson(column) || this.isPossiblyJson(tableName, column))
) {
const value = row[column.propertyName];
if (typeof value === 'string') {
@ -152,25 +174,6 @@ export class DatabaseExportService {
// @TODO: MySQL and MariaDB normalizations
}
/** Write a manifest file describing the export. */
private async writeManifest() {
const manifestFilePath = path.join(this.config.storageDirPath, MANIFEST_FILENAME);
const manifest: Manifest = {
lastExecutedMigration: await this.schemaService.getLastMigration(),
sourceDbType: this.globalConfig.database.type,
exportedAt: new Date().toISOString(),
rowCounts: this.rowCounts,
sequences: await this.schemaService.getSequences(),
};
await fs.promises.writeFile(manifestFilePath, JSON.stringify(manifest, null, 2), 'utf8');
this.exportFilePaths.push(manifestFilePath);
this.logger.info('[ExportService] Wrote manifest', { metadata: manifest });
}
/** Clear all `.jsonl` and `.json` files from the storage dir. */
async postExportCleanup() {
await this.fsService.removeFiles(this.exportFilePaths);
@ -193,14 +196,5 @@ export class DatabaseExportService {
return tableName === 'settings' && column.propertyName === 'value';
}
private tarballFileName() {
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
return `${this.config.tarballBaseFileName}-${year}-${month}-${day}.tar.gz`;
}
// #endregion
}

View file

@ -3,7 +3,9 @@ import { ensureError, jsonParse } from 'n8n-workflow';
import fs from 'node:fs';
import path from 'node:path';
import readline from 'node:readline';
import { pipeline } from 'node:stream/promises';
import { Service } from 'typedi';
import { Extract } from 'unzip-stream';
import { NotObjectLiteralError } from '@/errors/not-object-literal.error';
import { RowCountMismatchError } from '@/errors/row-count-mismatch.error';
@ -84,7 +86,12 @@ export class DatabaseImportService {
if (dbType !== 'postgresdb') throw new UnsupportedDestinationError(dbType);
await this.fsService.extractTarball(this.config.importFilePath, this.config.extractDirPath);
// @TODO: Stream instead of extracting to filesystem
await pipeline(
fs.createReadStream(this.config.importFilePath),
Extract({ path: this.config.extractDirPath }),
);
this.manifest = await this.getManifest();
@ -139,6 +146,8 @@ export class DatabaseImportService {
crlfDelay: Infinity, // treat CR and LF as single char
});
// @TODO: Insert in batches
const txRepository = tx.getRepository(entityTarget);
for await (const line of lineStream) {

View file

@ -1,18 +1,10 @@
import { FileNotFoundError } from 'n8n-core';
import { ensureError } from 'n8n-workflow';
import fs from 'node:fs';
import path from 'node:path';
import { pipeline } from 'node:stream/promises';
import { createGzip, createGunzip } from 'node:zlib';
import tar from 'tar-stream';
import { Service } from 'typedi';
import { Logger } from '@/logger';
@Service()
export class FilesystemService {
constructor(private readonly logger: Logger) {}
/**
* Ensure a directory exists by checking or creating it.
* @param dirPath Path to the directory to check or create.
@ -54,51 +46,4 @@ export class FilesystemService {
}
}
}
/**
* Create a tarball from the given file paths.
* @param srcPaths Paths to the files to include in the tarball.
* @param tarballPath Path to the tarball file to create.
*/
async createTarball(tarballPath: string, srcPaths: string[]) {
const pack = tar.pack();
for (const filePath of srcPaths) {
const fileContent = await fs.promises.readFile(filePath); // @TODO: Read stream
pack.entry({ name: path.basename(filePath) }, fileContent);
}
pack.finalize();
await pipeline(pack, createGzip(), fs.createWriteStream(tarballPath));
this.logger.info('[FilesystemService] Created tarball', { tarballPath });
}
/**
* Extract a tarball to a given directory.
* @param tarballPath Path to the tarball file to extract.
* @param dstDir Path to the directory to extract the tarball into.
* @returns Paths to the extracted files.
*/
async extractTarball(tarballPath: string, dstDir: string) {
await this.checkAccessible(tarballPath); // @TODO: Clearer error if tarball missing
const extractedFilePaths: string[] = [];
const extract = tar.extract();
extract.on('entry', async (header, stream, next) => {
const filePath = path.join(dstDir, header.name);
await pipeline(stream, fs.createWriteStream(filePath));
extractedFilePaths.push(filePath);
next();
});
await pipeline(fs.createReadStream(tarballPath), createGunzip(), extract);
this.logger.info('[FilesystemService] Extracted tarball', { tarballPath });
return extractedFilePaths;
}
}

View file

@ -1,149 +0,0 @@
import { GlobalConfig } from '@n8n/config';
import { DataSource, MigrationExecutor } from '@n8n/typeorm';
import archiver from 'archiver';
import { ApplicationError } from 'n8n-workflow';
import { createReadStream, createWriteStream, existsSync } from 'node:fs';
import { mkdir, readFile } from 'node:fs/promises';
import { dirname, join } from 'node:path';
import { createInterface } from 'node:readline';
import { PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { Service } from 'typedi';
import { Extract } from 'unzip-stream';
import { jsonColumnType } from '@/databases/entities/abstract-entity';
/** These tables are not backed up to reduce the backup size */
const excludeList = [
'execution_annotation_tags',
'execution_annotations',
'execution_data',
'execution_entity',
'execution_metadata',
'annotation_tag_entity',
];
@Service()
export class BackupService {
constructor(
private readonly globalConfig: GlobalConfig,
private readonly dataSource: DataSource,
) {}
async createBackup(archivePath: string) {
if (existsSync(archivePath)) {
throw new ApplicationError(
'Backup file already exists. Please delete that file and try again.',
);
}
await mkdir(dirname(archivePath), { recursive: true });
const archive = archiver('zip', { zlib: { level: 9 } });
archive.pipe(createWriteStream(archivePath));
for (const { name: tableName, columns } of this.tables) {
const totalRowsCount = await this.dataSource
.query(`SELECT COUNT(*) AS count FROM ${tableName}`)
.then((rows: Array<{ count: number }>) => rows[0].count);
if (totalRowsCount === 0) continue;
const fileName = `${tableName}.jsonl`;
const stream = new PassThrough();
archive.append(stream, { name: fileName });
let cursor = 0;
const batchSize = 10;
while (cursor < totalRowsCount) {
const rows = await this.dataSource.query(
`SELECT * from ${tableName} LIMIT ${cursor}, ${batchSize}`,
);
for (const row of rows) {
// Our sqlite setup has some quirks. The following code normalizes the exported data so that it can be imported into a new postgres or sqlite database.
if (this.globalConfig.database.type === 'sqlite') {
for (const { type: columnType, propertyName } of columns) {
if (propertyName in row) {
// Our sqlite setup used `simple-json` for JSON columns, which is stored as strings.
// This is because when we wrote this code, sqlite did not support native JSON column types.
if (columnType === jsonColumnType) {
row[propertyName] = JSON.parse(row[propertyName]);
}
// Sqlite does not have a separate Boolean data type, and uses integers 0/1 to mark values as boolean
else if (columnType === Boolean) {
row[propertyName] = Boolean(row[propertyName]);
}
}
}
}
stream.write(JSON.stringify(row));
stream.write('\n');
}
cursor += batchSize;
}
stream.end();
}
// Add this hidden file to store the last migration.
// This is used during import to ensure that the importing DB schema is up to date
archive.append(Buffer.from(await this.getLastMigrationName(), 'utf8'), {
name: '.lastMigration',
});
await archive.finalize();
}
async importBackup(archivePath: string) {
if (!existsSync(archivePath)) {
throw new ApplicationError('Backup archive not found. Please check the path.');
}
// TODO: instead of extracting to the filesystem, stream the files directly
const backupPath = '/tmp/backup';
await pipeline(createReadStream(archivePath), Extract({ path: backupPath }));
const lastMigrationInBackup = await readFile(join(backupPath, '.lastMigration'), 'utf8');
const getLastMigrationInDB = await this.getLastMigrationName();
if (lastMigrationInBackup !== getLastMigrationInDB) {
throw new ApplicationError('Last Migrations Differ, make sure to use the same n8n version');
}
// (2. if clean truncate)
// (2. if no clean, check if tables are empty)
// 3. disable foreign keys
// 4. import each jsonl
for (const { name, target } of this.tables) {
const repo = this.dataSource.getRepository(target);
await repo.delete({});
const filePath = join(backupPath, `${name}.jsonl`);
if (!existsSync(filePath)) continue;
const fileStream = createReadStream(filePath);
const lineStream = createInterface({ input: fileStream });
for await (const line of lineStream) {
// TODO: insert in batches to reduce DB load
await repo.insert(JSON.parse(line));
}
fileStream.close();
}
// 5. enable foreign keys
}
async getLastMigrationName() {
const migrationExecutor = new MigrationExecutor(this.dataSource);
const executedMigrations = await migrationExecutor.getExecutedMigrations();
return executedMigrations.at(0)!.name;
}
get tables() {
return this.dataSource.entityMetadatas
.filter((v) => !excludeList.includes(v.tableName))
.map(({ tableName, columns, target }) => ({ name: tableName, columns, target }));
}
}