mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 13:27:31 -08:00
a169b74062
## Summary > Describe what the PR does and how to test. Photos and videos are recommended. We were awaiting for the promise to resolve before returning. Because the trigger method does not return until the first message is received or the connection errors, the requests that actives the workflows did not respond making the activation button irresponsive. Without the change: https://www.loom.com/share/769b26d5d4ee407e999344fab3905eae With the change: https://www.loom.com/share/d1691ee1941248bc97f2ed97b0c129a3 ## Related tickets and issues https://linear.app/n8n/issue/ADO-895/activating-a-workflow-with-a-redis-trigger-fails ## Review / Merge checklist - [x] PR title and summary are descriptive. **Remember, the title automatically goes into the changelog. Use `(no-changelog)` otherwise.** ([conventions](https://github.com/n8n-io/n8n/blob/master/.github/pull_request_title_conventions.md)) - [x] [Docs updated](https://github.com/n8n-io/n8n-docs) or follow-up ticket created.
134 lines
3 KiB
TypeScript
134 lines
3 KiB
TypeScript
import type {
|
|
ITriggerFunctions,
|
|
IDataObject,
|
|
INodeType,
|
|
INodeTypeDescription,
|
|
ITriggerResponse,
|
|
} from 'n8n-workflow';
|
|
import { NodeOperationError } from 'n8n-workflow';
|
|
|
|
import redis from 'redis';
|
|
|
|
export class RedisTrigger implements INodeType {
|
|
description: INodeTypeDescription = {
|
|
displayName: 'Redis Trigger',
|
|
name: 'redisTrigger',
|
|
icon: 'file:redis.svg',
|
|
group: ['trigger'],
|
|
version: 1,
|
|
description: 'Subscribe to redis channel',
|
|
defaults: {
|
|
name: 'Redis Trigger',
|
|
},
|
|
inputs: [],
|
|
outputs: ['main'],
|
|
credentials: [
|
|
{
|
|
name: 'redis',
|
|
required: true,
|
|
},
|
|
],
|
|
properties: [
|
|
{
|
|
displayName: 'Channels',
|
|
name: 'channels',
|
|
type: 'string',
|
|
default: '',
|
|
required: true,
|
|
description:
|
|
'Channels to subscribe to, multiple channels be defined with comma. Wildcard character(*) is supported.',
|
|
},
|
|
{
|
|
displayName: 'Options',
|
|
name: 'options',
|
|
type: 'collection',
|
|
placeholder: 'Add Option',
|
|
default: {},
|
|
options: [
|
|
{
|
|
displayName: 'JSON Parse Body',
|
|
name: 'jsonParseBody',
|
|
type: 'boolean',
|
|
default: false,
|
|
description: 'Whether to try to parse the message to an object',
|
|
},
|
|
{
|
|
displayName: 'Only Message',
|
|
name: 'onlyMessage',
|
|
type: 'boolean',
|
|
default: false,
|
|
description: 'Whether to return only the message property',
|
|
},
|
|
],
|
|
},
|
|
],
|
|
};
|
|
|
|
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
|
|
const credentials = await this.getCredentials('redis');
|
|
|
|
const redisOptions: redis.ClientOpts = {
|
|
host: credentials.host as string,
|
|
port: credentials.port as number,
|
|
db: credentials.database as number,
|
|
};
|
|
|
|
if (credentials.password) {
|
|
redisOptions.password = credentials.password as string;
|
|
}
|
|
|
|
const channels = (this.getNodeParameter('channels') as string).split(',');
|
|
|
|
const options = this.getNodeParameter('options') as IDataObject;
|
|
|
|
if (!channels) {
|
|
throw new NodeOperationError(this.getNode(), 'Channels are mandatory!');
|
|
}
|
|
|
|
const client = redis.createClient(redisOptions);
|
|
|
|
const manualTriggerFunction = async () => {
|
|
await new Promise((resolve, reject) => {
|
|
client.on('connect', () => {
|
|
for (const channel of channels) {
|
|
client.psubscribe(channel);
|
|
}
|
|
client.on('pmessage', (pattern: string, channel: string, message: string) => {
|
|
if (options.jsonParseBody) {
|
|
try {
|
|
message = JSON.parse(message);
|
|
} catch (error) {}
|
|
}
|
|
|
|
if (options.onlyMessage) {
|
|
this.emit([this.helpers.returnJsonArray({ message })]);
|
|
resolve(true);
|
|
return;
|
|
}
|
|
|
|
this.emit([this.helpers.returnJsonArray({ channel, message })]);
|
|
resolve(true);
|
|
});
|
|
});
|
|
|
|
client.on('error', (error) => {
|
|
reject(error);
|
|
});
|
|
});
|
|
};
|
|
|
|
if (this.getMode() === 'trigger') {
|
|
void manualTriggerFunction();
|
|
}
|
|
|
|
async function closeFunction() {
|
|
client.quit();
|
|
}
|
|
|
|
return {
|
|
closeFunction,
|
|
manualTriggerFunction,
|
|
};
|
|
}
|
|
}
|