Merge pull request #73 from tcivie/72-exporter-creates-doo-many-stale-labels-which-increase-the-load-on-the-prometheus

Added cleanup job for metrics that are not needed anymore
This commit is contained in:
Gleb Tcivie 2024-11-23 19:33:10 +02:00 committed by GitHub
commit 2cfbcaa8cd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 67 additions and 10 deletions

2
.env
View file

@ -5,7 +5,7 @@ DATABASE_URL=postgres://postgres:postgres@postgres:5432/meshtastic
# Prometheus connection details
PROMETHEUS_COLLECTOR_PORT=9464
PROMETHEUS_JOB=example
PROMETHEUS_JOB=meshtastic
# MQTT connection details
MQTT_HOST=mqtt.meshtastic.org

View file

@ -0,0 +1,50 @@
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from prometheus_client import CollectorRegistry
class MetricCleanupJob:
def __init__(self, registry: CollectorRegistry):
self.registry = registry
self.scheduler = BackgroundScheduler()
def start(self):
"""Start the cleanup job to run every hour"""
self.scheduler.add_job(
self.cleanup_unknown_metrics,
'interval',
minutes=10,
next_run_time=datetime.now() + timedelta(minutes=1) # First run after 1 minute
)
self.scheduler.start()
def stop(self):
"""Stop the cleanup job"""
self.scheduler.shutdown()
def cleanup_unknown_metrics(self):
"""Remove metric entries with mostly 'Unknown' values from the metrics dictionary"""
try:
for collector, _ in list(self.registry._collector_to_names.items()):
if hasattr(collector, '_metrics'):
labels_to_remove = []
# Identify label combinations to remove
for labels, _ in list(collector._metrics.items()):
unknown_count = sum(1 for label in labels if 'Unknown' in str(label))
if unknown_count >= 1: # Threshold for "Unknown" values
labels_to_remove.append(labels)
# Remove identified label combinations
for labels in labels_to_remove:
try:
del collector._metrics[labels]
print(f"Removed metric entry with labels: {labels}")
except KeyError:
pass
except Exception as e:
print(f"Error removing metric entry: {e}")
except Exception as e:
print(f"Error during metric cleanup: {e}")

View file

@ -149,14 +149,15 @@ class MessageProcessor:
def process_json_mqtt(message):
topic = message.topic
json_packet = json.loads(message.payload)
if json_packet['sender'][0] == '!':
gateway_node_id = str(int(json_packet['sender'][1:], 16))
NodeConfigurationMetrics().process_mqtt_update(
node_id=gateway_node_id,
mqtt_json_enabled=True,
mqtt_encryption_enabled=json_packet.get('encrypted', False),
mqtt_configured_root_topic=topic
)
if 'sender' in json_packet:
if json_packet['sender'][0] == '!':
gateway_node_id = str(int(json_packet['sender'][1:], 16))
NodeConfigurationMetrics().process_mqtt_update(
node_id=gateway_node_id,
mqtt_json_enabled=True,
mqtt_encryption_enabled=json_packet.get('encrypted', False),
mqtt_configured_root_topic=topic
)
@staticmethod
def process_mqtt(topic: str, service_envelope: ServiceEnvelope, mesh_packet: MeshPacket):

View file

@ -7,6 +7,7 @@ from dotenv import load_dotenv
from constants import callback_api_version_map, protocol_map
from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics
from exporter.metric_cleanup_job import MetricCleanupJob
try:
from meshtastic.mesh_pb2 import MeshPacket
@ -136,4 +137,7 @@ if __name__ == "__main__":
# Configure the Processor and the Exporter
processor = MessageProcessor(registry, connection_pool)
cleanup_job = MetricCleanupJob(registry)
cleanup_job.start()
mqtt_client.loop_forever()

View file

@ -8,3 +8,5 @@ psycopg_pool~=3.2.2
meshtastic~=2.3.13
psycopg-binary~=3.1.20
geopy>=2.4.1
psycopg-pool>=3.2.2
APScheduler>=3.10.4