mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-24 02:52:24 -08:00
🐛 Fix issue with push messages that get received to fast
This commit is contained in:
parent
976e02efcc
commit
1ed72af255
|
@ -22,6 +22,8 @@ export const pushConnection = mixins(
|
|||
return {
|
||||
eventSource: null as EventSource | null,
|
||||
reconnectTimeout: null as NodeJS.Timeout | null,
|
||||
retryTimeout: null as NodeJS.Timeout | null,
|
||||
pushMessageQueue: [] as Array<{ event: Event, retriesLeft: number }>,
|
||||
};
|
||||
},
|
||||
computed: {
|
||||
|
@ -96,47 +98,84 @@ export const pushConnection = mixins(
|
|||
* @param {number} retryAttempts
|
||||
* @returns
|
||||
*/
|
||||
retryPushMessage (event: Event, retryAttempts: number) {
|
||||
retryAttempts = retryAttempts - 1;
|
||||
queuePushMessage (event: Event, retryAttempts: number) {
|
||||
this.pushMessageQueue.push({ event, retriesLeft: retryAttempts });
|
||||
|
||||
if (retryAttempts <= 0) {
|
||||
return;
|
||||
if (this.retryTimeout === null) {
|
||||
this.retryTimeout = setTimeout(this.processWaitingPushMessages, 20);
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
/**
|
||||
* Process the push messages which are waiting in the queue
|
||||
*/
|
||||
processWaitingPushMessages () {
|
||||
if (this.retryTimeout !== null) {
|
||||
clearTimeout(this.retryTimeout);
|
||||
this.retryTimeout = null;
|
||||
}
|
||||
|
||||
setTimeout(() => {
|
||||
this.pushMessageReceived(event, retryAttempts);
|
||||
}, 200);
|
||||
const queueLength = this.pushMessageQueue.length;
|
||||
for (let i = 0; i < queueLength; i++) {
|
||||
const messageData = this.pushMessageQueue.shift();
|
||||
|
||||
if (this.pushMessageReceived(messageData!.event, true) === false) {
|
||||
// Was not successful
|
||||
messageData!.retriesLeft -= 1;
|
||||
|
||||
if (messageData!.retriesLeft > 0) {
|
||||
// If still retries are left add it back and stop execution
|
||||
this.pushMessageQueue.unshift(messageData!);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.pushMessageQueue.length !== 0 && this.retryTimeout === null) {
|
||||
this.retryTimeout = setTimeout(this.processWaitingPushMessages, 25);
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
/**
|
||||
* Process a newly received message
|
||||
*
|
||||
* @param {Event} event The event data with the message data
|
||||
* @returns {void}
|
||||
* @param {boolean} [isRetry] If it is a retry
|
||||
* @returns {boolean} If message could be processed
|
||||
*/
|
||||
pushMessageReceived (event: Event, retryAttempts?: number): void {
|
||||
retryAttempts = retryAttempts || 5;
|
||||
pushMessageReceived (event: Event, isRetry?: boolean): boolean {
|
||||
const retryAttempts = 5;
|
||||
|
||||
let receivedData: IPushData;
|
||||
try {
|
||||
// @ts-ignore
|
||||
receivedData = JSON.parse(event.data);
|
||||
} catch (error) {
|
||||
console.error('The received push data is not valid JSON.'); // eslint-disable-line no-console
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!['testWebhookReceived'].includes(receivedData.type) && isRetry !== true && this.pushMessageQueue.length) {
|
||||
// If there are already messages in the queue add the new one that all of them
|
||||
// get executed in order
|
||||
this.queuePushMessage(event, retryAttempts);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (['nodeExecuteAfter', 'nodeExecuteBefore'].includes(receivedData.type)) {
|
||||
if (this.$store.getters.isActionActive('workflowRunning') === false) {
|
||||
// No workflow is running so ignore the messages
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
const pushData = receivedData.data as IPushDataNodeExecuteBefore;
|
||||
if (this.$store.getters.activeExecutionId !== pushData.executionId) {
|
||||
// The data is not for the currently active execution or
|
||||
// we do not have the execution id yet.
|
||||
this.retryPushMessage(event, retryAttempts);
|
||||
return;
|
||||
if (isRetry !== true) {
|
||||
this.queuePushMessage(event, retryAttempts);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,14 +187,16 @@ export const pushConnection = mixins(
|
|||
|
||||
if (this.$store.getters.isActionActive('workflowRunning') === false) {
|
||||
// No workflow is running so ignore the messages
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (this.$store.getters.activeExecutionId !== pushData.executionIdActive) {
|
||||
// The workflow which did finish execution did either not get started
|
||||
// by this session or we do not have the execution id yet.
|
||||
this.retryPushMessage(event, retryAttempts);
|
||||
return;
|
||||
if (isRetry !== true) {
|
||||
this.queuePushMessage(event, retryAttempts);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
const runDataExecuted = pushData.data;
|
||||
|
@ -231,7 +272,10 @@ export const pushConnection = mixins(
|
|||
this.$store.commit('setExecutionWaitingForWebhook', false);
|
||||
this.$store.commit('setActiveExecutionId', pushData.executionId);
|
||||
}
|
||||
|
||||
this.processWaitingPushMessages();
|
||||
}
|
||||
return true;
|
||||
},
|
||||
},
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue