meshtastic-metrics-exporter/exporter/metric_cleanup_job.py

100 lines
3.7 KiB
Python

from datetime import datetime, timedelta
from typing import Any, Dict, Tuple
from apscheduler.schedulers.background import BackgroundScheduler
from prometheus_client import CollectorRegistry
class TrackedMetricsDict(dict):
"""A dictionary that tracks updates for metrics"""
def __init__(self, collector, metric_tracker, *args, **kwargs):
super().__init__(*args, **kwargs)
self._collector = collector
self._metric_tracker = metric_tracker
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._metric_tracker.update_metric_timestamp(self._collector, key)
class MetricTrackingRegistry(CollectorRegistry):
"""Extended CollectorRegistry that tracks metric updates"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metric_tracker = MetricCleanupJob(self)
self._metric_tracker.start()
def register(self, collector):
"""Override register to add update tracking to collectors"""
super().register(collector)
if hasattr(collector, '_metrics'):
# Replace the metrics dict with our tracking version
tracked_dict = TrackedMetricsDict(
collector,
self._metric_tracker,
collector._metrics
)
collector._metrics = tracked_dict
def __del__(self):
"""Ensure cleanup job is stopped when registry is destroyed"""
if hasattr(self, '_metric_tracker'):
self._metric_tracker.stop()
class MetricCleanupJob:
def __init__(self, registry: CollectorRegistry):
self.registry = registry
self.scheduler = BackgroundScheduler()
self.last_updates: Dict[Tuple[Any, Any], datetime] = {}
def start(self):
"""Start the cleanup job to run every hour"""
self.scheduler.add_job(
self.cleanup_stale_metrics,
'interval',
minutes=10,
next_run_time=datetime.now() + timedelta(minutes=1)
)
self.scheduler.start()
def stop(self):
"""Stop the cleanup job"""
self.scheduler.shutdown()
def update_metric_timestamp(self, collector, labels):
"""Update the last modification time for a metric"""
metric_key = (collector, labels)
self.last_updates[metric_key] = datetime.now()
def cleanup_stale_metrics(self):
"""Remove metric entries that haven't been updated in 24 hours"""
try:
current_time = datetime.now()
stale_threshold = current_time - timedelta(hours=24)
for collector, _ in list(self.registry._collector_to_names.items()):
if hasattr(collector, '_metrics'):
labels_to_remove = []
for labels, _ in list(collector._metrics.items()):
metric_key = (collector, labels)
last_update = self.last_updates.get(metric_key)
if last_update is None or last_update < stale_threshold:
labels_to_remove.append(labels)
for labels in labels_to_remove:
try:
del collector._metrics[labels]
metric_key = (collector, labels)
self.last_updates.pop(metric_key, None)
print(f"Removed stale metric entry with labels: {labels}")
except KeyError:
pass
except Exception as e:
print(f"Error removing metric entry: {e}")
except Exception as e:
print(f"Error during metric cleanup: {e}")