| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | import logging | 
					
						
							|  |  |  | import os | 
					
						
							| 
									
										
										
										
											2024-06-25 12:39:27 -07:00
										 |  |  | from datetime import datetime | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | import paho.mqtt.client as mqtt | 
					
						
							|  |  |  | from dotenv import load_dotenv | 
					
						
							| 
									
										
										
										
											2024-07-03 01:42:48 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-08 12:24:41 -07:00
										 |  |  | from constants import callback_api_version_map, protocol_map | 
					
						
							| 
									
										
										
										
											2024-08-09 03:20:31 -07:00
										 |  |  | from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics | 
					
						
							| 
									
										
										
										
											2025-02-01 02:35:57 -08:00
										 |  |  | from exporter.metric_cleanup_job import MetricTrackingRegistry | 
					
						
							| 
									
										
										
										
											2024-07-08 12:24:41 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-03 01:42:48 -07:00
										 |  |  | try: | 
					
						
							|  |  |  |     from meshtastic.mesh_pb2 import MeshPacket | 
					
						
							|  |  |  |     from meshtastic.mqtt_pb2 import ServiceEnvelope | 
					
						
							|  |  |  | except ImportError: | 
					
						
							|  |  |  |     from meshtastic.protobuf.mesh_pb2 import MeshPacket | 
					
						
							|  |  |  |     from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-02-01 02:35:57 -08:00
										 |  |  | from prometheus_client import start_http_server | 
					
						
							| 
									
										
										
										
											2024-06-28 09:49:43 -07:00
										 |  |  | from psycopg_pool import ConnectionPool | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-28 09:49:43 -07:00
										 |  |  | connection_pool = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_connection(): | 
					
						
							|  |  |  |     return connection_pool.getconn() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def release_connection(conn): | 
					
						
							|  |  |  |     connection_pool.putconn(conn) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | def handle_connect(client, userdata, flags, reason_code, properties): | 
					
						
							|  |  |  |     print(f"Connected with result code {reason_code}") | 
					
						
							| 
									
										
										
										
											2024-08-08 23:18:15 -07:00
										 |  |  |     topics = os.getenv('MQTT_TOPIC', 'msh/israel/#').split(',') | 
					
						
							|  |  |  |     topics_tuples = [(topic, 0) for topic in topics] | 
					
						
							|  |  |  |     client.subscribe(topics_tuples) | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-03 11:05:02 -07:00
										 |  |  | def update_node_status(node_number, status): | 
					
						
							|  |  |  |     with connection_pool.connection() as conn: | 
					
						
							|  |  |  |         with conn.cursor() as cur: | 
					
						
							| 
									
										
										
										
											2024-08-09 03:20:31 -07:00
										 |  |  |             cur.execute("INSERT INTO node_details (node_id, mqtt_status, short_name, long_name) VALUES (%s, %s, %s, %s)" | 
					
						
							| 
									
										
										
										
											2024-07-03 11:05:02 -07:00
										 |  |  |                         "ON CONFLICT(node_id)" | 
					
						
							| 
									
										
										
										
											2024-08-09 03:20:31 -07:00
										 |  |  |                         "DO UPDATE SET mqtt_status = %s", | 
					
						
							| 
									
										
										
										
											2024-08-09 10:29:50 -07:00
										 |  |  |                         (node_number, status, 'Unknown (MQTT)', 'Unknown (MQTT)', status)) | 
					
						
							| 
									
										
										
										
											2024-07-03 11:05:02 -07:00
										 |  |  |             conn.commit() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | def handle_message(client, userdata, message): | 
					
						
							| 
									
										
										
										
											2024-06-25 12:39:27 -07:00
										 |  |  |     current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | 
					
						
							|  |  |  |     print(f"Received message on topic '{message.topic}' at {current_timestamp}") | 
					
						
							| 
									
										
										
										
											2024-07-08 12:25:49 -07:00
										 |  |  |     if '/json/' in message.topic: | 
					
						
							| 
									
										
										
										
											2024-08-09 03:20:31 -07:00
										 |  |  |         processor.process_json_mqtt(message) | 
					
						
							| 
									
										
										
										
											2024-07-08 12:25:49 -07:00
										 |  |  |         # Ignore JSON messages as there are also protobuf messages sent on other topic | 
					
						
							|  |  |  |         # Source: https://github.com/meshtastic/firmware/blob/master/src/mqtt/MQTT.cpp#L448 | 
					
						
							| 
									
										
										
										
											2024-06-27 13:54:10 -07:00
										 |  |  |         return | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-08 12:25:49 -07:00
										 |  |  |     if '/stat/' in message.topic or '/tele/' in message.topic: | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             user_id = message.topic.split('/')[-1]  # Hexadecimal user ID | 
					
						
							|  |  |  |             if user_id[0] == '!': | 
					
						
							|  |  |  |                 node_number = str(int(user_id[1:], 16)) | 
					
						
							|  |  |  |                 update_node_status(node_number, message.payload.decode('utf-8')) | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         except Exception as e: | 
					
						
							|  |  |  |             logging.error(f"Failed to handle user MQTT stat: {e}") | 
					
						
							|  |  |  |             return | 
					
						
							| 
									
										
										
										
											2024-06-28 09:49:43 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-08 12:25:49 -07:00
										 |  |  |     envelope = ServiceEnvelope() | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         envelope.ParseFromString(message.payload) | 
					
						
							|  |  |  |         packet: MeshPacket = envelope.packet | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         with connection_pool.connection() as conn: | 
					
						
							|  |  |  |             with conn.cursor() as cur: | 
					
						
							|  |  |  |                 cur.execute("SELECT id FROM messages WHERE id = %s", (str(packet.id),)) | 
					
						
							|  |  |  |                 if cur.fetchone() is not None: | 
					
						
							|  |  |  |                     logging.debug(f"Packet {packet.id} already processed") | 
					
						
							|  |  |  |                     return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING", | 
					
						
							|  |  |  |                             (str(packet.id),)) | 
					
						
							|  |  |  |                 conn.commit() | 
					
						
							| 
									
										
										
										
											2024-08-09 03:20:31 -07:00
										 |  |  |         processor.process_mqtt(message.topic, envelope, packet) | 
					
						
							| 
									
										
										
										
											2024-07-08 12:25:49 -07:00
										 |  |  |         processor.process(packet) | 
					
						
							|  |  |  |     except Exception as e: | 
					
						
							|  |  |  |         logging.error(f"Failed to handle message: {e}") | 
					
						
							|  |  |  |         return | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     load_dotenv() | 
					
						
							| 
									
										
										
										
											2024-06-28 09:49:43 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-26 02:02:17 -07:00
										 |  |  |     # We have to load_dotenv before we can import MessageProcessor to allow filtering of message types | 
					
						
							| 
									
										
										
										
											2024-08-09 03:20:31 -07:00
										 |  |  |     from exporter.processor.processor_base import MessageProcessor | 
					
						
							| 
									
										
										
										
											2024-07-26 02:02:17 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-28 09:49:43 -07:00
										 |  |  |     # Setup a connection pool | 
					
						
							|  |  |  |     connection_pool = ConnectionPool( | 
					
						
							|  |  |  |         os.getenv('DATABASE_URL'), | 
					
						
							| 
									
										
										
										
											2024-08-09 03:20:31 -07:00
										 |  |  |         max_size=100 | 
					
						
							| 
									
										
										
										
											2024-06-28 09:49:43 -07:00
										 |  |  |     ) | 
					
						
							| 
									
										
										
										
											2024-08-09 03:20:31 -07:00
										 |  |  |     # Configure node configuration metrics | 
					
						
							|  |  |  |     node_conf_metrics = NodeConfigurationMetrics(connection_pool) | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     # Configure Prometheus exporter | 
					
						
							| 
									
										
										
										
											2025-02-01 02:35:57 -08:00
										 |  |  |     registry = MetricTrackingRegistry() | 
					
						
							| 
									
										
										
										
											2024-10-22 11:32:01 -07:00
										 |  |  |     start_http_server(int(os.getenv('PROMETHEUS_COLLECTOR_PORT', 9464)), registry=registry) | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     # Create an MQTT client | 
					
						
							| 
									
										
										
										
											2024-07-08 12:24:41 -07:00
										 |  |  |     mqtt_protocol = os.getenv('MQTT_PROTOCOL', 'MQTTv5') | 
					
						
							|  |  |  |     mqtt_callback_api_version = os.getenv('MQTT_CALLBACK_API_VERSION', 'VERSION2') | 
					
						
							| 
									
										
										
										
											2024-06-24 11:44:10 -07:00
										 |  |  |     mqtt_client = mqtt.Client( | 
					
						
							| 
									
										
										
										
											2024-07-08 12:24:41 -07:00
										 |  |  |         callback_api_version=callback_api_version_map.get(mqtt_callback_api_version, mqtt.CallbackAPIVersion.VERSION2), | 
					
						
							|  |  |  |         protocol=protocol_map.get(mqtt_protocol, mqtt.MQTTv5) | 
					
						
							| 
									
										
										
										
											2024-06-24 11:44:10 -07:00
										 |  |  |     ) | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  |     mqtt_client.on_connect = handle_connect | 
					
						
							|  |  |  |     mqtt_client.on_message = handle_message | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-24 11:44:10 -07:00
										 |  |  |     if os.getenv('MQTT_IS_TLS', 'false') == 'true': | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  |         tls_context = mqtt.ssl.create_default_context() | 
					
						
							|  |  |  |         mqtt_client.tls_set_context(tls_context) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-24 08:05:47 -07:00
										 |  |  |     if os.getenv('MQTT_USERNAME', None) and os.getenv('MQTT_PASSWORD', None): | 
					
						
							|  |  |  |         mqtt_client.username_pw_set(os.getenv('MQTT_USERNAME'), os.getenv('MQTT_PASSWORD')) | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-24 11:44:10 -07:00
										 |  |  |     try: | 
					
						
							|  |  |  |         mqtt_client.connect( | 
					
						
							|  |  |  |             os.getenv('MQTT_HOST'), | 
					
						
							|  |  |  |             int(os.getenv('MQTT_PORT')), | 
					
						
							|  |  |  |             keepalive=int(os.getenv('MQTT_KEEPALIVE', 60)), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |     except Exception as e: | 
					
						
							|  |  |  |         logging.error(f"Failed to connect to MQTT broker: {e}") | 
					
						
							|  |  |  |         exit(1) | 
					
						
							| 
									
										
										
										
											2024-06-28 09:49:43 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  |     # Configure the Processor and the Exporter | 
					
						
							| 
									
										
										
										
											2024-06-28 09:49:43 -07:00
										 |  |  |     processor = MessageProcessor(registry, connection_pool) | 
					
						
							| 
									
										
										
										
											2024-06-23 12:15:31 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     mqtt_client.loop_forever() |