From e1b5ca252660dc0e2d090743e44896e28a7b0454 Mon Sep 17 00:00:00 2001 From: Gleb Tcivie Date: Mon, 8 Jul 2024 22:25:49 +0300 Subject: [PATCH] Added exception handling to avoid restarting and bein stuck in a loop + Ignoring JSON messages --- exporter/processor_base.py | 67 ++++++++++++++++++-------------- exporter/processors.py | 78 +++++++++++++++++++++++++++++--------- main.py | 48 ++++++++++++++--------- 3 files changed, 128 insertions(+), 65 deletions(-) diff --git a/exporter/processor_base.py b/exporter/processor_base.py index ce2b307..8d9939f 100644 --- a/exporter/processor_base.py +++ b/exporter/processor_base.py @@ -129,43 +129,52 @@ class MessageProcessor: ) def process(self, mesh_packet: MeshPacket): - if getattr(mesh_packet, 'encrypted'): - key_bytes = base64.b64decode(os.getenv('MQTT_SERVER_KEY', '1PG7OiApB1nwvP+rz05pAQ==').encode('ascii')) - nonce_packet_id = getattr(mesh_packet, "id").to_bytes(8, "little") - nonce_from_node = getattr(mesh_packet, "from").to_bytes(8, "little") + try: + if getattr(mesh_packet, 'encrypted'): + key_bytes = base64.b64decode(os.getenv('MQTT_SERVER_KEY', '1PG7OiApB1nwvP+rz05pAQ==').encode('ascii')) + nonce_packet_id = getattr(mesh_packet, "id").to_bytes(8, "little") + nonce_from_node = getattr(mesh_packet, "from").to_bytes(8, "little") - # Put both parts into a single byte array. - nonce = nonce_packet_id + nonce_from_node + # Put both parts into a single byte array. + nonce = nonce_packet_id + nonce_from_node - cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend()) - decryptor = cipher.decryptor() - decrypted_bytes = decryptor.update(getattr(mesh_packet, "encrypted")) + decryptor.finalize() + cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend()) + decryptor = cipher.decryptor() + decrypted_bytes = decryptor.update(getattr(mesh_packet, "encrypted")) + decryptor.finalize() - data = Data() - data.ParseFromString(decrypted_bytes) - mesh_packet.decoded.CopyFrom(data) - port_num = int(mesh_packet.decoded.portnum) - payload = mesh_packet.decoded.payload + data = Data() + try: + data.ParseFromString(decrypted_bytes) + except Exception as e: + print(f"Failed to decrypt message: {e}") + return + mesh_packet.decoded.CopyFrom(data) + port_num = int(mesh_packet.decoded.portnum) + payload = mesh_packet.decoded.payload - source_node_id = getattr(mesh_packet, 'from') - source_client_details = self._get_client_details(source_node_id) - if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true': - source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden', - long_name='Hidden') + source_node_id = getattr(mesh_packet, 'from') + source_client_details = self._get_client_details(source_node_id) + if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true': + source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden', + long_name='Hidden') - destination_node_id = getattr(mesh_packet, 'to') - destination_client_details = self._get_client_details(destination_node_id) - if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true': - destination_client_details = ClientDetails(node_id=destination_client_details.node_id, short_name='Hidden', - long_name='Hidden') + destination_node_id = getattr(mesh_packet, 'to') + destination_client_details = self._get_client_details(destination_node_id) + if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true': + destination_client_details = ClientDetails(node_id=destination_client_details.node_id, + short_name='Hidden', + long_name='Hidden') - if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports - return None # Ignore this packet + if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports + return None # Ignore this packet - self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details) + self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details) - processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool) - processor.process(payload, client_details=source_client_details) + processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool) + processor.process(payload, client_details=source_client_details) + except Exception as e: + print(f"Failed to process message: {e}") + return @staticmethod def get_port_name_from_portnum(port_num): diff --git a/exporter/processors.py b/exporter/processors.py index 1eb0c60..652c377 100644 --- a/exporter/processors.py +++ b/exporter/processors.py @@ -92,8 +92,11 @@ class RemoteHardwareAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received REMOTE_HARDWARE_APP packet") hardware_message = HardwareMessage() - hardware_message.ParseFromString(payload) - pass + try: + hardware_message.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse REMOTE_HARDWARE_APP packet: {e}") + return @ProcessorRegistry.register_processor(PortNum.POSITION_APP) @@ -101,7 +104,11 @@ class PositionAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received POSITION_APP packet") position = Position() - position.ParseFromString(payload) + try: + position.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse POSITION_APP packet: {e}") + return self.metrics.device_latitude_gauge.labels( **client_details.to_dict() ).set(position.latitude_i) @@ -122,7 +129,11 @@ class NodeInfoAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received NODEINFO_APP packet") user = User() - user.ParseFromString(payload) + try: + user.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse NODEINFO_APP packet: {e}") + return def db_operation(cur, conn): # First, try to select the existing record @@ -176,7 +187,11 @@ class RoutingAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received ROUTING_APP packet") routing = Routing() - routing.ParseFromString(payload) + try: + routing.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse ROUTING_APP packet: {e}") + return self.metrics.route_discovery_response_counter.labels( **client_details.to_dict(), response_type=self.get_error_name_from_routing(routing.error_reason) @@ -195,8 +210,11 @@ class AdminAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received ADMIN_APP packet") admin_message = AdminMessage() - admin_message.ParseFromString(payload) - pass + try: + admin_message.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse ADMIN_APP packet: {e}") + return @ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP) @@ -212,8 +230,11 @@ class WaypointAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received WAYPOINT_APP packet") waypoint = Waypoint() - waypoint.ParseFromString(payload) - pass + try: + waypoint.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse WAYPOINT_APP packet: {e}") + return @ProcessorRegistry.register_processor(PortNum.AUDIO_APP) @@ -249,8 +270,11 @@ class PaxCounterAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received PAXCOUNTER_APP packet") paxcounter = Paxcount() - paxcounter.ParseFromString(payload) - pass + try: + paxcounter.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse PAXCOUNTER_APP packet: {e}") + return @ProcessorRegistry.register_processor(PortNum.SERIAL_APP) @@ -265,8 +289,11 @@ class StoreForwardAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received STORE_FORWARD_APP packet") store_and_forward = StoreAndForward() - store_and_forward.ParseFromString(payload) - pass + try: + store_and_forward.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse STORE_FORWARD_APP packet: {e}") + return @ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP) @@ -284,7 +311,11 @@ class TelemetryAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received TELEMETRY_APP packet") telemetry = Telemetry() - telemetry.ParseFromString(payload) + try: + telemetry.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse TELEMETRY_APP packet: {e}") + return if telemetry.HasField('device_metrics'): device_metrics: DeviceMetrics = telemetry.device_metrics @@ -458,7 +489,11 @@ class TraceRouteAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received TRACEROUTE_APP packet") traceroute = RouteDiscovery() - traceroute.ParseFromString(payload) + try: + traceroute.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse TRACEROUTE_APP packet: {e}") + return if traceroute.route: route = traceroute.route self.metrics.route_discovery_gauge.labels( @@ -471,7 +506,11 @@ class NeighborInfoAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received NEIGHBORINFO_APP packet") neighbor_info = NeighborInfo() - neighbor_info.ParseFromString(payload) + try: + neighbor_info.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse NEIGHBORINFO_APP packet: {e}") + return self.update_node_graph(neighbor_info, client_details) self.update_node_neighbors(neighbor_info, client_details) @@ -537,8 +576,11 @@ class MapReportAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received MAP_REPORT_APP packet") map_report = MapReport() - map_report.ParseFromString(payload) - pass # Nothing interesting here + try: + map_report.ParseFromString(payload) + except Exception as e: + logger.error(f"Failed to parse MAP_REPORT_APP packet: {e}") + return @ProcessorRegistry.register_processor(PortNum.PRIVATE_APP) diff --git a/main.py b/main.py index 5de3295..8b84d08 100644 --- a/main.py +++ b/main.py @@ -14,7 +14,6 @@ except ImportError: from meshtastic.protobuf.mesh_pb2 import MeshPacket from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope -from paho.mqtt.enums import CallbackAPIVersion from prometheus_client import CollectorRegistry, start_http_server from psycopg_pool import ConnectionPool @@ -49,29 +48,42 @@ def update_node_status(node_number, status): def handle_message(client, userdata, message): current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"Received message on topic '{message.topic}' at {current_timestamp}") - if '/stat/' in message.topic: - user_id = message.topic.split('/')[-1] # Hexadecimal user ID - if user_id[0] == '!': - node_number = str(int(user_id[1:], 16)) - update_node_status(node_number, message.payload.decode('utf-8')) + if '/json/' in message.topic: + # Ignore JSON messages as there are also protobuf messages sent on other topic + # Source: https://github.com/meshtastic/firmware/blob/master/src/mqtt/MQTT.cpp#L448 return + if '/stat/' in message.topic or '/tele/' in message.topic: + try: + user_id = message.topic.split('/')[-1] # Hexadecimal user ID + if user_id[0] == '!': + node_number = str(int(user_id[1:], 16)) + update_node_status(node_number, message.payload.decode('utf-8')) + return + except Exception as e: + logging.error(f"Failed to handle user MQTT stat: {e}") + return + envelope = ServiceEnvelope() - envelope.ParseFromString(message.payload) - packet: MeshPacket = envelope.packet + try: + envelope.ParseFromString(message.payload) + packet: MeshPacket = envelope.packet - with connection_pool.connection() as conn: - with conn.cursor() as cur: - cur.execute("SELECT id FROM messages WHERE id = %s", (str(packet.id),)) - if cur.fetchone() is not None: - logging.debug(f"Packet {packet.id} already processed") - return + with connection_pool.connection() as conn: + with conn.cursor() as cur: + cur.execute("SELECT id FROM messages WHERE id = %s", (str(packet.id),)) + if cur.fetchone() is not None: + logging.debug(f"Packet {packet.id} already processed") + return - cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING", - (str(packet.id),)) - conn.commit() + cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING", + (str(packet.id),)) + conn.commit() - processor.process(packet) + processor.process(packet) + except Exception as e: + logging.error(f"Failed to handle message: {e}") + return if __name__ == "__main__":