fix(core): Mark binary data to be deleted when pruning executions (#4713)

* Mark binary data to be deleted when pruning executions

* eslint

* make pruneExecutionData async

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Ahsan Virani 2022-11-25 10:48:02 +01:00 committed by GitHub
parent 95b97078e8
commit 78c66f16d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 22 deletions

View file

@ -193,7 +193,7 @@ export function executeErrorWorkflow(
* *
*/ */
let throttling = false; let throttling = false;
function pruneExecutionData(this: WorkflowHooks): void { async function pruneExecutionData(this: WorkflowHooks): Promise<void> {
if (!throttling) { if (!throttling) {
Logger.verbose('Pruning execution data from database'); Logger.verbose('Pruning execution data from database');
@ -207,27 +207,31 @@ function pruneExecutionData(this: WorkflowHooks): void {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const utcDate = DateUtils.mixedDateToUtcDatetimeString(date); const utcDate = DateUtils.mixedDateToUtcDatetimeString(date);
// throttle just on success to allow for self healing on failure try {
Db.collections.Execution.delete({ stoppedAt: LessThanOrEqual(utcDate) }) const executions = await Db.collections.Execution.find({
.then((data) => stoppedAt: LessThanOrEqual(utcDate),
setTimeout(() => {
throttling = false;
}, timeout * 1000),
)
.catch((error) => {
ErrorReporter.error(error);
throttling = false;
Logger.error(
`Failed pruning execution data from database for execution ID ${this.executionId} (hookFunctionsSave)`,
{
...error,
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
},
);
}); });
await Db.collections.Execution.delete({ stoppedAt: LessThanOrEqual(utcDate) });
setTimeout(() => {
throttling = false;
}, timeout * 1000);
// Mark binary data for deletion for all executions
await BinaryDataManager.getInstance().markDataForDeletionByExecutionIds(
executions.map(({ id }) => id.toString()),
);
} catch (error) {
ErrorReporter.error(error);
throttling = false;
Logger.error(
`Failed pruning execution data from database for execution ID ${this.executionId} (hookFunctionsSave)`,
{
...error,
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
},
);
}
} }
} }
@ -491,7 +495,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
// Prune old execution data // Prune old execution data
if (config.getEnv('executions.pruneData')) { if (config.getEnv('executions.pruneData')) {
pruneExecutionData.call(this); await pruneExecutionData.call(this);
} }
const isManualMode = [this.mode, parentProcessMode].includes('manual'); const isManualMode = [this.mode, parentProcessMode].includes('manual');

View file

@ -144,6 +144,18 @@ export class BinaryDataManager {
return Promise.resolve(); return Promise.resolve();
} }
async markDataForDeletionByExecutionIds(executionIds: string[]): Promise<void> {
if (this.managers[this.binaryDataMode]) {
return Promise.all(
executionIds.map(async (id) =>
this.managers[this.binaryDataMode].markDataForDeletionByExecutionId(id),
),
).then(() => {});
}
return Promise.resolve();
}
async persistBinaryDataForExecutionId(executionId: string): Promise<void> { async persistBinaryDataForExecutionId(executionId: string): Promise<void> {
if (this.managers[this.binaryDataMode]) { if (this.managers[this.binaryDataMode]) {
return this.managers[this.binaryDataMode].persistBinaryDataForExecutionId(executionId); return this.managers[this.binaryDataMode].persistBinaryDataForExecutionId(executionId);