Added more "Static" data like geolocation to the PostgressDB and removed it from prometheus to reduce the load. + Added support for lookup of Country + City + State for nodes per geolocation.
This commit is contained in:
parent
d3f60cc5ff
commit
ea3f00b466
|
@ -43,7 +43,7 @@ services:
|
|||
- mesh-bridge
|
||||
|
||||
postgres:
|
||||
image: postgres:13.3
|
||||
image: postgres:16.3
|
||||
restart: unless-stopped
|
||||
networks:
|
||||
- mesh-bridge
|
||||
|
|
|
@ -19,23 +19,26 @@ CREATE TRIGGER trigger_expire_old_messages
|
|||
FOR EACH ROW
|
||||
EXECUTE FUNCTION expire_old_messages();
|
||||
|
||||
CREATE TABLE IF NOT EXISTS client_details
|
||||
CREATE TABLE IF NOT EXISTS node_details
|
||||
(
|
||||
node_id VARCHAR PRIMARY KEY,
|
||||
-- Base Data
|
||||
short_name VARCHAR,
|
||||
long_name VARCHAR,
|
||||
hardware_model VARCHAR,
|
||||
role VARCHAR,
|
||||
mqtt_status VARCHAR default 'none'
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS node_graph
|
||||
(
|
||||
node_id VARCHAR PRIMARY KEY,
|
||||
last_sent_by_node_id VARCHAR,
|
||||
last_sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
broadcast_interval_secs INTEGER,
|
||||
FOREIGN KEY (node_id) REFERENCES client_details (node_id)
|
||||
mqtt_status VARCHAR default 'none',
|
||||
-- Location Data
|
||||
longitude FLOAT,
|
||||
latitude FLOAT,
|
||||
altitude FLOAT,
|
||||
precision FLOAT,
|
||||
country VARCHAR,
|
||||
city VARCHAR,
|
||||
state VARCHAR,
|
||||
-- SQL Data
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS node_neighbors
|
||||
|
@ -44,12 +47,15 @@ CREATE TABLE IF NOT EXISTS node_neighbors
|
|||
node_id VARCHAR,
|
||||
neighbor_id VARCHAR,
|
||||
snr FLOAT,
|
||||
FOREIGN KEY (node_id) REFERENCES client_details (node_id),
|
||||
FOREIGN KEY (neighbor_id) REFERENCES client_details (node_id),
|
||||
FOREIGN KEY (node_id) REFERENCES node_details (node_id),
|
||||
FOREIGN KEY (neighbor_id) REFERENCES node_details (node_id),
|
||||
UNIQUE (node_id, neighbor_id)
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX idx_unique_node_neighbor ON node_neighbors (node_id, neighbor_id);
|
||||
|
||||
ALTER TABLE client_details
|
||||
ADD COLUMN IF NOT EXISTS created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
|
||||
CREATE OR REPLACE TRIGGER client_details_updated_at
|
||||
BEFORE UPDATE
|
||||
ON node_details
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE moddatetime(updated_at);
|
||||
|
|
17
exporter/db_handler.py
Normal file
17
exporter/db_handler.py
Normal file
|
@ -0,0 +1,17 @@
|
|||
from psycopg_pool import ConnectionPool
|
||||
|
||||
|
||||
class DBHandler:
|
||||
def __init__(self, db_pool: ConnectionPool):
|
||||
self.db_pool = db_pool
|
||||
|
||||
def get_connection(self):
|
||||
return self.db_pool.getconn()
|
||||
|
||||
def release_connection(self, conn):
|
||||
self.db_pool.putconn(conn)
|
||||
|
||||
def execute_db_operation(self, operation):
|
||||
with self.db_pool.connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
return operation(cur, conn)
|
|
@ -258,7 +258,7 @@ class MessageProcessor:
|
|||
# First, try to select the existing record
|
||||
cur.execute("""
|
||||
SELECT node_id, short_name, long_name, hardware_model, role
|
||||
FROM client_details
|
||||
FROM node_details
|
||||
WHERE node_id = %s;
|
||||
""", (node_id_str,))
|
||||
result = cur.fetchone()
|
||||
|
@ -266,7 +266,7 @@ class MessageProcessor:
|
|||
if not result:
|
||||
# If the client is not found, insert a new record
|
||||
cur.execute("""
|
||||
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role)
|
||||
INSERT INTO node_details (node_id, short_name, long_name, hardware_model, role)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
RETURNING node_id, short_name, long_name, hardware_model, role;
|
||||
""", (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None))
|
||||
|
|
|
@ -5,6 +5,8 @@ from venv import logger
|
|||
import psycopg
|
||||
import unishox2
|
||||
|
||||
from exporter.db_handler import DBHandler
|
||||
|
||||
try:
|
||||
from meshtastic.admin_pb2 import AdminMessage
|
||||
from meshtastic.mesh_pb2 import Position, User, HardwareModel, Routing, Waypoint, RouteDiscovery, NeighborInfo
|
||||
|
@ -36,17 +38,12 @@ from exporter.registry import _Metrics
|
|||
class Processor(ABC):
|
||||
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
|
||||
self.db_pool = db_pool
|
||||
self.metrics = _Metrics(registry)
|
||||
self.metrics = _Metrics(registry, DBHandler(db_pool))
|
||||
|
||||
@abstractmethod
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
pass
|
||||
|
||||
def execute_db_operation(self, operation):
|
||||
with self.db_pool.connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
return operation(cur, conn)
|
||||
|
||||
|
||||
class ProcessorRegistry:
|
||||
_registry = {}
|
||||
|
@ -113,18 +110,10 @@ class PositionAppProcessor(Processor):
|
|||
except Exception as e:
|
||||
logger.error(f"Failed to parse POSITION_APP packet: {e}")
|
||||
return
|
||||
self.metrics.device_latitude_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.latitude_i)
|
||||
self.metrics.device_longitude_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.longitude_i)
|
||||
self.metrics.device_altitude_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.altitude)
|
||||
self.metrics.device_position_precision_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.precision_bits)
|
||||
|
||||
self.metrics.update_metrics_position(
|
||||
position.latitude_i, position.longitude_i, position.altitude,
|
||||
position.precision_bits, client_details)
|
||||
pass
|
||||
|
||||
|
||||
|
@ -143,7 +132,7 @@ class NodeInfoAppProcessor(Processor):
|
|||
# First, try to select the existing record
|
||||
cur.execute("""
|
||||
SELECT short_name, long_name, hardware_model, role
|
||||
FROM client_details
|
||||
FROM node_details
|
||||
WHERE node_id = %s;
|
||||
""", (client_details.node_id,))
|
||||
existing_record = cur.fetchone()
|
||||
|
@ -167,7 +156,7 @@ class NodeInfoAppProcessor(Processor):
|
|||
|
||||
if update_fields:
|
||||
update_query = f"""
|
||||
UPDATE client_details
|
||||
UPDATE node_details
|
||||
SET {", ".join(update_fields)}
|
||||
WHERE node_id = %s
|
||||
"""
|
||||
|
@ -175,7 +164,7 @@ class NodeInfoAppProcessor(Processor):
|
|||
else:
|
||||
# If record doesn't exist, insert a new one
|
||||
cur.execute("""
|
||||
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role)
|
||||
INSERT INTO node_details (node_id, short_name, long_name, hardware_model, role)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
""", (client_details.node_id, user.short_name, user.long_name,
|
||||
ClientDetails.get_hardware_model_name_from_code(user.hw_model),
|
||||
|
@ -183,7 +172,7 @@ class NodeInfoAppProcessor(Processor):
|
|||
|
||||
conn.commit()
|
||||
|
||||
self.execute_db_operation(db_operation)
|
||||
self.metrics.db.execute_db_operation(db_operation)
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ROUTING_APP)
|
||||
|
@ -531,7 +520,7 @@ class NeighborInfoAppProcessor(Processor):
|
|||
""", (client_details.node_id, neighbor_info.last_sent_by_id, neighbor_info.node_broadcast_interval_secs))
|
||||
conn.commit()
|
||||
|
||||
self.execute_db_operation(operation)
|
||||
self.metrics.db.execute_db_operation(operation)
|
||||
|
||||
def update_node_neighbors(self, neighbor_info: NeighborInfo, client_details: ClientDetails):
|
||||
def operation(cur, conn):
|
||||
|
@ -554,18 +543,18 @@ class NeighborInfoAppProcessor(Processor):
|
|||
DO UPDATE SET snr = EXCLUDED.snr
|
||||
RETURNING node_id, neighbor_id
|
||||
)
|
||||
INSERT INTO client_details (node_id)
|
||||
INSERT INTO node_details (node_id)
|
||||
SELECT node_id FROM upsert
|
||||
WHERE NOT EXISTS (SELECT 1 FROM client_details WHERE node_id = upsert.node_id)
|
||||
WHERE NOT EXISTS (SELECT 1 FROM node_details WHERE node_id = upsert.node_id)
|
||||
UNION
|
||||
SELECT neighbor_id FROM upsert
|
||||
WHERE NOT EXISTS (SELECT 1 FROM client_details WHERE node_id = upsert.neighbor_id)
|
||||
WHERE NOT EXISTS (SELECT 1 FROM node_details WHERE node_id = upsert.neighbor_id)
|
||||
ON CONFLICT (node_id) DO NOTHING;
|
||||
""", (str(client_details.node_id), str(neighbor.node_id), float(neighbor.snr)))
|
||||
|
||||
conn.commit()
|
||||
|
||||
self.execute_db_operation(operation)
|
||||
self.metrics.db.execute_db_operation(operation)
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN)
|
||||
|
|
|
@ -1,5 +1,10 @@
|
|||
import geopy.point
|
||||
from geopy.geocoders import Nominatim
|
||||
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
|
||||
|
||||
from exporter.client_details import ClientDetails
|
||||
from exporter.db_handler import DBHandler
|
||||
|
||||
|
||||
class _Metrics:
|
||||
_instance = None
|
||||
|
@ -9,11 +14,13 @@ class _Metrics:
|
|||
cls._instance = super(_Metrics, cls).__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self, registry: CollectorRegistry):
|
||||
def __init__(self, registry: CollectorRegistry, db: DBHandler):
|
||||
if not hasattr(self, 'initialized'): # Ensuring __init__ runs only once
|
||||
self._registry = registry
|
||||
self._init_metrics()
|
||||
self.initialized = True # Attribute to indicate initialization
|
||||
self.geolocator = Nominatim()
|
||||
self.db = db
|
||||
|
||||
@staticmethod
|
||||
def _get_common_labels():
|
||||
|
@ -27,7 +34,6 @@ class _Metrics:
|
|||
self._init_metrics_telemetry_environment()
|
||||
self._init_metrics_telemetry_air_quality()
|
||||
self._init_metrics_telemetry_power()
|
||||
self._init_metrics_position()
|
||||
self._init_route_discovery_metrics()
|
||||
|
||||
def _init_metrics_text_message(self):
|
||||
|
@ -38,31 +44,23 @@ class _Metrics:
|
|||
registry=self._registry
|
||||
)
|
||||
|
||||
def _init_metrics_position(self):
|
||||
self.device_latitude_gauge = Gauge(
|
||||
'device_latitude',
|
||||
'Device latitude',
|
||||
self._get_common_labels(),
|
||||
registry=self._registry
|
||||
)
|
||||
self.device_longitude_gauge = Gauge(
|
||||
'device_longitude',
|
||||
'Device longitude',
|
||||
self._get_common_labels(),
|
||||
registry=self._registry
|
||||
)
|
||||
self.device_altitude_gauge = Gauge(
|
||||
'device_altitude',
|
||||
'Device altitude',
|
||||
self._get_common_labels(),
|
||||
registry=self._registry
|
||||
)
|
||||
self.device_position_precision_gauge = Gauge(
|
||||
'device_position_precision',
|
||||
'Device position precision',
|
||||
self._get_common_labels(),
|
||||
registry=self._registry
|
||||
)
|
||||
def update_metrics_position(self, latitude, longitude, altitude, precision, client_details: ClientDetails):
|
||||
point = geopy.point.Point(latitude, longitude, altitude)
|
||||
location = self.geolocator.reverse(point, language='en')
|
||||
|
||||
country = location.raw.get('address', {}).get('country', 'Unknown')
|
||||
city = location.raw.get('address', {}).get('city', 'Unknown')
|
||||
state = location.raw.get('address', {}).get('state', 'Unknown')
|
||||
|
||||
def db_operation(cur, conn):
|
||||
cur.execute("""
|
||||
UPDATE node_details
|
||||
SET latitude = %s, longitude = %s, altitude = %s, precision = %s, country = %s, city = %s, state = %s
|
||||
WHERE node_id = %s
|
||||
""", (latitude, longitude, altitude, precision, country, city, state, client_details.node_id))
|
||||
conn.commit()
|
||||
|
||||
self.db.execute_db_operation(db_operation)
|
||||
|
||||
def _init_metrics_telemetry_power(self):
|
||||
self.ch1_voltage_gauge = Gauge(
|
||||
|
@ -336,5 +334,3 @@ class _Metrics:
|
|||
self._get_common_labels() + ['response_type'],
|
||||
registry=self._registry
|
||||
)
|
||||
|
||||
|
||||
|
|
2
main.py
2
main.py
|
@ -36,7 +36,7 @@ def handle_connect(client, userdata, flags, reason_code, properties):
|
|||
def update_node_status(node_number, status):
|
||||
with connection_pool.connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("INSERT INTO client_details (node_id, mqtt_status) VALUES (%s, %s)"
|
||||
cur.execute("INSERT INTO node_details (node_id, mqtt_status) VALUES (%s, %s)"
|
||||
"ON CONFLICT(node_id)"
|
||||
"DO UPDATE SET mqtt_status = %s", (node_number, status, status))
|
||||
conn.commit()
|
||||
|
|
Loading…
Reference in a new issue