Added simple cleaning to verify if the metrics are stale

This commit is contained in:
Gleb Tcivie 2025-02-01 12:35:57 +02:00
parent 0281d0b990
commit 3fb222de43
2 changed files with 63 additions and 16 deletions

View file

@ -1,21 +1,61 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Dict, Tuple
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from prometheus_client import CollectorRegistry from prometheus_client import CollectorRegistry
class TrackedMetricsDict(dict):
"""A dictionary that tracks updates for metrics"""
def __init__(self, collector, metric_tracker, *args, **kwargs):
super().__init__(*args, **kwargs)
self._collector = collector
self._metric_tracker = metric_tracker
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._metric_tracker.update_metric_timestamp(self._collector, key)
class MetricTrackingRegistry(CollectorRegistry):
"""Extended CollectorRegistry that tracks metric updates"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metric_tracker = MetricCleanupJob(self)
self._metric_tracker.start()
def register(self, collector):
"""Override register to add update tracking to collectors"""
super().register(collector)
if hasattr(collector, '_metrics'):
# Replace the metrics dict with our tracking version
tracked_dict = TrackedMetricsDict(
collector,
self._metric_tracker,
collector._metrics
)
collector._metrics = tracked_dict
def __del__(self):
"""Ensure cleanup job is stopped when registry is destroyed"""
if hasattr(self, '_metric_tracker'):
self._metric_tracker.stop()
class MetricCleanupJob: class MetricCleanupJob:
def __init__(self, registry: CollectorRegistry): def __init__(self, registry: CollectorRegistry):
self.registry = registry self.registry = registry
self.scheduler = BackgroundScheduler() self.scheduler = BackgroundScheduler()
self.last_updates: Dict[Tuple[Any, Any], datetime] = {}
def start(self): def start(self):
"""Start the cleanup job to run every hour""" """Start the cleanup job to run every hour"""
self.scheduler.add_job( self.scheduler.add_job(
self.cleanup_unknown_metrics, self.cleanup_stale_metrics,
'interval', 'interval',
minutes=10, minutes=10,
next_run_time=datetime.now() + timedelta(minutes=1) # First run after 1 minute next_run_time=datetime.now() + timedelta(minutes=1)
) )
self.scheduler.start() self.scheduler.start()
@ -23,28 +63,38 @@ class MetricCleanupJob:
"""Stop the cleanup job""" """Stop the cleanup job"""
self.scheduler.shutdown() self.scheduler.shutdown()
def cleanup_unknown_metrics(self): def update_metric_timestamp(self, collector, labels):
"""Remove metric entries with mostly 'Unknown' values from the metrics dictionary""" """Update the last modification time for a metric"""
metric_key = (collector, labels)
self.last_updates[metric_key] = datetime.now()
def cleanup_stale_metrics(self):
"""Remove metric entries that haven't been updated in 24 hours"""
try: try:
current_time = datetime.now()
stale_threshold = current_time - timedelta(hours=24)
for collector, _ in list(self.registry._collector_to_names.items()): for collector, _ in list(self.registry._collector_to_names.items()):
if hasattr(collector, '_metrics'): if hasattr(collector, '_metrics'):
labels_to_remove = [] labels_to_remove = []
# Identify label combinations to remove
for labels, _ in list(collector._metrics.items()): for labels, _ in list(collector._metrics.items()):
unknown_count = sum(1 for label in labels if 'Unknown' in str(label)) metric_key = (collector, labels)
if unknown_count >= 1: # Threshold for "Unknown" values last_update = self.last_updates.get(metric_key)
if last_update is None or last_update < stale_threshold:
labels_to_remove.append(labels) labels_to_remove.append(labels)
# Remove identified label combinations
for labels in labels_to_remove: for labels in labels_to_remove:
try: try:
del collector._metrics[labels] del collector._metrics[labels]
print(f"Removed metric entry with labels: {labels}") metric_key = (collector, labels)
self.last_updates.pop(metric_key, None)
print(f"Removed stale metric entry with labels: {labels}")
except KeyError: except KeyError:
pass pass
except Exception as e: except Exception as e:
print(f"Error removing metric entry: {e}") print(f"Error removing metric entry: {e}")
except Exception as e: except Exception as e:
print(f"Error during metric cleanup: {e}") print(f"Error during metric cleanup: {e}")

View file

@ -7,7 +7,7 @@ from dotenv import load_dotenv
from constants import callback_api_version_map, protocol_map from constants import callback_api_version_map, protocol_map
from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics
from exporter.metric_cleanup_job import MetricCleanupJob from exporter.metric_cleanup_job import MetricTrackingRegistry
try: try:
from meshtastic.mesh_pb2 import MeshPacket from meshtastic.mesh_pb2 import MeshPacket
@ -16,7 +16,7 @@ except ImportError:
from meshtastic.protobuf.mesh_pb2 import MeshPacket from meshtastic.protobuf.mesh_pb2 import MeshPacket
from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope
from prometheus_client import CollectorRegistry, start_http_server from prometheus_client import start_http_server
from psycopg_pool import ConnectionPool from psycopg_pool import ConnectionPool
connection_pool = None connection_pool = None
@ -104,7 +104,7 @@ if __name__ == "__main__":
node_conf_metrics = NodeConfigurationMetrics(connection_pool) node_conf_metrics = NodeConfigurationMetrics(connection_pool)
# Configure Prometheus exporter # Configure Prometheus exporter
registry = CollectorRegistry() registry = MetricTrackingRegistry()
start_http_server(int(os.getenv('PROMETHEUS_COLLECTOR_PORT', 9464)), registry=registry) start_http_server(int(os.getenv('PROMETHEUS_COLLECTOR_PORT', 9464)), registry=registry)
# Create an MQTT client # Create an MQTT client
@ -137,7 +137,4 @@ if __name__ == "__main__":
# Configure the Processor and the Exporter # Configure the Processor and the Exporter
processor = MessageProcessor(registry, connection_pool) processor = MessageProcessor(registry, connection_pool)
cleanup_job = MetricCleanupJob(registry)
cleanup_job.start()
mqtt_client.loop_forever() mqtt_client.loop_forever()