diff --git a/docker-compose.yml b/docker-compose.yml index 7eef747..abec137 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,7 +43,7 @@ services: - mesh-bridge postgres: - image: postgres:13.3 + image: postgres:16.3 restart: unless-stopped networks: - mesh-bridge diff --git a/docker/postgres/init.sql b/docker/postgres/init.sql index 7cd9c5a..2ae31b6 100644 --- a/docker/postgres/init.sql +++ b/docker/postgres/init.sql @@ -19,23 +19,26 @@ CREATE TRIGGER trigger_expire_old_messages FOR EACH ROW EXECUTE FUNCTION expire_old_messages(); -CREATE TABLE IF NOT EXISTS client_details +CREATE TABLE IF NOT EXISTS node_details ( node_id VARCHAR PRIMARY KEY, +-- Base Data short_name VARCHAR, long_name VARCHAR, hardware_model VARCHAR, role VARCHAR, - mqtt_status VARCHAR default 'none' -); - -CREATE TABLE IF NOT EXISTS node_graph -( - node_id VARCHAR PRIMARY KEY, - last_sent_by_node_id VARCHAR, - last_sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - broadcast_interval_secs INTEGER, - FOREIGN KEY (node_id) REFERENCES client_details (node_id) + mqtt_status VARCHAR default 'none', +-- Location Data + longitude FLOAT, + latitude FLOAT, + altitude FLOAT, + precision FLOAT, + country VARCHAR, + city VARCHAR, + state VARCHAR, +-- SQL Data + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ); CREATE TABLE IF NOT EXISTS node_neighbors @@ -44,12 +47,15 @@ CREATE TABLE IF NOT EXISTS node_neighbors node_id VARCHAR, neighbor_id VARCHAR, snr FLOAT, - FOREIGN KEY (node_id) REFERENCES client_details (node_id), - FOREIGN KEY (neighbor_id) REFERENCES client_details (node_id), + FOREIGN KEY (node_id) REFERENCES node_details (node_id), + FOREIGN KEY (neighbor_id) REFERENCES node_details (node_id), UNIQUE (node_id, neighbor_id) ); CREATE UNIQUE INDEX idx_unique_node_neighbor ON node_neighbors (node_id, neighbor_id); -ALTER TABLE client_details - ADD COLUMN IF NOT EXISTS created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP; +CREATE OR REPLACE TRIGGER client_details_updated_at + BEFORE UPDATE + ON node_details + FOR EACH ROW +EXECUTE PROCEDURE moddatetime(updated_at); diff --git a/exporter/db_handler.py b/exporter/db_handler.py new file mode 100644 index 0000000..33a93a1 --- /dev/null +++ b/exporter/db_handler.py @@ -0,0 +1,17 @@ +from psycopg_pool import ConnectionPool + + +class DBHandler: + def __init__(self, db_pool: ConnectionPool): + self.db_pool = db_pool + + def get_connection(self): + return self.db_pool.getconn() + + def release_connection(self, conn): + self.db_pool.putconn(conn) + + def execute_db_operation(self, operation): + with self.db_pool.connection() as conn: + with conn.cursor() as cur: + return operation(cur, conn) diff --git a/exporter/processor_base.py b/exporter/processor_base.py index b2460e1..b66ba4b 100644 --- a/exporter/processor_base.py +++ b/exporter/processor_base.py @@ -258,7 +258,7 @@ class MessageProcessor: # First, try to select the existing record cur.execute(""" SELECT node_id, short_name, long_name, hardware_model, role - FROM client_details + FROM node_details WHERE node_id = %s; """, (node_id_str,)) result = cur.fetchone() @@ -266,7 +266,7 @@ class MessageProcessor: if not result: # If the client is not found, insert a new record cur.execute(""" - INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role) + INSERT INTO node_details (node_id, short_name, long_name, hardware_model, role) VALUES (%s, %s, %s, %s, %s) RETURNING node_id, short_name, long_name, hardware_model, role; """, (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None)) diff --git a/exporter/processors.py b/exporter/processors.py index 9d0339f..eb9b269 100644 --- a/exporter/processors.py +++ b/exporter/processors.py @@ -5,6 +5,8 @@ from venv import logger import psycopg import unishox2 +from exporter.db_handler import DBHandler + try: from meshtastic.admin_pb2 import AdminMessage from meshtastic.mesh_pb2 import Position, User, HardwareModel, Routing, Waypoint, RouteDiscovery, NeighborInfo @@ -36,17 +38,12 @@ from exporter.registry import _Metrics class Processor(ABC): def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool): self.db_pool = db_pool - self.metrics = _Metrics(registry) + self.metrics = _Metrics(registry, DBHandler(db_pool)) @abstractmethod def process(self, payload: bytes, client_details: ClientDetails): pass - def execute_db_operation(self, operation): - with self.db_pool.connection() as conn: - with conn.cursor() as cur: - return operation(cur, conn) - class ProcessorRegistry: _registry = {} @@ -113,18 +110,10 @@ class PositionAppProcessor(Processor): except Exception as e: logger.error(f"Failed to parse POSITION_APP packet: {e}") return - self.metrics.device_latitude_gauge.labels( - **client_details.to_dict() - ).set(position.latitude_i) - self.metrics.device_longitude_gauge.labels( - **client_details.to_dict() - ).set(position.longitude_i) - self.metrics.device_altitude_gauge.labels( - **client_details.to_dict() - ).set(position.altitude) - self.metrics.device_position_precision_gauge.labels( - **client_details.to_dict() - ).set(position.precision_bits) + + self.metrics.update_metrics_position( + position.latitude_i, position.longitude_i, position.altitude, + position.precision_bits, client_details) pass @@ -143,7 +132,7 @@ class NodeInfoAppProcessor(Processor): # First, try to select the existing record cur.execute(""" SELECT short_name, long_name, hardware_model, role - FROM client_details + FROM node_details WHERE node_id = %s; """, (client_details.node_id,)) existing_record = cur.fetchone() @@ -167,7 +156,7 @@ class NodeInfoAppProcessor(Processor): if update_fields: update_query = f""" - UPDATE client_details + UPDATE node_details SET {", ".join(update_fields)} WHERE node_id = %s """ @@ -175,7 +164,7 @@ class NodeInfoAppProcessor(Processor): else: # If record doesn't exist, insert a new one cur.execute(""" - INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role) + INSERT INTO node_details (node_id, short_name, long_name, hardware_model, role) VALUES (%s, %s, %s, %s, %s) """, (client_details.node_id, user.short_name, user.long_name, ClientDetails.get_hardware_model_name_from_code(user.hw_model), @@ -183,7 +172,7 @@ class NodeInfoAppProcessor(Processor): conn.commit() - self.execute_db_operation(db_operation) + self.metrics.db.execute_db_operation(db_operation) @ProcessorRegistry.register_processor(PortNum.ROUTING_APP) @@ -531,7 +520,7 @@ class NeighborInfoAppProcessor(Processor): """, (client_details.node_id, neighbor_info.last_sent_by_id, neighbor_info.node_broadcast_interval_secs)) conn.commit() - self.execute_db_operation(operation) + self.metrics.db.execute_db_operation(operation) def update_node_neighbors(self, neighbor_info: NeighborInfo, client_details: ClientDetails): def operation(cur, conn): @@ -554,18 +543,18 @@ class NeighborInfoAppProcessor(Processor): DO UPDATE SET snr = EXCLUDED.snr RETURNING node_id, neighbor_id ) - INSERT INTO client_details (node_id) + INSERT INTO node_details (node_id) SELECT node_id FROM upsert - WHERE NOT EXISTS (SELECT 1 FROM client_details WHERE node_id = upsert.node_id) + WHERE NOT EXISTS (SELECT 1 FROM node_details WHERE node_id = upsert.node_id) UNION SELECT neighbor_id FROM upsert - WHERE NOT EXISTS (SELECT 1 FROM client_details WHERE node_id = upsert.neighbor_id) + WHERE NOT EXISTS (SELECT 1 FROM node_details WHERE node_id = upsert.neighbor_id) ON CONFLICT (node_id) DO NOTHING; """, (str(client_details.node_id), str(neighbor.node_id), float(neighbor.snr))) conn.commit() - self.execute_db_operation(operation) + self.metrics.db.execute_db_operation(operation) @ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN) diff --git a/exporter/registry.py b/exporter/registry.py index bf43091..99c591b 100644 --- a/exporter/registry.py +++ b/exporter/registry.py @@ -1,5 +1,10 @@ +import geopy.point +from geopy.geocoders import Nominatim from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +from exporter.client_details import ClientDetails +from exporter.db_handler import DBHandler + class _Metrics: _instance = None @@ -9,11 +14,13 @@ class _Metrics: cls._instance = super(_Metrics, cls).__new__(cls) return cls._instance - def __init__(self, registry: CollectorRegistry): + def __init__(self, registry: CollectorRegistry, db: DBHandler): if not hasattr(self, 'initialized'): # Ensuring __init__ runs only once self._registry = registry self._init_metrics() self.initialized = True # Attribute to indicate initialization + self.geolocator = Nominatim() + self.db = db @staticmethod def _get_common_labels(): @@ -27,7 +34,6 @@ class _Metrics: self._init_metrics_telemetry_environment() self._init_metrics_telemetry_air_quality() self._init_metrics_telemetry_power() - self._init_metrics_position() self._init_route_discovery_metrics() def _init_metrics_text_message(self): @@ -38,31 +44,23 @@ class _Metrics: registry=self._registry ) - def _init_metrics_position(self): - self.device_latitude_gauge = Gauge( - 'device_latitude', - 'Device latitude', - self._get_common_labels(), - registry=self._registry - ) - self.device_longitude_gauge = Gauge( - 'device_longitude', - 'Device longitude', - self._get_common_labels(), - registry=self._registry - ) - self.device_altitude_gauge = Gauge( - 'device_altitude', - 'Device altitude', - self._get_common_labels(), - registry=self._registry - ) - self.device_position_precision_gauge = Gauge( - 'device_position_precision', - 'Device position precision', - self._get_common_labels(), - registry=self._registry - ) + def update_metrics_position(self, latitude, longitude, altitude, precision, client_details: ClientDetails): + point = geopy.point.Point(latitude, longitude, altitude) + location = self.geolocator.reverse(point, language='en') + + country = location.raw.get('address', {}).get('country', 'Unknown') + city = location.raw.get('address', {}).get('city', 'Unknown') + state = location.raw.get('address', {}).get('state', 'Unknown') + + def db_operation(cur, conn): + cur.execute(""" + UPDATE node_details + SET latitude = %s, longitude = %s, altitude = %s, precision = %s, country = %s, city = %s, state = %s + WHERE node_id = %s + """, (latitude, longitude, altitude, precision, country, city, state, client_details.node_id)) + conn.commit() + + self.db.execute_db_operation(db_operation) def _init_metrics_telemetry_power(self): self.ch1_voltage_gauge = Gauge( @@ -336,5 +334,3 @@ class _Metrics: self._get_common_labels() + ['response_type'], registry=self._registry ) - - diff --git a/main.py b/main.py index 8574514..9770f80 100644 --- a/main.py +++ b/main.py @@ -36,7 +36,7 @@ def handle_connect(client, userdata, flags, reason_code, properties): def update_node_status(node_number, status): with connection_pool.connection() as conn: with conn.cursor() as cur: - cur.execute("INSERT INTO client_details (node_id, mqtt_status) VALUES (%s, %s)" + cur.execute("INSERT INTO node_details (node_id, mqtt_status) VALUES (%s, %s)" "ON CONFLICT(node_id)" "DO UPDATE SET mqtt_status = %s", (node_number, status, status)) conn.commit()