diff --git a/packages/nodes-base/credentials/Redis.credentials.ts b/packages/nodes-base/credentials/Redis.credentials.ts index 8447219442..c48cbcdf53 100644 --- a/packages/nodes-base/credentials/Redis.credentials.ts +++ b/packages/nodes-base/credentials/Redis.credentials.ts @@ -31,7 +31,7 @@ export class Redis implements ICredentialType { default: 6379, }, { - displayName: 'Database', + displayName: 'Database Number', name: 'database', type: 'number', default: 0, diff --git a/packages/nodes-base/nodes/Redis/Redis.node.ts b/packages/nodes-base/nodes/Redis/Redis.node.ts index 2e6e6da52e..5bc1830405 100644 --- a/packages/nodes-base/nodes/Redis/Redis.node.ts +++ b/packages/nodes-base/nodes/Redis/Redis.node.ts @@ -68,6 +68,11 @@ export class Redis implements INodeType { value: 'set', description: 'Set the value of a key in redis.', }, + { + name: 'Publish', + value: 'publish', + description: 'Publish message to redis channel.', + }, ], default: 'info', description: 'The operation to perform.', @@ -370,6 +375,42 @@ export class Redis implements INodeType { default: 60, description: 'Number of seconds before key expiration.', }, + // ---------------------------------- + // publish + // ---------------------------------- + { + displayName: 'Channel', + name: 'channel', + type: 'string', + displayOptions: { + show: { + operation: [ + 'publish', + ], + }, + }, + default: '', + required: true, + description: 'Channel name.', + }, + { + displayName: 'Data', + name: 'messageData', + type: 'string', + displayOptions: { + show: { + operation: [ + 'publish', + ], + }, + }, + typeOptions: { + alwaysOpenEditWindow: true, + }, + default: '', + required: true, + description: 'Data to publish.', + }, ], }; @@ -491,6 +532,7 @@ export class Redis implements INodeType { const redisOptions: redis.ClientOpts = { host: credentials.host as string, port: credentials.port as number, + db: credentials.database as number, }; if (credentials.password) { @@ -516,7 +558,7 @@ export class Redis implements INodeType { resolve(this.prepareOutputData([{ json: convertInfoToObject(result as unknown as string) }])); client.quit(); - } else if (['delete', 'get', 'keys', 'set', 'incr'].includes(operation)) { + } else if (['delete', 'get', 'keys', 'set', 'incr', 'publish'].includes(operation)) { const items = this.getInputData(); const returnItems: INodeExecutionData[] = []; @@ -587,6 +629,12 @@ export class Redis implements INodeType { await clientExpire(keyIncr, ttl); } returnItems.push({json: {[keyIncr]: incrementVal}}); + } else if (operation === 'publish'){ + const channel = this.getNodeParameter('channel', itemIndex) as string; + const messageData = this.getNodeParameter('messageData', itemIndex) as string; + const clientPublish = util.promisify(client.publish).bind(client); + await clientPublish(channel, messageData); + returnItems.push(items[itemIndex]); } } diff --git a/packages/nodes-base/nodes/Redis/RedisTrigger.node.json b/packages/nodes-base/nodes/Redis/RedisTrigger.node.json new file mode 100644 index 0000000000..3ae8339ba8 --- /dev/null +++ b/packages/nodes-base/nodes/Redis/RedisTrigger.node.json @@ -0,0 +1,21 @@ +{ + "node": "n8n-nodes-base.redisTrigger", + "nodeVersion": "1.0", + "codexVersion": "1.0", + "categories": [ + "Communication", + "Development" + ], + "resources": { + "credentialDocumentation": [ + { + "url": "https://docs.n8n.io/credentials/redis" + } + ], + "primaryDocumentation": [ + { + "url": "https://docs.n8n.io/nodes/n8n-nodes-base.redisTrigger/" + } + ] + } +} diff --git a/packages/nodes-base/nodes/Redis/RedisTrigger.node.ts b/packages/nodes-base/nodes/Redis/RedisTrigger.node.ts new file mode 100644 index 0000000000..5a7073e75b --- /dev/null +++ b/packages/nodes-base/nodes/Redis/RedisTrigger.node.ts @@ -0,0 +1,143 @@ +import { + ITriggerFunctions, +} from 'n8n-core'; + +import { + IDataObject, + INodeType, + INodeTypeDescription, + ITriggerResponse, + NodeOperationError, +} from 'n8n-workflow'; + +import * as redis from 'redis'; + +export class RedisTrigger implements INodeType { + description: INodeTypeDescription = { + displayName: 'Redis Trigger', + name: 'redisTrigger', + icon: 'file:redis.svg', + group: ['trigger'], + version: 1, + description: 'Subscribe to redis channel', + defaults: { + name: 'Redis Trigger', + }, + inputs: [], + outputs: ['main'], + credentials: [ + { + name: 'redis', + required: true, + }, + ], + properties: [ + { + displayName: 'Channels', + name: 'channels', + type: 'string', + default: '', + required: true, + description: `Channels to subscribe to, multiple channels be defined with comma. Wildcard character(*) is supported`, + }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Option', + default: {}, + options: [ + { + displayName: 'JSON Parse Body', + name: 'jsonParseBody', + type: 'boolean', + default: false, + description: 'Try to parse the message to an object', + }, + { + displayName: 'Only Message', + name: 'onlyMessage', + type: 'boolean', + default: false, + description: 'Returns only the message property', + }, + ], + }, + ], + }; + + async trigger(this: ITriggerFunctions): Promise { + + const credentials = await this.getCredentials('redis'); + + if (credentials === undefined) { + throw new NodeOperationError(this.getNode(), 'No credentials got returned!'); + } + + const redisOptions: redis.ClientOpts = { + host: credentials.host as string, + port: credentials.port as number, + db: credentials.database as number, + }; + + if (credentials.password) { + redisOptions.password = credentials.password as string; + } + + + const channels = (this.getNodeParameter('channels') as string).split(','); + + const options = this.getNodeParameter('options') as IDataObject; + + if (!channels) { + throw new NodeOperationError(this.getNode(), 'Channels are mandatory!'); + } + + const client = redis.createClient(redisOptions); + + const self = this; + + async function manualTriggerFunction() { + await new Promise((resolve, reject) => { + client.on('connect', () => { + for (const channel of channels) { + client.psubscribe(channel); + } + client.on('pmessage', (pattern: string, channel: string, message: string) => { + if (options.jsonParseBody) { + try { + message = JSON.parse(message); + } catch (error) { } + } + + if (options.onlyMessage) { + self.emit([self.helpers.returnJsonArray({message})]); + resolve(true); + return; + } + + self.emit([self.helpers.returnJsonArray({channel, message})]); + resolve(true); + }); + }); + + client.on('error', (error) => { + reject(error); + }); + }); + } + + if (this.getMode() === 'trigger') { + manualTriggerFunction(); + } + + async function closeFunction() { + client.quit(); + } + + return { + closeFunction, + manualTriggerFunction, + }; + } +} diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index e90250aa1a..973559d9c2 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -584,6 +584,7 @@ "dist/nodes/ReadPdf/ReadPdf.node.js", "dist/nodes/Reddit/Reddit.node.js", "dist/nodes/Redis/Redis.node.js", + "dist/nodes/Redis/RedisTrigger.node.js", "dist/nodes/RenameKeys/RenameKeys.node.js", "dist/nodes/RespondToWebhook/RespondToWebhook.node.js", "dist/nodes/Rocketchat/Rocketchat.node.js",