From 3fb222de437e272bdf787605f93f298af3e0125b Mon Sep 17 00:00:00 2001 From: Gleb Tcivie Date: Sat, 1 Feb 2025 12:35:57 +0200 Subject: [PATCH] Added simple cleaning to verify if the metrics are stale --- exporter/metric_cleanup_job.py | 70 +++++++++++++++++++++++++++++----- main.py | 9 ++--- 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/exporter/metric_cleanup_job.py b/exporter/metric_cleanup_job.py index 3345f8f..0d51f89 100644 --- a/exporter/metric_cleanup_job.py +++ b/exporter/metric_cleanup_job.py @@ -1,21 +1,61 @@ from datetime import datetime, timedelta +from typing import Any, Dict, Tuple from apscheduler.schedulers.background import BackgroundScheduler from prometheus_client import CollectorRegistry +class TrackedMetricsDict(dict): + """A dictionary that tracks updates for metrics""" + + def __init__(self, collector, metric_tracker, *args, **kwargs): + super().__init__(*args, **kwargs) + self._collector = collector + self._metric_tracker = metric_tracker + + def __setitem__(self, key, value): + super().__setitem__(key, value) + self._metric_tracker.update_metric_timestamp(self._collector, key) + + +class MetricTrackingRegistry(CollectorRegistry): + """Extended CollectorRegistry that tracks metric updates""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._metric_tracker = MetricCleanupJob(self) + self._metric_tracker.start() + + def register(self, collector): + """Override register to add update tracking to collectors""" + super().register(collector) + if hasattr(collector, '_metrics'): + # Replace the metrics dict with our tracking version + tracked_dict = TrackedMetricsDict( + collector, + self._metric_tracker, + collector._metrics + ) + collector._metrics = tracked_dict + + def __del__(self): + """Ensure cleanup job is stopped when registry is destroyed""" + if hasattr(self, '_metric_tracker'): + self._metric_tracker.stop() + class MetricCleanupJob: def __init__(self, registry: CollectorRegistry): self.registry = registry self.scheduler = BackgroundScheduler() + self.last_updates: Dict[Tuple[Any, Any], datetime] = {} def start(self): """Start the cleanup job to run every hour""" self.scheduler.add_job( - self.cleanup_unknown_metrics, + self.cleanup_stale_metrics, 'interval', minutes=10, - next_run_time=datetime.now() + timedelta(minutes=1) # First run after 1 minute + next_run_time=datetime.now() + timedelta(minutes=1) ) self.scheduler.start() @@ -23,28 +63,38 @@ class MetricCleanupJob: """Stop the cleanup job""" self.scheduler.shutdown() - def cleanup_unknown_metrics(self): - """Remove metric entries with mostly 'Unknown' values from the metrics dictionary""" + def update_metric_timestamp(self, collector, labels): + """Update the last modification time for a metric""" + metric_key = (collector, labels) + self.last_updates[metric_key] = datetime.now() + + def cleanup_stale_metrics(self): + """Remove metric entries that haven't been updated in 24 hours""" try: + current_time = datetime.now() + stale_threshold = current_time - timedelta(hours=24) + for collector, _ in list(self.registry._collector_to_names.items()): if hasattr(collector, '_metrics'): labels_to_remove = [] - # Identify label combinations to remove for labels, _ in list(collector._metrics.items()): - unknown_count = sum(1 for label in labels if 'Unknown' in str(label)) - if unknown_count >= 1: # Threshold for "Unknown" values + metric_key = (collector, labels) + last_update = self.last_updates.get(metric_key) + + if last_update is None or last_update < stale_threshold: labels_to_remove.append(labels) - # Remove identified label combinations for labels in labels_to_remove: try: del collector._metrics[labels] - print(f"Removed metric entry with labels: {labels}") + metric_key = (collector, labels) + self.last_updates.pop(metric_key, None) + print(f"Removed stale metric entry with labels: {labels}") except KeyError: pass except Exception as e: print(f"Error removing metric entry: {e}") except Exception as e: - print(f"Error during metric cleanup: {e}") + print(f"Error during metric cleanup: {e}") \ No newline at end of file diff --git a/main.py b/main.py index 2025c1e..4fde658 100644 --- a/main.py +++ b/main.py @@ -7,7 +7,7 @@ from dotenv import load_dotenv from constants import callback_api_version_map, protocol_map from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics -from exporter.metric_cleanup_job import MetricCleanupJob +from exporter.metric_cleanup_job import MetricTrackingRegistry try: from meshtastic.mesh_pb2 import MeshPacket @@ -16,7 +16,7 @@ except ImportError: from meshtastic.protobuf.mesh_pb2 import MeshPacket from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope -from prometheus_client import CollectorRegistry, start_http_server +from prometheus_client import start_http_server from psycopg_pool import ConnectionPool connection_pool = None @@ -104,7 +104,7 @@ if __name__ == "__main__": node_conf_metrics = NodeConfigurationMetrics(connection_pool) # Configure Prometheus exporter - registry = CollectorRegistry() + registry = MetricTrackingRegistry() start_http_server(int(os.getenv('PROMETHEUS_COLLECTOR_PORT', 9464)), registry=registry) # Create an MQTT client @@ -137,7 +137,4 @@ if __name__ == "__main__": # Configure the Processor and the Exporter processor = MessageProcessor(registry, connection_pool) - cleanup_job = MetricCleanupJob(registry) - cleanup_job.start() - mqtt_client.loop_forever()