Added cleanup job for metrics that are not needed anymore
This commit is contained in:
parent
1fc4096772
commit
7b32cb57da
2
.env
2
.env
|
@ -5,7 +5,7 @@ DATABASE_URL=postgres://postgres:postgres@postgres:5432/meshtastic
|
||||||
|
|
||||||
# Prometheus connection details
|
# Prometheus connection details
|
||||||
PROMETHEUS_COLLECTOR_PORT=9464
|
PROMETHEUS_COLLECTOR_PORT=9464
|
||||||
PROMETHEUS_JOB=example
|
PROMETHEUS_JOB=meshtastic
|
||||||
|
|
||||||
# MQTT connection details
|
# MQTT connection details
|
||||||
MQTT_HOST=mqtt.meshtastic.org
|
MQTT_HOST=mqtt.meshtastic.org
|
||||||
|
|
50
exporter/metric_cleanup_job.py
Normal file
50
exporter/metric_cleanup_job.py
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
|
from prometheus_client import CollectorRegistry
|
||||||
|
|
||||||
|
|
||||||
|
class MetricCleanupJob:
|
||||||
|
def __init__(self, registry: CollectorRegistry):
|
||||||
|
self.registry = registry
|
||||||
|
self.scheduler = BackgroundScheduler()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Start the cleanup job to run every hour"""
|
||||||
|
self.scheduler.add_job(
|
||||||
|
self.cleanup_unknown_metrics,
|
||||||
|
'interval',
|
||||||
|
minutes=10,
|
||||||
|
next_run_time=datetime.now() + timedelta(minutes=1) # First run after 1 minute
|
||||||
|
)
|
||||||
|
self.scheduler.start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop the cleanup job"""
|
||||||
|
self.scheduler.shutdown()
|
||||||
|
|
||||||
|
def cleanup_unknown_metrics(self):
|
||||||
|
"""Remove metric entries with mostly 'Unknown' values from the metrics dictionary"""
|
||||||
|
try:
|
||||||
|
for collector, _ in list(self.registry._collector_to_names.items()):
|
||||||
|
if hasattr(collector, '_metrics'):
|
||||||
|
labels_to_remove = []
|
||||||
|
|
||||||
|
# Identify label combinations to remove
|
||||||
|
for labels, _ in list(collector._metrics.items()):
|
||||||
|
unknown_count = sum(1 for label in labels if 'Unknown' in str(label))
|
||||||
|
if unknown_count >= 1: # Threshold for "Unknown" values
|
||||||
|
labels_to_remove.append(labels)
|
||||||
|
|
||||||
|
# Remove identified label combinations
|
||||||
|
for labels in labels_to_remove:
|
||||||
|
try:
|
||||||
|
del collector._metrics[labels]
|
||||||
|
print(f"Removed metric entry with labels: {labels}")
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error removing metric entry: {e}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error during metric cleanup: {e}")
|
|
@ -149,14 +149,15 @@ class MessageProcessor:
|
||||||
def process_json_mqtt(message):
|
def process_json_mqtt(message):
|
||||||
topic = message.topic
|
topic = message.topic
|
||||||
json_packet = json.loads(message.payload)
|
json_packet = json.loads(message.payload)
|
||||||
if json_packet['sender'][0] == '!':
|
if 'sender' in json_packet:
|
||||||
gateway_node_id = str(int(json_packet['sender'][1:], 16))
|
if json_packet['sender'][0] == '!':
|
||||||
NodeConfigurationMetrics().process_mqtt_update(
|
gateway_node_id = str(int(json_packet['sender'][1:], 16))
|
||||||
node_id=gateway_node_id,
|
NodeConfigurationMetrics().process_mqtt_update(
|
||||||
mqtt_json_enabled=True,
|
node_id=gateway_node_id,
|
||||||
mqtt_encryption_enabled=json_packet.get('encrypted', False),
|
mqtt_json_enabled=True,
|
||||||
mqtt_configured_root_topic=topic
|
mqtt_encryption_enabled=json_packet.get('encrypted', False),
|
||||||
)
|
mqtt_configured_root_topic=topic
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def process_mqtt(topic: str, service_envelope: ServiceEnvelope, mesh_packet: MeshPacket):
|
def process_mqtt(topic: str, service_envelope: ServiceEnvelope, mesh_packet: MeshPacket):
|
||||||
|
|
4
main.py
4
main.py
|
@ -7,6 +7,7 @@ from dotenv import load_dotenv
|
||||||
|
|
||||||
from constants import callback_api_version_map, protocol_map
|
from constants import callback_api_version_map, protocol_map
|
||||||
from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics
|
from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics
|
||||||
|
from exporter.metric_cleanup_job import MetricCleanupJob
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from meshtastic.mesh_pb2 import MeshPacket
|
from meshtastic.mesh_pb2 import MeshPacket
|
||||||
|
@ -136,4 +137,7 @@ if __name__ == "__main__":
|
||||||
# Configure the Processor and the Exporter
|
# Configure the Processor and the Exporter
|
||||||
processor = MessageProcessor(registry, connection_pool)
|
processor = MessageProcessor(registry, connection_pool)
|
||||||
|
|
||||||
|
cleanup_job = MetricCleanupJob(registry)
|
||||||
|
cleanup_job.start()
|
||||||
|
|
||||||
mqtt_client.loop_forever()
|
mqtt_client.loop_forever()
|
||||||
|
|
Loading…
Reference in a new issue