diff --git a/.env b/.env index 51c314e..88a1952 100644 --- a/.env +++ b/.env @@ -35,3 +35,6 @@ MQTT_SERVER_KEY=1PG7OiApB1nwvP+rz05pAQ== # Message types to filter (default: none) (comma separated) (eg. TEXT_MESSAGE_APP,POSITION_APP) # Full list can be found here: https://buf.build/meshtastic/protobufs/docs/main:meshtastic#meshtastic.PortNum EXPORTER_MESSAGE_TYPES_TO_FILTER=TEXT_MESSAGE_APP + +# Enable node configurations report (default: true) +REPORT_NODE_CONFIGURATIONS=true diff --git a/docker/postgres/init.sql b/docker/postgres/init.sql index 1e8f37a..bdbe9d5 100644 --- a/docker/postgres/init.sql +++ b/docker/postgres/init.sql @@ -1,5 +1,3 @@ -CREATE EXTENSION IF NOT EXISTS moddatetime; - CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, @@ -53,8 +51,105 @@ CREATE TABLE IF NOT EXISTS node_neighbors CREATE UNIQUE INDEX idx_unique_node_neighbor ON node_neighbors (node_id, neighbor_id); -CREATE OR REPLACE TRIGGER client_details_updated_at +CREATE OR REPLACE TRIGGER node_details_updated_at BEFORE UPDATE ON node_details FOR EACH ROW EXECUTE PROCEDURE moddatetime(updated_at); + +CREATE TABLE IF NOT EXISTS node_configurations +( + node_id VARCHAR PRIMARY KEY, + last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + + -- Configuration (Telemetry) + environment_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL, + environment_update_last_timestamp TIMESTAMP DEFAULT NOW(), + + device_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL, + device_update_last_timestamp TIMESTAMP DEFAULT NOW(), + + air_quality_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL, + air_quality_update_last_timestamp TIMESTAMP DEFAULT NOW(), + + power_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL, + power_update_last_timestamp TIMESTAMP DEFAULT NOW(), + + -- Configuration (Range Test) + range_test_interval INTERVAL DEFAULT '0 seconds' NOT NULL, + range_test_packets_total INT DEFAULT 0, -- in packets + range_test_first_packet_timestamp TIMESTAMP DEFAULT NOW(), + range_test_last_packet_timestamp TIMESTAMP DEFAULT NOW(), + + -- Configuration (PAX Counter) + pax_counter_interval INTERVAL DEFAULT '0 seconds' NOT NULL, + pax_counter_last_timestamp TIMESTAMP DEFAULT NOW(), + + -- Configuration (Neighbor Info) + neighbor_info_interval INTERVAL DEFAULT '0 seconds' NOT NULL, + neighbor_info_last_timestamp TIMESTAMP DEFAULT NOW(), + + -- Configuration (MQTT) + mqtt_encryption_enabled BOOLEAN DEFAULT FALSE, + mqtt_json_enabled BOOLEAN DEFAULT FALSE, + mqtt_configured_root_topic TEXT DEFAULT '', + mqtt_info_last_timestamp TIMESTAMP DEFAULT NOW(), + + -- Configuration (Map) + map_broadcast_interval INTERVAL DEFAULT '0 seconds' NOT NULL, + map_broadcast_last_timestamp TIMESTAMP DEFAULT NOW(), + +-- FOREIGN KEY (node_id) REFERENCES node_details (node_id), + UNIQUE (node_id) +); + +-- -- Function to update old values +-- CREATE OR REPLACE FUNCTION update_old_node_configurations() +-- RETURNS TRIGGER AS $$ +-- BEGIN +-- -- Update intervals to 0 if not updated in 24 hours +-- IF NEW.environment_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN +-- NEW.environment_update_interval := '0 seconds'; +-- END IF; +-- +-- IF NEW.device_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN +-- NEW.device_update_interval := '0 seconds'; +-- END IF; +-- +-- IF NEW.air_quality_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN +-- NEW.air_quality_update_interval := '0 seconds'; +-- END IF; +-- +-- IF NEW.power_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN +-- NEW.power_update_interval := '0 seconds'; +-- END IF; +-- +-- IF NEW.range_test_last_packet_timestamp < NOW() - INTERVAL '1 hours' THEN +-- NEW.range_test_interval := '0 seconds'; +-- NEW.range_test_first_packet_timestamp := 0; +-- NEW.range_test_packets_total := 0; +-- END IF; +-- +-- IF NEW.pax_counter_last_timestamp < NOW() - INTERVAL '24 hours' THEN +-- NEW.pax_counter_interval := '0 seconds'; +-- END IF; +-- +-- IF NEW.neighbor_info_last_timestamp < NOW() - INTERVAL '24 hours' THEN +-- NEW.neighbor_info_interval := '0 seconds'; +-- END IF; +-- +-- IF NEW.map_broadcast_last_timestamp < NOW() - INTERVAL '24 hours' THEN +-- NEW.map_broadcast_interval := '0 seconds'; +-- END IF; +-- +-- NEW.last_updated := CURRENT_TIMESTAMP; +-- +-- RETURN NEW; +-- END; +-- $$ LANGUAGE plpgsql; +-- +-- -- Create the trigger +-- CREATE TRIGGER update_node_configurations_trigger +-- BEFORE UPDATE ON node_configurations +-- FOR EACH ROW +-- EXECUTE FUNCTION update_old_node_configurations(); \ No newline at end of file diff --git a/exporter/__init__.py b/exporter/__init__.py index 67f78e4..bf09bd1 100644 --- a/exporter/__init__.py +++ b/exporter/__init__.py @@ -1 +1 @@ -from .processor_base import MessageProcessor +from exporter.processor.processor_base import MessageProcessor diff --git a/exporter/metric/__init__.py b/exporter/metric/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/exporter/metric/node_configuration_metrics.py b/exporter/metric/node_configuration_metrics.py new file mode 100644 index 0000000..001ee03 --- /dev/null +++ b/exporter/metric/node_configuration_metrics.py @@ -0,0 +1,212 @@ +import os + +from psycopg_pool import ConnectionPool + +from exporter.db_handler import DBHandler + + +class Singleton(type): + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +class NodeConfigurationMetrics(metaclass=Singleton): + def __init__(self, connection_pool: ConnectionPool = None): + self.db = DBHandler(connection_pool) + self.report = os.getenv('REPORT_NODE_CONFIGURATIONS', True) + + def process_environment_update(self, node_id: str): + if not self.report: + return + + def db_operation(cur, conn): + cur.execute(""" + INSERT INTO node_configurations (node_id, + environment_update_interval, + environment_update_last_timestamp + ) VALUES (%s, %s, NOW()) + ON CONFLICT(node_id) + DO UPDATE SET + environment_update_interval = NOW() - node_configurations.environment_update_last_timestamp, + environment_update_last_timestamp = NOW() + """, (node_id, '0 seconds')) + conn.commit() + + self.db.execute_db_operation(db_operation) + + def process_device_update(self, node_id: str): + if not self.report: + return + + def db_operation(cur, conn): + cur.execute(""" + INSERT INTO node_configurations (node_id, + device_update_interval, + device_update_last_timestamp + ) VALUES (%s, %s, NOW()) + ON CONFLICT(node_id) + DO UPDATE SET + device_update_interval = NOW() - node_configurations.device_update_last_timestamp, + device_update_last_timestamp = NOW() + """, (node_id, '0 seconds')) + conn.commit() + + self.db.execute_db_operation(db_operation) + + def process_power_update(self, node_id: str): + if not self.report: + return + + def db_operation(cur, conn): + cur.execute(""" + INSERT INTO node_configurations (node_id, + power_update_interval, + power_update_last_timestamp + ) VALUES (%s, %s, NOW()) + ON CONFLICT(node_id) + DO UPDATE SET + power_update_interval = NOW() - node_configurations.power_update_last_timestamp, + power_update_last_timestamp = NOW() + """, (node_id, '0 seconds')) + conn.commit() + + self.db.execute_db_operation(db_operation) + + def map_broadcast_update(self, node_id: str): + if not self.report: + return + + def db_operation(cur, conn): + cur.execute(""" + INSERT INTO node_configurations (node_id, + map_broadcast_interval, + map_broadcast_last_timestamp + ) VALUES (%s, %s, NOW()) + ON CONFLICT(node_id) + DO UPDATE SET + map_broadcast_interval = NOW() - node_configurations.map_broadcast_last_timestamp, + map_broadcast_last_timestamp = NOW() + """, (node_id, '0 seconds')) + conn.commit() + + self.db.execute_db_operation(db_operation) + + def process_air_quality_update(self, node_id: str): + if not self.report: + return + + def db_operation(cur, conn): + cur.execute(""" + INSERT INTO node_configurations (node_id, + air_quality_update_interval, + air_quality_update_last_timestamp + ) VALUES (%s, %s, NOW()) + ON CONFLICT(node_id) + DO UPDATE SET + air_quality_update_interval = NOW() - node_configurations.air_quality_update_last_timestamp, + air_quality_update_last_timestamp = NOW() + """, (node_id, '0 seconds')) + conn.commit() + + self.db.execute_db_operation(db_operation) + + def process_range_test_update(self, node_id: str): + if not self.report: + return + + def db_operation(cur, conn): + cur.execute(""" + INSERT INTO node_configurations ( + node_id, + range_test_interval, + range_test_packets_total, + range_test_first_packet_timestamp, + range_test_last_packet_timestamp + ) VALUES (%s, %s, NOW(), NOW(), NOW()) + ON CONFLICT(node_id) + DO UPDATE SET + range_test_interval = NOW() - node_configurations.range_test_last_packet_timestamp, + range_test_packets_total = CASE + WHEN EXCLUDED.range_test_last_packet_timestamp - node_configurations.range_test_first_packet_timestamp >= INTERVAL '1 hour' + THEN 1 + ELSE node_configurations.range_test_packets_total + 1 + END, + range_test_first_packet_timestamp = CASE + WHEN EXCLUDED.range_test_last_packet_timestamp - node_configurations.range_test_first_packet_timestamp >= INTERVAL '1 hour' + THEN NOW() + ELSE node_configurations.range_test_first_packet_timestamp + END, + range_test_last_packet_timestamp = NOW() + """, (node_id, '0 seconds')) + conn.commit() + + self.db.execute_db_operation(db_operation) + + def process_pax_counter_update(self, node_id: str): + if not self.report: + return + + def db_operation(cur, conn): + cur.execute(""" + INSERT INTO node_configurations ( + node_id, + pax_counter_interval, + pax_counter_last_timestamp + ) VALUES (%s, %s, NOW()) + ON CONFLICT(node_id) + DO UPDATE SET + pax_counter_interval = NOW() - node_configurations.pax_counter_last_timestamp, + pax_counter_last_timestamp = NOW() + """, (node_id, '0 seconds')) + conn.commit() + + self.db.execute_db_operation(db_operation) + + def process_neighbor_info_update(self, node_id: str): + if not self.report: + return + + def db_operation(cur, conn): + cur.execute(""" + INSERT INTO node_configurations ( + node_id, + neighbor_info_interval, + neighbor_info_last_timestamp + ) VALUES (%s, %s, NOW()) + ON CONFLICT(node_id) + DO UPDATE SET + neighbor_info_interval = NOW() - node_configurations.neighbor_info_last_timestamp, + neighbor_info_last_timestamp = NOW() + """, (node_id, '0 seconds')) + conn.commit() + + self.db.execute_db_operation(db_operation) + + def process_mqtt_update(self, node_id: str, mqtt_encryption_enabled=None, mqtt_json_enabled=None, + mqtt_configured_root_topic=None): + if not self.report: + return + + def db_operation(cur, conn): + cur.execute(""" + INSERT INTO node_configurations ( + node_id, + mqtt_encryption_enabled, + mqtt_json_enabled, + mqtt_configured_root_topic, + mqtt_info_last_timestamp + ) VALUES (%s, COALESCE(%s, FALSE), COALESCE(%s, FALSE), COALESCE(%s, ''), NOW()) + ON CONFLICT(node_id) + DO UPDATE SET + mqtt_encryption_enabled = COALESCE(EXCLUDED.mqtt_encryption_enabled, node_configurations.mqtt_encryption_enabled), + mqtt_json_enabled = COALESCE(EXCLUDED.mqtt_json_enabled, node_configurations.mqtt_json_enabled), + mqtt_configured_root_topic = COALESCE(EXCLUDED.mqtt_configured_root_topic, node_configurations.mqtt_configured_root_topic), + mqtt_info_last_timestamp = NOW() + """, (node_id, mqtt_encryption_enabled, mqtt_json_enabled, mqtt_configured_root_topic)) + conn.commit() + + self.db.execute_db_operation(db_operation) diff --git a/exporter/registry.py b/exporter/metric/node_metrics.py similarity index 99% rename from exporter/registry.py rename to exporter/metric/node_metrics.py index db9538d..7a5c9f0 100644 --- a/exporter/registry.py +++ b/exporter/metric/node_metrics.py @@ -4,12 +4,12 @@ from exporter.client_details import ClientDetails from exporter.db_handler import DBHandler -class _Metrics: +class Metrics: _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: - cls._instance = super(_Metrics, cls).__new__(cls) + cls._instance = super(Metrics, cls).__new__(cls) return cls._instance def __init__(self, registry: CollectorRegistry, db: DBHandler): diff --git a/exporter/processor/__init__.py b/exporter/processor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/exporter/processor_base.py b/exporter/processor/processor_base.py similarity index 89% rename from exporter/processor_base.py rename to exporter/processor/processor_base.py index a775aeb..65358e1 100644 --- a/exporter/processor_base.py +++ b/exporter/processor/processor_base.py @@ -1,9 +1,13 @@ import base64 +import json import os import sys from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope + +from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics try: from meshtastic.mesh_pb2 import MeshPacket, Data, HardwareModel @@ -16,7 +20,7 @@ from prometheus_client import CollectorRegistry, Counter, Gauge from psycopg_pool import ConnectionPool from exporter.client_details import ClientDetails -from exporter.processors import ProcessorRegistry +from exporter.processor.processors import ProcessorRegistry class MessageProcessor: @@ -141,6 +145,32 @@ class MessageProcessor: registry=self.registry ) + @staticmethod + def process_json_mqtt(message): + topic = message.topic + json_packet = json.loads(message.payload) + if json_packet['sender'][0] == '!': + gateway_node_id = str(int(json_packet['sender'][1:], 16)) + NodeConfigurationMetrics().process_mqtt_update( + node_id=gateway_node_id, + mqtt_encryption_enabled=json_packet.get('encrypted', False), + mqtt_configured_root_topic=topic + ) + + @staticmethod + def process_mqtt(topic: str, service_envelope: ServiceEnvelope, mesh_packet: MeshPacket): + is_encrypted = False + if getattr(mesh_packet, 'encrypted'): + is_encrypted = True + if getattr(service_envelope, 'gateway_id'): + if service_envelope.gateway_id[0] == '!': + gateway_node_id = str(int(service_envelope.gateway_id[1:], 16)) + NodeConfigurationMetrics().process_mqtt_update( + node_id=gateway_node_id, + mqtt_encryption_enabled=is_encrypted, + mqtt_configured_root_topic=topic + ) + def process(self, mesh_packet: MeshPacket): try: if getattr(mesh_packet, 'encrypted'): diff --git a/exporter/processors.py b/exporter/processor/processors.py similarity index 96% rename from exporter/processors.py rename to exporter/processor/processors.py index 66f94c1..bb2c7bf 100644 --- a/exporter/processors.py +++ b/exporter/processor/processors.py @@ -6,6 +6,7 @@ import psycopg import unishox2 from exporter.db_handler import DBHandler +from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics try: from meshtastic.admin_pb2 import AdminMessage @@ -32,13 +33,13 @@ from prometheus_client import CollectorRegistry from psycopg_pool import ConnectionPool from exporter.client_details import ClientDetails -from exporter.registry import _Metrics +from exporter.metric.node_metrics import Metrics class Processor(ABC): def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool): self.db_pool = db_pool - self.metrics = _Metrics(registry, DBHandler(db_pool)) + self.metrics = Metrics(registry, DBHandler(db_pool)) @abstractmethod def process(self, payload: bytes, client_details: ClientDetails): @@ -257,6 +258,7 @@ class IpTunnelAppProcessor(Processor): class PaxCounterAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received PAXCOUNTER_APP packet") + NodeConfigurationMetrics().process_pax_counter_update(client_details.node_id) paxcounter = Paxcount() try: paxcounter.ParseFromString(payload) @@ -288,6 +290,7 @@ class StoreForwardAppProcessor(Processor): class RangeTestAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received RANGE_TEST_APP packet") + NodeConfigurationMetrics().process_range_test_update(client_details.node_id) pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9 @@ -306,6 +309,7 @@ class TelemetryAppProcessor(Processor): return if telemetry.HasField('device_metrics'): + NodeConfigurationMetrics().process_device_update(client_details.node_id) device_metrics: DeviceMetrics = telemetry.device_metrics self.metrics.battery_level_gauge.labels( **client_details.to_dict() @@ -328,6 +332,7 @@ class TelemetryAppProcessor(Processor): ).inc(getattr(device_metrics, 'uptime_seconds', 0)) if telemetry.HasField('environment_metrics'): + NodeConfigurationMetrics().process_environment_update(client_details.node_id) environment_metrics: EnvironmentMetrics = telemetry.environment_metrics self.metrics.temperature_gauge.labels( **client_details.to_dict() @@ -382,6 +387,7 @@ class TelemetryAppProcessor(Processor): ).set(getattr(environment_metrics, 'weight', 0)) if telemetry.HasField('air_quality_metrics'): + NodeConfigurationMetrics().process_air_quality_update(client_details.node_id) air_quality_metrics: AirQualityMetrics = telemetry.air_quality_metrics self.metrics.pm10_standard_gauge.labels( **client_details.to_dict() @@ -432,6 +438,7 @@ class TelemetryAppProcessor(Processor): ).set(getattr(air_quality_metrics, 'particles_100um', 0)) if telemetry.HasField('power_metrics'): + NodeConfigurationMetrics().process_power_update(client_details.node_id) power_metrics: PowerMetrics = telemetry.power_metrics self.metrics.ch1_voltage_gauge.labels( **client_details.to_dict() @@ -493,6 +500,7 @@ class TraceRouteAppProcessor(Processor): class NeighborInfoAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received NEIGHBORINFO_APP packet") + NodeConfigurationMetrics().process_neighbor_info_update(client_details.node_id) neighbor_info = NeighborInfo() try: neighbor_info.ParseFromString(payload) @@ -547,6 +555,7 @@ class AtakPluginProcessor(Processor): class MapReportAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received MAP_REPORT_APP packet") + NodeConfigurationMetrics().map_broadcast_update(client_details.node_id) map_report = MapReport() try: map_report.ParseFromString(payload) diff --git a/main.py b/main.py index d090783..17164bc 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ import paho.mqtt.client as mqtt from dotenv import load_dotenv from constants import callback_api_version_map, protocol_map +from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics try: from meshtastic.mesh_pb2 import MeshPacket @@ -38,9 +39,10 @@ def handle_connect(client, userdata, flags, reason_code, properties): def update_node_status(node_number, status): with connection_pool.connection() as conn: with conn.cursor() as cur: - cur.execute("INSERT INTO node_details (node_id, mqtt_status) VALUES (%s, %s)" + cur.execute("INSERT INTO node_details (node_id, mqtt_status, short_name, long_name) VALUES (%s, %s, %s, %s)" "ON CONFLICT(node_id)" - "DO UPDATE SET mqtt_status = %s", (node_number, status, status)) + "DO UPDATE SET mqtt_status = %s", + (node_number, status, status, 'Unknown (MQTT)', 'Unknown (MQTT)')) conn.commit() @@ -48,6 +50,7 @@ def handle_message(client, userdata, message): current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"Received message on topic '{message.topic}' at {current_timestamp}") if '/json/' in message.topic: + processor.process_json_mqtt(message) # Ignore JSON messages as there are also protobuf messages sent on other topic # Source: https://github.com/meshtastic/firmware/blob/master/src/mqtt/MQTT.cpp#L448 return @@ -78,7 +81,7 @@ def handle_message(client, userdata, message): cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING", (str(packet.id),)) conn.commit() - + processor.process_mqtt(message.topic, envelope, packet) processor.process(packet) except Exception as e: logging.error(f"Failed to handle message: {e}") @@ -89,14 +92,15 @@ if __name__ == "__main__": load_dotenv() # We have to load_dotenv before we can import MessageProcessor to allow filtering of message types - from exporter.processor_base import MessageProcessor + from exporter.processor.processor_base import MessageProcessor # Setup a connection pool connection_pool = ConnectionPool( os.getenv('DATABASE_URL'), - min_size=1, - max_size=10 + max_size=100 ) + # Configure node configuration metrics + node_conf_metrics = NodeConfigurationMetrics(connection_pool) # Configure Prometheus exporter registry = CollectorRegistry()