Added exception handling to avoid restarting and bein stuck in a loop + Ignoring JSON messages

This commit is contained in:
Gleb Tcivie 2024-07-08 22:25:49 +03:00
parent 0d4547e57d
commit e1b5ca2526
3 changed files with 128 additions and 65 deletions

View file

@ -129,43 +129,52 @@ class MessageProcessor:
) )
def process(self, mesh_packet: MeshPacket): def process(self, mesh_packet: MeshPacket):
if getattr(mesh_packet, 'encrypted'): try:
key_bytes = base64.b64decode(os.getenv('MQTT_SERVER_KEY', '1PG7OiApB1nwvP+rz05pAQ==').encode('ascii')) if getattr(mesh_packet, 'encrypted'):
nonce_packet_id = getattr(mesh_packet, "id").to_bytes(8, "little") key_bytes = base64.b64decode(os.getenv('MQTT_SERVER_KEY', '1PG7OiApB1nwvP+rz05pAQ==').encode('ascii'))
nonce_from_node = getattr(mesh_packet, "from").to_bytes(8, "little") nonce_packet_id = getattr(mesh_packet, "id").to_bytes(8, "little")
nonce_from_node = getattr(mesh_packet, "from").to_bytes(8, "little")
# Put both parts into a single byte array. # Put both parts into a single byte array.
nonce = nonce_packet_id + nonce_from_node nonce = nonce_packet_id + nonce_from_node
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend()) cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
decryptor = cipher.decryptor() decryptor = cipher.decryptor()
decrypted_bytes = decryptor.update(getattr(mesh_packet, "encrypted")) + decryptor.finalize() decrypted_bytes = decryptor.update(getattr(mesh_packet, "encrypted")) + decryptor.finalize()
data = Data() data = Data()
data.ParseFromString(decrypted_bytes) try:
mesh_packet.decoded.CopyFrom(data) data.ParseFromString(decrypted_bytes)
port_num = int(mesh_packet.decoded.portnum) except Exception as e:
payload = mesh_packet.decoded.payload print(f"Failed to decrypt message: {e}")
return
mesh_packet.decoded.CopyFrom(data)
port_num = int(mesh_packet.decoded.portnum)
payload = mesh_packet.decoded.payload
source_node_id = getattr(mesh_packet, 'from') source_node_id = getattr(mesh_packet, 'from')
source_client_details = self._get_client_details(source_node_id) source_client_details = self._get_client_details(source_node_id)
if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true': if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true':
source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden', source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden',
long_name='Hidden') long_name='Hidden')
destination_node_id = getattr(mesh_packet, 'to') destination_node_id = getattr(mesh_packet, 'to')
destination_client_details = self._get_client_details(destination_node_id) destination_client_details = self._get_client_details(destination_node_id)
if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true': if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true':
destination_client_details = ClientDetails(node_id=destination_client_details.node_id, short_name='Hidden', destination_client_details = ClientDetails(node_id=destination_client_details.node_id,
long_name='Hidden') short_name='Hidden',
long_name='Hidden')
if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports
return None # Ignore this packet return None # Ignore this packet
self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details) self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details)
processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool) processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool)
processor.process(payload, client_details=source_client_details) processor.process(payload, client_details=source_client_details)
except Exception as e:
print(f"Failed to process message: {e}")
return
@staticmethod @staticmethod
def get_port_name_from_portnum(port_num): def get_port_name_from_portnum(port_num):

View file

