mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
⚡ Improve workflow retry
This commit is contained in:
parent
d59a043e3f
commit
886100eeef
|
@ -4,10 +4,10 @@ import {
|
||||||
|
|
||||||
import {
|
import {
|
||||||
createDeferredPromise,
|
createDeferredPromise,
|
||||||
IExecutionsCurrentSummary,
|
|
||||||
} from 'n8n-core';
|
} from 'n8n-core';
|
||||||
|
|
||||||
import {
|
import {
|
||||||
|
IExecutionsCurrentSummary,
|
||||||
IExecutingWorkflowData,
|
IExecutingWorkflowData,
|
||||||
IWorkflowExecutionDataProcess,
|
IWorkflowExecutionDataProcess,
|
||||||
} from '.';
|
} from '.';
|
||||||
|
@ -131,6 +131,7 @@ export class ActiveExecutions {
|
||||||
returnData.push(
|
returnData.push(
|
||||||
{
|
{
|
||||||
id,
|
id,
|
||||||
|
retryOf: data.executionData.retryOf as string | undefined,
|
||||||
startedAt: data.startedAt,
|
startedAt: data.startedAt,
|
||||||
mode: data.executionData.executionMode,
|
mode: data.executionData.executionMode,
|
||||||
workflowId: data.executionData.workflowData.id! as string,
|
workflowId: data.executionData.workflowData.id! as string,
|
||||||
|
|
|
@ -162,8 +162,8 @@ export interface IExecutionsStopData {
|
||||||
export interface IExecutionsSummary {
|
export interface IExecutionsSummary {
|
||||||
id?: string; // executionIdDb
|
id?: string; // executionIdDb
|
||||||
idActive?: string; // executionIdActive
|
idActive?: string; // executionIdActive
|
||||||
mode: WorkflowExecuteMode;
|
|
||||||
finished?: boolean;
|
finished?: boolean;
|
||||||
|
mode: WorkflowExecuteMode;
|
||||||
retryOf?: string;
|
retryOf?: string;
|
||||||
retrySuccessId?: string;
|
retrySuccessId?: string;
|
||||||
startedAt: Date;
|
startedAt: Date;
|
||||||
|
@ -172,6 +172,16 @@ export interface IExecutionsSummary {
|
||||||
workflowName?: string;
|
workflowName?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
export interface IExecutionsCurrentSummary {
|
||||||
|
id: string;
|
||||||
|
retryOf?: string;
|
||||||
|
startedAt: Date;
|
||||||
|
mode: WorkflowExecuteMode;
|
||||||
|
workflowId: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
export interface IExecutionDeleteFilter {
|
export interface IExecutionDeleteFilter {
|
||||||
deleteBefore?: Date;
|
deleteBefore?: Date;
|
||||||
filters?: IDataObject;
|
filters?: IDataObject;
|
||||||
|
@ -260,6 +270,7 @@ export interface IPushDataExecutionFinished {
|
||||||
data: IRun;
|
data: IRun;
|
||||||
executionIdActive: string;
|
executionIdActive: string;
|
||||||
executionIdDb?: string;
|
executionIdDb?: string;
|
||||||
|
retryOf?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface IPushDataExecutionStarted {
|
export interface IPushDataExecutionStarted {
|
||||||
|
|
|
@ -828,7 +828,7 @@ class App {
|
||||||
|
|
||||||
|
|
||||||
// Retries a failed execution
|
// Retries a failed execution
|
||||||
this.app.post('/rest/executions/:id/retry', ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<string> => {
|
this.app.post('/rest/executions/:id/retry', ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<boolean> => {
|
||||||
// Get the data to execute
|
// Get the data to execute
|
||||||
const fullExecutionDataFlatted = await Db.collections.Execution!.findOne(req.params.id);
|
const fullExecutionDataFlatted = await Db.collections.Execution!.findOne(req.params.id);
|
||||||
|
|
||||||
|
@ -859,7 +859,13 @@ class App {
|
||||||
const workflowRunner = new WorkflowRunner();
|
const workflowRunner = new WorkflowRunner();
|
||||||
const executionId = await workflowRunner.run(data);
|
const executionId = await workflowRunner.run(data);
|
||||||
|
|
||||||
return executionId;
|
const executionData = await this.activeExecutionsInstance.getPostExecutePromise(executionId);
|
||||||
|
|
||||||
|
if (executionData === undefined) {
|
||||||
|
throw new Error('The retry did not start for an unknown reason.');
|
||||||
|
}
|
||||||
|
|
||||||
|
return !!executionData.finished;
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
|
||||||
|
@ -893,7 +899,6 @@ class App {
|
||||||
|
|
||||||
|
|
||||||
// Returns all the currently working executions
|
// Returns all the currently working executions
|
||||||
// this.app.get('/rest/executions-current', ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<IExecutionsCurrentSummaryExtended[]> => {
|
|
||||||
this.app.get('/rest/executions-current', ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<IExecutionsSummary[]> => {
|
this.app.get('/rest/executions-current', ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<IExecutionsSummary[]> => {
|
||||||
const executingWorkflows = this.activeExecutionsInstance.getActiveExecutions();
|
const executingWorkflows = this.activeExecutionsInstance.getActiveExecutions();
|
||||||
|
|
||||||
|
@ -912,7 +917,8 @@ class App {
|
||||||
{
|
{
|
||||||
idActive: data.id.toString(),
|
idActive: data.id.toString(),
|
||||||
workflowId: data.workflowId,
|
workflowId: data.workflowId,
|
||||||
mode:data.mode,
|
mode: data.mode,
|
||||||
|
retryOf: data.retryOf,
|
||||||
startedAt: new Date(data.startedAt),
|
startedAt: new Date(data.startedAt),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
|
@ -67,7 +67,7 @@ function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mo
|
||||||
* @param {string} executionIdActive The id of the finished execution
|
* @param {string} executionIdActive The id of the finished execution
|
||||||
* @param {string} [executionIdDb] The database id of finished execution
|
* @param {string} [executionIdDb] The database id of finished execution
|
||||||
*/
|
*/
|
||||||
export function pushExecutionFinished(fullRunData: IRun, executionIdActive: string, executionIdDb?: string) {
|
export function pushExecutionFinished(fullRunData: IRun, executionIdActive: string, executionIdDb?: string, retryOf?: string) {
|
||||||
// Clone the object except the runData. That one is not supposed
|
// Clone the object except the runData. That one is not supposed
|
||||||
// to be send. Because that data got send piece by piece after
|
// to be send. Because that data got send piece by piece after
|
||||||
// each node which finished executing
|
// each node which finished executing
|
||||||
|
@ -87,6 +87,7 @@ export function pushExecutionFinished(fullRunData: IRun, executionIdActive: stri
|
||||||
executionIdActive,
|
executionIdActive,
|
||||||
executionIdDb,
|
executionIdDb,
|
||||||
data: pushRunData,
|
data: pushRunData,
|
||||||
|
retryOf,
|
||||||
};
|
};
|
||||||
|
|
||||||
pushInstance.send('executionFinished', sendData);
|
pushInstance.send('executionFinished', sendData);
|
||||||
|
@ -166,7 +167,7 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, execution
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mode === 'manual' && saveManualExecutions === false) {
|
if (mode === 'manual' && saveManualExecutions === false) {
|
||||||
pushExecutionFinished(fullRunData, executionId);
|
pushExecutionFinished(fullRunData, executionId, undefined, retryOf);
|
||||||
executeErrorWorkflow(workflowData, fullRunData, mode);
|
executeErrorWorkflow(workflowData, fullRunData, mode);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -183,7 +184,7 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, execution
|
||||||
if (workflowDidSucceed === true && saveDataSuccessExecution === 'none' ||
|
if (workflowDidSucceed === true && saveDataSuccessExecution === 'none' ||
|
||||||
workflowDidSucceed === false && saveDataErrorExecution === 'none'
|
workflowDidSucceed === false && saveDataErrorExecution === 'none'
|
||||||
) {
|
) {
|
||||||
pushExecutionFinished(fullRunData, executionId);
|
pushExecutionFinished(fullRunData, executionId, undefined, retryOf);
|
||||||
executeErrorWorkflow(workflowData, fullRunData, mode);
|
executeErrorWorkflow(workflowData, fullRunData, mode);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -216,10 +217,10 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, execution
|
||||||
await Db.collections.Execution!.update(retryOf, { retrySuccessId: executionResult.id });
|
await Db.collections.Execution!.update(retryOf, { retrySuccessId: executionResult.id });
|
||||||
}
|
}
|
||||||
|
|
||||||
pushExecutionFinished(fullRunData, executionId, executionResult.id as string);
|
pushExecutionFinished(fullRunData, executionId, executionResult.id as string, retryOf);
|
||||||
executeErrorWorkflow(workflowData, fullRunData, mode, executionResult ? executionResult.id as string : undefined);
|
executeErrorWorkflow(workflowData, fullRunData, mode, executionResult ? executionResult.id as string : undefined);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
pushExecutionFinished(fullRunData, executionId);
|
pushExecutionFinished(fullRunData, executionId, undefined, retryOf);
|
||||||
executeErrorWorkflow(workflowData, fullRunData, mode);
|
executeErrorWorkflow(workflowData, fullRunData, mode);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -46,13 +46,6 @@ export interface IExecuteSingleFunctions extends IExecuteSingleFunctionsBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
export interface IExecutionsCurrentSummary {
|
|
||||||
id: string;
|
|
||||||
startedAt: Date;
|
|
||||||
mode: WorkflowExecuteMode;
|
|
||||||
workflowId: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ITriggerFunctions extends ITriggerFunctionsBase {
|
export interface ITriggerFunctions extends ITriggerFunctionsBase {
|
||||||
helpers: {
|
helpers: {
|
||||||
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
|
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
|
||||||
|
|
|
@ -143,7 +143,7 @@ export interface IRestApi {
|
||||||
getCredentialTypes(): Promise<ICredentialType[]>;
|
getCredentialTypes(): Promise<ICredentialType[]>;
|
||||||
getExecution(id: string): Promise<IExecutionResponse>;
|
getExecution(id: string): Promise<IExecutionResponse>;
|
||||||
deleteExecutions(sendData: IExecutionDeleteFilter): Promise<void>;
|
deleteExecutions(sendData: IExecutionDeleteFilter): Promise<void>;
|
||||||
retryExecution(id: string): Promise<IExecutionResponse>;
|
retryExecution(id: string): Promise<boolean>;
|
||||||
getTimezones(): Promise<IDataObject>;
|
getTimezones(): Promise<IDataObject>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,6 +303,8 @@ export interface IExecutionsCurrentSummaryExtended {
|
||||||
idActive: string;
|
idActive: string;
|
||||||
finished?: boolean;
|
finished?: boolean;
|
||||||
mode: WorkflowExecuteMode;
|
mode: WorkflowExecuteMode;
|
||||||
|
retryOf?: string;
|
||||||
|
retrySuccessId?: string;
|
||||||
startedAt: Date;
|
startedAt: Date;
|
||||||
stoppedAt?: Date;
|
stoppedAt?: Date;
|
||||||
workflowId: string;
|
workflowId: string;
|
||||||
|
@ -356,6 +358,7 @@ export interface IPushDataExecutionFinished {
|
||||||
data: IRun;
|
data: IRun;
|
||||||
executionIdActive: string;
|
executionIdActive: string;
|
||||||
executionIdDb?: string;
|
executionIdDb?: string;
|
||||||
|
retryOf?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface IPushDataExecutionStarted {
|
export interface IPushDataExecutionStarted {
|
||||||
|
|
|
@ -390,9 +390,9 @@ export default mixins(
|
||||||
this.isDataLoading = true;
|
this.isDataLoading = true;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const data = await this.restApi().retryExecution(execution.id);
|
const retrySuccessful = await this.restApi().retryExecution(execution.id);
|
||||||
|
|
||||||
if (data.finished === true) {
|
if (retrySuccessful === true) {
|
||||||
this.$showMessage({
|
this.$showMessage({
|
||||||
title: 'Retry successful',
|
title: 'Retry successful',
|
||||||
message: 'The retry was successful!',
|
message: 'The retry was successful!',
|
||||||
|
@ -406,13 +406,11 @@ export default mixins(
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
this.refreshData();
|
|
||||||
this.isDataLoading = false;
|
this.isDataLoading = false;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.$showError(error, 'Problem with retry', 'There was a problem with the retry:');
|
this.$showError(error, 'Problem with retry', 'There was a problem with the retry:');
|
||||||
|
|
||||||
this.isDataLoading = false;
|
this.isDataLoading = false;
|
||||||
this.refreshData();
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
async refreshData () {
|
async refreshData () {
|
||||||
|
|
|
@ -160,7 +160,6 @@ export const pushConnection = mixins(
|
||||||
|
|
||||||
const runDataExecuted = pushData.data;
|
const runDataExecuted = pushData.data;
|
||||||
|
|
||||||
|
|
||||||
if (runDataExecuted.finished !== true) {
|
if (runDataExecuted.finished !== true) {
|
||||||
// There was a problem with executing the workflow
|
// There was a problem with executing the workflow
|
||||||
let errorMessage = 'There was a problem executing the workflow!';
|
let errorMessage = 'There was a problem executing the workflow!';
|
||||||
|
@ -202,6 +201,7 @@ export const pushConnection = mixins(
|
||||||
finished: false,
|
finished: false,
|
||||||
mode: pushData.mode,
|
mode: pushData.mode,
|
||||||
startedAt: pushData.startedAt,
|
startedAt: pushData.startedAt,
|
||||||
|
retryOf: pushData.retryOf,
|
||||||
workflowId: pushData.workflowId,
|
workflowId: pushData.workflowId,
|
||||||
workflowName: pushData.workflowName,
|
workflowName: pushData.workflowName,
|
||||||
};
|
};
|
||||||
|
|
|
@ -263,7 +263,7 @@ export const restApi = Vue.extend({
|
||||||
},
|
},
|
||||||
|
|
||||||
// Returns the execution with the given name
|
// Returns the execution with the given name
|
||||||
retryExecution: (id: string): Promise<IExecutionResponse> => {
|
retryExecution: (id: string): Promise<boolean> => {
|
||||||
return self.restApi().makeRestApiRequest('POST', `/executions/${id}/retry`);
|
return self.restApi().makeRestApiRequest('POST', `/executions/${id}/retry`);
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue