n8n/packages/nodes-base/nodes/Redis/RedisTrigger.node.ts
Ricardo Espinoza a169b74062
fix(Redis Trigger Node): Activating a workflow with a Redis trigger fails (#8129)
## Summary
> Describe what the PR does and how to test. Photos and videos are
recommended.

We were awaiting for the promise to resolve before returning. Because
the trigger method does not return until the first message is received
or the connection errors, the requests that actives the workflows did
not respond making the activation button irresponsive.

Without the change:

https://www.loom.com/share/769b26d5d4ee407e999344fab3905eae

With the change:

https://www.loom.com/share/d1691ee1941248bc97f2ed97b0c129a3

## Related tickets and issues

https://linear.app/n8n/issue/ADO-895/activating-a-workflow-with-a-redis-trigger-fails


## Review / Merge checklist
- [x] PR title and summary are descriptive. **Remember, the title
automatically goes into the changelog. Use `(no-changelog)` otherwise.**
([conventions](https://github.com/n8n-io/n8n/blob/master/.github/pull_request_title_conventions.md))
- [x] [Docs updated](https://github.com/n8n-io/n8n-docs) or follow-up
ticket created.
2023-12-21 12:29:26 -05:00

134 lines
3 KiB
TypeScript

import type {
ITriggerFunctions,
IDataObject,
INodeType,
INodeTypeDescription,
ITriggerResponse,
} from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import redis from 'redis';
export class RedisTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'Redis Trigger',
name: 'redisTrigger',
icon: 'file:redis.svg',
group: ['trigger'],
version: 1,
description: 'Subscribe to redis channel',
defaults: {
name: 'Redis Trigger',
},
inputs: [],
outputs: ['main'],
credentials: [
{
name: 'redis',
required: true,
},
],
properties: [
{
displayName: 'Channels',
name: 'channels',
type: 'string',
default: '',
required: true,
description:
'Channels to subscribe to, multiple channels be defined with comma. Wildcard character(*) is supported.',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'JSON Parse Body',
name: 'jsonParseBody',
type: 'boolean',
default: false,
description: 'Whether to try to parse the message to an object',
},
{
displayName: 'Only Message',
name: 'onlyMessage',
type: 'boolean',
default: false,
description: 'Whether to return only the message property',
},
],
},
],
};
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
const credentials = await this.getCredentials('redis');
const redisOptions: redis.ClientOpts = {
host: credentials.host as string,
port: credentials.port as number,
db: credentials.database as number,
};
if (credentials.password) {
redisOptions.password = credentials.password as string;
}
const channels = (this.getNodeParameter('channels') as string).split(',');
const options = this.getNodeParameter('options') as IDataObject;
if (!channels) {
throw new NodeOperationError(this.getNode(), 'Channels are mandatory!');
}
const client = redis.createClient(redisOptions);
const manualTriggerFunction = async () => {
await new Promise((resolve, reject) => {
client.on('connect', () => {
for (const channel of channels) {
client.psubscribe(channel);
}
client.on('pmessage', (pattern: string, channel: string, message: string) => {
if (options.jsonParseBody) {
try {
message = JSON.parse(message);
} catch (error) {}
}
if (options.onlyMessage) {
this.emit([this.helpers.returnJsonArray({ message })]);
resolve(true);
return;
}
this.emit([this.helpers.returnJsonArray({ channel, message })]);
resolve(true);
});
});
client.on('error', (error) => {
reject(error);
});
});
};
if (this.getMode() === 'trigger') {
void manualTriggerFunction();
}
async function closeFunction() {
client.quit();
}
return {
closeFunction,
manualTriggerFunction,
};
}
}