mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 04:47:29 -08:00
⚡ Do only send manual executions to starting session & cleanup
This commit is contained in:
parent
53693886df
commit
deaa015e61
|
@ -179,8 +179,7 @@ export interface IExecutionsStopData {
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface IExecutionsSummary {
|
export interface IExecutionsSummary {
|
||||||
id?: string; // executionIdDb
|
id: string;
|
||||||
idActive?: string; // executionIdActive
|
|
||||||
finished?: boolean;
|
finished?: boolean;
|
||||||
mode: WorkflowExecuteMode;
|
mode: WorkflowExecuteMode;
|
||||||
retryOf?: string;
|
retryOf?: string;
|
||||||
|
@ -327,8 +326,7 @@ export type IPushDataType = 'executionFinished' | 'executionStarted' | 'nodeExec
|
||||||
|
|
||||||
export interface IPushDataExecutionFinished {
|
export interface IPushDataExecutionFinished {
|
||||||
data: IRun;
|
data: IRun;
|
||||||
executionIdActive: string;
|
executionId: string;
|
||||||
executionIdDb?: string;
|
|
||||||
retryOf?: string;
|
retryOf?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1670,7 +1670,7 @@ class App {
|
||||||
|
|
||||||
return results.map(result => {
|
return results.map(result => {
|
||||||
return {
|
return {
|
||||||
idActive: result.id,
|
id: result.id,
|
||||||
workflowId: result.workflowId,
|
workflowId: result.workflowId,
|
||||||
mode: result.mode,
|
mode: result.mode,
|
||||||
retryOf: result.retryOf !== null ? result.retryOf : undefined,
|
retryOf: result.retryOf !== null ? result.retryOf : undefined,
|
||||||
|
@ -1693,7 +1693,7 @@ class App {
|
||||||
}
|
}
|
||||||
returnData.push(
|
returnData.push(
|
||||||
{
|
{
|
||||||
idActive: data.id.toString(),
|
id: data.id.toString(),
|
||||||
workflowId: data.workflowId === undefined ? '' : data.workflowId.toString(),
|
workflowId: data.workflowId === undefined ? '' : data.workflowId.toString(),
|
||||||
mode: data.mode,
|
mode: data.mode,
|
||||||
retryOf: data.retryOf,
|
retryOf: data.retryOf,
|
||||||
|
|
|
@ -117,42 +117,6 @@ function pruneExecutionData(): void {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pushes the execution out to all connected clients
|
|
||||||
*
|
|
||||||
* @param {WorkflowExecuteMode} mode The mode in which the workflow got started in
|
|
||||||
* @param {IRun} fullRunData The RunData of the finished execution
|
|
||||||
* @param {string} executionIdActive The id of the finished execution
|
|
||||||
* @param {string} [executionIdDb] The database id of finished execution
|
|
||||||
*/
|
|
||||||
export function pushExecutionFinished(mode: WorkflowExecuteMode, fullRunData: IRun, executionIdActive: string, executionIdDb?: string, retryOf?: string) {
|
|
||||||
// Clone the object except the runData. That one is not supposed
|
|
||||||
// to be send. Because that data got send piece by piece after
|
|
||||||
// each node which finished executing
|
|
||||||
const pushRunData = {
|
|
||||||
...fullRunData,
|
|
||||||
data: {
|
|
||||||
...fullRunData.data,
|
|
||||||
resultData: {
|
|
||||||
...fullRunData.data.resultData,
|
|
||||||
runData: {},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
// Push data to editor-ui once workflow finished
|
|
||||||
const sendData: IPushDataExecutionFinished = {
|
|
||||||
executionIdActive,
|
|
||||||
executionIdDb,
|
|
||||||
data: pushRunData,
|
|
||||||
retryOf,
|
|
||||||
};
|
|
||||||
|
|
||||||
const pushInstance = Push.getInstance();
|
|
||||||
pushInstance.send('executionFinished', sendData);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns hook functions to push data to Editor-UI
|
* Returns hook functions to push data to Editor-UI
|
||||||
*
|
*
|
||||||
|
@ -192,25 +156,52 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
|
||||||
],
|
],
|
||||||
workflowExecuteBefore: [
|
workflowExecuteBefore: [
|
||||||
async function (this: WorkflowHooks): Promise<void> {
|
async function (this: WorkflowHooks): Promise<void> {
|
||||||
// Push data to editor-ui once workflow finished
|
// Push data to session which started the workflow
|
||||||
if (this.mode === 'manual') {
|
if (this.sessionId === undefined) {
|
||||||
const pushInstance = Push.getInstance();
|
return;
|
||||||
pushInstance.send('executionStarted', {
|
|
||||||
executionId: this.executionId,
|
|
||||||
mode: this.mode,
|
|
||||||
startedAt: new Date(),
|
|
||||||
retryOf: this.retryOf,
|
|
||||||
workflowId: this.workflowData.id as string,
|
|
||||||
workflowName: this.workflowData.name,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
const pushInstance = Push.getInstance();
|
||||||
|
pushInstance.send('executionStarted', {
|
||||||
|
executionId: this.executionId,
|
||||||
|
mode: this.mode,
|
||||||
|
startedAt: new Date(),
|
||||||
|
retryOf: this.retryOf,
|
||||||
|
workflowId: this.workflowData.id as string,
|
||||||
|
workflowName: this.workflowData.name,
|
||||||
|
}, this.sessionId);
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
workflowExecuteAfter: [
|
workflowExecuteAfter: [
|
||||||
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
|
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
|
||||||
if (this.mode === 'manual') {
|
// Push data to session which started the workflow
|
||||||
pushExecutionFinished(this.mode, fullRunData, this.executionId, undefined, this.retryOf);
|
if (this.sessionId === undefined) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clone the object except the runData. That one is not supposed
|
||||||
|
// to be send. Because that data got send piece by piece after
|
||||||
|
// each node which finished executing
|
||||||
|
const pushRunData = {
|
||||||
|
...fullRunData,
|
||||||
|
data: {
|
||||||
|
...fullRunData.data,
|
||||||
|
resultData: {
|
||||||
|
...fullRunData.data.resultData,
|
||||||
|
runData: {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// Push data to editor-ui once workflow finished
|
||||||
|
// TODO: Look at this again
|
||||||
|
const sendData: IPushDataExecutionFinished = {
|
||||||
|
executionId: this.executionId,
|
||||||
|
data: pushRunData,
|
||||||
|
retryOf: this.retryOf,
|
||||||
|
};
|
||||||
|
|
||||||
|
const pushInstance = Push.getInstance();
|
||||||
|
pushInstance.send('executionFinished', sendData, this.sessionId);
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
};
|
};
|
||||||
|
@ -243,7 +234,7 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx
|
||||||
if (execution === undefined) {
|
if (execution === undefined) {
|
||||||
// Something went badly wrong if this happens.
|
// Something went badly wrong if this happens.
|
||||||
// This check is here mostly to make typescript happy.
|
// This check is here mostly to make typescript happy.
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution);
|
const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution);
|
||||||
|
|
||||||
|
@ -282,7 +273,7 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx
|
||||||
|
|
||||||
// Set last executed node so that it may resume on failure
|
// Set last executed node so that it may resume on failure
|
||||||
fullExecutionData.data.resultData.lastNodeExecuted = nodeName;
|
fullExecutionData.data.resultData.lastNodeExecuted = nodeName;
|
||||||
|
|
||||||
const flattenedExecutionData = ResponseHelper.flattenExecutionData(fullExecutionData);
|
const flattenedExecutionData = ResponseHelper.flattenExecutionData(fullExecutionData);
|
||||||
|
|
||||||
await Db.collections.Execution!.update(this.executionId, flattenedExecutionData as IExecutionFlattedDb);
|
await Db.collections.Execution!.update(this.executionId, flattenedExecutionData as IExecutionFlattedDb);
|
||||||
|
|
|
@ -101,9 +101,6 @@ export class WorkflowRunner {
|
||||||
// Remove from active execution with empty data. That will
|
// Remove from active execution with empty data. That will
|
||||||
// set the execution to failed.
|
// set the execution to failed.
|
||||||
this.activeExecutions.remove(executionId, fullRunData);
|
this.activeExecutions.remove(executionId, fullRunData);
|
||||||
|
|
||||||
// Also send to Editor UI
|
|
||||||
WorkflowExecuteAdditionalData.pushExecutionFinished(executionMode, fullRunData, executionId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -175,7 +172,7 @@ export class WorkflowRunner {
|
||||||
workflowExecution = workflowExecute.processRunExecutionData(workflow);
|
workflowExecution = workflowExecute.processRunExecutionData(workflow);
|
||||||
} else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) {
|
} else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) {
|
||||||
// Execute all nodes
|
// Execute all nodes
|
||||||
|
|
||||||
// Can execute without webhook so go on
|
// Can execute without webhook so go on
|
||||||
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
|
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
|
||||||
workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode);
|
workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode);
|
||||||
|
@ -298,7 +295,7 @@ export class WorkflowRunner {
|
||||||
}, queueRecoveryInterval * 1000);
|
}, queueRecoveryInterval * 1000);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
const clearWatchdogInterval = () => {
|
const clearWatchdogInterval = () => {
|
||||||
if (watchDogInterval) {
|
if (watchDogInterval) {
|
||||||
clearInterval(watchDogInterval);
|
clearInterval(watchDogInterval);
|
||||||
|
@ -332,7 +329,7 @@ export class WorkflowRunner {
|
||||||
await jobData;
|
await jobData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
const executionDb = await Db.collections.Execution!.findOne(executionId) as IExecutionFlattedDb;
|
const executionDb = await Db.collections.Execution!.findOne(executionId) as IExecutionFlattedDb;
|
||||||
const fullExecutionData = ResponseHelper.unflattenExecutionData(executionDb) as IExecutionResponse;
|
const fullExecutionData = ResponseHelper.unflattenExecutionData(executionDb) as IExecutionResponse;
|
||||||
|
|
|
@ -314,8 +314,7 @@ export interface IExecutionsListResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface IExecutionsCurrentSummaryExtended {
|
export interface IExecutionsCurrentSummaryExtended {
|
||||||
id?: string;
|
id: string;
|
||||||
idActive: string;
|
|
||||||
finished?: boolean;
|
finished?: boolean;
|
||||||
mode: WorkflowExecuteMode;
|
mode: WorkflowExecuteMode;
|
||||||
retryOf?: string;
|
retryOf?: string;
|
||||||
|
@ -334,8 +333,7 @@ export interface IExecutionsStopData {
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface IExecutionsSummary {
|
export interface IExecutionsSummary {
|
||||||
id?: string; // executionIdDb
|
id: string;
|
||||||
idActive?: string; // executionIdActive
|
|
||||||
mode: WorkflowExecuteMode;
|
mode: WorkflowExecuteMode;
|
||||||
finished?: boolean;
|
finished?: boolean;
|
||||||
retryOf?: string;
|
retryOf?: string;
|
||||||
|
@ -370,8 +368,7 @@ export interface IPushDataExecutionStarted {
|
||||||
|
|
||||||
export interface IPushDataExecutionFinished {
|
export interface IPushDataExecutionFinished {
|
||||||
data: IRun;
|
data: IRun;
|
||||||
executionIdActive: string;
|
executionId: string;
|
||||||
executionIdDb?: string;
|
|
||||||
retryOf?: string;
|
retryOf?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,6 @@
|
||||||
<template slot-scope="scope">
|
<template slot-scope="scope">
|
||||||
{{convertToDisplayDate(scope.row.startedAt)}}<br />
|
{{convertToDisplayDate(scope.row.startedAt)}}<br />
|
||||||
<small v-if="scope.row.id">ID: {{scope.row.id}}</small>
|
<small v-if="scope.row.id">ID: {{scope.row.id}}</small>
|
||||||
<small v-if="scope.row.idActive && scope.row.id === undefined && scope.row.stoppedAt === undefined">Active-ID: {{scope.row.idActive}}</small>
|
|
||||||
</template>
|
</template>
|
||||||
</el-table-column>
|
</el-table-column>
|
||||||
<el-table-column property="workflowName" label="Name">
|
<el-table-column property="workflowName" label="Name">
|
||||||
|
@ -126,8 +125,8 @@
|
||||||
</el-table-column>
|
</el-table-column>
|
||||||
<el-table-column label="" width="100" align="center">
|
<el-table-column label="" width="100" align="center">
|
||||||
<template slot-scope="scope">
|
<template slot-scope="scope">
|
||||||
<span v-if="scope.row.stoppedAt === undefined && scope.row.idActive">
|
<span v-if="scope.row.stoppedAt === undefined">
|
||||||
<el-button circle title="Stop Execution" @click.stop="stopExecution(scope.row.idActive)" :loading="stoppingExecutions.includes(scope.row.idActive)" size="mini">
|
<el-button circle title="Stop Execution" @click.stop="stopExecution(scope.row.id)" :loading="stoppingExecutions.includes(scope.row.id)" size="mini">
|
||||||
<font-awesome-icon icon="stop" />
|
<font-awesome-icon icon="stop" />
|
||||||
</el-button>
|
</el-button>
|
||||||
</span>
|
</span>
|
||||||
|
|
|
@ -191,7 +191,7 @@ export const pushConnection = mixins(
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.$store.getters.activeExecutionId !== pushData.executionIdActive) {
|
if (this.$store.getters.activeExecutionId !== pushData.executionId) {
|
||||||
// The workflow which did finish execution did either not get started
|
// The workflow which did finish execution did either not get started
|
||||||
// by this session or we do not have the execution id yet.
|
// by this session or we do not have the execution id yet.
|
||||||
if (isRetry !== true) {
|
if (isRetry !== true) {
|
||||||
|
@ -242,7 +242,7 @@ export const pushConnection = mixins(
|
||||||
const pushData = receivedData.data as IPushDataExecutionStarted;
|
const pushData = receivedData.data as IPushDataExecutionStarted;
|
||||||
|
|
||||||
const executionData: IExecutionsCurrentSummaryExtended = {
|
const executionData: IExecutionsCurrentSummaryExtended = {
|
||||||
idActive: pushData.executionId,
|
id: pushData.executionId,
|
||||||
finished: false,
|
finished: false,
|
||||||
mode: pushData.mode,
|
mode: pushData.mode,
|
||||||
startedAt: pushData.startedAt,
|
startedAt: pushData.startedAt,
|
||||||
|
|
|
@ -99,7 +99,7 @@ export const store = new Vuex.Store({
|
||||||
addActiveExecution (state, newActiveExecution: IExecutionsCurrentSummaryExtended) {
|
addActiveExecution (state, newActiveExecution: IExecutionsCurrentSummaryExtended) {
|
||||||
// Check if the execution exists already
|
// Check if the execution exists already
|
||||||
const activeExecution = state.activeExecutions.find(execution => {
|
const activeExecution = state.activeExecutions.find(execution => {
|
||||||
return execution.idActive === newActiveExecution.idActive;
|
return execution.id === newActiveExecution.id;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (activeExecution !== undefined) {
|
if (activeExecution !== undefined) {
|
||||||
|
@ -115,7 +115,7 @@ export const store = new Vuex.Store({
|
||||||
finishActiveExecution (state, finishedActiveExecution: IPushDataExecutionFinished) {
|
finishActiveExecution (state, finishedActiveExecution: IPushDataExecutionFinished) {
|
||||||
// Find the execution to set to finished
|
// Find the execution to set to finished
|
||||||
const activeExecution = state.activeExecutions.find(execution => {
|
const activeExecution = state.activeExecutions.find(execution => {
|
||||||
return execution.idActive === finishedActiveExecution.executionIdActive;
|
return execution.id === finishedActiveExecution.executionId;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (activeExecution === undefined) {
|
if (activeExecution === undefined) {
|
||||||
|
@ -123,8 +123,8 @@ export const store = new Vuex.Store({
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (finishedActiveExecution.executionIdDb !== undefined) {
|
if (finishedActiveExecution.executionId !== undefined) {
|
||||||
Vue.set(activeExecution, 'id', finishedActiveExecution.executionIdDb);
|
Vue.set(activeExecution, 'id', finishedActiveExecution.executionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
Vue.set(activeExecution, 'finished', finishedActiveExecution.data.finished);
|
Vue.set(activeExecution, 'finished', finishedActiveExecution.data.finished);
|
||||||
|
|
|
@ -742,8 +742,7 @@ export default mixins(
|
||||||
} as IRun;
|
} as IRun;
|
||||||
const pushData = {
|
const pushData = {
|
||||||
data: executedData,
|
data: executedData,
|
||||||
executionIdActive: executionId,
|
executionId: executionId,
|
||||||
executionIdDb: executionId,
|
|
||||||
retryOf: execution.retryOf,
|
retryOf: execution.retryOf,
|
||||||
} as IPushDataExecutionFinished;
|
} as IPushDataExecutionFinished;
|
||||||
this.$store.commit('finishActiveExecution', pushData);
|
this.$store.commit('finishActiveExecution', pushData);
|
||||||
|
@ -759,8 +758,6 @@ export default mixins(
|
||||||
} else {
|
} else {
|
||||||
this.$showError(error, 'Problem stopping execution', 'There was a problem stopping the execuction:');
|
this.$showError(error, 'Problem stopping execution', 'There was a problem stopping the execuction:');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
this.stopExecutionInProgress = false;
|
this.stopExecutionInProgress = false;
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in a new issue