mirror of
https://github.com/tcivie/meshtastic-metrics-exporter.git
synced 2025-03-05 20:52:02 -08:00
100 lines
3.7 KiB
Python
100 lines
3.7 KiB
Python
from datetime import datetime, timedelta
|
|
from typing import Any, Dict, Tuple
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from prometheus_client import CollectorRegistry
|
|
|
|
|
|
class TrackedMetricsDict(dict):
|
|
"""A dictionary that tracks updates for metrics"""
|
|
|
|
def __init__(self, collector, metric_tracker, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._collector = collector
|
|
self._metric_tracker = metric_tracker
|
|
|
|
def __setitem__(self, key, value):
|
|
super().__setitem__(key, value)
|
|
self._metric_tracker.update_metric_timestamp(self._collector, key)
|
|
|
|
|
|
class MetricTrackingRegistry(CollectorRegistry):
|
|
"""Extended CollectorRegistry that tracks metric updates"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._metric_tracker = MetricCleanupJob(self)
|
|
self._metric_tracker.start()
|
|
|
|
def register(self, collector):
|
|
"""Override register to add update tracking to collectors"""
|
|
super().register(collector)
|
|
if hasattr(collector, '_metrics'):
|
|
# Replace the metrics dict with our tracking version
|
|
tracked_dict = TrackedMetricsDict(
|
|
collector,
|
|
self._metric_tracker,
|
|
collector._metrics
|
|
)
|
|
collector._metrics = tracked_dict
|
|
|
|
def __del__(self):
|
|
"""Ensure cleanup job is stopped when registry is destroyed"""
|
|
if hasattr(self, '_metric_tracker'):
|
|
self._metric_tracker.stop()
|
|
|
|
class MetricCleanupJob:
|
|
def __init__(self, registry: CollectorRegistry):
|
|
self.registry = registry
|
|
self.scheduler = BackgroundScheduler()
|
|
self.last_updates: Dict[Tuple[Any, Any], datetime] = {}
|
|
|
|
def start(self):
|
|
"""Start the cleanup job to run every hour"""
|
|
self.scheduler.add_job(
|
|
self.cleanup_stale_metrics,
|
|
'interval',
|
|
minutes=10,
|
|
next_run_time=datetime.now() + timedelta(minutes=1)
|
|
)
|
|
self.scheduler.start()
|
|
|
|
def stop(self):
|
|
"""Stop the cleanup job"""
|
|
self.scheduler.shutdown()
|
|
|
|
def update_metric_timestamp(self, collector, labels):
|
|
"""Update the last modification time for a metric"""
|
|
metric_key = (collector, labels)
|
|
self.last_updates[metric_key] = datetime.now()
|
|
|
|
def cleanup_stale_metrics(self):
|
|
"""Remove metric entries that haven't been updated in 24 hours"""
|
|
try:
|
|
current_time = datetime.now()
|
|
stale_threshold = current_time - timedelta(hours=24)
|
|
|
|
for collector, _ in list(self.registry._collector_to_names.items()):
|
|
if hasattr(collector, '_metrics'):
|
|
labels_to_remove = []
|
|
|
|
for labels, _ in list(collector._metrics.items()):
|
|
metric_key = (collector, labels)
|
|
last_update = self.last_updates.get(metric_key)
|
|
|
|
if last_update is None or last_update < stale_threshold:
|
|
labels_to_remove.append(labels)
|
|
|
|
for labels in labels_to_remove:
|
|
try:
|
|
del collector._metrics[labels]
|
|
metric_key = (collector, labels)
|
|
self.last_updates.pop(metric_key, None)
|
|
print(f"Removed stale metric entry with labels: {labels}")
|
|
except KeyError:
|
|
pass
|
|
except Exception as e:
|
|
print(f"Error removing metric entry: {e}")
|
|
|
|
except Exception as e:
|
|
print(f"Error during metric cleanup: {e}") |