From 0b7874df10412b310826f8657ab6358731d8464f Mon Sep 17 00:00:00 2001 From: Gleb Tcivie Date: Mon, 24 Jun 2024 18:05:47 +0300 Subject: [PATCH] added support for TEXT_MESSAGE_APP and .env files --- .env | 26 ++++++++ .idea/meshtastic-metrics-exporter.iml | 3 + Dockerfile | 8 +++ exporter/processors.py | 58 ++++++++++++++---- exporter/registry.py | 87 +++++++++++++++++---------- main.py | 26 ++++---- requirements.txt | 5 ++ 7 files changed, 157 insertions(+), 56 deletions(-) create mode 100644 Dockerfile create mode 100644 requirements.txt diff --git a/.env b/.env index e69de29..4845ffc 100644 --- a/.env +++ b/.env @@ -0,0 +1,26 @@ +# Description: Environment variables for the application + +# Redis connection details +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_DB=0 +REDIS_PASSWORD= + +# Prometheus connection details +PROMETHEUS_PUSHGATEWAY=http://localhost:9091 +PROMETHEUS_JOB=example + +# MQTT connection details +MQTT_HOST=localhost +MQTT_PORT=1883 +MQTT_USERNAME= +MQTT_PASSWORD= +MQTT_KEEPALIVE=60 +MQTT_TOPIC='msh/israel/#' +MQTT_IS_TLS=false + +# Exporter configuration +MESH_HIDE_SOURCE_DATA=false# Hide source data in the exporter (default: false) +MESH_HIDE_DESTINATION_DATA=false# Hide destination data in the exporter (default: false) +FILTERED_PORTS=1# Filtered ports in the exporter (default: 1, can be a comma-separated list of ports) +HIDE_MESSAGE=true# Hide message content in the TEXT_MESSAGE_APP packets (default: true) \ No newline at end of file diff --git a/.idea/meshtastic-metrics-exporter.iml b/.idea/meshtastic-metrics-exporter.iml index 2c80e12..ba8dfea 100644 --- a/.idea/meshtastic-metrics-exporter.iml +++ b/.idea/meshtastic-metrics-exporter.iml @@ -7,4 +7,7 @@ + + \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2ee4941 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.12-slim +LABEL author="Gleb Tcivie" + +WORKDIR /app +COPY requirements.txt . +RUN pip install -r requirements.txt +COPY main.py . +CMD ["python3", "-u", "main.py"] \ No newline at end of file diff --git a/exporter/processors.py b/exporter/processors.py index 44637c4..9aaa5b5 100644 --- a/exporter/processors.py +++ b/exporter/processors.py @@ -1,3 +1,5 @@ +import os + import redis from meshtastic.mesh_pb2 import MeshPacket from prometheus_client import CollectorRegistry, Counter @@ -10,32 +12,64 @@ class MessageProcessor: self.registry = registry self.redis_client = redis_client self.counter = Counter('mesh_packets', 'Number of mesh packets processed', - ['source_id', 'source_short_name', 'source_long_name', 'portnum'], + [ + 'source_id', 'source_short_name', 'source_long_name', + 'destination_id', 'destination_short_name', 'destination_long_name', + 'portnum', + 'rx_time', 'rx_snr', 'hop_limit', 'want_ack', 'via_mqtt', 'hop_start' + ], registry=self.registry) def process(self, mesh_packet: MeshPacket): - port_num = mesh_packet.decoded.portnum + port_num = int(mesh_packet.decoded.portnum) payload = mesh_packet.decoded.payload - processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.redis_client) - client_details = self._get_client_details(mesh_packet) + source_client_details = self._get_client_details(mesh_packet['from']) + if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true': + source_client_details = { + 'id': source_client_details['id'], + 'short_name': 'Hidden', + 'long_name': 'Hidden', + } + destination_client_details = self._get_client_details(mesh_packet['to']) + if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true': + destination_client_details = { + 'id': destination_client_details['id'], + 'short_name': 'Hidden', + 'long_name': 'Hidden', + } + + if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports + return None # Ignore this packet + self.counter.labels( - source_id=client_details['id'], - source_short_name=client_details['short_name'], - source_long_name=client_details['long_name'], + source_id=source_client_details['id'], + source_short_name=source_client_details['short_name'], + source_long_name=source_client_details['long_name'], + + destination_id=destination_client_details['id'], + destination_short_name=destination_client_details['short_name'], + destination_long_name=destination_client_details['long_name'], + + rx_time=mesh_packet.rx_time, + rx_snr=mesh_packet.rx_snr, + hop_limit=mesh_packet.hop_limit, + want_ack=mesh_packet.want_ack, + via_mqtt=mesh_packet.via_mqtt, + hop_start=mesh_packet.hop_start, portnum=port_num ).inc() - processor.process_packet(payload) - def _get_client_details(self, mesh_packet: MeshPacket): - from_id = mesh_packet['from'] + processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.redis_client) + processor.process(payload) - details = self.redis_client.hgetall(f"node:{from_id}") + def _get_client_details(self, id: str): + details = self.redis_client.hgetall(f"node:{id}") if details: return details return { - 'id': from_id, + 'id': id, 'short_name': 'Unknown', 'long_name': 'Unknown', } diff --git a/exporter/registry.py b/exporter/registry.py index b2e4231..b1ea9c4 100644 --- a/exporter/registry.py +++ b/exporter/registry.py @@ -1,3 +1,4 @@ +import os from abc import ABC, abstractmethod from venv import logger @@ -11,7 +12,14 @@ from meshtastic.portnums_pb2 import PortNum from meshtastic.remote_hardware_pb2 import HardwareMessage from meshtastic.storeforward_pb2 import StoreAndForward from meshtastic.telemetry_pb2 import Telemetry -from prometheus_client import CollectorRegistry +from prometheus_client import CollectorRegistry, Counter + + +class ClientDetails: + def __init__(self, id, short_name, long_name): + self.id = id + self.short_name = short_name + self.long_name = long_name class Processor(ABC): @@ -20,7 +28,7 @@ class Processor(ABC): self.redis_client = redis_client @abstractmethod - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): pass @@ -42,21 +50,38 @@ class ProcessorRegistry: @ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP) class UnknownAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received UNKNOWN_APP packet") return None -@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP) +@ProcessorRegistry.register_processor('TEXT_MESSAGE_APP') class TextMessageAppProcessor(Processor): - def process(self, payload): + def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): + super().__init__(registry, redis_client) + self.message_counter = Counter( + 'text_message_app', + 'Text message app payload details', + ['client_id', 'short_name', 'long_name', 'message_content'], + registry=self.registry + ) + + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received TEXT_MESSAGE_APP packet") - pass + message = payload.decode('utf-8') + if os.getenv('HIDE_MESSAGE', 'true') == 'true': + message = 'Hidden' + self.message_counter.labels( + client_id=client_details.id, + short_name=client_details.short_name, + long_name=client_details.long_name, + message_content=message + ).inc() @ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP) class RemoteHardwareAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received REMOTE_HARDWARE_APP packet") hardware_message = HardwareMessage() hardware_message.ParseFromString(payload) @@ -65,7 +90,7 @@ class RemoteHardwareAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.POSITION_APP) class PositionAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received POSITION_APP packet") position = Position() position.ParseFromString(payload) @@ -74,7 +99,7 @@ class PositionAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.NODEINFO_APP) class NodeInfoAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received NODEINFO_APP packet") user = User() user.ParseFromString(payload) @@ -83,7 +108,7 @@ class NodeInfoAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.ROUTING_APP) class RoutingAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received ROUTING_APP packet") routing = Routing() routing.ParseFromString(payload) @@ -92,7 +117,7 @@ class RoutingAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.ADMIN_APP) class AdminAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received ADMIN_APP packet") admin_message = AdminMessage() admin_message.ParseFromString(payload) @@ -101,7 +126,7 @@ class AdminAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP) class TextMessageCompressedAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received TEXT_MESSAGE_COMPRESSED_APP packet") decompressed_payload = unishox2.decompress(payload, len(payload)) pass @@ -109,7 +134,7 @@ class TextMessageCompressedAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.WAYPOINT_APP) class WaypointAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received WAYPOINT_APP packet") waypoint = Waypoint() waypoint.ParseFromString(payload) @@ -118,35 +143,35 @@ class WaypointAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.AUDIO_APP) class AudioAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received AUDIO_APP packet") pass # NOTE: Audio packet. should probably be processed @ProcessorRegistry.register_processor(PortNum.DETECTION_SENSOR_APP) class DetectionSensorAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received DETECTION_SENSOR_APP packet") pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9 @ProcessorRegistry.register_processor(PortNum.REPLY_APP) class ReplyAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received REPLY_APP packet") pass # NOTE: Provides a 'ping' service that replies to any packet it receives. This is useful for testing. @ProcessorRegistry.register_processor(PortNum.IP_TUNNEL_APP) class IpTunnelAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received IP_TUNNEL_APP packet") pass # NOTE: IP Packet. Handled by the python API, firmware ignores this one and passes it on. @ProcessorRegistry.register_processor(PortNum.PAXCOUNTER_APP) class PaxCounterAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received PAXCOUNTER_APP packet") paxcounter = Paxcount() paxcounter.ParseFromString(payload) @@ -155,14 +180,14 @@ class PaxCounterAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.SERIAL_APP) class SerialAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received SERIAL_APP packet") pass # NOTE: Provides a hardware serial interface to send and receive from the Meshtastic network. @ProcessorRegistry.register_processor(PortNum.STORE_FORWARD_APP) class StoreForwardAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received STORE_FORWARD_APP packet") store_and_forward = StoreAndForward() store_and_forward.ParseFromString(payload) @@ -171,14 +196,14 @@ class StoreForwardAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP) class RangeTestAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received RANGE_TEST_APP packet") pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9 @ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP) class TelemetryAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received TELEMETRY_APP packet") telemetry = Telemetry() telemetry.ParseFromString(payload) @@ -187,21 +212,21 @@ class TelemetryAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.ZPS_APP) class ZpsAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received ZPS_APP packet") pass # NOTE: Experimental tools for estimating node position without a GPS @ProcessorRegistry.register_processor(PortNum.SIMULATOR_APP) class SimulatorAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received SIMULATOR_APP packet") pass # NOTE: Used to let multiple instances of Linux native applications communicate as if they did using their LoRa chip. @ProcessorRegistry.register_processor(PortNum.TRACEROUTE_APP) class TraceRouteAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received TRACEROUTE_APP packet") traceroute = RouteDiscovery() traceroute.ParseFromString(payload) @@ -210,7 +235,7 @@ class TraceRouteAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.NEIGHBORINFO_APP) class NeighborInfoAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received NEIGHBORINFO_APP packet") neighbor_info = NeighborInfo() neighbor_info.ParseFromString(payload) @@ -219,14 +244,14 @@ class NeighborInfoAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN) class AtakPluginProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received ATAK_PLUGIN packet") pass # NOTE: ATAK Plugin @ProcessorRegistry.register_processor(PortNum.MAP_REPORT_APP) class MapReportAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received MAP_REPORT_APP packet") map_report = MapReport() map_report.ParseFromString(payload) @@ -235,20 +260,20 @@ class MapReportAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.PRIVATE_APP) class PrivateAppProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received PRIVATE_APP packet") pass # NOTE: Private application portnum @ProcessorRegistry.register_processor(PortNum.ATAK_FORWARDER) class AtakForwarderProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received ATAK_FORWARDER packet") pass # NOTE: ATAK Forwarder @ProcessorRegistry.register_processor(PortNum.MAX) class MaxProcessor(Processor): - def process(self, payload): + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received MAX packet") pass # NOTE: Maximum portnum value diff --git a/main.py b/main.py index 4e4be23..a941ed1 100644 --- a/main.py +++ b/main.py @@ -13,7 +13,7 @@ from exporter.processors import MessageProcessor def handle_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") - client.subscribe(os.getenv('mqtt_topic', 'msh/israel/#')) + client.subscribe(os.getenv('MQTT_TOPIC', 'msh/israel/#')) def handle_message(client, userdata, message): @@ -34,17 +34,17 @@ if __name__ == "__main__": load_dotenv() # Create Redis client redis_client = redis.Redis( - host=os.getenv('redis_host'), - port=int(os.getenv('redis_port')), - db=int(os.getenv('redis_db', 0)), - password=os.getenv('redis_password', None), + host=os.getenv('REDIS_HOST'), + port=int(os.getenv('REDIS_PORT')), + db=int(os.getenv('REDIS_DB', 0)), + password=os.getenv('REDIS_PASSWORD', None), ) # Configure Prometheus exporter registry = CollectorRegistry() push_to_gateway( - os.getenv('prometheus_pushgateway'), - job=os.getenv('prometheus_job'), + os.getenv('PROMETHEUS_PUSHGATEWAY'), + job=os.getenv('PROMETHEUS_JOB'), registry=registry, ) @@ -54,17 +54,17 @@ if __name__ == "__main__": mqtt_client.on_connect = handle_connect mqtt_client.on_message = handle_message - if bool(os.getenv('mqtt_is_tls', False)): + if bool(os.getenv('MQTT_IS_TLS', False)): tls_context = mqtt.ssl.create_default_context() mqtt_client.tls_set_context(tls_context) - if os.getenv('mqtt_username', None) and os.getenv('mqtt_password', None): - mqtt_client.username_pw_set(os.getenv('mqtt_username'), os.getenv('mqtt_password')) + if os.getenv('MQTT_USERNAME', None) and os.getenv('MQTT_PASSWORD', None): + mqtt_client.username_pw_set(os.getenv('MQTT_USERNAME'), os.getenv('MQTT_PASSWORD')) mqtt_client.connect( - os.getenv('mqtt_host'), - int(os.getenv('mqtt_port')), - keepalive=int(os.getenv('mqtt_keepalive', 60)), + os.getenv('MQTT_HOST'), + int(os.getenv('MQTT_PORT')), + keepalive=int(os.getenv('MQTT_KEEPALIVE', 60)), ) # Configure the Processor and the Exporter processor = MessageProcessor(registry, redis_client) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4fba887 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +paho-mqtt~=2.1.0 +redis~=5.0.6 +python-dotenv~=1.0.1 +meshtastic~=2.3.11 +prometheus_client~=0.20.0 \ No newline at end of file