@ -92,8 +92,11 @@ class RemoteHardwareAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received REMOTE_HARDWARE_APP packet") logger.debug("Received REMOTE_HARDWARE_APP packet")
hardware_message = HardwareMessage() hardware_message = HardwareMessage()
hardware_message.ParseFromString(payload) try:
pass hardware_message.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse REMOTE_HARDWARE_APP packet: {e}")
return
@ProcessorRegistry.register_processor(PortNum.POSITION_APP) @ProcessorRegistry.register_processor(PortNum.POSITION_APP)
@ -101,7 +104,11 @@ class PositionAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received POSITION_APP packet") logger.debug("Received POSITION_APP packet")
position = Position() position = Position()
position.ParseFromString(payload) try:
position.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse POSITION_APP packet: {e}")
return
self.metrics.device_latitude_gauge.labels( self.metrics.device_latitude_gauge.labels(
**client_details.to_dict() **client_details.to_dict()
).set(position.latitude_i) ).set(position.latitude_i)
@ -122,7 +129,11 @@ class NodeInfoAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received NODEINFO_APP packet") logger.debug("Received NODEINFO_APP packet")
user = User() user = User()
user.ParseFromString(payload) try:
user.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse NODEINFO_APP packet: {e}")
return
def db_operation(cur, conn): def db_operation(cur, conn):
# First, try to select the existing record # First, try to select the existing record
@ -176,7 +187,11 @@ class RoutingAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received ROUTING_APP packet") logger.debug("Received ROUTING_APP packet")
routing = Routing() routing = Routing()
routing.ParseFromString(payload) try:
routing.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse ROUTING_APP packet: {e}")
return
self.metrics.route_discovery_response_counter.labels( self.metrics.route_discovery_response_counter.labels(
**client_details.to_dict(), **client_details.to_dict(),
response_type=self.get_error_name_from_routing(routing.error_reason) response_type=self.get_error_name_from_routing(routing.error_reason)
@ -195,8 +210,11 @@ class AdminAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received ADMIN_APP packet") logger.debug("Received ADMIN_APP packet")
admin_message = AdminMessage() admin_message = AdminMessage()
admin_message.ParseFromString(payload) try:
pass admin_message.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse ADMIN_APP packet: {e}")
return
@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP) @ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP)
@ -212,8 +230,11 @@ class WaypointAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received WAYPOINT_APP packet") logger.debug("Received WAYPOINT_APP packet")
waypoint = Waypoint() waypoint = Waypoint()
waypoint.ParseFromString(payload) try:
pass waypoint.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse WAYPOINT_APP packet: {e}")
return
@ProcessorRegistry.register_processor(PortNum.AUDIO_APP) @ProcessorRegistry.register_processor(PortNum.AUDIO_APP)
@ -249,8 +270,11 @@ class PaxCounterAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received PAXCOUNTER_APP packet") logger.debug("Received PAXCOUNTER_APP packet")
paxcounter = Paxcount() paxcounter = Paxcount()
paxcounter.ParseFromString(payload) try:
pass paxcounter.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse PAXCOUNTER_APP packet: {e}")
return
@ProcessorRegistry.register_processor(PortNum.SERIAL_APP) @ProcessorRegistry.register_processor(PortNum.SERIAL_APP)
@ -265,8 +289,11 @@ class StoreForwardAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received STORE_FORWARD_APP packet") logger.debug("Received STORE_FORWARD_APP packet")
store_and_forward = StoreAndForward() store_and_forward = StoreAndForward()
store_and_forward.ParseFromString(payload) try:
pass store_and_forward.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse STORE_FORWARD_APP packet: {e}")
return
@ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP) @ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP)
@ -284,7 +311,11 @@ class TelemetryAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received TELEMETRY_APP packet") logger.debug("Received TELEMETRY_APP packet")
telemetry = Telemetry() telemetry = Telemetry()
telemetry.ParseFromString(payload) try:
telemetry.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse TELEMETRY_APP packet: {e}")
return
if telemetry.HasField('device_metrics'): if telemetry.HasField('device_metrics'):
device_metrics: DeviceMetrics = telemetry.device_metrics device_metrics: DeviceMetrics = telemetry.device_metrics
@ -458,7 +489,11 @@ class TraceRouteAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received TRACEROUTE_APP packet") logger.debug("Received TRACEROUTE_APP packet")
traceroute = RouteDiscovery() traceroute = RouteDiscovery()
traceroute.ParseFromString(payload) try:
traceroute.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse TRACEROUTE_APP packet: {e}")
return
if traceroute.route: if traceroute.route:
route = traceroute.route route = traceroute.route
self.metrics.route_discovery_gauge.labels( self.metrics.route_discovery_gauge.labels(
@ -471,7 +506,11 @@ class NeighborInfoAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received NEIGHBORINFO_APP packet") logger.debug("Received NEIGHBORINFO_APP packet")
neighbor_info = NeighborInfo() neighbor_info = NeighborInfo()
neighbor_info.ParseFromString(payload) try:
neighbor_info.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse NEIGHBORINFO_APP packet: {e}")
return
self.update_node_graph(neighbor_info, client_details) self.update_node_graph(neighbor_info, client_details)
self.update_node_neighbors(neighbor_info, client_details) self.update_node_neighbors(neighbor_info, client_details)
@ -537,8 +576,11 @@ class MapReportAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received MAP_REPORT_APP packet") logger.debug("Received MAP_REPORT_APP packet")
map_report = MapReport() map_report = MapReport()
map_report.ParseFromString(payload) try:
pass # Nothing interesting here map_report.ParseFromString(payload)
except Exception as e:
logger.error(f"Failed to parse MAP_REPORT_APP packet: {e}")
return
@ProcessorRegistry.register_processor(PortNum.PRIVATE_APP) @ProcessorRegistry.register_processor(PortNum.PRIVATE_APP)

48
main.py
View file

@ -14,7 +14,6 @@ except ImportError:
from meshtastic.protobuf.mesh_pb2 import MeshPacket from meshtastic.protobuf.mesh_pb2 import MeshPacket
from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope
from paho.mqtt.enums import CallbackAPIVersion
from prometheus_client import CollectorRegistry, start_http_server from prometheus_client import CollectorRegistry, start_http_server
from psycopg_pool import ConnectionPool from psycopg_pool import ConnectionPool
@ -49,29 +48,42 @@ def update_node_status(node_number, status):
def handle_message(client, userdata, message): def handle_message(client, userdata, message):
current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"Received message on topic '{message.topic}' at {current_timestamp}") print(f"Received message on topic '{message.topic}' at {current_timestamp}")
if '/stat/' in message.topic: if '/json/' in message.topic:
user_id = message.topic.split('/')[-1] # Hexadecimal user ID # Ignore JSON messages as there are also protobuf messages sent on other topic
if user_id[0] == '!': # Source: https://github.com/meshtastic/firmware/blob/master/src/mqtt/MQTT.cpp#L448
node_number = str(int(user_id[1:], 16))
update_node_status(node_number, message.payload.decode('utf-8'))
return return
if '/stat/' in message.topic or '/tele/' in message.topic:
try:
user_id = message.topic.split('/')[-1] # Hexadecimal user ID
if user_id[0] == '!':
node_number = str(int(user_id[1:], 16))
update_node_status(node_number, message.payload.decode('utf-8'))
return
except Exception as e:
logging.error(f"Failed to handle user MQTT stat: {e}")
return
envelope = ServiceEnvelope() envelope = ServiceEnvelope()
envelope.ParseFromString(message.payload) try:
packet: MeshPacket = envelope.packet envelope.ParseFromString(message.payload)
packet: MeshPacket = envelope.packet
with connection_pool.connection() as conn: with connection_pool.connection() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute("SELECT id FROM messages WHERE id = %s", (str(packet.id),)) cur.execute("SELECT id FROM messages WHERE id = %s", (str(packet.id),))
if cur.fetchone() is not None: if cur.fetchone() is not None:
logging.debug(f"Packet {packet.id} already processed") logging.debug(f"Packet {packet.id} already processed")
return return
cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING", cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING",
(str(packet.id),)) (str(packet.id),))
conn.commit() conn.commit()
processor.process(packet) processor.process(packet)
except Exception as e:
logging.error(f"Failed to handle message: {e}")
return
if __name__ == "__main__": if __name__ == "__main__":