mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
⚡ Add mongodb migration
This commit is contained in:
parent
84c4b32261
commit
3ca9647215
|
@ -35,7 +35,6 @@ import {
|
||||||
|
|
||||||
import * as express from 'express';
|
import * as express from 'express';
|
||||||
|
|
||||||
|
|
||||||
export class ActiveWorkflowRunner {
|
export class ActiveWorkflowRunner {
|
||||||
private activeWorkflows: ActiveWorkflows | null = null;
|
private activeWorkflows: ActiveWorkflows | null = null;
|
||||||
|
|
||||||
|
@ -240,6 +239,7 @@ export class ActiveWorkflowRunner {
|
||||||
} as IWebhookDb;
|
} as IWebhookDb;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
await Db.collections.Webhook?.insert(webhook);
|
await Db.collections.Webhook?.insert(webhook);
|
||||||
|
|
||||||
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, false);
|
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, false);
|
||||||
|
@ -257,7 +257,7 @@ export class ActiveWorkflowRunner {
|
||||||
// if it's a workflow from the the insert
|
// if it's a workflow from the the insert
|
||||||
// TODO check if there is standard error code for deplicate key violation that works
|
// TODO check if there is standard error code for deplicate key violation that works
|
||||||
// with all databases
|
// with all databases
|
||||||
if (error.name === 'QueryFailedError') {
|
if (error.name === 'MongoError' || error.name === 'QueryFailedError') {
|
||||||
|
|
||||||
errorMessage = `The webhook path [${webhook.webhookPath}] and method [${webhook.method}] already exist.`;
|
errorMessage = `The webhook path [${webhook.webhookPath}] and method [${webhook.method}] already exist.`;
|
||||||
|
|
||||||
|
@ -303,6 +303,11 @@ export class ActiveWorkflowRunner {
|
||||||
await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, false);
|
await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if it's a mongo objectId convert it to string
|
||||||
|
if (typeof workflowData.id === 'object') {
|
||||||
|
workflowData.id = workflowData.id.toString();
|
||||||
|
}
|
||||||
|
|
||||||
const webhook = {
|
const webhook = {
|
||||||
workflowId: workflowData.id,
|
workflowId: workflowData.id,
|
||||||
} as IWebhookDb;
|
} as IWebhookDb;
|
||||||
|
@ -310,7 +315,6 @@ export class ActiveWorkflowRunner {
|
||||||
await Db.collections.Webhook?.delete(webhook);
|
await Db.collections.Webhook?.delete(webhook);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs the given workflow
|
* Runs the given workflow
|
||||||
*
|
*
|
||||||
|
|
|
@ -36,7 +36,8 @@ import {
|
||||||
} from './databases/postgresdb/migrations';
|
} from './databases/postgresdb/migrations';
|
||||||
|
|
||||||
import {
|
import {
|
||||||
InitialMigration1587563438936
|
InitialMigration1587563438936,
|
||||||
|
WebhookModel1592679094242,
|
||||||
} from './databases/mongodb/migrations';
|
} from './databases/mongodb/migrations';
|
||||||
|
|
||||||
import {
|
import {
|
||||||
|
@ -68,7 +69,7 @@ export async function init(): Promise<IDatabaseCollections> {
|
||||||
entityPrefix,
|
entityPrefix,
|
||||||
url: await GenericHelpers.getConfigValue('database.mongodb.connectionUrl') as string,
|
url: await GenericHelpers.getConfigValue('database.mongodb.connectionUrl') as string,
|
||||||
useNewUrlParser: true,
|
useNewUrlParser: true,
|
||||||
migrations: [InitialMigration1587563438936],
|
migrations: [InitialMigration1587563438936, WebhookModel1592679094242],
|
||||||
migrationsRun: true,
|
migrationsRun: true,
|
||||||
migrationsTableName: `${entityPrefix}migrations`,
|
migrationsTableName: `${entityPrefix}migrations`,
|
||||||
};
|
};
|
||||||
|
|
|
@ -2,6 +2,8 @@ import {
|
||||||
Column,
|
Column,
|
||||||
Entity,
|
Entity,
|
||||||
Index,
|
Index,
|
||||||
|
ObjectID,
|
||||||
|
ObjectIdColumn,
|
||||||
} from 'typeorm';
|
} from 'typeorm';
|
||||||
|
|
||||||
import {
|
import {
|
||||||
|
@ -9,9 +11,11 @@ import {
|
||||||
} from '../../Interfaces';
|
} from '../../Interfaces';
|
||||||
|
|
||||||
@Entity()
|
@Entity()
|
||||||
@Index(['webhookPath', 'method'], { unique: true })
|
|
||||||
export class WebhookEntity implements IWebhookDb {
|
export class WebhookEntity implements IWebhookDb {
|
||||||
|
|
||||||
|
@ObjectIdColumn()
|
||||||
|
id: ObjectID;
|
||||||
|
|
||||||
@Column()
|
@Column()
|
||||||
workflowId: number;
|
workflowId: number;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
import {
|
||||||
|
MigrationInterface,
|
||||||
|
} from 'typeorm';
|
||||||
|
|
||||||
|
import {
|
||||||
|
IWorkflowDb,
|
||||||
|
NodeTypes,
|
||||||
|
WebhookHelpers,
|
||||||
|
} from '../../..';
|
||||||
|
|
||||||
|
import {
|
||||||
|
Workflow,
|
||||||
|
} from 'n8n-workflow/dist/src/Workflow';
|
||||||
|
|
||||||
|
import {
|
||||||
|
IWebhookDb,
|
||||||
|
} from '../../../Interfaces';
|
||||||
|
|
||||||
|
import * as config from '../../../../config';
|
||||||
|
|
||||||
|
import {
|
||||||
|
MongoQueryRunner,
|
||||||
|
} from 'typeorm/driver/mongodb/MongoQueryRunner';
|
||||||
|
|
||||||
|
export class WebhookModel1592679094242 implements MigrationInterface {
|
||||||
|
name = 'WebhookModel1592679094242';
|
||||||
|
|
||||||
|
async up(queryRunner: MongoQueryRunner): Promise<void> {
|
||||||
|
const tablePrefix = config.get('database.tablePrefix');
|
||||||
|
const workflows = await queryRunner.cursor( `${tablePrefix}workflow_entity`, { active: true }).toArray() as IWorkflowDb[];
|
||||||
|
const data: IWebhookDb[] = [];
|
||||||
|
const nodeTypes = NodeTypes();
|
||||||
|
for (const workflow of workflows) {
|
||||||
|
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
|
||||||
|
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
|
||||||
|
for (const webhook of webhooks) {
|
||||||
|
data.push({
|
||||||
|
workflowId: workflowInstance.id as string,
|
||||||
|
webhookPath: webhook.path,
|
||||||
|
method: webhook.httpMethod,
|
||||||
|
node: webhook.node,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data.length !== 0) {
|
||||||
|
await queryRunner.manager.insertMany(`${tablePrefix}webhook_entity`, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
await queryRunner.manager.createCollectionIndex(`${tablePrefix}webhook_entity`, ['webhookPath', 'method'], { unique: true, background: false });
|
||||||
|
}
|
||||||
|
|
||||||
|
async down(queryRunner: MongoQueryRunner): Promise<void> {
|
||||||
|
const tablePrefix = config.get('database.tablePrefix');
|
||||||
|
await queryRunner.dropTable(`${tablePrefix}webhook_entity`);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1 +1,2 @@
|
||||||
export * from './1587563438936-InitialMigration';
|
export * from './1587563438936-InitialMigration';
|
||||||
|
export * from './1592679094242-WebhookModel';
|
||||||
|
|
|
@ -36,11 +36,6 @@ export class WebhookModel1589476000887 implements MigrationInterface {
|
||||||
const nodeTypes = NodeTypes();
|
const nodeTypes = NodeTypes();
|
||||||
for (const workflow of workflows) {
|
for (const workflow of workflows) {
|
||||||
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
|
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
|
||||||
// I'm writing something more simple than this. I tried to use the built in method
|
|
||||||
// getWorkflowWebhooks but it needs additionaldata and to get it I need the credentials
|
|
||||||
// and for some reason when I use
|
|
||||||
// const credentials = await WorkflowCredentials(node);
|
|
||||||
// to get the credentials I got an error I think is cuz the database is yet not ready.
|
|
||||||
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
|
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
|
||||||
for (const webhook of webhooks) {
|
for (const webhook of webhooks) {
|
||||||
data.push({
|
data.push({
|
||||||
|
|
Loading…
Reference in a new issue