diff --git a/.env b/.env index 88a1952..da3c9e5 100644 --- a/.env +++ b/.env @@ -5,7 +5,7 @@ DATABASE_URL=postgres://postgres:postgres@postgres:5432/meshtastic # Prometheus connection details PROMETHEUS_COLLECTOR_PORT=9464 -PROMETHEUS_JOB=example +PROMETHEUS_JOB=meshtastic # MQTT connection details MQTT_HOST=mqtt.meshtastic.org diff --git a/exporter/metric_cleanup_job.py b/exporter/metric_cleanup_job.py new file mode 100644 index 0000000..3345f8f --- /dev/null +++ b/exporter/metric_cleanup_job.py @@ -0,0 +1,50 @@ +from datetime import datetime, timedelta + +from apscheduler.schedulers.background import BackgroundScheduler +from prometheus_client import CollectorRegistry + + +class MetricCleanupJob: + def __init__(self, registry: CollectorRegistry): + self.registry = registry + self.scheduler = BackgroundScheduler() + + def start(self): + """Start the cleanup job to run every hour""" + self.scheduler.add_job( + self.cleanup_unknown_metrics, + 'interval', + minutes=10, + next_run_time=datetime.now() + timedelta(minutes=1) # First run after 1 minute + ) + self.scheduler.start() + + def stop(self): + """Stop the cleanup job""" + self.scheduler.shutdown() + + def cleanup_unknown_metrics(self): + """Remove metric entries with mostly 'Unknown' values from the metrics dictionary""" + try: + for collector, _ in list(self.registry._collector_to_names.items()): + if hasattr(collector, '_metrics'): + labels_to_remove = [] + + # Identify label combinations to remove + for labels, _ in list(collector._metrics.items()): + unknown_count = sum(1 for label in labels if 'Unknown' in str(label)) + if unknown_count >= 1: # Threshold for "Unknown" values + labels_to_remove.append(labels) + + # Remove identified label combinations + for labels in labels_to_remove: + try: + del collector._metrics[labels] + print(f"Removed metric entry with labels: {labels}") + except KeyError: + pass + except Exception as e: + print(f"Error removing metric entry: {e}") + + except Exception as e: + print(f"Error during metric cleanup: {e}") diff --git a/exporter/processor/processor_base.py b/exporter/processor/processor_base.py index c47a4d8..7148f8f 100644 --- a/exporter/processor/processor_base.py +++ b/exporter/processor/processor_base.py @@ -149,14 +149,15 @@ class MessageProcessor: def process_json_mqtt(message): topic = message.topic json_packet = json.loads(message.payload) - if json_packet['sender'][0] == '!': - gateway_node_id = str(int(json_packet['sender'][1:], 16)) - NodeConfigurationMetrics().process_mqtt_update( - node_id=gateway_node_id, - mqtt_json_enabled=True, - mqtt_encryption_enabled=json_packet.get('encrypted', False), - mqtt_configured_root_topic=topic - ) + if 'sender' in json_packet: + if json_packet['sender'][0] == '!': + gateway_node_id = str(int(json_packet['sender'][1:], 16)) + NodeConfigurationMetrics().process_mqtt_update( + node_id=gateway_node_id, + mqtt_json_enabled=True, + mqtt_encryption_enabled=json_packet.get('encrypted', False), + mqtt_configured_root_topic=topic + ) @staticmethod def process_mqtt(topic: str, service_envelope: ServiceEnvelope, mesh_packet: MeshPacket): diff --git a/main.py b/main.py index e2efd7b..2025c1e 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ from dotenv import load_dotenv from constants import callback_api_version_map, protocol_map from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics +from exporter.metric_cleanup_job import MetricCleanupJob try: from meshtastic.mesh_pb2 import MeshPacket @@ -136,4 +137,7 @@ if __name__ == "__main__": # Configure the Processor and the Exporter processor = MessageProcessor(registry, connection_pool) + cleanup_job = MetricCleanupJob(registry) + cleanup_job.start() + mqtt_client.loop_forever() diff --git a/requirements.txt b/requirements.txt index c19a5e2..2b461e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,6 @@ psycopg~=3.1.19 psycopg_pool~=3.2.2 meshtastic~=2.3.13 psycopg-binary~=3.1.20 -geopy>=2.4.1 \ No newline at end of file +geopy>=2.4.1 +psycopg-pool>=3.2.2 +APScheduler>=3.10.4 \ No newline at end of file