From af29d71a7d7be35bc1f8f4e929323279262ff869 Mon Sep 17 00:00:00 2001 From: Gleb Tcivie Date: Mon, 1 Jul 2024 13:57:26 +0300 Subject: [PATCH] Reformatted the code. client_details.py -> Now handles the client details Class processor_base.py -> All the base classes for the processors processors.py -> All available processors registry.py -> Registry mechanism for the processors --- .idea/sqldialects.xml | 2 +- exporter/__init__.py | 2 +- exporter/client_details.py | 37 ++ exporter/processor_base.py | 271 ++++++++++++++ exporter/processors.py | 735 ++++++++++++++++++++++++------------- exporter/registry.py | 528 +------------------------- main.py | 2 +- 7 files changed, 794 insertions(+), 783 deletions(-) create mode 100644 exporter/client_details.py create mode 100644 exporter/processor_base.py diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml index 0201e4b..f19a67b 100644 --- a/.idea/sqldialects.xml +++ b/.idea/sqldialects.xml @@ -2,7 +2,7 @@ - + \ No newline at end of file diff --git a/exporter/__init__.py b/exporter/__init__.py index e602c54..67f78e4 100644 --- a/exporter/__init__.py +++ b/exporter/__init__.py @@ -1 +1 @@ -from .processors import MessageProcessor +from .processor_base import MessageProcessor diff --git a/exporter/client_details.py b/exporter/client_details.py new file mode 100644 index 0000000..5078848 --- /dev/null +++ b/exporter/client_details.py @@ -0,0 +1,37 @@ +from meshtastic.config_pb2 import Config +from meshtastic.mesh_pb2 import HardwareModel + + +class ClientDetails: + def __init__(self, node_id, short_name='Unknown', long_name='Unknown', hardware_model=HardwareModel.UNSET, + role=None): + self.node_id = node_id + self.short_name = short_name + self.long_name = long_name + self.hardware_model: HardwareModel = hardware_model + self.role: Config.DeviceConfig.Role = role + + def to_dict(self): + return { + 'node_id': self.node_id, + 'short_name': self.short_name, + 'long_name': self.long_name, + 'hardware_model': self.get_hardware_model_name_from_code(self.hardware_model), + 'role': self.get_role_name_from_role(self.role) + } + + @staticmethod + def get_role_name_from_role(role): + descriptor = Config.DeviceConfig.Role.DESCRIPTOR + for enum_value in descriptor.values: + if enum_value.number == role: + return enum_value.name + return 'UNKNOWN_ROLE' + + @staticmethod + def get_hardware_model_name_from_code(hardware_model): + descriptor = HardwareModel.DESCRIPTOR + for enum_value in descriptor.values: + if enum_value.number == hardware_model: + return enum_value.name + return 'UNKNOWN_HARDWARE_MODEL' diff --git a/exporter/processor_base.py b/exporter/processor_base.py new file mode 100644 index 0000000..ec9d2ef --- /dev/null +++ b/exporter/processor_base.py @@ -0,0 +1,271 @@ +import base64 +import os + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from meshtastic.mesh_pb2 import MeshPacket, Data, HardwareModel +from meshtastic.portnums_pb2 import PortNum +from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge +from psycopg_pool import ConnectionPool + +from exporter.client_details import ClientDetails +from exporter.processors import ProcessorRegistry + + +class MessageProcessor: + def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool): + self.rx_rssi_gauge = None + self.channel_counter = None + self.packet_id_counter = None + self.hop_start_gauge = None + self.via_mqtt_counter = None + self.want_ack_counter = None + self.hop_limit_counter = None + self.rx_snr_gauge = None + self.rx_time_histogram = None + self.total_packets_counter = None + self.destination_message_type_counter = None + self.source_message_type_counter = None + self.registry = registry + self.db_pool = db_pool + self.init_metrics() + self.processor_registry = ProcessorRegistry() + + def init_metrics(self): + common_labels = [ + 'source_id', 'source_short_name', 'source_long_name', 'source_hardware_model', 'source_role', + 'destination_id', 'destination_short_name', 'destination_long_name', 'destination_hardware_model', + 'destination_role' + ] + + self.source_message_type_counter = Counter( + 'mesh_packet_source_types', + 'Types of mesh packets processed by source', + common_labels + ['portnum'], + registry=self.registry + ) + # Destination-related counters + self.destination_message_type_counter = Counter( + 'mesh_packet_destination_types', + 'Types of mesh packets processed by destination', + common_labels + ['portnum'], + registry=self.registry + ) + # Counters for the total number of packets + self.total_packets_counter = Counter( + 'mesh_packet_total', + 'Total number of mesh packets processed', + common_labels, + registry=self.registry + ) + # Histogram for the rx_time (time in seconds) + self.rx_time_histogram = Histogram( + 'mesh_packet_rx_time', + 'Receive time of mesh packets (seconds since 1970)', + common_labels, + registry=self.registry + ) + # Gauge for the rx_snr (signal-to-noise ratio) + self.rx_snr_gauge = Gauge( + 'mesh_packet_rx_snr', + 'Receive SNR of mesh packets', + common_labels, + registry=self.registry + ) + # Counter for hop_limit + self.hop_limit_counter = Counter( + 'mesh_packet_hop_limit', + 'Hop limit of mesh packets', + common_labels, + registry=self.registry + ) + # Counter for want_ack (occurrences of want_ack set to true) + self.want_ack_counter = Counter( + 'mesh_packet_want_ack', + 'Occurrences of want ACK for mesh packets', + common_labels, + registry=self.registry + ) + # Counter for via_mqtt (occurrences of via_mqtt set to true) + self.via_mqtt_counter = Counter( + 'mesh_packet_via_mqtt', + 'Occurrences of mesh packets sent via MQTT', + common_labels, + registry=self.registry + ) + # Gauge for hop_start + self.hop_start_gauge = Gauge( + 'mesh_packet_hop_start', + 'Hop start of mesh packets', + common_labels, + registry=self.registry + ) + # Counter for unique packet IDs + self.packet_id_counter = Counter( + 'mesh_packet_ids', + 'Unique IDs for mesh packets', + common_labels + ['packet_id'], + registry=self.registry + ) + # Counter for the channel used + self.channel_counter = Counter( + 'mesh_packet_channel', + 'Channel used for mesh packets', + common_labels + ['channel'], + registry=self.registry + ) + # Gauge for the rx_rssi (received signal strength indicator) + self.rx_rssi_gauge = Gauge( + 'mesh_packet_rx_rssi', + 'Receive RSSI of mesh packets', + common_labels, + registry=self.registry + ) + + def process(self, mesh_packet: MeshPacket): + if getattr(mesh_packet, 'encrypted'): + key_bytes = base64.b64decode(os.getenv('MQTT_SERVER_KEY', '1PG7OiApB1nwvP+rz05pAQ==').encode('ascii')) + nonce_packet_id = getattr(mesh_packet, "id").to_bytes(8, "little") + nonce_from_node = getattr(mesh_packet, "from").to_bytes(8, "little") + + # Put both parts into a single byte array. + nonce = nonce_packet_id + nonce_from_node + + cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend()) + decryptor = cipher.decryptor() + decrypted_bytes = decryptor.update(getattr(mesh_packet, "encrypted")) + decryptor.finalize() + + data = Data() + data.ParseFromString(decrypted_bytes) + mesh_packet.decoded.CopyFrom(data) + port_num = int(mesh_packet.decoded.portnum) + payload = mesh_packet.decoded.payload + + source_node_id = getattr(mesh_packet, 'from') + source_client_details = self._get_client_details(source_node_id) + if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true': + source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden', + long_name='Hidden') + + destination_node_id = getattr(mesh_packet, 'to') + destination_client_details = self._get_client_details(destination_node_id) + if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true': + destination_client_details = ClientDetails(node_id=destination_client_details.node_id, short_name='Hidden', + long_name='Hidden') + + if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports + return None # Ignore this packet + + self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details) + + processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool) + processor.process(payload, client_details=source_client_details) + + @staticmethod + def get_port_name_from_portnum(port_num): + descriptor = PortNum.DESCRIPTOR + for enum_value in descriptor.values: + if enum_value.number == port_num: + return enum_value.name + return 'UNKNOWN_PORT' + + def process_simple_packet_details(self, destination_client_details, mesh_packet, port_num, source_client_details): + common_labels = { + 'source_id': source_client_details.node_id, + 'source_short_name': source_client_details.short_name, + 'source_long_name': source_client_details.long_name, + 'source_hardware_model': source_client_details.hardware_model, + 'source_role': source_client_details.role, + 'destination_id': destination_client_details.node_id, + 'destination_short_name': destination_client_details.short_name, + 'destination_long_name': destination_client_details.long_name, + 'destination_hardware_model': destination_client_details.hardware_model, + 'destination_role': destination_client_details.role, + } + + self.source_message_type_counter.labels( + **common_labels, + portnum=self.get_port_name_from_portnum(port_num) + ).inc() + + self.destination_message_type_counter.labels( + **common_labels, + portnum=self.get_port_name_from_portnum(port_num) + ).inc() + + self.total_packets_counter.labels( + **common_labels + ).inc() + + self.rx_time_histogram.labels( + **common_labels + ).observe(mesh_packet.rx_time) + + self.rx_snr_gauge.labels( + **common_labels + ).set(mesh_packet.rx_snr) + + self.hop_limit_counter.labels( + **common_labels + ).inc(mesh_packet.hop_limit) + + if mesh_packet.want_ack: + self.want_ack_counter.labels( + **common_labels + ).inc() + + if mesh_packet.via_mqtt: + self.via_mqtt_counter.labels( + **common_labels + ).inc() + + self.hop_start_gauge.labels( + **common_labels + ).set(mesh_packet.hop_start) + + self.packet_id_counter.labels( + **common_labels, + packet_id=mesh_packet.id + ).inc() + + # Increment the channel counter + self.channel_counter.labels( + **common_labels, + channel=mesh_packet.channel + ).inc() + + # Set the rx_rssi in the gauge + self.rx_rssi_gauge.labels( + **common_labels + ).set(mesh_packet.rx_rssi) + + def _get_client_details(self, node_id: int) -> ClientDetails: + node_id_str = str(node_id) # Convert the integer to a string + with self.db_pool.connection() as conn: + with conn.cursor() as cur: + # First, try to select the existing record + cur.execute(""" + SELECT node_id, short_name, long_name, hardware_model, role + FROM client_details + WHERE node_id = %s; + """, (node_id_str,)) + result = cur.fetchone() + + if not result: + # If the client is not found, insert a new record + cur.execute(""" + INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role) + VALUES (%s, %s, %s, %s, %s) + RETURNING node_id, short_name, long_name, hardware_model, role; + """, (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None)) + conn.commit() + result = cur.fetchone() + + # At this point, we should always have a result, either from SELECT or INSERT + return ClientDetails( + node_id=result[0], + short_name=result[1], + long_name=result[2], + hardware_model=result[3], + role=result[4] + ) diff --git a/exporter/processors.py b/exporter/processors.py index 17bb379..c33fc4c 100644 --- a/exporter/processors.py +++ b/exporter/processors.py @@ -1,269 +1,498 @@ -import base64 import os +from abc import ABC, abstractmethod +from venv import logger -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from meshtastic.mesh_pb2 import MeshPacket, Data, HardwareModel +import psycopg +import unishox2 +from meshtastic.admin_pb2 import AdminMessage +from meshtastic.mesh_pb2 import Position, User, HardwareModel, Routing, Waypoint, RouteDiscovery, NeighborInfo +from meshtastic.mqtt_pb2 import MapReport +from meshtastic.paxcount_pb2 import Paxcount from meshtastic.portnums_pb2 import PortNum -from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge +from meshtastic.remote_hardware_pb2 import HardwareMessage +from meshtastic.storeforward_pb2 import StoreAndForward +from meshtastic.telemetry_pb2 import Telemetry, DeviceMetrics, EnvironmentMetrics, AirQualityMetrics, PowerMetrics +from prometheus_client import CollectorRegistry from psycopg_pool import ConnectionPool -from exporter.registry import ProcessorRegistry, ClientDetails +from exporter.client_details import ClientDetails +from exporter.registry import _Metrics -class MessageProcessor: +class Processor(ABC): def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool): - self.rx_rssi_gauge = None - self.channel_counter = None - self.packet_id_counter = None - self.hop_start_gauge = None - self.via_mqtt_counter = None - self.want_ack_counter = None - self.hop_limit_counter = None - self.rx_snr_gauge = None - self.rx_time_histogram = None - self.total_packets_counter = None - self.destination_message_type_counter = None - self.source_message_type_counter = None - self.registry = registry self.db_pool = db_pool - self.init_metrics() - self.processor_registry = ProcessorRegistry() + self.metrics = _Metrics(registry) - def init_metrics(self): - common_labels = [ - 'source_id', 'source_short_name', 'source_long_name', 'source_hardware_model', 'source_role', - 'destination_id', 'destination_short_name', 'destination_long_name', 'destination_hardware_model', - 'destination_role' - ] + @abstractmethod + def process(self, payload: bytes, client_details: ClientDetails): + pass - self.source_message_type_counter = Counter( - 'mesh_packet_source_types', - 'Types of mesh packets processed by source', - common_labels + ['portnum'], - registry=self.registry - ) - # Destination-related counters - self.destination_message_type_counter = Counter( - 'mesh_packet_destination_types', - 'Types of mesh packets processed by destination', - common_labels + ['portnum'], - registry=self.registry - ) - # Counters for the total number of packets - self.total_packets_counter = Counter( - 'mesh_packet_total', - 'Total number of mesh packets processed', - common_labels, - registry=self.registry - ) - # Histogram for the rx_time (time in seconds) - self.rx_time_histogram = Histogram( - 'mesh_packet_rx_time', - 'Receive time of mesh packets (seconds since 1970)', - common_labels, - registry=self.registry - ) - # Gauge for the rx_snr (signal-to-noise ratio) - self.rx_snr_gauge = Gauge( - 'mesh_packet_rx_snr', - 'Receive SNR of mesh packets', - common_labels, - registry=self.registry - ) - # Counter for hop_limit - self.hop_limit_counter = Counter( - 'mesh_packet_hop_limit', - 'Hop limit of mesh packets', - common_labels, - registry=self.registry - ) - # Counter for want_ack (occurrences of want_ack set to true) - self.want_ack_counter = Counter( - 'mesh_packet_want_ack', - 'Occurrences of want ACK for mesh packets', - common_labels, - registry=self.registry - ) - # Counter for via_mqtt (occurrences of via_mqtt set to true) - self.via_mqtt_counter = Counter( - 'mesh_packet_via_mqtt', - 'Occurrences of mesh packets sent via MQTT', - common_labels, - registry=self.registry - ) - # Gauge for hop_start - self.hop_start_gauge = Gauge( - 'mesh_packet_hop_start', - 'Hop start of mesh packets', - common_labels, - registry=self.registry - ) - # Counter for unique packet IDs - self.packet_id_counter = Counter( - 'mesh_packet_ids', - 'Unique IDs for mesh packets', - common_labels + ['packet_id'], - registry=self.registry - ) - # Counter for the channel used - self.channel_counter = Counter( - 'mesh_packet_channel', - 'Channel used for mesh packets', - common_labels + ['channel'], - registry=self.registry - ) - # Gauge for the rx_rssi (received signal strength indicator) - self.rx_rssi_gauge = Gauge( - 'mesh_packet_rx_rssi', - 'Receive RSSI of mesh packets', - common_labels, - registry=self.registry - ) - - def process(self, mesh_packet: MeshPacket): - if getattr(mesh_packet, 'encrypted'): - key_bytes = base64.b64decode(os.getenv('MQTT_SERVER_KEY', '1PG7OiApB1nwvP+rz05pAQ==').encode('ascii')) - nonce_packet_id = getattr(mesh_packet, "id").to_bytes(8, "little") - nonce_from_node = getattr(mesh_packet, "from").to_bytes(8, "little") - - # Put both parts into a single byte array. - nonce = nonce_packet_id + nonce_from_node - - cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend()) - decryptor = cipher.decryptor() - decrypted_bytes = decryptor.update(getattr(mesh_packet, "encrypted")) + decryptor.finalize() - - data = Data() - data.ParseFromString(decrypted_bytes) - mesh_packet.decoded.CopyFrom(data) - port_num = int(mesh_packet.decoded.portnum) - payload = mesh_packet.decoded.payload - - source_node_id = getattr(mesh_packet, 'from') - source_client_details = self._get_client_details(source_node_id) - if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true': - source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden', - long_name='Hidden') - - destination_node_id = getattr(mesh_packet, 'to') - destination_client_details = self._get_client_details(destination_node_id) - if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true': - destination_client_details = ClientDetails(node_id=destination_client_details.node_id, short_name='Hidden', - long_name='Hidden') - - if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports - return None # Ignore this packet - - self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details) - - processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool) - processor.process(payload, client_details=source_client_details) - - def get_port_name_from_portnum(self, port_num): - descriptor = PortNum.DESCRIPTOR - for enum_value in descriptor.values: - if enum_value.number == port_num: - return enum_value.name - return 'UNKNOWN_PORT' - - def process_simple_packet_details(self, destination_client_details, mesh_packet, port_num, source_client_details): - common_labels = { - 'source_id': source_client_details.node_id, - 'source_short_name': source_client_details.short_name, - 'source_long_name': source_client_details.long_name, - 'source_hardware_model': source_client_details.hardware_model, - 'source_role': source_client_details.role, - 'destination_id': destination_client_details.node_id, - 'destination_short_name': destination_client_details.short_name, - 'destination_long_name': destination_client_details.long_name, - 'destination_hardware_model': destination_client_details.hardware_model, - 'destination_role': destination_client_details.role, - } - - self.source_message_type_counter.labels( - **common_labels, - portnum=self.get_port_name_from_portnum(port_num) - ).inc() - - self.destination_message_type_counter.labels( - **common_labels, - portnum=self.get_port_name_from_portnum(port_num) - ).inc() - - self.total_packets_counter.labels( - **common_labels - ).inc() - - self.rx_time_histogram.labels( - **common_labels - ).observe(mesh_packet.rx_time) - - self.rx_snr_gauge.labels( - **common_labels - ).set(mesh_packet.rx_snr) - - self.hop_limit_counter.labels( - **common_labels - ).inc(mesh_packet.hop_limit) - - if mesh_packet.want_ack: - self.want_ack_counter.labels( - **common_labels - ).inc() - - if mesh_packet.via_mqtt: - self.via_mqtt_counter.labels( - **common_labels - ).inc() - - self.hop_start_gauge.labels( - **common_labels - ).set(mesh_packet.hop_start) - - self.packet_id_counter.labels( - **common_labels, - packet_id=mesh_packet.id - ).inc() - - # Increment the channel counter - self.channel_counter.labels( - **common_labels, - channel=mesh_packet.channel - ).inc() - - # Set the rx_rssi in the gauge - self.rx_rssi_gauge.labels( - **common_labels - ).set(mesh_packet.rx_rssi) - - def _get_client_details(self, node_id: int) -> ClientDetails: - node_id_str = str(node_id) # Convert the integer to a string + def execute_db_operation(self, operation): with self.db_pool.connection() as conn: with conn.cursor() as cur: - # First, try to select the existing record + return operation(cur, conn) + + +class ProcessorRegistry: + _registry = {} + + @classmethod + def register_processor(cls, port_num): + def inner_wrapper(wrapped_class): + cls._registry[port_num] = wrapped_class + return wrapped_class + + return inner_wrapper + + @classmethod + def get_processor(cls, port_num) -> type(Processor): + return cls._registry.get(port_num, UnknownAppProcessor) + + +######################################################################################################################## +# PROCESSORS # +######################################################################################################################## + +@ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP) +class UnknownAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received UNKNOWN_APP packet") + return None + + +@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP) +class TextMessageAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received TEXT_MESSAGE_APP packet") + message = payload.decode('utf-8') + if os.getenv('HIDE_MESSAGE', 'true') == 'true': + message = 'Hidden' + self.metrics.message_length_histogram.labels( + **client_details.to_dict() + ).observe(len(message)) + + +@ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP) +class RemoteHardwareAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received REMOTE_HARDWARE_APP packet") + hardware_message = HardwareMessage() + hardware_message.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.POSITION_APP) +class PositionAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received POSITION_APP packet") + position = Position() + position.ParseFromString(payload) + self.metrics.device_latitude_gauge.labels( + **client_details.to_dict() + ).set(position.latitude_i) + self.metrics.device_longitude_gauge.labels( + **client_details.to_dict() + ).set(position.longitude_i) + self.metrics.device_altitude_gauge.labels( + **client_details.to_dict() + ).set(position.altitude) + self.metrics.device_position_precision_gauge.labels( + **client_details.to_dict() + ).set(position.precision_bits) + pass + + +@ProcessorRegistry.register_processor(PortNum.NODEINFO_APP) +class NodeInfoAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received NODEINFO_APP packet") + user = User() + user.ParseFromString(payload) + + def db_operation(cur, conn): + # First, try to select the existing record + cur.execute(""" + SELECT short_name, long_name, hardware_model, role + FROM client_details + WHERE node_id = %s; + """, (client_details.node_id,)) + existing_record = cur.fetchone() + + if existing_record: + # If record exists, update only the fields that are provided in the new data + update_fields = [] + update_values = [] + if user.short_name: + update_fields.append("short_name = %s") + update_values.append(user.short_name) + if user.long_name: + update_fields.append("long_name = %s") + update_values.append(user.long_name) + if user.hw_model != HardwareModel.UNSET: + update_fields.append("hardware_model = %s") + update_values.append(ClientDetails.get_hardware_model_name_from_code(user.hw_model)) + if user.role is not None: + update_fields.append("role = %s") + update_values.append(ClientDetails.get_role_name_from_role(user.role)) + + if update_fields: + update_query = f""" + UPDATE client_details + SET {", ".join(update_fields)} + WHERE node_id = %s + """ + cur.execute(update_query, update_values + [client_details.node_id]) + else: + # If record doesn't exist, insert a new one cur.execute(""" - SELECT node_id, short_name, long_name, hardware_model, role - FROM client_details - WHERE node_id = %s; - """, (node_id_str,)) - result = cur.fetchone() + INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role) + VALUES (%s, %s, %s, %s, %s) + """, (client_details.node_id, user.short_name, user.long_name, + ClientDetails.get_hardware_model_name_from_code(user.hw_model), + ClientDetails.get_role_name_from_role(user.role))) - if not result: - # If the client is not found, insert a new record - cur.execute(""" - INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role) - VALUES (%s, %s, %s, %s, %s) - RETURNING node_id, short_name, long_name, hardware_model, role; - """, (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None)) - conn.commit() - result = cur.fetchone() + conn.commit() - # At this point, we should always have a result, either from SELECT or INSERT - return ClientDetails( - node_id=result[0], - short_name=result[1], - long_name=result[2], - hardware_model=result[3], - role=result[4] - ) + self.execute_db_operation(db_operation) + + +@ProcessorRegistry.register_processor(PortNum.ROUTING_APP) +class RoutingAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received ROUTING_APP packet") + routing = Routing() + routing.ParseFromString(payload) + self.metrics.route_discovery_response_counter.labels( + **client_details.to_dict(), + response_type=self.get_error_name_from_routing(routing.error_reason) + ).inc() + + @staticmethod + def get_error_name_from_routing(error_code): + for name, value in Routing.Error.__dict__.items(): + if isinstance(value, int) and value == error_code: + return name + return 'UNKNOWN_ERROR' + + +@ProcessorRegistry.register_processor(PortNum.ADMIN_APP) +class AdminAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received ADMIN_APP packet") + admin_message = AdminMessage() + admin_message.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP) +class TextMessageCompressedAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received TEXT_MESSAGE_COMPRESSED_APP packet") + decompressed_payload = unishox2.decompress(payload, len(payload)) + pass + + +@ProcessorRegistry.register_processor(PortNum.WAYPOINT_APP) +class WaypointAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received WAYPOINT_APP packet") + waypoint = Waypoint() + waypoint.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.AUDIO_APP) +class AudioAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received AUDIO_APP packet") + pass # NOTE: Audio packet. should probably be processed + + +@ProcessorRegistry.register_processor(PortNum.DETECTION_SENSOR_APP) +class DetectionSensorAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received DETECTION_SENSOR_APP packet") + pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9 + + +@ProcessorRegistry.register_processor(PortNum.REPLY_APP) +class ReplyAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received REPLY_APP packet") + pass # NOTE: Provides a 'ping' service that replies to any packet it receives. This is useful for testing. + + +@ProcessorRegistry.register_processor(PortNum.IP_TUNNEL_APP) +class IpTunnelAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received IP_TUNNEL_APP packet") + pass # NOTE: IP Packet. Handled by the python API, firmware ignores this one and passes it on. + + +@ProcessorRegistry.register_processor(PortNum.PAXCOUNTER_APP) +class PaxCounterAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received PAXCOUNTER_APP packet") + paxcounter = Paxcount() + paxcounter.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.SERIAL_APP) +class SerialAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received SERIAL_APP packet") + pass # NOTE: Provides a hardware serial interface to send and receive from the Meshtastic network. + + +@ProcessorRegistry.register_processor(PortNum.STORE_FORWARD_APP) +class StoreForwardAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received STORE_FORWARD_APP packet") + store_and_forward = StoreAndForward() + store_and_forward.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP) +class RangeTestAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received RANGE_TEST_APP packet") + pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9 + + +@ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP) +class TelemetryAppProcessor(Processor): + def __init__(self, registry: CollectorRegistry, db_connection: psycopg.connection): + super().__init__(registry, db_connection) + + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received TELEMETRY_APP packet") + telemetry = Telemetry() + telemetry.ParseFromString(payload) + + if telemetry.HasField('device_metrics'): + device_metrics: DeviceMetrics = telemetry.device_metrics + self.metrics.battery_level_gauge.labels( + **client_details.to_dict() + ).set(getattr(device_metrics, 'battery_level', 0)) + + self.metrics.voltage_gauge.labels( + **client_details.to_dict() + ).set(getattr(device_metrics, 'voltage', 0)) + + self.metrics.channel_utilization_gauge.labels( + **client_details.to_dict() + ).set(getattr(device_metrics, 'channel_utilization', 0)) + + self.metrics.air_util_tx_gauge.labels( + **client_details.to_dict() + ).set(getattr(device_metrics, 'air_util_tx', 0)) + + self.metrics.uptime_seconds_counter.labels( + **client_details.to_dict() + ).inc(getattr(device_metrics, 'uptime_seconds', 0)) + + if telemetry.HasField('environment_metrics'): + environment_metrics: EnvironmentMetrics = telemetry.environment_metrics + self.metrics.temperature_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'temperature', 0)) + + self.metrics.relative_humidity_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'relative_humidity', 0)) + + self.metrics.barometric_pressure_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'barometric_pressure', 0)) + + self.metrics.gas_resistance_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'gas_resistance', 0)) + + self.metrics.iaq_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'iaq', 0)) + + self.metrics.distance_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'distance', 0)) + + self.metrics.lux_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'lux', 0)) + + self.metrics.white_lux_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'white_lux', 0)) + + self.metrics.ir_lux_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'ir_lux', 0)) + + self.metrics.uv_lux_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'uv_lux', 0)) + + self.metrics.wind_direction_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'wind_direction', 0)) + + self.metrics.wind_speed_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'wind_speed', 0)) + + self.metrics.weight_gauge.labels( + **client_details.to_dict() + ).set(getattr(environment_metrics, 'weight', 0)) + + if telemetry.HasField('air_quality_metrics'): + air_quality_metrics: AirQualityMetrics = telemetry.air_quality_metrics + self.metrics.pm10_standard_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'pm10_standard', 0)) + + self.metrics.pm25_standard_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'pm25_standard', 0)) + + self.metrics.pm100_standard_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'pm100_standard', 0)) + + self.metrics.pm10_environmental_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'pm10_environmental', 0)) + + self.metrics.pm25_environmental_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'pm25_environmental', 0)) + + self.metrics.pm100_environmental_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'pm100_environmental', 0)) + + self.metrics.particles_03um_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'particles_03um', 0)) + + self.metrics.particles_05um_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'particles_05um', 0)) + + self.metrics.particles_10um_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'particles_10um', 0)) + + self.metrics.particles_25um_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'particles_25um', 0)) + + self.metrics.particles_50um_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'particles_50um', 0)) + + self.metrics.particles_100um_gauge.labels( + **client_details.to_dict() + ).set(getattr(air_quality_metrics, 'particles_100um', 0)) + + if telemetry.HasField('power_metrics'): + power_metrics: PowerMetrics = telemetry.power_metrics + self.metrics.ch1_voltage_gauge.labels( + **client_details.to_dict() + ).set(getattr(power_metrics, 'ch1_voltage', 0)) + + self.metrics.ch1_current_gauge.labels( + **client_details.to_dict() + ).set(getattr(power_metrics, 'ch1_current', 0)) + + self.metrics.ch2_voltage_gauge.labels( + **client_details.to_dict() + ).set(getattr(power_metrics, 'ch2_voltage', 0)) + + self.metrics.ch2_current_gauge.labels( + **client_details.to_dict() + ).set(getattr(power_metrics, 'ch2_current', 0)) + + self.metrics.ch3_voltage_gauge.labels( + **client_details.to_dict() + ).set(getattr(power_metrics, 'ch3_voltage', 0)) + + self.metrics.ch3_current_gauge.labels( + **client_details.to_dict() + ).set(getattr(power_metrics, 'ch3_current', 0)) + + +@ProcessorRegistry.register_processor(PortNum.ZPS_APP) +class ZpsAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received ZPS_APP packet") + pass # NOTE: Experimental tools for estimating node position without a GPS + + +@ProcessorRegistry.register_processor(PortNum.SIMULATOR_APP) +class SimulatorAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received SIMULATOR_APP packet") + pass # NOTE: Used to let multiple instances of Linux native applications communicate as if they did using their LoRa chip. + + +@ProcessorRegistry.register_processor(PortNum.TRACEROUTE_APP) +class TraceRouteAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received TRACEROUTE_APP packet") + traceroute = RouteDiscovery() + traceroute.ParseFromString(payload) + if traceroute.route: + route = traceroute.route + self.metrics.route_discovery_counter.labels( + **client_details.to_dict() + ).inc(len(route)) + + +@ProcessorRegistry.register_processor(PortNum.NEIGHBORINFO_APP) +class NeighborInfoAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received NEIGHBORINFO_APP packet") + neighbor_info = NeighborInfo() + neighbor_info.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN) +class AtakPluginProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received ATAK_PLUGIN packet") + pass # NOTE: ATAK Plugin + + +@ProcessorRegistry.register_processor(PortNum.MAP_REPORT_APP) +class MapReportAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received MAP_REPORT_APP packet") + map_report = MapReport() + map_report.ParseFromString(payload) + pass # Nothing interesting here + + +@ProcessorRegistry.register_processor(PortNum.PRIVATE_APP) +class PrivateAppProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received PRIVATE_APP packet") + pass # NOTE: Private application portnum + + +@ProcessorRegistry.register_processor(PortNum.ATAK_FORWARDER) +class AtakForwarderProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received ATAK_FORWARDER packet") + pass # NOTE: ATAK Forwarder + + +@ProcessorRegistry.register_processor(PortNum.MAX) +class MaxProcessor(Processor): + def process(self, payload: bytes, client_details: ClientDetails): + logger.debug("Received MAX packet") + pass # NOTE: Maximum portnum value diff --git a/exporter/registry.py b/exporter/registry.py index e8edfb9..4d2519f 100644 --- a/exporter/registry.py +++ b/exporter/registry.py @@ -1,20 +1,4 @@ -import os -from abc import ABC, abstractmethod -from venv import logger - -import psycopg -import unishox2 -from meshtastic.admin_pb2 import AdminMessage -from meshtastic.config_pb2 import Config -from meshtastic.mesh_pb2 import Position, User, Routing, Waypoint, RouteDiscovery, NeighborInfo, HardwareModel -from meshtastic.mqtt_pb2 import MapReport -from meshtastic.paxcount_pb2 import Paxcount -from meshtastic.portnums_pb2 import PortNum -from meshtastic.remote_hardware_pb2 import HardwareMessage -from meshtastic.storeforward_pb2 import StoreAndForward -from meshtastic.telemetry_pb2 import Telemetry, DeviceMetrics, EnvironmentMetrics, AirQualityMetrics, PowerMetrics from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram -from psycopg_pool import ConnectionPool class _Metrics: @@ -37,10 +21,7 @@ class _Metrics: 'node_id', 'short_name', 'long_name', 'hardware_model', 'role' ] - def _init_metrics(self): # TODO: Go over the metrics and rethink some of them to be more like the longtitute and - # latitude - The values should represent something and we shouldn't just label stuff. Also, the labels should - # be less used looked upon like keys for the data - # Histogram for the length of messages + def _init_metrics(self): self._init_metrics_text_message() self._init_metrics_telemetry_device() self._init_metrics_telemetry_environment() @@ -357,510 +338,3 @@ class _Metrics: ) -def get_hardware_model_name_from_code(hardware_model): - descriptor = HardwareModel.DESCRIPTOR - for enum_value in descriptor.values: - if enum_value.number == hardware_model: - return enum_value.name - return 'UNKNOWN_HARDWARE_MODEL' - - -def get_role_name_from_role(role): - descriptor = Config.DeviceConfig.Role.DESCRIPTOR - for enum_value in descriptor.values: - if enum_value.number == role: - return enum_value.name - return 'UNKNOWN_ROLE' - - -class ClientDetails: - def __init__(self, node_id, short_name='Unknown', long_name='Unknown', hardware_model=HardwareModel.UNSET, - role=None): - self.node_id = node_id - self.short_name = short_name - self.long_name = long_name - self.hardware_model: HardwareModel = hardware_model - self.role: Config.DeviceConfig.Role = role - - def to_dict(self): - return { - 'node_id': self.node_id, - 'short_name': self.short_name, - 'long_name': self.long_name, - 'hardware_model': get_hardware_model_name_from_code(self.hardware_model), - 'role': get_role_name_from_role(self.role) - } - - -class Processor(ABC): - def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool): - self.db_pool = db_pool - self.metrics = _Metrics(registry) - - @abstractmethod - def process(self, payload: bytes, client_details: ClientDetails): - pass - - def execute_db_operation(self, operation): - with self.db_pool.connection() as conn: - with conn.cursor() as cur: - return operation(cur, conn) - - -class ProcessorRegistry: - _registry = {} - - @classmethod - def register_processor(cls, port_num): - def inner_wrapper(wrapped_class): - cls._registry[port_num] = wrapped_class - return wrapped_class - - return inner_wrapper - - @classmethod - def get_processor(cls, port_num) -> type(Processor): - return cls._registry.get(port_num, UnknownAppProcessor) - - -@ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP) -class UnknownAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received UNKNOWN_APP packet") - return None - - -@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP) -class TextMessageAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received TEXT_MESSAGE_APP packet") - message = payload.decode('utf-8') - if os.getenv('HIDE_MESSAGE', 'true') == 'true': - message = 'Hidden' - self.metrics.message_length_histogram.labels( - **client_details.to_dict() - ).observe(len(message)) - - -@ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP) -class RemoteHardwareAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received REMOTE_HARDWARE_APP packet") - hardware_message = HardwareMessage() - hardware_message.ParseFromString(payload) - pass - - -@ProcessorRegistry.register_processor(PortNum.POSITION_APP) -class PositionAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received POSITION_APP packet") - position = Position() - position.ParseFromString(payload) - self.metrics.device_latitude_gauge.labels( - **client_details.to_dict() - ).set(position.latitude_i) - self.metrics.device_longitude_gauge.labels( - **client_details.to_dict() - ).set(position.longitude_i) - self.metrics.device_altitude_gauge.labels( - **client_details.to_dict() - ).set(position.altitude) - self.metrics.device_position_precision_gauge.labels( - **client_details.to_dict() - ).set(position.precision_bits) - pass - - -@ProcessorRegistry.register_processor(PortNum.NODEINFO_APP) -class NodeInfoAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received NODEINFO_APP packet") - user = User() - user.ParseFromString(payload) - - def db_operation(cur, conn): - # First, try to select the existing record - cur.execute(""" - SELECT short_name, long_name, hardware_model, role - FROM client_details - WHERE node_id = %s; - """, (client_details.node_id,)) - existing_record = cur.fetchone() - - if existing_record: - # If record exists, update only the fields that are provided in the new data - update_fields = [] - update_values = [] - if user.short_name: - update_fields.append("short_name = %s") - update_values.append(user.short_name) - if user.long_name: - update_fields.append("long_name = %s") - update_values.append(user.long_name) - if user.hw_model != HardwareModel.UNSET: - update_fields.append("hardware_model = %s") - update_values.append(get_hardware_model_name_from_code(user.hw_model)) - if user.role is not None: - update_fields.append("role = %s") - update_values.append(get_role_name_from_role(user.role)) - - if update_fields: - update_query = f""" - UPDATE client_details - SET {", ".join(update_fields)} - WHERE node_id = %s - """ - cur.execute(update_query, update_values + [client_details.node_id]) - else: - # If record doesn't exist, insert a new one - cur.execute(""" - INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role) - VALUES (%s, %s, %s, %s, %s) - """, (client_details.node_id, user.short_name, user.long_name, - get_hardware_model_name_from_code(user.hw_model), get_role_name_from_role(user.role))) - - conn.commit() - - self.execute_db_operation(db_operation) - - -@ProcessorRegistry.register_processor(PortNum.ROUTING_APP) -class RoutingAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received ROUTING_APP packet") - routing = Routing() - routing.ParseFromString(payload) - self.metrics.route_discovery_response_counter.labels( - **client_details.to_dict(), - response_type=self.get_error_name_from_routing(routing.error_reason) - ).inc() - - @staticmethod - def get_error_name_from_routing(error_code): - for name, value in Routing.Error.__dict__.items(): - if isinstance(value, int) and value == error_code: - return name - return 'UNKNOWN_ERROR' - - -@ProcessorRegistry.register_processor(PortNum.ADMIN_APP) -class AdminAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received ADMIN_APP packet") - admin_message = AdminMessage() - admin_message.ParseFromString(payload) - pass - - -@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP) -class TextMessageCompressedAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received TEXT_MESSAGE_COMPRESSED_APP packet") - decompressed_payload = unishox2.decompress(payload, len(payload)) - pass - - -@ProcessorRegistry.register_processor(PortNum.WAYPOINT_APP) -class WaypointAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received WAYPOINT_APP packet") - waypoint = Waypoint() - waypoint.ParseFromString(payload) - pass - - -@ProcessorRegistry.register_processor(PortNum.AUDIO_APP) -class AudioAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received AUDIO_APP packet") - pass # NOTE: Audio packet. should probably be processed - - -@ProcessorRegistry.register_processor(PortNum.DETECTION_SENSOR_APP) -class DetectionSensorAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received DETECTION_SENSOR_APP packet") - pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9 - - -@ProcessorRegistry.register_processor(PortNum.REPLY_APP) -class ReplyAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received REPLY_APP packet") - pass # NOTE: Provides a 'ping' service that replies to any packet it receives. This is useful for testing. - - -@ProcessorRegistry.register_processor(PortNum.IP_TUNNEL_APP) -class IpTunnelAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received IP_TUNNEL_APP packet") - pass # NOTE: IP Packet. Handled by the python API, firmware ignores this one and passes it on. - - -@ProcessorRegistry.register_processor(PortNum.PAXCOUNTER_APP) -class PaxCounterAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received PAXCOUNTER_APP packet") - paxcounter = Paxcount() - paxcounter.ParseFromString(payload) - pass - - -@ProcessorRegistry.register_processor(PortNum.SERIAL_APP) -class SerialAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received SERIAL_APP packet") - pass # NOTE: Provides a hardware serial interface to send and receive from the Meshtastic network. - - -@ProcessorRegistry.register_processor(PortNum.STORE_FORWARD_APP) -class StoreForwardAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received STORE_FORWARD_APP packet") - store_and_forward = StoreAndForward() - store_and_forward.ParseFromString(payload) - pass - - -@ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP) -class RangeTestAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received RANGE_TEST_APP packet") - pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9 - - -@ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP) -class TelemetryAppProcessor(Processor): - def __init__(self, registry: CollectorRegistry, db_connection: psycopg.connection): - super().__init__(registry, db_connection) - - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received TELEMETRY_APP packet") - telemetry = Telemetry() - telemetry.ParseFromString(payload) - - if telemetry.HasField('device_metrics'): - device_metrics: DeviceMetrics = telemetry.device_metrics - self.metrics.battery_level_gauge.labels( - **client_details.to_dict() - ).set(getattr(device_metrics, 'battery_level', 0)) - - self.metrics.voltage_gauge.labels( - **client_details.to_dict() - ).set(getattr(device_metrics, 'voltage', 0)) - - self.metrics.channel_utilization_gauge.labels( - **client_details.to_dict() - ).set(getattr(device_metrics, 'channel_utilization', 0)) - - self.metrics.air_util_tx_gauge.labels( - **client_details.to_dict() - ).set(getattr(device_metrics, 'air_util_tx', 0)) - - self.metrics.uptime_seconds_counter.labels( - **client_details.to_dict() - ).inc(getattr(device_metrics, 'uptime_seconds', 0)) - - if telemetry.HasField('environment_metrics'): - environment_metrics: EnvironmentMetrics = telemetry.environment_metrics - self.metrics.temperature_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'temperature', 0)) - - self.metrics.relative_humidity_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'relative_humidity', 0)) - - self.metrics.barometric_pressure_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'barometric_pressure', 0)) - - self.metrics.gas_resistance_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'gas_resistance', 0)) - - self.metrics.iaq_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'iaq', 0)) - - self.metrics.distance_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'distance', 0)) - - self.metrics.lux_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'lux', 0)) - - self.metrics.white_lux_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'white_lux', 0)) - - self.metrics.ir_lux_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'ir_lux', 0)) - - self.metrics.uv_lux_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'uv_lux', 0)) - - self.metrics.wind_direction_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'wind_direction', 0)) - - self.metrics.wind_speed_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'wind_speed', 0)) - - self.metrics.weight_gauge.labels( - **client_details.to_dict() - ).set(getattr(environment_metrics, 'weight', 0)) - - if telemetry.HasField('air_quality_metrics'): - air_quality_metrics: AirQualityMetrics = telemetry.air_quality_metrics - self.metrics.pm10_standard_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'pm10_standard', 0)) - - self.metrics.pm25_standard_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'pm25_standard', 0)) - - self.metrics.pm100_standard_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'pm100_standard', 0)) - - self.metrics.pm10_environmental_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'pm10_environmental', 0)) - - self.metrics.pm25_environmental_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'pm25_environmental', 0)) - - self.metrics.pm100_environmental_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'pm100_environmental', 0)) - - self.metrics.particles_03um_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'particles_03um', 0)) - - self.metrics.particles_05um_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'particles_05um', 0)) - - self.metrics.particles_10um_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'particles_10um', 0)) - - self.metrics.particles_25um_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'particles_25um', 0)) - - self.metrics.particles_50um_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'particles_50um', 0)) - - self.metrics.particles_100um_gauge.labels( - **client_details.to_dict() - ).set(getattr(air_quality_metrics, 'particles_100um', 0)) - - if telemetry.HasField('power_metrics'): - power_metrics: PowerMetrics = telemetry.power_metrics - self.metrics.ch1_voltage_gauge.labels( - **client_details.to_dict() - ).set(getattr(power_metrics, 'ch1_voltage', 0)) - - self.metrics.ch1_current_gauge.labels( - **client_details.to_dict() - ).set(getattr(power_metrics, 'ch1_current', 0)) - - self.metrics.ch2_voltage_gauge.labels( - **client_details.to_dict() - ).set(getattr(power_metrics, 'ch2_voltage', 0)) - - self.metrics.ch2_current_gauge.labels( - **client_details.to_dict() - ).set(getattr(power_metrics, 'ch2_current', 0)) - - self.metrics.ch3_voltage_gauge.labels( - **client_details.to_dict() - ).set(getattr(power_metrics, 'ch3_voltage', 0)) - - self.metrics.ch3_current_gauge.labels( - **client_details.to_dict() - ).set(getattr(power_metrics, 'ch3_current', 0)) - - -@ProcessorRegistry.register_processor(PortNum.ZPS_APP) -class ZpsAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received ZPS_APP packet") - pass # NOTE: Experimental tools for estimating node position without a GPS - - -@ProcessorRegistry.register_processor(PortNum.SIMULATOR_APP) -class SimulatorAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received SIMULATOR_APP packet") - pass # NOTE: Used to let multiple instances of Linux native applications communicate as if they did using their LoRa chip. - - -@ProcessorRegistry.register_processor(PortNum.TRACEROUTE_APP) -class TraceRouteAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received TRACEROUTE_APP packet") - traceroute = RouteDiscovery() - traceroute.ParseFromString(payload) - if traceroute.route: - route = traceroute.route - self.metrics.route_discovery_counter.labels( - **client_details.to_dict() - ).inc(len(route)) - - -@ProcessorRegistry.register_processor(PortNum.NEIGHBORINFO_APP) -class NeighborInfoAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received NEIGHBORINFO_APP packet") - neighbor_info = NeighborInfo() - neighbor_info.ParseFromString(payload) - pass - - -@ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN) -class AtakPluginProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received ATAK_PLUGIN packet") - pass # NOTE: ATAK Plugin - - -@ProcessorRegistry.register_processor(PortNum.MAP_REPORT_APP) -class MapReportAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received MAP_REPORT_APP packet") - map_report = MapReport() - map_report.ParseFromString(payload) - pass # Nothing interesting here - - -@ProcessorRegistry.register_processor(PortNum.PRIVATE_APP) -class PrivateAppProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received PRIVATE_APP packet") - pass # NOTE: Private application portnum - - -@ProcessorRegistry.register_processor(PortNum.ATAK_FORWARDER) -class AtakForwarderProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received ATAK_FORWARDER packet") - pass # NOTE: ATAK Forwarder - - -@ProcessorRegistry.register_processor(PortNum.MAX) -class MaxProcessor(Processor): - def process(self, payload: bytes, client_details: ClientDetails): - logger.debug("Received MAX packet") - pass # NOTE: Maximum portnum value diff --git a/main.py b/main.py index e5184a6..a932302 100644 --- a/main.py +++ b/main.py @@ -10,7 +10,7 @@ from paho.mqtt.enums import CallbackAPIVersion from prometheus_client import CollectorRegistry, start_http_server from psycopg_pool import ConnectionPool -from exporter.processors import MessageProcessor +from exporter.processor_base import MessageProcessor # Global connection pool connection_pool = None