commit 07ddcdf5d1f6f66305a05e91cd9510a6a367673d Author: Gleb Tcivie Date: Sun Jun 23 22:15:31 2024 +0300 Bare bones are ready diff --git a/.env b/.env new file mode 100644 index 0000000..e69de29 diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..893ac4e --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,29 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/meshtastic-metrics-exporter.iml b/.idea/meshtastic-metrics-exporter.iml new file mode 100644 index 0000000..2c80e12 --- /dev/null +++ b/.idea/meshtastic-metrics-exporter.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..e61441b --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..7f27e0a --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/exporter/__init__.py b/exporter/__init__.py new file mode 100644 index 0000000..e602c54 --- /dev/null +++ b/exporter/__init__.py @@ -0,0 +1 @@ +from .processors import MessageProcessor diff --git a/exporter/processors.py b/exporter/processors.py new file mode 100644 index 0000000..1127e5d --- /dev/null +++ b/exporter/processors.py @@ -0,0 +1,16 @@ +from meshtastic.mesh_pb2 import MeshPacket +from prometheus_client import CollectorRegistry + +from exporter.registry import ProcessorRegistry + + +class MessageProcessor: + def __init__(self, registry: CollectorRegistry): + self.registry = registry + + def process(self, mesh_packet: MeshPacket): + port_num = mesh_packet.decoded.portnum + payload = mesh_packet.decoded.payload + processor = ProcessorRegistry.get_processor(port_num)(self.registry) + + processor.process(payload) diff --git a/exporter/registry.py b/exporter/registry.py new file mode 100644 index 0000000..857594b --- /dev/null +++ b/exporter/registry.py @@ -0,0 +1,252 @@ +from abc import ABC, abstractmethod +from venv import logger + +import unishox2 +from meshtastic.admin_pb2 import AdminMessage +from meshtastic.mesh_pb2 import Position, User, Routing, Waypoint, RouteDiscovery, NeighborInfo +from meshtastic.mqtt_pb2 import MapReport +from meshtastic.paxcount_pb2 import Paxcount +from meshtastic.portnums_pb2 import PortNum +from meshtastic.remote_hardware_pb2 import HardwareMessage +from meshtastic.storeforward_pb2 import StoreAndForward +from meshtastic.telemetry_pb2 import Telemetry +from prometheus_client import CollectorRegistry + + +class Processor(ABC): + def __init__(self, registry: CollectorRegistry): + self.registry = registry + + @abstractmethod + def process(self, payload): + pass + + +class ProcessorRegistry: + _registry = {} + + @classmethod + def register_processor(cls, portnum): + def inner_wrapper(wrapped_class): + cls._registry[portnum] = wrapped_class() + return wrapped_class + + return inner_wrapper + + @classmethod + def get_processor(cls, portnum): + return cls._registry.get(portnum, UnknownAppProcessor()) + + +@ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP) +class UnknownAppProcessor(Processor): + def process(self, payload): + logger.debug("Received UNKNOWN_APP packet") + return None + + +@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP) +class TextMessageAppProcessor(Processor): + def process(self, payload): + logger.debug("Received TEXT_MESSAGE_APP packet") + pass + + +@ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP) +class RemoteHardwareAppProcessor(Processor): + def process(self, payload): + logger.debug("Received REMOTE_HARDWARE_APP packet") + hardware_message = HardwareMessage() + hardware_message.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.POSITION_APP) +class PositionAppProcessor(Processor): + def process(self, payload): + logger.debug("Received POSITION_APP packet") + position = Position() + position.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.NODEINFO_APP) +class NodeInfoAppProcessor(Processor): + def process(self, payload): + logger.debug("Received NODEINFO_APP packet") + user = User() + user.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.ROUTING_APP) +class RoutingAppProcessor(Processor): + def process(self, payload): + logger.debug("Received ROUTING_APP packet") + routing = Routing() + routing.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.ADMIN_APP) +class AdminAppProcessor(Processor): + def process(self, payload): + logger.debug("Received ADMIN_APP packet") + admin_message = AdminMessage() + admin_message.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP) +class TextMessageCompressedAppProcessor(Processor): + def process(self, payload): + logger.debug("Received TEXT_MESSAGE_COMPRESSED_APP packet") + decompressed_payload = unishox2.decompress(payload, len(payload)) + pass + + +@ProcessorRegistry.register_processor(PortNum.WAYPOINT_APP) +class WaypointAppProcessor(Processor): + def process(self, payload): + logger.debug("Received WAYPOINT_APP packet") + waypoint = Waypoint() + waypoint.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.AUDIO_APP) +class AudioAppProcessor(Processor): + def process(self, payload): + logger.debug("Received AUDIO_APP packet") + pass # NOTE: Audio packet. should probably be processed + + +@ProcessorRegistry.register_processor(PortNum.DETECTION_SENSOR_APP) +class DetectionSensorAppProcessor(Processor): + def process(self, payload): + logger.debug("Received DETECTION_SENSOR_APP packet") + pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9 + + +@ProcessorRegistry.register_processor(PortNum.REPLY_APP) +class ReplyAppProcessor(Processor): + def process(self, payload): + logger.debug("Received REPLY_APP packet") + pass # NOTE: Provides a 'ping' service that replies to any packet it receives. This is useful for testing. + + +@ProcessorRegistry.register_processor(PortNum.IP_TUNNEL_APP) +class IpTunnelAppProcessor(Processor): + def process(self, payload): + logger.debug("Received IP_TUNNEL_APP packet") + pass # NOTE: IP Packet. Handled by the python API, firmware ignores this one and passes it on. + + +@ProcessorRegistry.register_processor(PortNum.PAXCOUNTER_APP) +class PaxCounterAppProcessor(Processor): + def process(self, payload): + logger.debug("Received PAXCOUNTER_APP packet") + paxcounter = Paxcount() + paxcounter.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.SERIAL_APP) +class SerialAppProcessor(Processor): + def process(self, payload): + logger.debug("Received SERIAL_APP packet") + pass # NOTE: Provides a hardware serial interface to send and receive from the Meshtastic network. + + +@ProcessorRegistry.register_processor(PortNum.STORE_FORWARD_APP) +class StoreForwardAppProcessor(Processor): + def process(self, payload): + logger.debug("Received STORE_FORWARD_APP packet") + store_and_forward = StoreAndForward() + store_and_forward.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP) +class RangeTestAppProcessor(Processor): + def process(self, payload): + logger.debug("Received RANGE_TEST_APP packet") + pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9 + + +@ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP) +class TelemetryAppProcessor(Processor): + def process(self, payload): + logger.debug("Received TELEMETRY_APP packet") + telemetry = Telemetry() + telemetry.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.ZPS_APP) +class ZpsAppProcessor(Processor): + def process(self, payload): + logger.debug("Received ZPS_APP packet") + pass # NOTE: Experimental tools for estimating node position without a GPS + + +@ProcessorRegistry.register_processor(PortNum.SIMULATOR_APP) +class SimulatorAppProcessor(Processor): + def process(self, payload): + logger.debug("Received SIMULATOR_APP packet") + pass # NOTE: Used to let multiple instances of Linux native applications communicate as if they did using their LoRa chip. + + +@ProcessorRegistry.register_processor(PortNum.TRACEROUTE_APP) +class TraceRouteAppProcessor(Processor): + def process(self, payload): + logger.debug("Received TRACEROUTE_APP packet") + traceroute = RouteDiscovery() + traceroute.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.NEIGHBORINFO_APP) +class NeighborInfoAppProcessor(Processor): + def process(self, payload): + logger.debug("Received NEIGHBORINFO_APP packet") + neighbor_info = NeighborInfo() + neighbor_info.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN) +class AtakPluginProcessor(Processor): + def process(self, payload): + logger.debug("Received ATAK_PLUGIN packet") + pass # NOTE: ATAK Plugin + + +@ProcessorRegistry.register_processor(PortNum.MAP_REPORT_APP) +class MapReportAppProcessor(Processor): + def process(self, payload): + logger.debug("Received MAP_REPORT_APP packet") + map_report = MapReport() + map_report.ParseFromString(payload) + pass + + +@ProcessorRegistry.register_processor(PortNum.PRIVATE_APP) +class PrivateAppProcessor(Processor): + def process(self, payload): + logger.debug("Received PRIVATE_APP packet") + pass # NOTE: Private application portnum + + +@ProcessorRegistry.register_processor(PortNum.ATAK_FORWARDER) +class AtakForwarderProcessor(Processor): + def process(self, payload): + logger.debug("Received ATAK_FORWARDER packet") + pass # NOTE: ATAK Forwarder + + +@ProcessorRegistry.register_processor(PortNum.MAX) +class MaxProcessor(Processor): + def process(self, payload): + logger.debug("Received MAX packet") + pass # NOTE: Maximum portnum value diff --git a/main.py b/main.py new file mode 100644 index 0000000..912d5b7 --- /dev/null +++ b/main.py @@ -0,0 +1,72 @@ +import logging +import os + +import paho.mqtt.client as mqtt +import redis +from dotenv import load_dotenv +from meshtastic.mesh_pb2 import MeshPacket +from meshtastic.mqtt_pb2 import ServiceEnvelope +from prometheus_client import push_to_gateway, CollectorRegistry + +from exporter.processors import MessageProcessor + + +def handle_connect(client, userdata, flags, reason_code, properties): + print(f"Connected with result code {reason_code}") + client.subscribe(os.getenv('mqtt_topic', 'msh/israel/#')) + + +def handle_message(client, userdata, message): + print(f"Received message '{message.payload.decode()}' on topic '{message.topic}'") + envelope = ServiceEnvelope() + envelope.ParseFromString(message.payload) + + packet: MeshPacket = envelope.packet + if redis_client.set(str(packet.id), 1, nx=True, ex=os.getenv('redis_expiration', 60), get=True) is not None: + logging.debug(f"Packet {packet.id} already processed") + return + + # Process the packet + processor.process(packet) + + +if __name__ == "__main__": + load_dotenv() + # Create Redis client + redis_client = redis.Redis( + host=os.getenv('redis_host'), + port=int(os.getenv('redis_port')), + db=int(os.getenv('redis_db', 0)), + password=os.getenv('redis_password', None), + ) + + # Configure Prometheus exporter + registry = CollectorRegistry() + push_to_gateway( + os.getenv('prometheus_pushgateway'), + job=os.getenv('prometheus_job'), + registry=registry, + ) + + # Create an MQTT client + mqtt_client = mqtt.Client() + + mqtt_client.on_connect = handle_connect + mqtt_client.on_message = handle_message + + if bool(os.getenv('mqtt_is_tls', False)): + tls_context = mqtt.ssl.create_default_context() + mqtt_client.tls_set_context(tls_context) + + if os.getenv('mqtt_username', None) and os.getenv('mqtt_password', None): + mqtt_client.username_pw_set(os.getenv('mqtt_username'), os.getenv('mqtt_password')) + + mqtt_client.connect( + os.getenv('mqtt_host'), + int(os.getenv('mqtt_port')), + keepalive=int(os.getenv('mqtt_keepalive', 60)), + ) + # Configure the Processor and the Exporter + processor = MessageProcessor(registry) + + mqtt_client.loop_forever()