mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
🐛 Fix bug that workflows did not get activated correctly
This commit is contained in:
parent
10cfe70d33
commit
22c40d026b
|
@ -52,6 +52,9 @@ export class ActiveWorkflowRunner {
|
||||||
// so intead of pulling all the active wehhooks just pull the actives that have a trigger
|
// so intead of pulling all the active wehhooks just pull the actives that have a trigger
|
||||||
const workflowsData: IWorkflowDb[] = await Db.collections.Workflow!.find({ active: true }) as IWorkflowDb[];
|
const workflowsData: IWorkflowDb[] = await Db.collections.Workflow!.find({ active: true }) as IWorkflowDb[];
|
||||||
|
|
||||||
|
// Clear up active workflow table
|
||||||
|
await Db.collections.Webhook?.clear();
|
||||||
|
|
||||||
this.activeWorkflows = new ActiveWorkflows();
|
this.activeWorkflows = new ActiveWorkflows();
|
||||||
|
|
||||||
if (workflowsData.length !== 0) {
|
if (workflowsData.length !== 0) {
|
||||||
|
@ -59,22 +62,14 @@ export class ActiveWorkflowRunner {
|
||||||
console.log(' Start Active Workflows:');
|
console.log(' Start Active Workflows:');
|
||||||
console.log(' ================================');
|
console.log(' ================================');
|
||||||
|
|
||||||
const nodeTypes = NodeTypes();
|
|
||||||
|
|
||||||
for (const workflowData of workflowsData) {
|
for (const workflowData of workflowsData) {
|
||||||
|
console.log(` - ${workflowData.name}`);
|
||||||
const workflow = new Workflow({ id: workflowData.id.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
|
try {
|
||||||
|
await this.add(workflowData.id.toString(), workflowData);
|
||||||
if (workflow.getTriggerNodes().length !== 0
|
console.log(` => Started`);
|
||||||
|| workflow.getPollNodes().length !== 0) {
|
} catch (error) {
|
||||||
console.log(` - ${workflowData.name}`);
|
console.log(` => ERROR: Workflow could not be activated:`);
|
||||||
try {
|
console.log(` ${error.message}`);
|
||||||
await this.add(workflowData.id.toString(), workflowData);
|
|
||||||
console.log(` => Started`);
|
|
||||||
} catch (error) {
|
|
||||||
console.log(` => ERROR: Workflow could not be activated:`);
|
|
||||||
console.log(` ${error.message}`);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,14 +82,18 @@ export class ActiveWorkflowRunner {
|
||||||
* @memberof ActiveWorkflowRunner
|
* @memberof ActiveWorkflowRunner
|
||||||
*/
|
*/
|
||||||
async removeAll(): Promise<void> {
|
async removeAll(): Promise<void> {
|
||||||
if (this.activeWorkflows === null) {
|
const activeWorkflowId: string[] = [];
|
||||||
return;
|
|
||||||
|
if (this.activeWorkflows !== null) {
|
||||||
|
// TODO: This should be renamed!
|
||||||
|
activeWorkflowId.push.apply(activeWorkflowId, this.activeWorkflows.allActiveWorkflows());
|
||||||
}
|
}
|
||||||
|
|
||||||
const activeWorkflows = this.activeWorkflows.allActiveWorkflows();
|
const activeWorkflows = await this.getActiveWorkflows();
|
||||||
|
activeWorkflowId.push.apply(activeWorkflowId, activeWorkflows.map(workflow => workflow.id));
|
||||||
|
|
||||||
const removePromises = [];
|
const removePromises = [];
|
||||||
for (const workflowId of activeWorkflows) {
|
for (const workflowId of activeWorkflowId) {
|
||||||
removePromises.push(this.remove(workflowId));
|
removePromises.push(this.remove(workflowId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,7 +182,7 @@ export class ActiveWorkflowRunner {
|
||||||
* @memberof ActiveWorkflowRunner
|
* @memberof ActiveWorkflowRunner
|
||||||
*/
|
*/
|
||||||
getActiveWorkflows(): Promise<IWorkflowDb[]> {
|
getActiveWorkflows(): Promise<IWorkflowDb[]> {
|
||||||
return Db.collections.Workflow?.find({ select: ['id'] }) as Promise<IWorkflowDb[]>;
|
return Db.collections.Workflow?.find({ where: { active: true }, select: ['id'] }) as Promise<IWorkflowDb[]>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,20 +2,6 @@ import {
|
||||||
MigrationInterface,
|
MigrationInterface,
|
||||||
} from 'typeorm';
|
} from 'typeorm';
|
||||||
|
|
||||||
import {
|
|
||||||
IWorkflowDb,
|
|
||||||
NodeTypes,
|
|
||||||
WebhookHelpers,
|
|
||||||
} from '../../..';
|
|
||||||
|
|
||||||
import {
|
|
||||||
Workflow,
|
|
||||||
} from 'n8n-workflow/dist/src/Workflow';
|
|
||||||
|
|
||||||
import {
|
|
||||||
IWebhookDb,
|
|
||||||
} from '../../../Interfaces';
|
|
||||||
|
|
||||||
import * as config from '../../../../config';
|
import * as config from '../../../../config';
|
||||||
|
|
||||||
import {
|
import {
|
||||||
|
@ -27,26 +13,6 @@ export class WebhookModel1592679094242 implements MigrationInterface {
|
||||||
|
|
||||||
async up(queryRunner: MongoQueryRunner): Promise<void> {
|
async up(queryRunner: MongoQueryRunner): Promise<void> {
|
||||||
const tablePrefix = config.get('database.tablePrefix');
|
const tablePrefix = config.get('database.tablePrefix');
|
||||||
const workflows = await queryRunner.cursor( `${tablePrefix}workflow_entity`, { active: true }).toArray() as IWorkflowDb[];
|
|
||||||
const data: IWebhookDb[] = [];
|
|
||||||
const nodeTypes = NodeTypes();
|
|
||||||
for (const workflow of workflows) {
|
|
||||||
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
|
|
||||||
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
|
|
||||||
for (const webhook of webhooks) {
|
|
||||||
data.push({
|
|
||||||
workflowId: workflowInstance.id as string,
|
|
||||||
webhookPath: webhook.path,
|
|
||||||
method: webhook.httpMethod,
|
|
||||||
node: webhook.node,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data.length !== 0) {
|
|
||||||
await queryRunner.manager.insertMany(`${tablePrefix}webhook_entity`, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
await queryRunner.manager.createCollectionIndex(`${tablePrefix}webhook_entity`, ['webhookPath', 'method'], { unique: true, background: false });
|
await queryRunner.manager.createCollectionIndex(`${tablePrefix}webhook_entity`, ['webhookPath', 'method'], { unique: true, background: false });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,20 +5,6 @@ import {
|
||||||
|
|
||||||
import * as config from '../../../../config';
|
import * as config from '../../../../config';
|
||||||
|
|
||||||
import {
|
|
||||||
IWorkflowDb,
|
|
||||||
NodeTypes,
|
|
||||||
WebhookHelpers,
|
|
||||||
} from '../../..';
|
|
||||||
|
|
||||||
import {
|
|
||||||
Workflow,
|
|
||||||
} from 'n8n-workflow';
|
|
||||||
|
|
||||||
import {
|
|
||||||
IWebhookDb,
|
|
||||||
} from '../../../Interfaces';
|
|
||||||
|
|
||||||
export class WebhookModel1592447867632 implements MigrationInterface {
|
export class WebhookModel1592447867632 implements MigrationInterface {
|
||||||
name = 'WebhookModel1592447867632';
|
name = 'WebhookModel1592447867632';
|
||||||
|
|
||||||
|
@ -26,30 +12,6 @@ export class WebhookModel1592447867632 implements MigrationInterface {
|
||||||
const tablePrefix = config.get('database.tablePrefix');
|
const tablePrefix = config.get('database.tablePrefix');
|
||||||
|
|
||||||
await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity (workflowId int NOT NULL, webhookPath varchar(255) NOT NULL, method varchar(255) NOT NULL, node varchar(255) NOT NULL, PRIMARY KEY (webhookPath, method)) ENGINE=InnoDB`);
|
await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity (workflowId int NOT NULL, webhookPath varchar(255) NOT NULL, method varchar(255) NOT NULL, node varchar(255) NOT NULL, PRIMARY KEY (webhookPath, method)) ENGINE=InnoDB`);
|
||||||
|
|
||||||
const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[];
|
|
||||||
const data: IWebhookDb[] = [];
|
|
||||||
const nodeTypes = NodeTypes();
|
|
||||||
for (const workflow of workflows) {
|
|
||||||
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
|
|
||||||
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
|
|
||||||
for (const webhook of webhooks) {
|
|
||||||
data.push({
|
|
||||||
workflowId: workflowInstance.id as string,
|
|
||||||
webhookPath: webhook.path,
|
|
||||||
method: webhook.httpMethod,
|
|
||||||
node: webhook.node,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data.length !== 0) {
|
|
||||||
await queryRunner.manager.createQueryBuilder()
|
|
||||||
.insert()
|
|
||||||
.into(`${tablePrefix}webhook_entity`)
|
|
||||||
.values(data)
|
|
||||||
.execute();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async down(queryRunner: QueryRunner): Promise<void> {
|
async down(queryRunner: QueryRunner): Promise<void> {
|
||||||
|
|
|
@ -3,20 +3,6 @@ import {
|
||||||
QueryRunner,
|
QueryRunner,
|
||||||
} from 'typeorm';
|
} from 'typeorm';
|
||||||
|
|
||||||
import {
|
|
||||||
IWorkflowDb,
|
|
||||||
NodeTypes,
|
|
||||||
WebhookHelpers,
|
|
||||||
} from '../../..';
|
|
||||||
|
|
||||||
import {
|
|
||||||
Workflow,
|
|
||||||
} from 'n8n-workflow';
|
|
||||||
|
|
||||||
import {
|
|
||||||
IWebhookDb,
|
|
||||||
} from '../../../Interfaces';
|
|
||||||
|
|
||||||
import * as config from '../../../../config';
|
import * as config from '../../../../config';
|
||||||
|
|
||||||
export class WebhookModel1589476000887 implements MigrationInterface {
|
export class WebhookModel1589476000887 implements MigrationInterface {
|
||||||
|
@ -31,30 +17,6 @@ export class WebhookModel1589476000887 implements MigrationInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
await queryRunner.query(`CREATE TABLE ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" character varying NOT NULL, "method" character varying NOT NULL, "node" character varying NOT NULL, CONSTRAINT "PK_${tablePrefixIndex}b21ace2e13596ccd87dc9bf4ea6" PRIMARY KEY ("webhookPath", "method"))`, undefined);
|
await queryRunner.query(`CREATE TABLE ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" character varying NOT NULL, "method" character varying NOT NULL, "node" character varying NOT NULL, CONSTRAINT "PK_${tablePrefixIndex}b21ace2e13596ccd87dc9bf4ea6" PRIMARY KEY ("webhookPath", "method"))`, undefined);
|
||||||
|
|
||||||
const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[];
|
|
||||||
const data: IWebhookDb[] = [];
|
|
||||||
const nodeTypes = NodeTypes();
|
|
||||||
for (const workflow of workflows) {
|
|
||||||
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
|
|
||||||
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
|
|
||||||
for (const webhook of webhooks) {
|
|
||||||
data.push({
|
|
||||||
workflowId: workflowInstance.id as string,
|
|
||||||
webhookPath: webhook.path,
|
|
||||||
method: webhook.httpMethod,
|
|
||||||
node: webhook.node,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data.length !== 0) {
|
|
||||||
await queryRunner.manager.createQueryBuilder()
|
|
||||||
.insert()
|
|
||||||
.into(`${tablePrefix}webhook_entity`)
|
|
||||||
.values(data)
|
|
||||||
.execute();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async down(queryRunner: QueryRunner): Promise<void> {
|
async down(queryRunner: QueryRunner): Promise<void> {
|
||||||
|
|
|
@ -5,20 +5,6 @@ import {
|
||||||
|
|
||||||
import * as config from '../../../../config';
|
import * as config from '../../../../config';
|
||||||
|
|
||||||
import {
|
|
||||||
IWorkflowDb,
|
|
||||||
NodeTypes,
|
|
||||||
WebhookHelpers,
|
|
||||||
} from '../../..';
|
|
||||||
|
|
||||||
import {
|
|
||||||
Workflow,
|
|
||||||
} from 'n8n-workflow';
|
|
||||||
|
|
||||||
import {
|
|
||||||
IWebhookDb,
|
|
||||||
} from '../../../Interfaces';
|
|
||||||
|
|
||||||
export class WebhookModel1592445003908 implements MigrationInterface {
|
export class WebhookModel1592445003908 implements MigrationInterface {
|
||||||
name = 'WebhookModel1592445003908';
|
name = 'WebhookModel1592445003908';
|
||||||
|
|
||||||
|
@ -26,34 +12,6 @@ export class WebhookModel1592445003908 implements MigrationInterface {
|
||||||
const tablePrefix = config.get('database.tablePrefix');
|
const tablePrefix = config.get('database.tablePrefix');
|
||||||
|
|
||||||
await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" varchar NOT NULL, "method" varchar NOT NULL, "node" varchar NOT NULL, PRIMARY KEY ("webhookPath", "method"))`);
|
await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" varchar NOT NULL, "method" varchar NOT NULL, "node" varchar NOT NULL, PRIMARY KEY ("webhookPath", "method"))`);
|
||||||
|
|
||||||
const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[];
|
|
||||||
const data: IWebhookDb[] = [];
|
|
||||||
const nodeTypes = NodeTypes();
|
|
||||||
for (const workflow of workflows) {
|
|
||||||
workflow.nodes = JSON.parse(workflow.nodes as unknown as string);
|
|
||||||
workflow.connections = JSON.parse(workflow.connections as unknown as string);
|
|
||||||
workflow.staticData = JSON.parse(workflow.staticData as unknown as string);
|
|
||||||
workflow.settings = JSON.parse(workflow.settings as unknown as string);
|
|
||||||
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
|
|
||||||
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
|
|
||||||
for (const webhook of webhooks) {
|
|
||||||
data.push({
|
|
||||||
workflowId: workflowInstance.id as string,
|
|
||||||
webhookPath: webhook.path,
|
|
||||||
method: webhook.httpMethod,
|
|
||||||
node: webhook.node,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data.length !== 0) {
|
|
||||||
await queryRunner.manager.createQueryBuilder()
|
|
||||||
.insert()
|
|
||||||
.into(`${tablePrefix}webhook_entity`)
|
|
||||||
.values(data)
|
|
||||||
.execute();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async down(queryRunner: QueryRunner): Promise<void> {
|
async down(queryRunner: QueryRunner): Promise<void> {
|
||||||
|
|
Loading…
Reference in a new issue