2023-08-01 02:56:54 -07:00
import {
NodeOperationError ,
type IDataObject ,
type INodeType ,
type INodeTypeDescription ,
type ITriggerFunctions ,
type ITriggerResponse ,
2023-06-07 02:01:57 -07:00
} from 'n8n-workflow' ;
import {
pgTriggerFunction ,
initDB ,
searchSchema ,
searchTables ,
2023-08-01 02:56:54 -07:00
prepareNames ,
2023-06-07 02:01:57 -07:00
} from './PostgresTrigger.functions' ;
export class PostgresTrigger implements INodeType {
description : INodeTypeDescription = {
displayName : 'Postgres Trigger' ,
name : 'postgresTrigger' ,
icon : 'file:postgres.svg' ,
group : [ 'trigger' ] ,
version : 1 ,
description : 'Listens to Postgres messages' ,
2023-06-23 03:29:24 -07:00
eventTriggerDescription : '' ,
2023-06-07 02:01:57 -07:00
defaults : {
name : 'Postgres Trigger' ,
} ,
2023-06-23 03:29:24 -07:00
triggerPanel : {
header : '' ,
executionsHelp : {
inactive :
"<b>While building your workflow</b>, click the 'listen' button, then trigger a Postgres event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Once you're happy with your workflow</b>, <a data-key='activate'>activate</a> it. Then every time a change is detected, the workflow will execute. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor." ,
active :
"<b>While building your workflow</b>, click the 'listen' button, then trigger a Postgres event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Your workflow will also execute automatically</b>, since it's activated. Every time a change is detected, this node will trigger an execution. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor." ,
} ,
activationHint :
2023-08-01 02:56:54 -07:00
"Once you've finished building your workflow, <a data-key='activate'>activate</a> it to have it also listen continuously (you just won't see those executions here)." ,
2023-06-23 03:29:24 -07:00
} ,
2023-06-07 02:01:57 -07:00
inputs : [ ] ,
outputs : [ 'main' ] ,
credentials : [
{
name : 'postgres' ,
required : true ,
} ,
] ,
properties : [
{
2023-06-22 08:46:46 -07:00
displayName : 'Listen For' ,
2023-06-07 02:01:57 -07:00
name : 'triggerMode' ,
type : 'options' ,
options : [
{
2023-06-22 08:46:46 -07:00
name : 'Table Row Change Events' ,
2023-06-07 02:01:57 -07:00
value : 'createTrigger' ,
2023-06-22 08:46:46 -07:00
description : 'Insert, update or delete' ,
2023-06-07 02:01:57 -07:00
} ,
{
2023-06-22 08:46:46 -07:00
name : 'Advanced' ,
2023-06-07 02:01:57 -07:00
value : 'listenTrigger' ,
2023-06-22 08:46:46 -07:00
description : 'Listen to existing Postgres channel' ,
2023-06-07 02:01:57 -07:00
} ,
] ,
default : 'createTrigger' ,
} ,
{
displayName : 'Schema Name' ,
name : 'schema' ,
type : 'resourceLocator' ,
default : { mode : 'list' , value : 'public' } ,
required : true ,
displayOptions : {
show : {
triggerMode : [ 'createTrigger' ] ,
} ,
} ,
modes : [
{
displayName : 'From List' ,
name : 'list' ,
type : 'list' ,
placeholder : 'Select a schema' ,
typeOptions : {
searchListMethod : 'searchSchema' ,
searchFilterRequired : false ,
} ,
} ,
{
displayName : 'Name' ,
name : 'name' ,
type : 'string' ,
placeholder : 'e.g. public' ,
} ,
] ,
} ,
{
displayName : 'Table Name' ,
name : 'tableName' ,
type : 'resourceLocator' ,
default : { mode : 'list' , value : '' } ,
required : true ,
displayOptions : {
show : {
triggerMode : [ 'createTrigger' ] ,
} ,
} ,
modes : [
{
displayName : 'From List' ,
name : 'list' ,
type : 'list' ,
placeholder : 'Select a table' ,
typeOptions : {
searchListMethod : 'searchTables' ,
searchFilterRequired : false ,
} ,
} ,
{
displayName : 'Name' ,
name : 'name' ,
type : 'string' ,
placeholder : 'e.g. table_name' ,
} ,
] ,
} ,
{
displayName : 'Channel Name' ,
name : 'channelName' ,
type : 'string' ,
default : '' ,
required : true ,
placeholder : 'e.g. n8n_channel' ,
description : 'Name of the channel to listen to' ,
displayOptions : {
show : {
triggerMode : [ 'listenTrigger' ] ,
} ,
} ,
} ,
{
2023-06-22 08:46:46 -07:00
displayName : 'Event to listen for' ,
2023-06-07 02:01:57 -07:00
name : 'firesOn' ,
type : 'options' ,
displayOptions : {
show : {
triggerMode : [ 'createTrigger' ] ,
} ,
} ,
options : [
{
name : 'Insert' ,
value : 'INSERT' ,
} ,
{
name : 'Update' ,
value : 'UPDATE' ,
} ,
{
name : 'Delete' ,
value : 'DELETE' ,
} ,
] ,
default : 'INSERT' ,
} ,
{
displayName : 'Additional Fields' ,
name : 'additionalFields' ,
type : 'collection' ,
placeholder : 'Add Field' ,
default : { } ,
displayOptions : {
show : {
triggerMode : [ 'createTrigger' ] ,
} ,
} ,
options : [
{
displayName : 'Channel Name' ,
name : 'channelName' ,
type : 'string' ,
placeholder : 'e.g. n8n_channel' ,
description : 'Name of the channel to listen to' ,
default : '' ,
} ,
{
displayName : 'Function Name' ,
name : 'functionName' ,
type : 'string' ,
description : 'Name of the function to create' ,
placeholder : 'e.g. n8n_trigger_function()' ,
default : '' ,
} ,
{
displayName : 'Replace if Exists' ,
name : 'replaceIfExists' ,
type : 'boolean' ,
description : 'Whether to replace an existing function and trigger with the same name' ,
default : false ,
} ,
{
displayName : 'Trigger Name' ,
name : 'triggerName' ,
type : 'string' ,
description : 'Name of the trigger to create' ,
placeholder : 'e.g. n8n_trigger' ,
default : '' ,
} ,
] ,
} ,
] ,
} ;
methods = {
listSearch : {
searchSchema ,
searchTables ,
} ,
} ;
async trigger ( this : ITriggerFunctions ) : Promise < ITriggerResponse > {
const triggerMode = this . getNodeParameter ( 'triggerMode' , 0 ) as string ;
const additionalFields = this . getNodeParameter ( 'additionalFields' , 0 ) as IDataObject ;
2023-08-01 02:56:54 -07:00
// initialize and connect to database
const { db , pgp } = await initDB . call ( this ) ;
const connection = await db . connect ( { direct : true } ) ;
2023-06-07 02:01:57 -07:00
2023-08-01 02:56:54 -07:00
// prepare and set up listener
const onNotification = async ( data : IDataObject ) = > {
2023-06-07 02:01:57 -07:00
if ( data . payload ) {
try {
2023-08-01 02:56:54 -07:00
data . payload = JSON . parse ( data . payload as string ) as IDataObject ;
2023-06-07 02:01:57 -07:00
} catch ( error ) { }
}
this . emit ( [ this . helpers . returnJsonArray ( [ data ] ) ] ) ;
} ;
2023-08-01 02:56:54 -07:00
// create trigger, funstion and channel or use existing channel
const pgNames = prepareNames ( this . getNode ( ) . id , this . getMode ( ) , additionalFields ) ;
if ( triggerMode === 'createTrigger' ) {
await pgTriggerFunction . call (
this ,
db ,
additionalFields ,
pgNames . functionName ,
pgNames . triggerName ,
pgNames . channelName ,
) ;
} else {
pgNames . channelName = this . getNodeParameter ( 'channelName' , '' ) as string ;
}
// listen to channel
await connection . none ( ` LISTEN ${ pgNames . channelName } ` ) ;
const cleanUpDb = async ( ) = > {
try {
await connection . none ( 'UNLISTEN $1:name' , [ pgNames . channelName ] ) ;
if ( triggerMode === 'createTrigger' ) {
const functionName = pgNames . functionName . includes ( '(' )
? pgNames . functionName . split ( '(' ) [ 0 ]
: pgNames . functionName ;
await connection . any ( 'DROP FUNCTION IF EXISTS $1:name CASCADE' , [ functionName ] ) ;
const schema = this . getNodeParameter ( 'schema' , undefined , {
extractValue : true ,
} ) as string ;
const table = this . getNodeParameter ( 'tableName' , undefined , {
extractValue : true ,
} ) as string ;
await connection . any ( 'DROP TRIGGER IF EXISTS $1:name ON $2:name.$3:name CASCADE' , [
pgNames . triggerName ,
schema ,
table ,
] ) ;
}
connection . client . removeListener ( 'notification' , onNotification ) ;
} catch ( error ) {
throw new NodeOperationError (
this . getNode ( ) ,
` Postgres Trigger Error: ${ ( error as Error ) . message } ` ,
) ;
} finally {
pgp . end ( ) ;
}
} ;
2023-06-07 02:01:57 -07:00
connection . client . on ( 'notification' , onNotification ) ;
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
const closeFunction = async ( ) = > {
2023-08-01 02:56:54 -07:00
await cleanUpDb ( ) ;
} ;
const manualTriggerFunction = async ( ) = > {
await new Promise ( async ( resolve , reject ) = > {
const timeoutHandler = setTimeout ( async ( ) = > {
reject (
new Error (
await ( async ( ) = > {
await cleanUpDb ( ) ;
return 'Aborted, no data received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.' ;
} ) ( ) ,
) ,
) ;
} , 30000 ) ;
connection . client . on ( 'notification' , async ( data : IDataObject ) = > {
if ( data . payload ) {
try {
data . payload = JSON . parse ( data . payload as string ) as IDataObject ;
} catch ( error ) { }
}
this . emit ( [ this . helpers . returnJsonArray ( [ data ] ) ] ) ;
clearTimeout ( timeoutHandler ) ;
resolve ( true ) ;
} ) ;
} ) ;
2023-06-07 02:01:57 -07:00
} ;
return {
closeFunction ,
2023-08-01 02:56:54 -07:00
manualTriggerFunction : this.getMode ( ) === 'manual' ? manualTriggerFunction : undefined ,
2023-06-07 02:01:57 -07:00
} ;
}
}