Merge pull request #53 from tcivie/49-mqtt-import-stops
Performence optimization + redesign
This commit is contained in:
commit
6139b7a968
10
.env
10
.env
|
@ -13,7 +13,7 @@ MQTT_PORT=1883
|
||||||
MQTT_USERNAME=meshdev
|
MQTT_USERNAME=meshdev
|
||||||
MQTT_PASSWORD=large4cats
|
MQTT_PASSWORD=large4cats
|
||||||
MQTT_KEEPALIVE=60
|
MQTT_KEEPALIVE=60
|
||||||
MQTT_TOPIC='msh/israel/#'
|
MQTT_TOPIC='msh/#'
|
||||||
MQTT_IS_TLS=false
|
MQTT_IS_TLS=false
|
||||||
|
|
||||||
# MQTT protocol version (default: MQTTv5) the public MQTT server supports MQTTv311
|
# MQTT protocol version (default: MQTTv5) the public MQTT server supports MQTTv311
|
||||||
|
@ -29,9 +29,9 @@ MQTT_CALLBACK_API_VERSION=VERSION2
|
||||||
MESH_HIDE_SOURCE_DATA=false
|
MESH_HIDE_SOURCE_DATA=false
|
||||||
## Hide destination data in the exporter (default: false)
|
## Hide destination data in the exporter (default: false)
|
||||||
MESH_HIDE_DESTINATION_DATA=false
|
MESH_HIDE_DESTINATION_DATA=false
|
||||||
## Filtered ports in the exporter (default: 1, can be a comma-separated list of ports)
|
|
||||||
FILTERED_PORTS=0
|
|
||||||
## Hide message content in the TEXT_MESSAGE_APP packets (default: true) (Currently we only log message length, if we hide then all messages would have the same length)
|
|
||||||
HIDE_MESSAGE=false
|
|
||||||
## MQTT server Key for decoding
|
## MQTT server Key for decoding
|
||||||
MQTT_SERVER_KEY=1PG7OiApB1nwvP+rz05pAQ==
|
MQTT_SERVER_KEY=1PG7OiApB1nwvP+rz05pAQ==
|
||||||
|
|
||||||
|
# Message types to filter (default: none) (comma separated) (eg. TEXT_MESSAGE_APP,POSITION_APP)
|
||||||
|
# Full list can be found here: https://buf.build/meshtastic/protobufs/docs/main:meshtastic#meshtastic.PortNum
|
||||||
|
EXPORTER_MESSAGE_TYPES_TO_FILTER=TEXT_MESSAGE_APP
|
||||||
|
|
|
@ -43,7 +43,7 @@ services:
|
||||||
- mesh-bridge
|
- mesh-bridge
|
||||||
|
|
||||||
postgres:
|
postgres:
|
||||||
image: postgres:13.3
|
image: postgres:16.3
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
networks:
|
networks:
|
||||||
- mesh-bridge
|
- mesh-bridge
|
||||||
|
|
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
|
@ -1,3 +1,5 @@
|
||||||
|
CREATE EXTENSION IF NOT EXISTS moddatetime;
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS messages
|
CREATE TABLE IF NOT EXISTS messages
|
||||||
(
|
(
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
|
@ -19,23 +21,23 @@ CREATE TRIGGER trigger_expire_old_messages
|
||||||
FOR EACH ROW
|
FOR EACH ROW
|
||||||
EXECUTE FUNCTION expire_old_messages();
|
EXECUTE FUNCTION expire_old_messages();
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS client_details
|
CREATE TABLE IF NOT EXISTS node_details
|
||||||
(
|
(
|
||||||
node_id VARCHAR PRIMARY KEY,
|
node_id VARCHAR PRIMARY KEY,
|
||||||
|
-- Base Data
|
||||||
short_name VARCHAR,
|
short_name VARCHAR,
|
||||||
long_name VARCHAR,
|
long_name VARCHAR,
|
||||||
hardware_model VARCHAR,
|
hardware_model VARCHAR,
|
||||||
role VARCHAR,
|
role VARCHAR,
|
||||||
mqtt_status VARCHAR default 'none'
|
mqtt_status VARCHAR default 'none',
|
||||||
);
|
-- Location Data
|
||||||
|
longitude INT,
|
||||||
CREATE TABLE IF NOT EXISTS node_graph
|
latitude INT,
|
||||||
(
|
altitude INT,
|
||||||
node_id VARCHAR PRIMARY KEY,
|
precision INT,
|
||||||
last_sent_by_node_id VARCHAR,
|
-- SQL Data
|
||||||
last_sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
|
||||||
broadcast_interval_secs INTEGER,
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
|
||||||
FOREIGN KEY (node_id) REFERENCES client_details (node_id)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS node_neighbors
|
CREATE TABLE IF NOT EXISTS node_neighbors
|
||||||
|
@ -44,12 +46,15 @@ CREATE TABLE IF NOT EXISTS node_neighbors
|
||||||
node_id VARCHAR,
|
node_id VARCHAR,
|
||||||
neighbor_id VARCHAR,
|
neighbor_id VARCHAR,
|
||||||
snr FLOAT,
|
snr FLOAT,
|
||||||
FOREIGN KEY (node_id) REFERENCES client_details (node_id),
|
FOREIGN KEY (node_id) REFERENCES node_details (node_id),
|
||||||
FOREIGN KEY (neighbor_id) REFERENCES client_details (node_id),
|
FOREIGN KEY (neighbor_id) REFERENCES node_details (node_id),
|
||||||
UNIQUE (node_id, neighbor_id)
|
UNIQUE (node_id, neighbor_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE UNIQUE INDEX idx_unique_node_neighbor ON node_neighbors (node_id, neighbor_id);
|
CREATE UNIQUE INDEX idx_unique_node_neighbor ON node_neighbors (node_id, neighbor_id);
|
||||||
|
|
||||||
ALTER TABLE client_details
|
CREATE OR REPLACE TRIGGER client_details_updated_at
|
||||||
ADD COLUMN IF NOT EXISTS created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
|
BEFORE UPDATE
|
||||||
|
ON node_details
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE PROCEDURE moddatetime(updated_at);
|
||||||
|
|
17
exporter/db_handler.py
Normal file
17
exporter/db_handler.py
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
from psycopg_pool import ConnectionPool
|
||||||
|
|
||||||
|
|
||||||
|
class DBHandler:
|
||||||
|
def __init__(self, db_pool: ConnectionPool):
|
||||||
|
self.db_pool = db_pool
|
||||||
|
|
||||||
|
def get_connection(self):
|
||||||
|
return self.db_pool.getconn()
|
||||||
|
|
||||||
|
def release_connection(self, conn):
|
||||||
|
self.db_pool.putconn(conn)
|
||||||
|
|
||||||
|
def execute_db_operation(self, operation):
|
||||||
|
with self.db_pool.connection() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
return operation(cur, conn)
|
|
@ -1,5 +1,6 @@
|
||||||
import base64
|
import base64
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||||
|
@ -11,7 +12,7 @@ except ImportError:
|
||||||
from meshtastic.protobuf.mesh_pb2 import MeshPacket, Data, HardwareModel
|
from meshtastic.protobuf.mesh_pb2 import MeshPacket, Data, HardwareModel
|
||||||
from meshtastic.protobuf.portnums_pb2 import PortNum
|
from meshtastic.protobuf.portnums_pb2 import PortNum
|
||||||
|
|
||||||
from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge
|
from prometheus_client import CollectorRegistry, Counter, Gauge
|
||||||
from psycopg_pool import ConnectionPool
|
from psycopg_pool import ConnectionPool
|
||||||
|
|
||||||
from exporter.client_details import ClientDetails
|
from exporter.client_details import ClientDetails
|
||||||
|
@ -20,6 +21,7 @@ from exporter.processors import ProcessorRegistry
|
||||||
|
|
||||||
class MessageProcessor:
|
class MessageProcessor:
|
||||||
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
|
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
|
||||||
|
self.message_size_in_bytes = None
|
||||||
self.rx_rssi_gauge = None
|
self.rx_rssi_gauge = None
|
||||||
self.channel_counter = None
|
self.channel_counter = None
|
||||||
self.packet_id_counter = None
|
self.packet_id_counter = None
|
||||||
|
@ -44,6 +46,17 @@ class MessageProcessor:
|
||||||
'destination_role'
|
'destination_role'
|
||||||
]
|
]
|
||||||
|
|
||||||
|
reduced_labels = [
|
||||||
|
'source_id', 'destination_id'
|
||||||
|
]
|
||||||
|
|
||||||
|
self.message_size_in_bytes = Gauge(
|
||||||
|
'text_message_app_size_in_bytes',
|
||||||
|
'Size of text messages processed by the app in Bytes',
|
||||||
|
reduced_labels + ['portnum'],
|
||||||
|
registry=self.registry
|
||||||
|
)
|
||||||
|
|
||||||
self.source_message_type_counter = Counter(
|
self.source_message_type_counter = Counter(
|
||||||
'mesh_packet_source_types',
|
'mesh_packet_source_types',
|
||||||
'Types of mesh packets processed by source',
|
'Types of mesh packets processed by source',
|
||||||
|
@ -65,7 +78,7 @@ class MessageProcessor:
|
||||||
registry=self.registry
|
registry=self.registry
|
||||||
)
|
)
|
||||||
# Histogram for the rx_time (time in seconds)
|
# Histogram for the rx_time (time in seconds)
|
||||||
self.rx_time_histogram = Histogram(
|
self.rx_time_histogram = Gauge(
|
||||||
'mesh_packet_rx_time',
|
'mesh_packet_rx_time',
|
||||||
'Receive time of mesh packets (seconds since 1970)',
|
'Receive time of mesh packets (seconds since 1970)',
|
||||||
common_labels,
|
common_labels,
|
||||||
|
@ -165,9 +178,6 @@ class MessageProcessor:
|
||||||
short_name='Hidden',
|
short_name='Hidden',
|
||||||
long_name='Hidden')
|
long_name='Hidden')
|
||||||
|
|
||||||
if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports
|
|
||||||
return None # Ignore this packet
|
|
||||||
|
|
||||||
self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details)
|
self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details)
|
||||||
|
|
||||||
processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool)
|
processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool)
|
||||||
|
@ -184,7 +194,8 @@ class MessageProcessor:
|
||||||
return enum_value.name
|
return enum_value.name
|
||||||
return 'UNKNOWN_PORT'
|
return 'UNKNOWN_PORT'
|
||||||
|
|
||||||
def process_simple_packet_details(self, destination_client_details, mesh_packet, port_num, source_client_details):
|
def process_simple_packet_details(self, destination_client_details, mesh_packet: MeshPacket, port_num,
|
||||||
|
source_client_details):
|
||||||
common_labels = {
|
common_labels = {
|
||||||
'source_id': source_client_details.node_id,
|
'source_id': source_client_details.node_id,
|
||||||
'source_short_name': source_client_details.short_name,
|
'source_short_name': source_client_details.short_name,
|
||||||
|
@ -198,6 +209,16 @@ class MessageProcessor:
|
||||||
'destination_role': destination_client_details.role,
|
'destination_role': destination_client_details.role,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reduced_labels = {
|
||||||
|
'source_id': source_client_details.node_id,
|
||||||
|
'destination_id': destination_client_details.node_id
|
||||||
|
}
|
||||||
|
|
||||||
|
self.message_size_in_bytes.labels(
|
||||||
|
**reduced_labels,
|
||||||
|
portnum=self.get_port_name_from_portnum(port_num)
|
||||||
|
).set(sys.getsizeof(mesh_packet))
|
||||||
|
|
||||||
self.source_message_type_counter.labels(
|
self.source_message_type_counter.labels(
|
||||||
**common_labels,
|
**common_labels,
|
||||||
portnum=self.get_port_name_from_portnum(port_num)
|
portnum=self.get_port_name_from_portnum(port_num)
|
||||||
|
@ -214,7 +235,7 @@ class MessageProcessor:
|
||||||
|
|
||||||
self.rx_time_histogram.labels(
|
self.rx_time_histogram.labels(
|
||||||
**common_labels
|
**common_labels
|
||||||
).observe(mesh_packet.rx_time)
|
).set(mesh_packet.rx_time)
|
||||||
|
|
||||||
self.rx_snr_gauge.labels(
|
self.rx_snr_gauge.labels(
|
||||||
**common_labels
|
**common_labels
|
||||||
|
@ -261,7 +282,7 @@ class MessageProcessor:
|
||||||
# First, try to select the existing record
|
# First, try to select the existing record
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
SELECT node_id, short_name, long_name, hardware_model, role
|
SELECT node_id, short_name, long_name, hardware_model, role
|
||||||
FROM client_details
|
FROM node_details
|
||||||
WHERE node_id = %s;
|
WHERE node_id = %s;
|
||||||
""", (node_id_str,))
|
""", (node_id_str,))
|
||||||
result = cur.fetchone()
|
result = cur.fetchone()
|
||||||
|
@ -269,7 +290,7 @@ class MessageProcessor:
|
||||||
if not result:
|
if not result:
|
||||||
# If the client is not found, insert a new record
|
# If the client is not found, insert a new record
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role)
|
INSERT INTO node_details (node_id, short_name, long_name, hardware_model, role)
|
||||||
VALUES (%s, %s, %s, %s, %s)
|
VALUES (%s, %s, %s, %s, %s)
|
||||||
RETURNING node_id, short_name, long_name, hardware_model, role;
|
RETURNING node_id, short_name, long_name, hardware_model, role;
|
||||||
""", (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None))
|
""", (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None))
|
||||||
|
|
|
@ -5,6 +5,8 @@ from venv import logger
|
||||||
import psycopg
|
import psycopg
|
||||||
import unishox2
|
import unishox2
|
||||||
|
|
||||||
|
from exporter.db_handler import DBHandler
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from meshtastic.admin_pb2 import AdminMessage
|
from meshtastic.admin_pb2 import AdminMessage
|
||||||
from meshtastic.mesh_pb2 import Position, User, HardwareModel, Routing, Waypoint, RouteDiscovery, NeighborInfo
|
from meshtastic.mesh_pb2 import Position, User, HardwareModel, Routing, Waypoint, RouteDiscovery, NeighborInfo
|
||||||
|
@ -36,17 +38,12 @@ from exporter.registry import _Metrics
|
||||||
class Processor(ABC):
|
class Processor(ABC):
|
||||||
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
|
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
|
||||||
self.db_pool = db_pool
|
self.db_pool = db_pool
|
||||||
self.metrics = _Metrics(registry)
|
self.metrics = _Metrics(registry, DBHandler(db_pool))
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def process(self, payload: bytes, client_details: ClientDetails):
|
def process(self, payload: bytes, client_details: ClientDetails):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def execute_db_operation(self, operation):
|
|
||||||
with self.db_pool.connection() as conn:
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
return operation(cur, conn)
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessorRegistry:
|
class ProcessorRegistry:
|
||||||
_registry = {}
|
_registry = {}
|
||||||
|
@ -54,6 +51,11 @@ class ProcessorRegistry:
|
||||||
@classmethod
|
@classmethod
|
||||||
def register_processor(cls, port_num):
|
def register_processor(cls, port_num):
|
||||||
def inner_wrapper(wrapped_class):
|
def inner_wrapper(wrapped_class):
|
||||||
|
if PortNum.DESCRIPTOR.values_by_number[port_num].name in os.getenv('EXPORTER_MESSAGE_TYPES_TO_FILTER',
|
||||||
|
'').split(','):
|
||||||
|
logger.info(f"Processor for port_num {port_num} is filtered out")
|
||||||
|
return wrapped_class
|
||||||
|
|
||||||
cls._registry[port_num] = wrapped_class
|
cls._registry[port_num] = wrapped_class
|
||||||
return wrapped_class
|
return wrapped_class
|
||||||
|
|
||||||
|
@ -71,7 +73,6 @@ class ProcessorRegistry:
|
||||||
@ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP)
|
@ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP)
|
||||||
class UnknownAppProcessor(Processor):
|
class UnknownAppProcessor(Processor):
|
||||||
def process(self, payload: bytes, client_details: ClientDetails):
|
def process(self, payload: bytes, client_details: ClientDetails):
|
||||||
logger.debug("Received UNKNOWN_APP packet")
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@ -79,12 +80,7 @@ class UnknownAppProcessor(Processor):
|
||||||
class TextMessageAppProcessor(Processor):
|
class TextMessageAppProcessor(Processor):
|
||||||
def process(self, payload: bytes, client_details: ClientDetails):
|
def process(self, payload: bytes, client_details: ClientDetails):
|
||||||
logger.debug("Received TEXT_MESSAGE_APP packet")
|
logger.debug("Received TEXT_MESSAGE_APP packet")
|
||||||
message = payload.decode('utf-8')
|
pass
|
||||||
if os.getenv('HIDE_MESSAGE', 'true') == 'true':
|
|
||||||
message = 'Hidden'
|
|
||||||
self.metrics.message_length_histogram.labels(
|
|
||||||
**client_details.to_dict()
|
|
||||||
).observe(len(message))
|
|
||||||
|
|
||||||
|
|
||||||
@ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP)
|
@ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP)
|
||||||
|
@ -109,18 +105,10 @@ class PositionAppProcessor(Processor):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to parse POSITION_APP packet: {e}")
|
logger.error(f"Failed to parse POSITION_APP packet: {e}")
|
||||||
return
|
return
|
||||||
self.metrics.device_latitude_gauge.labels(
|
|
||||||
**client_details.to_dict()
|
self.metrics.update_metrics_position(
|
||||||
).set(position.latitude_i)
|
position.latitude_i, position.longitude_i, position.altitude,
|
||||||
self.metrics.device_longitude_gauge.labels(
|
position.precision_bits, client_details)
|
||||||
**client_details.to_dict()
|
|
||||||
).set(position.longitude_i)
|
|
||||||
self.metrics.device_altitude_gauge.labels(
|
|
||||||
**client_details.to_dict()
|
|
||||||
).set(position.altitude)
|
|
||||||
self.metrics.device_position_precision_gauge.labels(
|
|
||||||
**client_details.to_dict()
|
|
||||||
).set(position.precision_bits)
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@ -139,7 +127,7 @@ class NodeInfoAppProcessor(Processor):
|
||||||
# First, try to select the existing record
|
# First, try to select the existing record
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
SELECT short_name, long_name, hardware_model, role
|
SELECT short_name, long_name, hardware_model, role
|
||||||
FROM client_details
|
FROM node_details
|
||||||
WHERE node_id = %s;
|
WHERE node_id = %s;
|
||||||
""", (client_details.node_id,))
|
""", (client_details.node_id,))
|
||||||
existing_record = cur.fetchone()
|
existing_record = cur.fetchone()
|
||||||
|
@ -163,7 +151,7 @@ class NodeInfoAppProcessor(Processor):
|
||||||
|
|
||||||
if update_fields:
|
if update_fields:
|
||||||
update_query = f"""
|
update_query = f"""
|
||||||
UPDATE client_details
|
UPDATE node_details
|
||||||
SET {", ".join(update_fields)}
|
SET {", ".join(update_fields)}
|
||||||
WHERE node_id = %s
|
WHERE node_id = %s
|
||||||
"""
|
"""
|
||||||
|
@ -171,7 +159,7 @@ class NodeInfoAppProcessor(Processor):
|
||||||
else:
|
else:
|
||||||
# If record doesn't exist, insert a new one
|
# If record doesn't exist, insert a new one
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role)
|
INSERT INTO node_details (node_id, short_name, long_name, hardware_model, role)
|
||||||
VALUES (%s, %s, %s, %s, %s)
|
VALUES (%s, %s, %s, %s, %s)
|
||||||
""", (client_details.node_id, user.short_name, user.long_name,
|
""", (client_details.node_id, user.short_name, user.long_name,
|
||||||
ClientDetails.get_hardware_model_name_from_code(user.hw_model),
|
ClientDetails.get_hardware_model_name_from_code(user.hw_model),
|
||||||
|
@ -179,7 +167,7 @@ class NodeInfoAppProcessor(Processor):
|
||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
self.execute_db_operation(db_operation)
|
self.metrics.get_db().execute_db_operation(db_operation)
|
||||||
|
|
||||||
|
|
||||||
@ProcessorRegistry.register_processor(PortNum.ROUTING_APP)
|
@ProcessorRegistry.register_processor(PortNum.ROUTING_APP)
|
||||||
|
@ -511,24 +499,8 @@ class NeighborInfoAppProcessor(Processor):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to parse NEIGHBORINFO_APP packet: {e}")
|
logger.error(f"Failed to parse NEIGHBORINFO_APP packet: {e}")
|
||||||
return
|
return
|
||||||
self.update_node_graph(neighbor_info, client_details)
|
|
||||||
self.update_node_neighbors(neighbor_info, client_details)
|
self.update_node_neighbors(neighbor_info, client_details)
|
||||||
|
|
||||||
def update_node_graph(self, neighbor_info: NeighborInfo, client_details: ClientDetails):
|
|
||||||
def operation(cur, conn):
|
|
||||||
cur.execute("""
|
|
||||||
INSERT INTO node_graph (node_id, last_sent_by_node_id, broadcast_interval_secs)
|
|
||||||
VALUES (%s, %s, %s)
|
|
||||||
ON CONFLICT (node_id)
|
|
||||||
DO UPDATE SET
|
|
||||||
last_sent_by_node_id = EXCLUDED.last_sent_by_node_id,
|
|
||||||
broadcast_interval_secs = EXCLUDED.broadcast_interval_secs,
|
|
||||||
last_sent_at = CURRENT_TIMESTAMP
|
|
||||||
""", (client_details.node_id, neighbor_info.last_sent_by_id, neighbor_info.node_broadcast_interval_secs))
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
self.execute_db_operation(operation)
|
|
||||||
|
|
||||||
def update_node_neighbors(self, neighbor_info: NeighborInfo, client_details: ClientDetails):
|
def update_node_neighbors(self, neighbor_info: NeighborInfo, client_details: ClientDetails):
|
||||||
def operation(cur, conn):
|
def operation(cur, conn):
|
||||||
new_neighbor_ids = [str(neighbor.node_id) for neighbor in neighbor_info.neighbors]
|
new_neighbor_ids = [str(neighbor.node_id) for neighbor in neighbor_info.neighbors]
|
||||||
|
@ -550,18 +522,18 @@ class NeighborInfoAppProcessor(Processor):
|
||||||
DO UPDATE SET snr = EXCLUDED.snr
|
DO UPDATE SET snr = EXCLUDED.snr
|
||||||
RETURNING node_id, neighbor_id
|
RETURNING node_id, neighbor_id
|
||||||
)
|
)
|
||||||
INSERT INTO client_details (node_id)
|
INSERT INTO node_details (node_id)
|
||||||
SELECT node_id FROM upsert
|
SELECT node_id FROM upsert
|
||||||
WHERE NOT EXISTS (SELECT 1 FROM client_details WHERE node_id = upsert.node_id)
|
WHERE NOT EXISTS (SELECT 1 FROM node_details WHERE node_id = upsert.node_id)
|
||||||
UNION
|
UNION
|
||||||
SELECT neighbor_id FROM upsert
|
SELECT neighbor_id FROM upsert
|
||||||
WHERE NOT EXISTS (SELECT 1 FROM client_details WHERE node_id = upsert.neighbor_id)
|
WHERE NOT EXISTS (SELECT 1 FROM node_details WHERE node_id = upsert.neighbor_id)
|
||||||
ON CONFLICT (node_id) DO NOTHING;
|
ON CONFLICT (node_id) DO NOTHING;
|
||||||
""", (str(client_details.node_id), str(neighbor.node_id), float(neighbor.snr)))
|
""", (str(client_details.node_id), str(neighbor.node_id), float(neighbor.snr)))
|
||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
self.execute_db_operation(operation)
|
self.metrics.get_db().execute_db_operation(operation)
|
||||||
|
|
||||||
|
|
||||||
@ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN)
|
@ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN)
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
|
from prometheus_client import CollectorRegistry, Counter, Gauge
|
||||||
|
|
||||||
|
from exporter.client_details import ClientDetails
|
||||||
|
from exporter.db_handler import DBHandler
|
||||||
|
|
||||||
|
|
||||||
class _Metrics:
|
class _Metrics:
|
||||||
|
@ -9,11 +12,15 @@ class _Metrics:
|
||||||
cls._instance = super(_Metrics, cls).__new__(cls)
|
cls._instance = super(_Metrics, cls).__new__(cls)
|
||||||
return cls._instance
|
return cls._instance
|
||||||
|
|
||||||
def __init__(self, registry: CollectorRegistry):
|
def __init__(self, registry: CollectorRegistry, db: DBHandler):
|
||||||
if not hasattr(self, 'initialized'): # Ensuring __init__ runs only once
|
if not hasattr(self, 'initialized'): # Ensuring __init__ runs only once
|
||||||
self._registry = registry
|
self._registry = registry
|
||||||
self._init_metrics()
|
self._init_metrics()
|
||||||
self.initialized = True # Attribute to indicate initialization
|
self.initialized = True # Attribute to indicate initialization
|
||||||
|
self.db = db
|
||||||
|
|
||||||
|
def get_db(self):
|
||||||
|
return self.db
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_common_labels():
|
def _get_common_labels():
|
||||||
|
@ -22,47 +29,31 @@ class _Metrics:
|
||||||
]
|
]
|
||||||
|
|
||||||
def _init_metrics(self):
|
def _init_metrics(self):
|
||||||
self._init_metrics_text_message()
|
|
||||||
self._init_metrics_telemetry_device()
|
self._init_metrics_telemetry_device()
|
||||||
self._init_metrics_telemetry_environment()
|
self._init_metrics_telemetry_environment()
|
||||||
self._init_metrics_telemetry_air_quality()
|
self._init_metrics_telemetry_air_quality()
|
||||||
self._init_metrics_telemetry_power()
|
self._init_metrics_telemetry_power()
|
||||||
self._init_metrics_position()
|
|
||||||
self._init_route_discovery_metrics()
|
self._init_route_discovery_metrics()
|
||||||
|
|
||||||
def _init_metrics_text_message(self):
|
def update_metrics_position(self, latitude, longitude, altitude, precision, client_details: ClientDetails):
|
||||||
self.message_length_histogram = Histogram(
|
# Could be used to calculate more complex data (Like distances etc..)
|
||||||
'text_message_app_length',
|
# point = geopy.point.Point(latitude, longitude, altitude) # Not used for now
|
||||||
'Length of text messages processed by the app',
|
|
||||||
self._get_common_labels(),
|
|
||||||
registry=self._registry
|
|
||||||
)
|
|
||||||
|
|
||||||
def _init_metrics_position(self):
|
if latitude != 0 and longitude != 0:
|
||||||
self.device_latitude_gauge = Gauge(
|
# location = RateLimiter(self.geolocator.reverse, min_delay_seconds=10, swallow_exceptions=False)((latitude, longitude), language='en', timeout=10)
|
||||||
'device_latitude',
|
# country = location.raw.get('address', {}).get('country', 'Unknown')
|
||||||
'Device latitude',
|
# city = location.raw.get('address', {}).get('city', 'Unknown')
|
||||||
self._get_common_labels(),
|
# state = location.raw.get('address', {}).get('state', 'Unknown')
|
||||||
registry=self._registry
|
|
||||||
)
|
def db_operation(cur, conn):
|
||||||
self.device_longitude_gauge = Gauge(
|
cur.execute("""
|
||||||
'device_longitude',
|
UPDATE node_details
|
||||||
'Device longitude',
|
SET latitude = %s, longitude = %s, altitude = %s, precision = %s
|
||||||
self._get_common_labels(),
|
WHERE node_id = %s
|
||||||
registry=self._registry
|
""", (latitude, longitude, altitude, precision, client_details.node_id))
|
||||||
)
|
conn.commit()
|
||||||
self.device_altitude_gauge = Gauge(
|
|
||||||
'device_altitude',
|
self.db.execute_db_operation(db_operation)
|
||||||
'Device altitude',
|
|
||||||
self._get_common_labels(),
|
|
||||||
registry=self._registry
|
|
||||||
)
|
|
||||||
self.device_position_precision_gauge = Gauge(
|
|
||||||
'device_position_precision',
|
|
||||||
'Device position precision',
|
|
||||||
self._get_common_labels(),
|
|
||||||
registry=self._registry
|
|
||||||
)
|
|
||||||
|
|
||||||
def _init_metrics_telemetry_power(self):
|
def _init_metrics_telemetry_power(self):
|
||||||
self.ch1_voltage_gauge = Gauge(
|
self.ch1_voltage_gauge = Gauge(
|
||||||
|
@ -336,5 +327,3 @@ class _Metrics:
|
||||||
self._get_common_labels() + ['response_type'],
|
self._get_common_labels() + ['response_type'],
|
||||||
registry=self._registry
|
registry=self._registry
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
7
main.py
7
main.py
|
@ -17,8 +17,6 @@ except ImportError:
|
||||||
from prometheus_client import CollectorRegistry, start_http_server
|
from prometheus_client import CollectorRegistry, start_http_server
|
||||||
from psycopg_pool import ConnectionPool
|
from psycopg_pool import ConnectionPool
|
||||||
|
|
||||||
from exporter.processor_base import MessageProcessor
|
|
||||||
|
|
||||||
connection_pool = None
|
connection_pool = None
|
||||||
|
|
||||||
|
|
||||||
|
@ -38,7 +36,7 @@ def handle_connect(client, userdata, flags, reason_code, properties):
|
||||||
def update_node_status(node_number, status):
|
def update_node_status(node_number, status):
|
||||||
with connection_pool.connection() as conn:
|
with connection_pool.connection() as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute("INSERT INTO client_details (node_id, mqtt_status) VALUES (%s, %s)"
|
cur.execute("INSERT INTO node_details (node_id, mqtt_status) VALUES (%s, %s)"
|
||||||
"ON CONFLICT(node_id)"
|
"ON CONFLICT(node_id)"
|
||||||
"DO UPDATE SET mqtt_status = %s", (node_number, status, status))
|
"DO UPDATE SET mqtt_status = %s", (node_number, status, status))
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
@ -88,6 +86,9 @@ def handle_message(client, userdata, message):
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
|
# We have to load_dotenv before we can import MessageProcessor to allow filtering of message types
|
||||||
|
from exporter.processor_base import MessageProcessor
|
||||||
|
|
||||||
# Setup a connection pool
|
# Setup a connection pool
|
||||||
connection_pool = ConnectionPool(
|
connection_pool = ConnectionPool(
|
||||||
os.getenv('DATABASE_URL'),
|
os.getenv('DATABASE_URL'),
|
||||||
|
|
|
@ -6,4 +6,5 @@ cryptography~=42.0.8
|
||||||
psycopg~=3.1.19
|
psycopg~=3.1.19
|
||||||
psycopg_pool~=3.2.2
|
psycopg_pool~=3.2.2
|
||||||
meshtastic~=2.3.13
|
meshtastic~=2.3.13
|
||||||
psycopg-binary~=3.1.20
|
psycopg-binary~=3.1.20
|
||||||
|
geopy>=2.4.1
|
Loading…
Reference in a new issue