diff --git a/packages/nodes-base/nodes/Postgres/PostgresInterface.ts b/packages/nodes-base/nodes/Postgres/PostgresInterface.ts new file mode 100644 index 0000000000..27ca9ea847 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/PostgresInterface.ts @@ -0,0 +1,6 @@ +export interface IPostgresTrigger { + triggerName: string; + functionName: string; + channelName: string; + target: string; +} diff --git a/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts b/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts new file mode 100644 index 0000000000..67ccfe1d92 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts @@ -0,0 +1,137 @@ +import type { + ITriggerFunctions, + IDataObject, + ILoadOptionsFunctions, + INodeListSearchResult, + INodeListSearchItems, +} from 'n8n-workflow'; +import pgPromise from 'pg-promise'; +import type pg from 'pg-promise/typescript/pg-subset'; + +export async function pgTriggerFunction( + this: ITriggerFunctions, + db: pgPromise.IDatabase<{}, pg.IClient>, +): Promise { + const schema = this.getNodeParameter('schema', 'public', { extractValue: true }) as string; + const tableName = this.getNodeParameter('tableName', undefined, { + extractValue: true, + }) as string; + const target = `${schema}."${tableName}"`; + const firesOn = this.getNodeParameter('firesOn', 0) as string; + const functionReplace = + "CREATE OR REPLACE FUNCTION $1:raw RETURNS trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF AS $BODY$ begin perform pg_notify('$2:raw', row_to_json($3:raw)::text); return null; end; $BODY$;"; + const dropIfExist = 'DROP TRIGGER IF EXISTS $1:raw ON $2:raw'; + const functionExists = + "CREATE FUNCTION $1:raw RETURNS trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF AS $BODY$ begin perform pg_notify('$2:raw', row_to_json($3:raw)::text); return null; end; $BODY$"; + const trigger = + 'CREATE TRIGGER $4:raw AFTER $3:raw ON $1:raw FOR EACH ROW EXECUTE FUNCTION $2:raw'; + const whichData = firesOn === 'DELETE' ? 'old' : 'new'; + const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; + const nodeId = this.getNode().id.replace(/-/g, '_'); + let functionName = + (additionalFields.functionName as string) || `n8n_trigger_function_${nodeId}()`; + if (!functionName.includes('()')) { + functionName = functionName.concat('()'); + } + const triggerName = (additionalFields.triggerName as string) || `n8n_trigger_${nodeId}`; + const channelName = (additionalFields.channelName as string) || `n8n_channel_${nodeId}`; + if (channelName.includes('-')) { + throw new Error('Channel name cannot contain hyphens (-)'); + } + const replaceIfExists = additionalFields.replaceIfExists || false; + try { + if (replaceIfExists || !(additionalFields.triggerName || additionalFields.functionName)) { + await db.any(functionReplace, [functionName, channelName, whichData]); + await db.any(dropIfExist, [triggerName, target, whichData]); + } else { + await db.any(functionExists, [functionName, channelName, whichData]); + } + await db.any(trigger, [target, functionName, firesOn, triggerName]); + } catch (err) { + if (err.message.includes('near "-"')) { + throw new Error('Names cannot contain hyphens (-)'); + } + throw new Error(err as string); + } +} + +export async function initDB(this: ITriggerFunctions | ILoadOptionsFunctions) { + const credentials = await this.getCredentials('postgres'); + const pgp = pgPromise({ + // prevent spam in console "WARNING: Creating a duplicate database object for the same connection." + noWarnings: true, + }); + const config: IDataObject = { + host: credentials.host as string, + port: credentials.port as number, + database: credentials.database as string, + user: credentials.user as string, + password: credentials.password as string, + }; + + if (credentials.allowUnauthorizedCerts === true) { + config.ssl = { + rejectUnauthorized: false, + }; + } else { + config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined); + config.sslmode = (credentials.ssl as string) || 'disable'; + } + return pgp(config); +} + +export async function searchSchema(this: ILoadOptionsFunctions): Promise { + const db = await initDB.call(this); + const schemaList = await db.any('SELECT schema_name FROM information_schema.schemata'); + const results: INodeListSearchItems[] = schemaList.map((s) => ({ + name: s.schema_name as string, + value: s.schema_name as string, + })); + await db.$pool.end(); + return { results }; +} + +export async function searchTables(this: ILoadOptionsFunctions): Promise { + const schema = this.getNodeParameter('schema', 0) as IDataObject; + const db = await initDB.call(this); + let tableList = []; + try { + tableList = await db.any( + 'SELECT table_name FROM information_schema.tables WHERE table_schema = $1', + [schema.value], + ); + } catch (error) { + throw new Error(error as string); + } + const results: INodeListSearchItems[] = tableList.map((s) => ({ + name: s.table_name as string, + value: s.table_name as string, + })); + await db.$pool.end(); + return { results }; +} + +export async function dropTriggerFunction( + this: ITriggerFunctions, + db: pgPromise.IDatabase<{}, pg.IClient>, +): Promise { + const schema = this.getNodeParameter('schema', undefined, { extractValue: true }) as string; + const tableName = this.getNodeParameter('tableName', undefined, { + extractValue: true, + }) as string; + const target = `${schema}."${tableName}"`; + const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; + const nodeId = this.getNode().id.replace(/-/g, '_'); + let functionName = + (additionalFields.functionName as string) || `n8n_trigger_function_${nodeId}()`; + if (!functionName.includes('()')) { + functionName = functionName.concat('()'); + } + const triggerName = (additionalFields.triggerName as string) || `n8n_trigger_${nodeId}`; + try { + await db.any('DROP FUNCTION IF EXISTS $1:raw CASCADE', [functionName]); + await db.any('DROP TRIGGER IF EXISTS $1:raw ON $2:raw CASCADE', [triggerName, target]); + } catch (error) { + throw new Error(error as string); + } +} diff --git a/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.json b/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.json new file mode 100644 index 0000000000..5f733549cc --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.json @@ -0,0 +1,18 @@ +{ + "node": "n8n-nodes-base.postgresTrigger", + "nodeVersion": "1.0", + "codexVersion": "1.0", + "categories": ["Development"], + "resources": { + "credentialDocumentation": [ + { + "url": "https://docs.n8n.io/credentials/postgres" + } + ], + "primaryDocumentation": [ + { + "url": "https://docs.n8n.io/integrations/builtin/app-nodes/n8n-nodes-base.postgres/" + } + ] + } +} diff --git a/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts b/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts new file mode 100644 index 0000000000..38971f6c58 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/PostgresTrigger.node.ts @@ -0,0 +1,250 @@ +import type { + IDataObject, + INodeType, + INodeTypeDescription, + ITriggerFunctions, + ITriggerResponse, +} from 'n8n-workflow'; +import { + dropTriggerFunction, + pgTriggerFunction, + initDB, + searchSchema, + searchTables, +} from './PostgresTrigger.functions'; + +export class PostgresTrigger implements INodeType { + description: INodeTypeDescription = { + displayName: 'Postgres Trigger', + name: 'postgresTrigger', + icon: 'file:postgres.svg', + group: ['trigger'], + version: 1, + description: 'Listens to Postgres messages', + defaults: { + name: 'Postgres Trigger', + }, + inputs: [], + outputs: ['main'], + credentials: [ + { + name: 'postgres', + required: true, + }, + ], + properties: [ + { + displayName: 'Trigger Mode', + name: 'triggerMode', + type: 'options', + options: [ + { + name: 'Listen and Create Trigger Rule', + value: 'createTrigger', + description: 'Create a trigger rule and listen to it', + }, + { + name: 'Listen to Channel', + value: 'listenTrigger', + description: 'Receive real-time notifications from a channel', + }, + ], + default: 'createTrigger', + }, + { + displayName: 'Schema Name', + name: 'schema', + type: 'resourceLocator', + default: { mode: 'list', value: 'public' }, + required: true, + displayOptions: { + show: { + triggerMode: ['createTrigger'], + }, + }, + modes: [ + { + displayName: 'From List', + name: 'list', + type: 'list', + placeholder: 'Select a schema', + typeOptions: { + searchListMethod: 'searchSchema', + searchFilterRequired: false, + }, + }, + { + displayName: 'Name', + name: 'name', + type: 'string', + placeholder: 'e.g. public', + }, + ], + }, + { + displayName: 'Table Name', + name: 'tableName', + type: 'resourceLocator', + default: { mode: 'list', value: '' }, + required: true, + displayOptions: { + show: { + triggerMode: ['createTrigger'], + }, + }, + modes: [ + { + displayName: 'From List', + name: 'list', + type: 'list', + placeholder: 'Select a table', + typeOptions: { + searchListMethod: 'searchTables', + searchFilterRequired: false, + }, + }, + { + displayName: 'Name', + name: 'name', + type: 'string', + placeholder: 'e.g. table_name', + }, + ], + }, + { + displayName: 'Channel Name', + name: 'channelName', + type: 'string', + default: '', + required: true, + placeholder: 'e.g. n8n_channel', + description: 'Name of the channel to listen to', + displayOptions: { + show: { + triggerMode: ['listenTrigger'], + }, + }, + }, + { + displayName: 'Events to Listen To', + name: 'firesOn', + type: 'options', + displayOptions: { + show: { + triggerMode: ['createTrigger'], + }, + }, + options: [ + { + name: 'Insert', + value: 'INSERT', + }, + { + name: 'Update', + value: 'UPDATE', + }, + { + name: 'Delete', + value: 'DELETE', + }, + ], + default: 'INSERT', + }, + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'collection', + placeholder: 'Add Field', + default: {}, + displayOptions: { + show: { + triggerMode: ['createTrigger'], + }, + }, + options: [ + { + displayName: 'Channel Name', + name: 'channelName', + type: 'string', + placeholder: 'e.g. n8n_channel', + description: 'Name of the channel to listen to', + default: '', + }, + + { + displayName: 'Function Name', + name: 'functionName', + type: 'string', + description: 'Name of the function to create', + placeholder: 'e.g. n8n_trigger_function()', + default: '', + }, + { + displayName: 'Replace if Exists', + name: 'replaceIfExists', + type: 'boolean', + description: 'Whether to replace an existing function and trigger with the same name', + default: false, + }, + { + displayName: 'Trigger Name', + name: 'triggerName', + type: 'string', + description: 'Name of the trigger to create', + placeholder: 'e.g. n8n_trigger', + default: '', + }, + ], + }, + ], + }; + + methods = { + listSearch: { + searchSchema, + searchTables, + }, + }; + + async trigger(this: ITriggerFunctions): Promise { + const triggerMode = this.getNodeParameter('triggerMode', 0) as string; + const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; + + const db = await initDB.call(this); + if (triggerMode === 'createTrigger') { + await pgTriggerFunction.call(this, db); + } + const channelName = + triggerMode === 'createTrigger' + ? additionalFields.channelName || `n8n_channel_${this.getNode().id.replace(/-/g, '_')}` + : (this.getNodeParameter('channelName', 0) as string); + + const onNotification = async (data: any) => { + if (data.payload) { + try { + data.payload = JSON.parse(data.payload as string); + } catch (error) {} + } + this.emit([this.helpers.returnJsonArray([data])]); + }; + + const connection = await db.connect({ direct: true }); + connection.client.on('notification', onNotification); + await connection.none(`LISTEN ${channelName}`); + + // The "closeFunction" function gets called by n8n whenever + // the workflow gets deactivated and can so clean up. + const closeFunction = async () => { + connection.client.removeListener('notification', onNotification); + await connection.none(`UNLISTEN ${channelName}`); + if (triggerMode === 'createTrigger') { + await dropTriggerFunction.call(this, db); + } + await db.$pool.end(); + }; + + return { + closeFunction, + }; + } +} diff --git a/packages/nodes-base/nodes/Postgres/v1/genericFunctions.ts b/packages/nodes-base/nodes/Postgres/v1/genericFunctions.ts index df0331ddc7..bf8548f022 100644 --- a/packages/nodes-base/nodes/Postgres/v1/genericFunctions.ts +++ b/packages/nodes-base/nodes/Postgres/v1/genericFunctions.ts @@ -343,7 +343,7 @@ export async function pgInsert( * * @param {Function} getNodeParam The getter for the Node's parameters * @param {pgPromise.IMain<{}, pg.IClient>} pgp The pgPromise instance - * @param {pgPromise.IDatabase<{}, pg.IClient>} db The pgPromise database connection + * @param {pgPromise.IDatabase<{}, pg.IClient>} db`` The pgPromise database connection * @param {INodeExecutionData[]} items The items to be inserted */ export async function pgInsertV2( diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 5d8dcfc9ca..e54c094a7e 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -623,6 +623,7 @@ "dist/nodes/Plivo/Plivo.node.js", "dist/nodes/PostBin/PostBin.node.js", "dist/nodes/Postgres/Postgres.node.js", + "dist/nodes/Postgres/PostgresTrigger.node.js", "dist/nodes/PostHog/PostHog.node.js", "dist/nodes/Postmark/PostmarkTrigger.node.js", "dist/nodes/ProfitWell/ProfitWell.node.js",