From 1ed72af255cf7843f9d84daf49d14e789c7420bc Mon Sep 17 00:00:00 2001 From: Jan Oberhauser Date: Fri, 1 May 2020 10:02:34 +0200 Subject: [PATCH] :bug: Fix issue with push messages that get received to fast --- .../src/components/mixins/pushConnection.ts | 80 ++++++++++++++----- 1 file changed, 62 insertions(+), 18 deletions(-) diff --git a/packages/editor-ui/src/components/mixins/pushConnection.ts b/packages/editor-ui/src/components/mixins/pushConnection.ts index f32423f6c9..d9e77d929c 100644 --- a/packages/editor-ui/src/components/mixins/pushConnection.ts +++ b/packages/editor-ui/src/components/mixins/pushConnection.ts @@ -22,6 +22,8 @@ export const pushConnection = mixins( return { eventSource: null as EventSource | null, reconnectTimeout: null as NodeJS.Timeout | null, + retryTimeout: null as NodeJS.Timeout | null, + pushMessageQueue: [] as Array<{ event: Event, retriesLeft: number }>, }; }, computed: { @@ -96,47 +98,84 @@ export const pushConnection = mixins( * @param {number} retryAttempts * @returns */ - retryPushMessage (event: Event, retryAttempts: number) { - retryAttempts = retryAttempts - 1; + queuePushMessage (event: Event, retryAttempts: number) { + this.pushMessageQueue.push({ event, retriesLeft: retryAttempts }); - if (retryAttempts <= 0) { - return; + if (this.retryTimeout === null) { + this.retryTimeout = setTimeout(this.processWaitingPushMessages, 20); + } + }, + + + /** + * Process the push messages which are waiting in the queue + */ + processWaitingPushMessages () { + if (this.retryTimeout !== null) { + clearTimeout(this.retryTimeout); + this.retryTimeout = null; } - setTimeout(() => { - this.pushMessageReceived(event, retryAttempts); - }, 200); + const queueLength = this.pushMessageQueue.length; + for (let i = 0; i < queueLength; i++) { + const messageData = this.pushMessageQueue.shift(); + + if (this.pushMessageReceived(messageData!.event, true) === false) { + // Was not successful + messageData!.retriesLeft -= 1; + + if (messageData!.retriesLeft > 0) { + // If still retries are left add it back and stop execution + this.pushMessageQueue.unshift(messageData!); + } + break; + } + } + + if (this.pushMessageQueue.length !== 0 && this.retryTimeout === null) { + this.retryTimeout = setTimeout(this.processWaitingPushMessages, 25); + } }, + /** * Process a newly received message * * @param {Event} event The event data with the message data - * @returns {void} + * @param {boolean} [isRetry] If it is a retry + * @returns {boolean} If message could be processed */ - pushMessageReceived (event: Event, retryAttempts?: number): void { - retryAttempts = retryAttempts || 5; + pushMessageReceived (event: Event, isRetry?: boolean): boolean { + const retryAttempts = 5; let receivedData: IPushData; try { // @ts-ignore receivedData = JSON.parse(event.data); } catch (error) { - console.error('The received push data is not valid JSON.'); // eslint-disable-line no-console - return; + return false; + } + + if (!['testWebhookReceived'].includes(receivedData.type) && isRetry !== true && this.pushMessageQueue.length) { + // If there are already messages in the queue add the new one that all of them + // get executed in order + this.queuePushMessage(event, retryAttempts); + return false; } if (['nodeExecuteAfter', 'nodeExecuteBefore'].includes(receivedData.type)) { if (this.$store.getters.isActionActive('workflowRunning') === false) { // No workflow is running so ignore the messages - return; + return false; } const pushData = receivedData.data as IPushDataNodeExecuteBefore; if (this.$store.getters.activeExecutionId !== pushData.executionId) { // The data is not for the currently active execution or // we do not have the execution id yet. - this.retryPushMessage(event, retryAttempts); - return; + if (isRetry !== true) { + this.queuePushMessage(event, retryAttempts); + } + return false; } } @@ -148,14 +187,16 @@ export const pushConnection = mixins( if (this.$store.getters.isActionActive('workflowRunning') === false) { // No workflow is running so ignore the messages - return; + return false; } if (this.$store.getters.activeExecutionId !== pushData.executionIdActive) { // The workflow which did finish execution did either not get started // by this session or we do not have the execution id yet. - this.retryPushMessage(event, retryAttempts); - return; + if (isRetry !== true) { + this.queuePushMessage(event, retryAttempts); + } + return false; } const runDataExecuted = pushData.data; @@ -231,7 +272,10 @@ export const pushConnection = mixins( this.$store.commit('setExecutionWaitingForWebhook', false); this.$store.commit('setActiveExecutionId', pushData.executionId); } + + this.processWaitingPushMessages(); } + return true; }, }, });