extract backup related code into a dedicated service class

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-09-23 22:39:13 +02:00
parent c40f26cb54
commit 24f0ce243e
No known key found for this signature in database
3 changed files with 158 additions and 165 deletions

View file

@ -1,27 +1,12 @@
import { DataSource, MigrationExecutor } from '@n8n/typeorm';
import { Flags } from '@oclif/core';
import archiver from 'archiver';
import * as assert from 'assert/strict';
import fs from 'fs';
import { tmpdir } from 'node:os';
import { PassThrough } from 'node:stream';
import { join } from 'path';
import Container from 'typedi';
import { jsonColumnType } from '@/databases/entities/abstract-entity';
import { BackupService } from '@/services/backup.service';
import { BaseCommand } from '../base-command';
/** These tables are not backed up to reduce the backup size */
const excludeList = [
'execution_annotation_tags',
'execution_annotations',
'execution_data',
'execution_entity',
'execution_metadata',
'annotation_tag_entity',
];
export class ExportBackupCommand extends BaseCommand {
static description = 'Backup to a zip file';
@ -37,85 +22,13 @@ export class ExportBackupCommand extends BaseCommand {
async run() {
const { flags } = await this.parse(ExportBackupCommand);
const connection = Container.get(DataSource);
const tables = connection.entityMetadatas
.filter((v) => !excludeList.includes(v.tableName))
.map((v) => ({
name: v.tableName,
columns: v.columns,
}));
await fs.promises.mkdir(flags.output, { recursive: true });
// TODO: bail if the file already exists, or prompt to overwrite
const zipPath = join(flags.output, 'n8n-backup.zip');
const archive = archiver('zip', { zlib: { level: 9 } });
archive.pipe(fs.createWriteStream(zipPath));
for (const { name: tableName, columns } of tables) {
const totalRowsCount = await connection
.query(`SELECT COUNT(*) AS count FROM ${tableName}`)
.then((rows: Array<{ count: number }>) => rows[0].count);
if (totalRowsCount === 0) continue;
const fileName = `${tableName}.jsonl`;
const stream = new PassThrough();
archive.append(stream, { name: fileName });
let cursor = 0;
const batchSize = 10;
while (cursor < totalRowsCount) {
const rows = await connection.query(
`SELECT * from ${tableName} LIMIT ${cursor}, ${batchSize}`,
);
for (const row of rows) {
// Our sqlite setup has some quirks. The following code normalizes the exported data so that it can be imported into a new postgres or sqlite database.
if (this.globalConfig.database.type === 'sqlite') {
for (const { type: columnType, propertyName } of columns) {
if (propertyName in row) {
// Our sqlite setup used `simple-json` for JSON columns, which is stored as strings.
// This is because when we wrote this code, sqlite did not support native JSON column types.
if (columnType === jsonColumnType) {
row[propertyName] = JSON.parse(row[propertyName]);
}
// Sqlite does not have a separate Boolean data type, and uses integers 0/1 to mark values as boolean
else if (columnType === Boolean) {
row[propertyName] = Boolean(row[propertyName]);
}
}
}
}
stream.write(JSON.stringify(row));
stream.write('\n');
}
cursor += batchSize;
}
stream.end();
}
const migrationExecutor = new MigrationExecutor(connection);
const executedMigrations = await migrationExecutor.getExecutedMigrations();
const lastExecutedMigration = executedMigrations.at(0);
assert.ok(lastExecutedMigration, 'should have been run by db.ts');
// Add this hidden file to store the last migration.
// This is used during import to ensure that the importing DB schema is up to date
archive.append(Buffer.from(lastExecutedMigration.name, 'utf8'), { name: '.lastMigration' });
await archive.finalize();
const backupService = Container.get(BackupService);
await backupService.createBackup(zipPath);
console.log(`data exported to ${zipPath}`);
// TODO: clean up temp dir
}
async catch(error: Error) {
this.logger.error('Error exporting workflows. See log messages for details.');
this.logger.error(error.message);
}
}

View file

@ -1,25 +1,11 @@
import { Flags } from '@oclif/core';
import { DataSource, MigrationExecutor } from '@n8n/typeorm';
import * as assert from 'assert/strict';
import fs from 'fs';
import readline from 'readline';
import { tmpdir } from 'node:os';
import { join } from 'path';
import Container from 'typedi';
import { tmpdir } from 'node:os';
import { pipeline } from 'node:stream/promises';
import { Extract } from 'unzip-stream';
import { BackupService } from '@/services/backup.service';
import { BaseCommand } from '../base-command';
import { ApplicationError } from 'n8n-workflow';
const excludeList = [
'execution_annotation_tags',
'execution_annotations',
'execution_data',
'execution_entity',
'execution_metadata',
'annotation_tag_entity',
];
export class ImportBackupCommand extends BaseCommand {
static description = 'Import from a backup zip file';
@ -35,70 +21,15 @@ export class ImportBackupCommand extends BaseCommand {
}),
};
// TODO: do batching
async run() {
const { flags } = await this.parse(ImportBackupCommand);
// TODO:
// 1. check last migrations
const connection = Container.get(DataSource);
const migrationExecutor = new MigrationExecutor(connection);
const executedMigrations = await migrationExecutor.getExecutedMigrations();
const lastExecutedMigration = executedMigrations.at(0);
assert.ok(lastExecutedMigration, 'should have been run by db.ts');
const zipPath = join(flags.input, 'n8n-backup.zip');
if (!fs.existsSync(zipPath)) {
throw new ApplicationError('Backup zip file not count');
}
// TODO: instead of extracting to the filesystem, stream the files directly
const backupPath = '/tmp/backup';
await pipeline(fs.createReadStream(zipPath), Extract({ path: backupPath }));
const lastMigrationInBackup = (
await fs.promises.readFile(join(backupPath, '.lastMigration'), 'utf8')
).trim();
if (lastMigrationInBackup !== lastExecutedMigration.name) {
throw new ApplicationError('Last Migrations Differ, make sure to use the same n8n version');
}
// (2. if clean truncate)
// (2. if no clean, check if tables are empty)
// 3. disable foreign keys
// 4. import each jsonl
const tables = connection.entityMetadatas
.filter((v) => !excludeList.includes(v.tableName))
.map((v) => ({ name: v.tableName, target: v.target }));
for (const { name, target } of tables) {
const repo = connection.getRepository(target);
await repo.delete({});
const filePath = join(backupPath, `${name}.jsonl`);
if (!fs.existsSync(filePath)) continue;
const fileStream = fs.createReadStream(filePath);
const lineStream = readline.createInterface({
input: fileStream,
});
for await (const line of lineStream) {
// TODO: insert in batches to reduce DB load
await repo.insert(JSON.parse(line));
}
fileStream.close();
}
// 5. enable foreign keys
const backupService = Container.get(BackupService);
await backupService.importBackup(zipPath);
console.log(`data imported from ${zipPath}`);
}
async catch(error: Error) {
console.log(error.stack);
this.logger.error('Error exporting workflows. See log messages for details.');
this.logger.error(error.message);
}
}

