Merge pull request #28 from tcivie/26-doesnt-connect-to-meshtastic-public-mqtt
Handle MQTT JSON messages and other invalid messages + Add support for defining MQTT connection version
This commit is contained in:
commit
b3d81b2706
8
.env
8
.env
|
@ -16,6 +16,14 @@ MQTT_KEEPALIVE=60
|
|||
MQTT_TOPIC='msh/israel/#'
|
||||
MQTT_IS_TLS=false
|
||||
|
||||
# MQTT protocol version (default: MQTTv5) the public MQTT server supports MQTTv311
|
||||
# Options: MQTTv311, MQTTv31, MQTTv5
|
||||
MQTT_PROTOCOL=MQTTv311
|
||||
|
||||
# MQTT callback API version (default: VERSION2) the public MQTT server supports VERSION2
|
||||
# Options: VERSION1, VERSION2
|
||||
MQTT_CALLBACK_API_VERSION=VERSION2
|
||||
|
||||
# Exporter configuration
|
||||
## Hide source data in the exporter (default: false)
|
||||
MESH_HIDE_SOURCE_DATA=false
|
||||
|
|
13
constants.py
Normal file
13
constants.py
Normal file
|
@ -0,0 +1,13 @@
|
|||
import paho.mqtt.client as mqtt
|
||||
from paho.mqtt.enums import CallbackAPIVersion
|
||||
|
||||
protocol_map = {
|
||||
'MQTTv31': mqtt.MQTTv31,
|
||||
'MQTTv311': mqtt.MQTTv311,
|
||||
'MQTTv5': mqtt.MQTTv5
|
||||
}
|
||||
|
||||
callback_api_version_map = {
|
||||
'VERSION1': CallbackAPIVersion.VERSION1,
|
||||
'VERSION2': CallbackAPIVersion.VERSION2
|
||||
}
|
|
@ -129,43 +129,52 @@ class MessageProcessor:
|
|||
)
|
||||
|
||||
def process(self, mesh_packet: MeshPacket):
|
||||
if getattr(mesh_packet, 'encrypted'):
|
||||
key_bytes = base64.b64decode(os.getenv('MQTT_SERVER_KEY', '1PG7OiApB1nwvP+rz05pAQ==').encode('ascii'))
|
||||
nonce_packet_id = getattr(mesh_packet, "id").to_bytes(8, "little")
|
||||
nonce_from_node = getattr(mesh_packet, "from").to_bytes(8, "little")
|
||||
try:
|
||||
if getattr(mesh_packet, 'encrypted'):
|
||||
key_bytes = base64.b64decode(os.getenv('MQTT_SERVER_KEY', '1PG7OiApB1nwvP+rz05pAQ==').encode('ascii'))
|
||||
nonce_packet_id = getattr(mesh_packet, "id").to_bytes(8, "little")
|
||||
nonce_from_node = getattr(mesh_packet, "from").to_bytes(8, "little")
|
||||
|
||||
# Put both parts into a single byte array.
|
||||
nonce = nonce_packet_id + nonce_from_node
|
||||
# Put both parts into a single byte array.
|
||||
nonce = nonce_packet_id + nonce_from_node
|
||||
|
||||
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
decrypted_bytes = decryptor.update(getattr(mesh_packet, "encrypted")) + decryptor.finalize()
|
||||
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
decrypted_bytes = decryptor.update(getattr(mesh_packet, "encrypted")) + decryptor.finalize()
|
||||
|
||||
data = Data()
|
||||
data.ParseFromString(decrypted_bytes)
|
||||
mesh_packet.decoded.CopyFrom(data)
|
||||
port_num = int(mesh_packet.decoded.portnum)
|
||||
payload = mesh_packet.decoded.payload
|
||||
data = Data()
|
||||
try:
|
||||
data.ParseFromString(decrypted_bytes)
|
||||
except Exception as e:
|
||||
print(f"Failed to decrypt message: {e}")
|
||||
return
|
||||
mesh_packet.decoded.CopyFrom(data)
|
||||
port_num = int(mesh_packet.decoded.portnum)
|
||||
payload = mesh_packet.decoded.payload
|
||||
|
||||
source_node_id = getattr(mesh_packet, 'from')
|
||||
source_client_details = self._get_client_details(source_node_id)
|
||||
if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true':
|
||||
source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden',
|
||||
long_name='Hidden')
|
||||
source_node_id = getattr(mesh_packet, 'from')
|
||||
source_client_details = self._get_client_details(source_node_id)
|
||||
if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true':
|
||||
source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden',
|
||||
long_name='Hidden')
|
||||
|
||||
destination_node_id = getattr(mesh_packet, 'to')
|
||||
destination_client_details = self._get_client_details(destination_node_id)
|
||||
if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true':
|
||||
destination_client_details = ClientDetails(node_id=destination_client_details.node_id, short_name='Hidden',
|
||||
long_name='Hidden')
|
||||
destination_node_id = getattr(mesh_packet, 'to')
|
||||
destination_client_details = self._get_client_details(destination_node_id)
|
||||
if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true':
|
||||
destination_client_details = ClientDetails(node_id=destination_client_details.node_id,
|
||||
short_name='Hidden',
|
||||
long_name='Hidden')
|
||||
|
||||
if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports
|
||||
return None # Ignore this packet
|
||||
if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports
|
||||
return None # Ignore this packet
|
||||
|
||||
self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details)
|
||||
self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details)
|
||||
|
||||
processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool)
|
||||
processor.process(payload, client_details=source_client_details)
|
||||
processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool)
|
||||
processor.process(payload, client_details=source_client_details)
|
||||
except Exception as e:
|
||||
print(f"Failed to process message: {e}")
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def get_port_name_from_portnum(port_num):
|
||||
|
|
|
@ -92,8 +92,11 @@ class RemoteHardwareAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received REMOTE_HARDWARE_APP packet")
|
||||
hardware_message = HardwareMessage()
|
||||
hardware_message.ParseFromString(payload)
|
||||
pass
|
||||
try:
|
||||
hardware_message.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse REMOTE_HARDWARE_APP packet: {e}")
|
||||
return
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.POSITION_APP)
|
||||
|
@ -101,7 +104,11 @@ class PositionAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received POSITION_APP packet")
|
||||
position = Position()
|
||||
position.ParseFromString(payload)
|
||||
try:
|
||||
position.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse POSITION_APP packet: {e}")
|
||||
return
|
||||
self.metrics.device_latitude_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.latitude_i)
|
||||
|
@ -122,7 +129,11 @@ class NodeInfoAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received NODEINFO_APP packet")
|
||||
user = User()
|
||||
user.ParseFromString(payload)
|
||||
try:
|
||||
user.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse NODEINFO_APP packet: {e}")
|
||||
return
|
||||
|
||||
def db_operation(cur, conn):
|
||||
# First, try to select the existing record
|
||||
|
@ -176,7 +187,11 @@ class RoutingAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ROUTING_APP packet")
|
||||
routing = Routing()
|
||||
routing.ParseFromString(payload)
|
||||
try:
|
||||
routing.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse ROUTING_APP packet: {e}")
|
||||
return
|
||||
self.metrics.route_discovery_response_counter.labels(
|
||||
**client_details.to_dict(),
|
||||
response_type=self.get_error_name_from_routing(routing.error_reason)
|
||||
|
@ -195,8 +210,11 @@ class AdminAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ADMIN_APP packet")
|
||||
admin_message = AdminMessage()
|
||||
admin_message.ParseFromString(payload)
|
||||
pass
|
||||
try:
|
||||
admin_message.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse ADMIN_APP packet: {e}")
|
||||
return
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP)
|
||||
|
@ -212,8 +230,11 @@ class WaypointAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received WAYPOINT_APP packet")
|
||||
waypoint = Waypoint()
|
||||
waypoint.ParseFromString(payload)
|
||||
pass
|
||||
try:
|
||||
waypoint.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse WAYPOINT_APP packet: {e}")
|
||||
return
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.AUDIO_APP)
|
||||
|
@ -249,8 +270,11 @@ class PaxCounterAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received PAXCOUNTER_APP packet")
|
||||
paxcounter = Paxcount()
|
||||
paxcounter.ParseFromString(payload)
|
||||
pass
|
||||
try:
|
||||
paxcounter.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse PAXCOUNTER_APP packet: {e}")
|
||||
return
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.SERIAL_APP)
|
||||
|
@ -265,8 +289,11 @@ class StoreForwardAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received STORE_FORWARD_APP packet")
|
||||
store_and_forward = StoreAndForward()
|
||||
store_and_forward.ParseFromString(payload)
|
||||
pass
|
||||
try:
|
||||
store_and_forward.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse STORE_FORWARD_APP packet: {e}")
|
||||
return
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP)
|
||||
|
@ -284,7 +311,11 @@ class TelemetryAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received TELEMETRY_APP packet")
|
||||
telemetry = Telemetry()
|
||||
telemetry.ParseFromString(payload)
|
||||
try:
|
||||
telemetry.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse TELEMETRY_APP packet: {e}")
|
||||
return
|
||||
|
||||
if telemetry.HasField('device_metrics'):
|
||||
device_metrics: DeviceMetrics = telemetry.device_metrics
|
||||
|
@ -458,7 +489,11 @@ class TraceRouteAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received TRACEROUTE_APP packet")
|
||||
traceroute = RouteDiscovery()
|
||||
traceroute.ParseFromString(payload)
|
||||
try:
|
||||
traceroute.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse TRACEROUTE_APP packet: {e}")
|
||||
return
|
||||
if traceroute.route:
|
||||
route = traceroute.route
|
||||
self.metrics.route_discovery_gauge.labels(
|
||||
|
@ -471,7 +506,11 @@ class NeighborInfoAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received NEIGHBORINFO_APP packet")
|
||||
neighbor_info = NeighborInfo()
|
||||
neighbor_info.ParseFromString(payload)
|
||||
try:
|
||||
neighbor_info.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse NEIGHBORINFO_APP packet: {e}")
|
||||
return
|
||||
self.update_node_graph(neighbor_info, client_details)
|
||||
self.update_node_neighbors(neighbor_info, client_details)
|
||||
|
||||
|
@ -537,8 +576,11 @@ class MapReportAppProcessor(Processor):
|
|||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received MAP_REPORT_APP packet")
|
||||
map_report = MapReport()
|
||||
map_report.ParseFromString(payload)
|
||||
pass # Nothing interesting here
|
||||
try:
|
||||
map_report.ParseFromString(payload)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse MAP_REPORT_APP packet: {e}")
|
||||
return
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.PRIVATE_APP)
|
||||
|
|
56
main.py
56
main.py
|
@ -5,6 +5,8 @@ from datetime import datetime
|
|||
import paho.mqtt.client as mqtt
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from constants import callback_api_version_map, protocol_map
|
||||
|
||||
try:
|
||||
from meshtastic.mesh_pb2 import MeshPacket
|
||||
from meshtastic.mqtt_pb2 import ServiceEnvelope
|
||||
|
@ -12,7 +14,6 @@ except ImportError:
|
|||
from meshtastic.protobuf.mesh_pb2 import MeshPacket
|
||||
from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope
|
||||
|
||||
from paho.mqtt.enums import CallbackAPIVersion
|
||||
from prometheus_client import CollectorRegistry, start_http_server
|
||||
from psycopg_pool import ConnectionPool
|
||||
|
||||
|
@ -47,29 +48,42 @@ def update_node_status(node_number, status):
|
|||
def handle_message(client, userdata, message):
|
||||
current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
print(f"Received message on topic '{message.topic}' at {current_timestamp}")
|
||||
if '/stat/' in message.topic:
|
||||
user_id = message.topic.split('/')[-1] # Hexadecimal user ID
|
||||
if user_id[0] == '!':
|
||||
node_number = str(int(user_id[1:], 16))
|
||||
update_node_status(node_number, message.payload.decode('utf-8'))
|
||||
if '/json/' in message.topic:
|
||||
# Ignore JSON messages as there are also protobuf messages sent on other topic
|
||||
# Source: https://github.com/meshtastic/firmware/blob/master/src/mqtt/MQTT.cpp#L448
|
||||
return
|
||||
|
||||
if '/stat/' in message.topic or '/tele/' in message.topic:
|
||||
try:
|
||||
user_id = message.topic.split('/')[-1] # Hexadecimal user ID
|
||||
if user_id[0] == '!':
|
||||
node_number = str(int(user_id[1:], 16))
|
||||
update_node_status(node_number, message.payload.decode('utf-8'))
|
||||
return
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to handle user MQTT stat: {e}")
|
||||
return
|
||||
|
||||
envelope = ServiceEnvelope()
|
||||
envelope.ParseFromString(message.payload)
|
||||
packet: MeshPacket = envelope.packet
|
||||
try:
|
||||
envelope.ParseFromString(message.payload)
|
||||
packet: MeshPacket = envelope.packet
|
||||
|
||||
with connection_pool.connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT id FROM messages WHERE id = %s", (str(packet.id),))
|
||||
if cur.fetchone() is not None:
|
||||
logging.debug(f"Packet {packet.id} already processed")
|
||||
return
|
||||
with connection_pool.connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT id FROM messages WHERE id = %s", (str(packet.id),))
|
||||
if cur.fetchone() is not None:
|
||||
logging.debug(f"Packet {packet.id} already processed")
|
||||
return
|
||||
|
||||
cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING",
|
||||
(str(packet.id),))
|
||||
conn.commit()
|
||||
cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING",
|
||||
(str(packet.id),))
|
||||
conn.commit()
|
||||
|
||||
processor.process(packet)
|
||||
processor.process(packet)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to handle message: {e}")
|
||||
return
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -87,9 +101,11 @@ if __name__ == "__main__":
|
|||
start_http_server(int(os.getenv('PROMETHEUS_COLLECTOR_PORT', 8000)), registry=registry)
|
||||
|
||||
# Create an MQTT client
|
||||
mqtt_protocol = os.getenv('MQTT_PROTOCOL', 'MQTTv5')
|
||||
mqtt_callback_api_version = os.getenv('MQTT_CALLBACK_API_VERSION', 'VERSION2')
|
||||
mqtt_client = mqtt.Client(
|
||||
callback_api_version=CallbackAPIVersion.VERSION2,
|
||||
protocol=mqtt.MQTTv5
|
||||
callback_api_version=callback_api_version_map.get(mqtt_callback_api_version, mqtt.CallbackAPIVersion.VERSION2),
|
||||
protocol=protocol_map.get(mqtt_protocol, mqtt.MQTTv5)
|
||||
)
|
||||
mqtt_client.on_connect = handle_connect
|
||||
mqtt_client.on_message = handle_message
|
||||
|
|
Loading…
Reference in a new issue