From 0866af8db505ddfbf7828096a95ad331eeffaa71 Mon Sep 17 00:00:00 2001 From: Gleb Tcivie Date: Fri, 28 Jun 2024 19:49:43 +0300 Subject: [PATCH] Reformatted code to support postgres instead of redis for ease of use in Grafana --- .env | 7 +- .idea/dataSources.xml | 8 +- .idea/meshtastic-metrics-exporter.iml | 3 - .idea/sqldialects.xml | 8 ++ docker/grafana/datasources.yml | 11 +-- docker/postgres/init.sql | 29 +++++++ exporter/processors.py | 59 +++++++++----- exporter/registry.py | 110 ++++++++++++++++++-------- main.py | 54 +++++++------ requirements.txt | 5 +- 10 files changed, 193 insertions(+), 101 deletions(-) create mode 100644 .idea/sqldialects.xml create mode 100644 docker/postgres/init.sql diff --git a/.env b/.env index 210addf..3b13bbe 100644 --- a/.env +++ b/.env @@ -1,10 +1,7 @@ # Description: Environment variables for the application -# Redis connection details -REDIS_HOST=redis -REDIS_PORT=6379 -REDIS_DB=0 -REDIS_PASSWORD= +# Postgres connection details +DATABASE_URL=postgres://postgres:postgres@postgres:5432/meshtastic # Prometheus connection details PROMETHEUS_COLLECTOR_PORT=9464 diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml index 5391cb6..2574510 100644 --- a/.idea/dataSources.xml +++ b/.idea/dataSources.xml @@ -1,11 +1,11 @@ - - redis + + postgresql true - jdbc.RedisDriver - jdbc:redis://localhost:6379/0 + org.postgresql.Driver + jdbc:postgresql://localhost:5432/meshtastic $ProjectFileDir$ diff --git a/.idea/meshtastic-metrics-exporter.iml b/.idea/meshtastic-metrics-exporter.iml index ba8dfea..2c80e12 100644 --- a/.idea/meshtastic-metrics-exporter.iml +++ b/.idea/meshtastic-metrics-exporter.iml @@ -7,7 +7,4 @@ - - \ No newline at end of file diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml new file mode 100644 index 0000000..0201e4b --- /dev/null +++ b/.idea/sqldialects.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/docker/grafana/datasources.yml b/docker/grafana/datasources.yml index 54ae4d0..35bd6dc 100644 --- a/docker/grafana/datasources.yml +++ b/docker/grafana/datasources.yml @@ -7,13 +7,4 @@ datasources: isDefault: true editable: true jsonData: - httpMethod: POST - - - name: redis-datasource - type: redis-datasource - access: proxy - url: redis://redis:6379/0 - isDefault: false - editable: true - jsonData: - client: standalone \ No newline at end of file + httpMethod: POST \ No newline at end of file diff --git a/docker/postgres/init.sql b/docker/postgres/init.sql new file mode 100644 index 0000000..61a4408 --- /dev/null +++ b/docker/postgres/init.sql @@ -0,0 +1,29 @@ +CREATE TABLE IF NOT EXISTS messages +( + id TEXT PRIMARY KEY, + received_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE OR REPLACE FUNCTION expire_old_messages() + RETURNS TRIGGER AS +$$ +BEGIN + DELETE FROM messages WHERE received_at < NOW() - INTERVAL '1 minute'; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER trigger_expire_old_messages + AFTER INSERT + ON messages + FOR EACH ROW +EXECUTE FUNCTION expire_old_messages(); + +CREATE TABLE IF NOT EXISTS client_details +( + node_id VARCHAR PRIMARY KEY, + short_name VARCHAR, + long_name VARCHAR, + hardware_model VARCHAR, + role VARCHAR +); diff --git a/exporter/processors.py b/exporter/processors.py index adf72fb..2314a9a 100644 --- a/exporter/processors.py +++ b/exporter/processors.py @@ -1,20 +1,18 @@ import base64 -import json import os -import redis from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from meshtastic.config_pb2 import Config -from meshtastic.mesh_pb2 import MeshPacket, HardwareModel, Data +from meshtastic.mesh_pb2 import MeshPacket, Data, HardwareModel from meshtastic.portnums_pb2 import PortNum from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge +from psycopg_pool import ConnectionPool from exporter.registry import ProcessorRegistry, ClientDetails class MessageProcessor: - def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): + def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool): self.rx_rssi_gauge = None self.channel_counter = None self.packet_id_counter = None @@ -28,7 +26,7 @@ class MessageProcessor: self.destination_message_type_counter = None self.source_message_type_counter = None self.registry = registry - self.redis_client = redis_client + self.db_pool = db_pool self.init_metrics() self.processor_registry = ProcessorRegistry() @@ -137,12 +135,14 @@ class MessageProcessor: port_num = int(mesh_packet.decoded.portnum) payload = mesh_packet.decoded.payload - source_client_details = self._get_client_details(getattr(mesh_packet, 'from')) + source_node_id = getattr(mesh_packet, 'from') + source_client_details = self._get_client_details(source_node_id) if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true': source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden', long_name='Hidden') - destination_client_details = self._get_client_details(getattr(mesh_packet, 'to')) + destination_node_id = getattr(mesh_packet, 'to') + destination_client_details = self._get_client_details(destination_node_id) if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true': destination_client_details = ClientDetails(node_id=destination_client_details.node_id, short_name='Hidden', long_name='Hidden') @@ -152,7 +152,7 @@ class MessageProcessor: self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details) - processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.redis_client) + processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool) processor.process(payload, client_details=source_client_details) def get_port_name_from_portnum(self, port_num): @@ -237,16 +237,33 @@ class MessageProcessor: destination_id=destination_client_details.node_id ).set(mesh_packet.rx_rssi) - def _get_client_details(self, node_id: str) -> ClientDetails: - user_details_json = self.redis_client.get(f"node:{node_id}") - if user_details_json is not None: - # Decode the JSON string to a Python dictionary - user_details = json.loads(user_details_json) - return ClientDetails(node_id=node_id, - short_name=user_details.get('short_name', 'Unknown'), - long_name=user_details.get('long_name', 'Unknown'), - hardware_model=user_details.get('hardware_model', HardwareModel.UNSET), - role=user_details.get('role', Config.DeviceConfig.Role.ValueType), - ) + def _get_client_details(self, node_id: int) -> ClientDetails: + node_id_str = str(node_id) # Convert the integer to a string + with self.db_pool.connection() as conn: + with conn.cursor() as cur: + # First, try to select the existing record + cur.execute(""" + SELECT node_id, short_name, long_name, hardware_model, role + FROM client_details + WHERE node_id = %s; + """, (node_id_str,)) + result = cur.fetchone() - return ClientDetails(node_id=node_id) + if not result: + # If the client is not found, insert a new record + cur.execute(""" + INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role) + VALUES (%s, %s, %s, %s, %s) + RETURNING node_id, short_name, long_name, hardware_model, role; + """, (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None)) + conn.commit() + result = cur.fetchone() + + # At this point, we should always have a result, either from SELECT or INSERT + return ClientDetails( + node_id=result[0], + short_name=result[1], + long_name=result[2], + hardware_model=result[3], + role=result[4] + ) diff --git a/exporter/registry.py b/exporter/registry.py index b4ff0da..5a0272c 100644 --- a/exporter/registry.py +++ b/exporter/registry.py @@ -1,9 +1,8 @@ -import json import os from abc import ABC, abstractmethod from venv import logger -import redis +import psycopg import unishox2 from meshtastic.admin_pb2 import AdminMessage from meshtastic.config_pb2 import Config @@ -15,6 +14,7 @@ from meshtastic.remote_hardware_pb2 import HardwareMessage from meshtastic.storeforward_pb2 import StoreAndForward from meshtastic.telemetry_pb2 import Telemetry, DeviceMetrics, EnvironmentMetrics, AirQualityMetrics, PowerMetrics from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +from psycopg_pool import ConnectionPool class _Metrics: @@ -351,6 +351,22 @@ class _Metrics: ) +def get_hardware_model_name_from_code(hardware_model): + descriptor = HardwareModel.DESCRIPTOR + for enum_value in descriptor.values: + if enum_value.number == hardware_model: + return enum_value.name + return 'UNKNOWN_HARDWARE_MODEL' + + +def get_role_name_from_role(role): + descriptor = Config.DeviceConfig.Role.DESCRIPTOR + for enum_value in descriptor.values: + if enum_value.number == role: + return enum_value.name + return 'UNKNOWN_ROLE' + + class ClientDetails: def __init__(self, node_id, short_name='Unknown', long_name='Unknown', hardware_model=HardwareModel.UNSET, role=None): @@ -360,39 +376,30 @@ class ClientDetails: self.hardware_model: HardwareModel = hardware_model self.role: Config.DeviceConfig.Role = role - def get_role_name_from_role(self): - descriptor = Config.DeviceConfig.Role.DESCRIPTOR - for enum_value in descriptor.values: - if enum_value.number == self.role: - return enum_value.name - return 'UNKNOWN_ROLE' - - def get_hardware_model_name_from_code(self): - descriptor = HardwareModel.DESCRIPTOR - for enum_value in descriptor.values: - if enum_value.number == self.hardware_model: - return enum_value.name - return 'UNKNOWN_HARDWARE_MODEL' - def to_dict(self): return { 'node_id': self.node_id, 'short_name': self.short_name, 'long_name': self.long_name, - 'hardware_model': self.get_hardware_model_name_from_code(), - 'role': self.get_role_name_from_role() + 'hardware_model': get_hardware_model_name_from_code(self.hardware_model), + 'role': get_role_name_from_role(self.role) } class Processor(ABC): - def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): - self.redis_client = redis_client + def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool): + self.db_pool = db_pool self.metrics = _Metrics(registry) @abstractmethod def process(self, payload: bytes, client_details: ClientDetails): pass + def execute_db_operation(self, operation): + with self.db_pool.connection() as conn: + with conn.cursor() as cur: + return operation(cur, conn) + class ProcessorRegistry: _registry = {} @@ -419,14 +426,10 @@ class UnknownAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP) class TextMessageAppProcessor(Processor): - def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): - super().__init__(registry, redis_client) - def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received TEXT_MESSAGE_APP packet") message = payload.decode('utf-8') - if os.getenv('HIDE_MESSAGE', 'true') == 'true': # Currently there is no use for the message content, - # but later we could store it in redis or something + if os.getenv('HIDE_MESSAGE', 'true') == 'true': message = 'Hidden' self.metrics.message_length_histogram.labels( client_id=client_details.node_id @@ -469,13 +472,52 @@ class NodeInfoAppProcessor(Processor): logger.debug("Received NODEINFO_APP packet") user = User() user.ParseFromString(payload) - client_details.short_name = user.short_name - client_details.long_name = user.long_name - client_details.hardware_model = user.hw_model - client_details.role = user.role - user_details_json = json.dumps(client_details.to_dict()) - self.redis_client.set(f"node:{client_details.node_id}", user_details_json) - pass + + def db_operation(cur, conn): + # First, try to select the existing record + cur.execute(""" + SELECT short_name, long_name, hardware_model, role + FROM client_details + WHERE node_id = %s; + """, (client_details.node_id,)) + existing_record = cur.fetchone() + + if existing_record: + # If record exists, update only the fields that are provided in the new data + update_fields = [] + update_values = [] + if user.short_name: + update_fields.append("short_name = %s") + update_values.append(user.short_name) + if user.long_name: + update_fields.append("long_name = %s") + update_values.append(user.long_name) + if user.hw_model != HardwareModel.UNSET: + update_fields.append("hardware_model = %s") + update_values.append(get_hardware_model_name_from_code(user.hw_model)) + if user.role is not None: + update_fields.append("role = %s") + update_values.append(get_role_name_from_role(user.role)) + + if update_fields: + update_query = f""" + UPDATE client_details + SET {", ".join(update_fields)} + WHERE node_id = %s + """ + cur.execute(update_query, update_values + [client_details.node_id]) + else: + # If record doesn't exist, insert a new one + cur.execute(""" + INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role) + VALUES (%s, %s, %s, %s, %s) + """, (client_details.node_id, user.short_name, user.long_name, + get_hardware_model_name_from_code(user.hw_model), get_role_name_from_role(user.role))) + + conn.commit() + + self.execute_db_operation(db_operation) + @ProcessorRegistry.register_processor(PortNum.ROUTING_APP) @@ -584,8 +626,8 @@ class RangeTestAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP) class TelemetryAppProcessor(Processor): - def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): - super().__init__(registry, redis_client) + def __init__(self, registry: CollectorRegistry, db_connection: psycopg.connection): + super().__init__(registry, db_connection) def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received TELEMETRY_APP packet") diff --git a/main.py b/main.py index d4be407..e5184a6 100644 --- a/main.py +++ b/main.py @@ -3,15 +3,26 @@ import os from datetime import datetime import paho.mqtt.client as mqtt -import redis from dotenv import load_dotenv from meshtastic.mesh_pb2 import MeshPacket from meshtastic.mqtt_pb2 import ServiceEnvelope from paho.mqtt.enums import CallbackAPIVersion from prometheus_client import CollectorRegistry, start_http_server +from psycopg_pool import ConnectionPool from exporter.processors import MessageProcessor +# Global connection pool +connection_pool = None + + +def get_connection(): + return connection_pool.getconn() + + +def release_connection(conn): + connection_pool.putconn(conn) + def handle_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") @@ -21,37 +32,37 @@ def handle_connect(client, userdata, flags, reason_code, properties): def handle_message(client, userdata, message): current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"Received message on topic '{message.topic}' at {current_timestamp}") - - # Filter out messages from the 'stat' topic if '/stat/' in message.topic: print(f"Filtered out message from topic containing '/stat/': {message.topic}") return envelope = ServiceEnvelope() envelope.ParseFromString(message.payload) - packet: MeshPacket = envelope.packet - if redis_client.set(str(packet.id), 1, nx=True, ex=os.getenv('redis_expiration', 60), get=True) is not None: - logging.debug(f"Packet {packet.id} already processed") - return - # Process the packet + with connection_pool.connection() as conn: + with conn.cursor() as cur: + cur.execute("SELECT id FROM messages WHERE id = %s", (str(packet.id),)) + if cur.fetchone() is not None: + logging.debug(f"Packet {packet.id} already processed") + return + + cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING", + (str(packet.id),)) + conn.commit() + processor.process(packet) if __name__ == "__main__": load_dotenv() - # Create Redis client - try: - redis_client = redis.Redis( - host=os.getenv('REDIS_HOST'), - port=int(os.getenv('REDIS_PORT')), - db=int(os.getenv('REDIS_DB', 0)), - password=os.getenv('REDIS_PASSWORD', None), - ) - except Exception as e: - logging.error(f"Failed to connect to Redis: {e}") - exit(1) + + # Setup a connection pool + connection_pool = ConnectionPool( + os.getenv('DATABASE_URL'), + min_size=1, + max_size=10 + ) # Configure Prometheus exporter registry = CollectorRegistry() @@ -62,7 +73,6 @@ if __name__ == "__main__": callback_api_version=CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5 ) - mqtt_client.on_connect = handle_connect mqtt_client.on_message = handle_message @@ -78,12 +88,12 @@ if __name__ == "__main__": os.getenv('MQTT_HOST'), int(os.getenv('MQTT_PORT')), keepalive=int(os.getenv('MQTT_KEEPALIVE', 60)), - ) except Exception as e: logging.error(f"Failed to connect to MQTT broker: {e}") exit(1) + # Configure the Processor and the Exporter - processor = MessageProcessor(registry, redis_client) + processor = MessageProcessor(registry, connection_pool) mqtt_client.loop_forever() diff --git a/requirements.txt b/requirements.txt index 0c1eab4..3773ecb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ paho-mqtt~=2.1.0 -redis~=5.0.6 python-dotenv~=1.0.1 meshtastic~=2.3.11 prometheus_client~=0.20.0 unishox2-py3~=1.0.0 -cryptography~=42.0.8 \ No newline at end of file +cryptography~=42.0.8 +psycopg~=3.1.19 +psycopg_pool~=3.2.2 \ No newline at end of file