This commit is contained in:
Danny Martini 2024-09-19 15:27:44 -04:00 committed by GitHub
commit 1ff2174607
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 217 additions and 0 deletions

View file

@ -0,0 +1,84 @@
import { DataSource, MigrationExecutor } from '@n8n/typeorm';
import * as assert from 'assert/strict';
import fs from 'fs';
import { join } from 'path';
import Container from 'typedi';
import { BaseCommand } from '../base-command';
import { jsonColumnType } from '@/databases/entities/abstract-entity';
export class ExportAllCommand extends BaseCommand {
static description = 'Export Everything';
static examples = ['$ n8n export:all'];
// TODO: add `exportPath` flag
static flags = {};
async run() {
const connection = Container.get(DataSource);
const excludeList = [
'execution_annotation_tags',
'execution_annotations',
'execution_data',
'execution_entity',
'execution_metadata',
'annotation_tag_entity',
];
const tables = connection.entityMetadatas
.filter((v) => !excludeList.includes(v.name))
.map((v) => ({
name: v.tableName,
columns: v.columns,
}));
const backupPath = '/tmp/backup';
await fs.promises.mkdir(backupPath, { recursive: true });
for (const { name, columns } of tables) {
// TODO: implement batching
//const rows = await repo.find({ relations: [] });
const rows = await connection.query(`SELECT * from ${name}`);
const stream = fs.createWriteStream(join(backupPath, `${name}.jsonl`));
for (const row of rows) {
const data = JSON.stringify(row);
// TODO: fix the types
for (const column of columns) {
// TODO: only do this for sqlite
//
//
//TODO: STOPPED HERE
if (column.type === jsonColumnType) {
console.log(column.type);
}
}
stream.write(data);
stream.write('\n');
}
stream.end();
}
const migrationExecutor = new MigrationExecutor(connection);
const executedMigrations = await migrationExecutor.getExecutedMigrations();
const lastExecutedMigration = executedMigrations.at(0);
assert.ok(lastExecutedMigration, 'should have been run by db.ts');
await fs.promises.writeFile(
join(backupPath, 'lastMigration'),
lastExecutedMigration.name,
'utf8',
);
}
async catch(error: Error) {
this.logger.error('Error exporting workflows. See log messages for details.');
this.logger.error(error.message);
}
}

View file

@ -0,0 +1,94 @@
import { DataSource, MigrationExecutor } from '@n8n/typeorm';
import * as assert from 'assert/strict';
import fs from 'fs';
import { join } from 'path';
import Container from 'typedi';
import { BaseCommand } from '../base-command';
import { ApplicationError } from 'n8n-workflow';
// TODO: do this
//const fs = require('fs');
//const readline = require('readline');
//
//(async () => {
// const fileStream = fs.createReadStream(__dirname + '/test.jsonl');
// const lineStream = readline.createInterface({
// input: fileStream,
// crlfDelay: Infinity,
// });
//
// for await (const line of lineStream) {
// console.log(JSON.parse(line));
// }
//})();
export class ImportAllCommand extends BaseCommand {
static description = 'Import Everything';
static examples = ['$ n8n import:all'];
// TODO: add `importPath` flag
// TODO: add `clean` flag
static flags = {};
// TODO: do batching
async run() {
// TODO:
// 1. check last migrations
const connection = Container.get(DataSource);
const migrationExecutor = new MigrationExecutor(connection);
const executedMigrations = await migrationExecutor.getExecutedMigrations();
const lastExecutedMigration = executedMigrations.at(0);
assert.ok(lastExecutedMigration, 'should have been run by db.ts');
const backupPath = '/tmp/backup';
const lastMigrationInBackup = (
await fs.promises.readFile(join(backupPath, 'lastMigration'), 'utf8')
).trim();
if (lastMigrationInBackup !== lastExecutedMigration.name) {
throw new ApplicationError('Last Migrations Differ, make sure to use the same n8n version');
}
// (2. if clean truncate)
// (2. if no clean, check if tables are empty)
// 3. disable foreign keys
// 4. import each jsonl
const excludeList = [
'execution_annotation_tags',
'execution_annotations',
'execution_data',
'execution_entity',
'execution_metadata',
'annotation_tag_entity',
];
const tables = connection.entityMetadatas
.filter((v) => !excludeList.includes(v.tableName))
.map((v) => ({ name: v.tableName, target: v.target }));
for (const { name, target } of tables) {
const repo = connection.getRepository(target);
await repo.delete({});
const rows = (await fs.promises.readFile(`${join(backupPath, name)}.jsonl`, 'utf8'))
.split('\n')
.filter((row) => row !== '');
for (const row of rows) {
await repo.insert(JSON.parse(row));
}
}
// 5. enable foreign keys
}
async catch(error: Error) {
console.log(error.stack);
this.logger.error('Error exporting workflows. See log messages for details.');
this.logger.error(error.message);
}
}

39
tasks.md Normal file
View file

@ -0,0 +1,39 @@
# Goals
* export all data (except execution related data) into a file
* import from said file
* support flag `clean` (default: false)
* false: if db is empty commence else print error
* true: truncate all tables and commence
* foreign keys are retained
# Future Goals
* making the export atomic
* for now users have to shut down all n8n instances while exporting and importing
* have foreign keys on while importing
* for now we disable them before the import and enable them after it again
# File Format
* 1 file per table
* 1 line per row
* format: JSONL
* metadata file
* contains last migration name that was run
* tared/zipped in the end
# Tasks
* include only columns (and all of them) when serializing
* ignoring virtual properties and including ignored things like passwords
* make sure sequence counters are updated
* only allow importing into sqlite and pg
# Questions
* what if one record fails?
* skip, abort?