View file

@ -0,0 +1,149 @@
import { GlobalConfig } from '@n8n/config';
import { DataSource, MigrationExecutor } from '@n8n/typeorm';
import archiver from 'archiver';
import { ApplicationError } from 'n8n-workflow';
import { createReadStream, createWriteStream, existsSync } from 'node:fs';
import { mkdir, readFile } from 'node:fs/promises';
import { dirname, join } from 'node:path';
import { createInterface } from 'node:readline';
import { PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { Service } from 'typedi';
import { Extract } from 'unzip-stream';
import { jsonColumnType } from '@/databases/entities/abstract-entity';
/** These tables are not backed up to reduce the backup size */
const excludeList = [
'execution_annotation_tags',
'execution_annotations',
'execution_data',
'execution_entity',
'execution_metadata',
'annotation_tag_entity',
];
@Service()
export class BackupService {
constructor(
private readonly globalConfig: GlobalConfig,
private readonly dataSource: DataSource,
) {}
async createBackup(archivePath: string) {
if (existsSync(archivePath)) {
throw new ApplicationError(
'Backup file already exists. Please delete that file and try again.',
);
}
await mkdir(dirname(archivePath), { recursive: true });
const archive = archiver('zip', { zlib: { level: 9 } });
archive.pipe(createWriteStream(archivePath));
for (const { name: tableName, columns } of this.tables) {
const totalRowsCount = await this.dataSource
.query(`SELECT COUNT(*) AS count FROM ${tableName}`)
.then((rows: Array<{ count: number }>) => rows[0].count);
if (totalRowsCount === 0) continue;
const fileName = `${tableName}.jsonl`;
const stream = new PassThrough();
archive.append(stream, { name: fileName });
let cursor = 0;
const batchSize = 10;
while (cursor < totalRowsCount) {
const rows = await this.dataSource.query(
`SELECT * from ${tableName} LIMIT ${cursor}, ${batchSize}`,
);
for (const row of rows) {
// Our sqlite setup has some quirks. The following code normalizes the exported data so that it can be imported into a new postgres or sqlite database.
if (this.globalConfig.database.type === 'sqlite') {
for (const { type: columnType, propertyName } of columns) {
if (propertyName in row) {
// Our sqlite setup used `simple-json` for JSON columns, which is stored as strings.
// This is because when we wrote this code, sqlite did not support native JSON column types.
if (columnType === jsonColumnType) {
row[propertyName] = JSON.parse(row[propertyName]);
}
// Sqlite does not have a separate Boolean data type, and uses integers 0/1 to mark values as boolean
else if (columnType === Boolean) {
row[propertyName] = Boolean(row[propertyName]);
}
}
}
}
stream.write(JSON.stringify(row));
stream.write('\n');
}
cursor += batchSize;
}
stream.end();
}
// Add this hidden file to store the last migration.
// This is used during import to ensure that the importing DB schema is up to date
archive.append(Buffer.from(await this.getLastMigrationName(), 'utf8'), {
name: '.lastMigration',
});
await archive.finalize();
}
async importBackup(archivePath: string) {
if (!existsSync(archivePath)) {
throw new ApplicationError('Backup archive not found. Please check the path.');
}
// TODO: instead of extracting to the filesystem, stream the files directly
const backupPath = '/tmp/backup';
await pipeline(createReadStream(archivePath), Extract({ path: backupPath }));
const lastMigrationInBackup = await readFile(join(backupPath, '.lastMigration'), 'utf8');
const getLastMigrationInDB = await this.getLastMigrationName();
if (lastMigrationInBackup !== getLastMigrationInDB) {
throw new ApplicationError('Last Migrations Differ, make sure to use the same n8n version');
}
// (2. if clean truncate)
// (2. if no clean, check if tables are empty)
// 3. disable foreign keys
// 4. import each jsonl
for (const { name, target } of this.tables) {
const repo = this.dataSource.getRepository(target);
await repo.delete({});
const filePath = join(backupPath, `${name}.jsonl`);
if (!existsSync(filePath)) continue;
const fileStream = createReadStream(filePath);
const lineStream = createInterface({ input: fileStream });
for await (const line of lineStream) {
// TODO: insert in batches to reduce DB load
await repo.insert(JSON.parse(line));
}
fileStream.close();
}
// 5. enable foreign keys
}
async getLastMigrationName() {
const migrationExecutor = new MigrationExecutor(this.dataSource);
const executedMigrations = await migrationExecutor.getExecutedMigrations();
return executedMigrations.at(0)!.name;
}
get tables() {
return this.dataSource.entityMetadatas
.filter((v) => !excludeList.includes(v.tableName))
.map(({ tableName, columns, target }) => ({ name: tableName, columns, target }));
}
}