From 491378de772670b640be23e3b9bc7eadad6d3ebe Mon Sep 17 00:00:00 2001 From: Michael Kret <88898367+michael-radency@users.noreply.github.com> Date: Tue, 1 Aug 2023 12:56:54 +0300 Subject: [PATCH] fix(Postgres Trigger Node): Imposible to cancell execution manually (#6709) --- .../Postgres/PostgresTrigger.functions.ts | 98 +++++++------- .../nodes/Postgres/PostgresTrigger.node.ts | 120 ++++++++++++++---- 2 files changed, 146 insertions(+), 72 deletions(-) diff --git a/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts b/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts index 67ccfe1d92..9d6ea69c78 100644 --- a/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts +++ b/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts @@ -8,50 +8,79 @@ import type { import pgPromise from 'pg-promise'; import type pg from 'pg-promise/typescript/pg-subset'; +export function prepareNames(id: string, mode: string, additionalFields: IDataObject) { + let suffix = id.replace(/-/g, '_'); + + if (mode === 'manual') { + suffix = `${suffix}_manual`; + } + + let functionName = + (additionalFields.functionName as string) || `n8n_trigger_function_${suffix}()`; + + if (!(functionName.includes('(') && functionName.includes(')'))) { + functionName = `${functionName}()`; + } + + const triggerName = (additionalFields.triggerName as string) || `n8n_trigger_${suffix}`; + const channelName = (additionalFields.channelName as string) || `n8n_channel_${suffix}`; + + if (channelName.includes('-')) { + throw new Error('Channel name cannot contain hyphens (-)'); + } + + return { functionName, triggerName, channelName }; +} + export async function pgTriggerFunction( this: ITriggerFunctions, db: pgPromise.IDatabase<{}, pg.IClient>, + additionalFields: IDataObject, + functionName: string, + triggerName: string, + channelName: string, ): Promise { const schema = this.getNodeParameter('schema', 'public', { extractValue: true }) as string; const tableName = this.getNodeParameter('tableName', undefined, { extractValue: true, }) as string; + const target = `${schema}."${tableName}"`; + const firesOn = this.getNodeParameter('firesOn', 0) as string; + const functionReplace = "CREATE OR REPLACE FUNCTION $1:raw RETURNS trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF AS $BODY$ begin perform pg_notify('$2:raw', row_to_json($3:raw)::text); return null; end; $BODY$;"; + const dropIfExist = 'DROP TRIGGER IF EXISTS $1:raw ON $2:raw'; + const functionExists = "CREATE FUNCTION $1:raw RETURNS trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF AS $BODY$ begin perform pg_notify('$2:raw', row_to_json($3:raw)::text); return null; end; $BODY$"; + const trigger = 'CREATE TRIGGER $4:raw AFTER $3:raw ON $1:raw FOR EACH ROW EXECUTE FUNCTION $2:raw'; + const whichData = firesOn === 'DELETE' ? 'old' : 'new'; - const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; - const nodeId = this.getNode().id.replace(/-/g, '_'); - let functionName = - (additionalFields.functionName as string) || `n8n_trigger_function_${nodeId}()`; - if (!functionName.includes('()')) { - functionName = functionName.concat('()'); - } - const triggerName = (additionalFields.triggerName as string) || `n8n_trigger_${nodeId}`; - const channelName = (additionalFields.channelName as string) || `n8n_channel_${nodeId}`; + if (channelName.includes('-')) { throw new Error('Channel name cannot contain hyphens (-)'); } - const replaceIfExists = additionalFields.replaceIfExists || false; + + const replaceIfExists = additionalFields.replaceIfExists ?? false; + try { - if (replaceIfExists || !(additionalFields.triggerName || additionalFields.functionName)) { + if (replaceIfExists || !(additionalFields.triggerName ?? additionalFields.functionName)) { await db.any(functionReplace, [functionName, channelName, whichData]); await db.any(dropIfExist, [triggerName, target, whichData]); } else { await db.any(functionExists, [functionName, channelName, whichData]); } await db.any(trigger, [target, functionName, firesOn, triggerName]); - } catch (err) { - if (err.message.includes('near "-"')) { + } catch (error) { + if ((error as Error).message.includes('near "-"')) { throw new Error('Names cannot contain hyphens (-)'); } - throw new Error(err as string); + throw error; } } @@ -77,23 +106,25 @@ export async function initDB(this: ITriggerFunctions | ILoadOptionsFunctions) { config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined); config.sslmode = (credentials.ssl as string) || 'disable'; } - return pgp(config); + + const db = pgp(config); + return { db, pgp }; } export async function searchSchema(this: ILoadOptionsFunctions): Promise { - const db = await initDB.call(this); + const { db, pgp } = await initDB.call(this); const schemaList = await db.any('SELECT schema_name FROM information_schema.schemata'); - const results: INodeListSearchItems[] = schemaList.map((s) => ({ + const results: INodeListSearchItems[] = (schemaList as IDataObject[]).map((s) => ({ name: s.schema_name as string, value: s.schema_name as string, })); - await db.$pool.end(); + pgp.end(); return { results }; } export async function searchTables(this: ILoadOptionsFunctions): Promise { const schema = this.getNodeParameter('schema', 0) as IDataObject; - const db = await initDB.call(this); + const { db, pgp } = await initDB.call(this); let tableList = []; try { tableList = await db.any( @@ -103,35 +134,10 @@ export async function searchTables(this: ILoadOptionsFunctions): Promise ({ + const results: INodeListSearchItems[] = (tableList as IDataObject[]).map((s) => ({ name: s.table_name as string, value: s.table_name as string, })); - await db.$pool.end(); + pgp.end(); return { results }; } - -export async function dropTriggerFunction( - this: ITriggerFunctions, - db: pgPromise.IDatabase<{}, pg.IClient>, -): Promise { - const schema = this.getNodeParameter('schema', undefined, { extractValue: true }) as string; - const tableName = this.getNodeParameter('tableName', undefined, { - extractValue: true, - }) as string; - const target = `${schema}."${tableName}"`; - const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; - const nodeId = this.getNode().id.replace(/-/g, '_'); - let functionName = - (additionalFields.functionName as string) || `n8n_trigger_function_${nodeId}()`; - if (!functionName.includes('()')) { - functionName = functionName.concat('()'); - } - const triggerName = (additionalFields.triggerName as string) || `n8n_trigger_${nodeId}`; - try { - await db.any('DROP FUNCTION IF EXISTS $1:raw CASCADE', [functionName]); - await db.any('DROP TRIGGER IF EXISTS $1:raw ON $2:raw CASCADE', [triggerName, target]); - } catch (error) { - throw new Error(error as string); - } -} diff --git a/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts b/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts index fd1e1c7a0b..1938be39ef 100644 --- a/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts +++ b/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts @@ -1,16 +1,17 @@ -import type { - IDataObject, - INodeType, - INodeTypeDescription, - ITriggerFunctions, - ITriggerResponse, +import { + NodeOperationError, + type IDataObject, + type INodeType, + type INodeTypeDescription, + type ITriggerFunctions, + type ITriggerResponse, } from 'n8n-workflow'; import { - dropTriggerFunction, pgTriggerFunction, initDB, searchSchema, searchTables, + prepareNames, } from './PostgresTrigger.functions'; export class PostgresTrigger implements INodeType { @@ -34,7 +35,7 @@ export class PostgresTrigger implements INodeType { "While building your workflow, click the 'listen' button, then trigger a Postgres event. This will trigger an execution, which will show up in this editor.

Your workflow will also execute automatically, since it's activated. Every time a change is detected, this node will trigger an execution. These executions will show up in the executions list, but not in the editor.", }, activationHint: - "Once you’ve finished building your workflow, activate it to have it also listen continuously (you just won’t see those executions here).", + "Once you've finished building your workflow, activate it to have it also listen continuously (you just won't see those executions here).", }, inputs: [], outputs: ['main'], @@ -222,41 +223,108 @@ export class PostgresTrigger implements INodeType { const triggerMode = this.getNodeParameter('triggerMode', 0) as string; const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; - const db = await initDB.call(this); - if (triggerMode === 'createTrigger') { - await pgTriggerFunction.call(this, db); - } - const channelName = - triggerMode === 'createTrigger' - ? additionalFields.channelName || `n8n_channel_${this.getNode().id.replace(/-/g, '_')}` - : (this.getNodeParameter('channelName', 0) as string); + // initialize and connect to database + const { db, pgp } = await initDB.call(this); + const connection = await db.connect({ direct: true }); - const onNotification = async (data: any) => { + // prepare and set up listener + const onNotification = async (data: IDataObject) => { if (data.payload) { try { - data.payload = JSON.parse(data.payload as string); + data.payload = JSON.parse(data.payload as string) as IDataObject; } catch (error) {} } this.emit([this.helpers.returnJsonArray([data])]); }; - const connection = await db.connect({ direct: true }); + // create trigger, funstion and channel or use existing channel + const pgNames = prepareNames(this.getNode().id, this.getMode(), additionalFields); + if (triggerMode === 'createTrigger') { + await pgTriggerFunction.call( + this, + db, + additionalFields, + pgNames.functionName, + pgNames.triggerName, + pgNames.channelName, + ); + } else { + pgNames.channelName = this.getNodeParameter('channelName', '') as string; + } + + // listen to channel + await connection.none(`LISTEN ${pgNames.channelName}`); + + const cleanUpDb = async () => { + try { + await connection.none('UNLISTEN $1:name', [pgNames.channelName]); + if (triggerMode === 'createTrigger') { + const functionName = pgNames.functionName.includes('(') + ? pgNames.functionName.split('(')[0] + : pgNames.functionName; + await connection.any('DROP FUNCTION IF EXISTS $1:name CASCADE', [functionName]); + + const schema = this.getNodeParameter('schema', undefined, { + extractValue: true, + }) as string; + const table = this.getNodeParameter('tableName', undefined, { + extractValue: true, + }) as string; + + await connection.any('DROP TRIGGER IF EXISTS $1:name ON $2:name.$3:name CASCADE', [ + pgNames.triggerName, + schema, + table, + ]); + } + connection.client.removeListener('notification', onNotification); + } catch (error) { + throw new NodeOperationError( + this.getNode(), + `Postgres Trigger Error: ${(error as Error).message}`, + ); + } finally { + pgp.end(); + } + }; + connection.client.on('notification', onNotification); - await connection.none(`LISTEN ${channelName}`); // The "closeFunction" function gets called by n8n whenever // the workflow gets deactivated and can so clean up. const closeFunction = async () => { - connection.client.removeListener('notification', onNotification); - await connection.none(`UNLISTEN ${channelName}`); - if (triggerMode === 'createTrigger') { - await dropTriggerFunction.call(this, db); - } - await db.$pool.end(); + await cleanUpDb(); + }; + + const manualTriggerFunction = async () => { + await new Promise(async (resolve, reject) => { + const timeoutHandler = setTimeout(async () => { + reject( + new Error( + await (async () => { + await cleanUpDb(); + return 'Aborted, no data received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.'; + })(), + ), + ); + }, 30000); + connection.client.on('notification', async (data: IDataObject) => { + if (data.payload) { + try { + data.payload = JSON.parse(data.payload as string) as IDataObject; + } catch (error) {} + } + + this.emit([this.helpers.returnJsonArray([data])]); + clearTimeout(timeoutHandler); + resolve(true); + }); + }); }; return { closeFunction, + manualTriggerFunction: this.getMode() === 'manual' ? manualTriggerFunction : undefined, }; } }