diff --git a/.env b/.env index 4845ffc..aea9f4f 100644 --- a/.env +++ b/.env @@ -7,20 +7,24 @@ REDIS_DB=0 REDIS_PASSWORD= # Prometheus connection details -PROMETHEUS_PUSHGATEWAY=http://localhost:9091 +PROMETHEUS_COLLECTOR_PORT=9464 PROMETHEUS_JOB=example # MQTT connection details -MQTT_HOST=localhost +MQTT_HOST=172.232.220.244 MQTT_PORT=1883 -MQTT_USERNAME= -MQTT_PASSWORD= +MQTT_USERNAME=israeli +MQTT_PASSWORD=israeli123 MQTT_KEEPALIVE=60 MQTT_TOPIC='msh/israel/#' MQTT_IS_TLS=false # Exporter configuration -MESH_HIDE_SOURCE_DATA=false# Hide source data in the exporter (default: false) -MESH_HIDE_DESTINATION_DATA=false# Hide destination data in the exporter (default: false) -FILTERED_PORTS=1# Filtered ports in the exporter (default: 1, can be a comma-separated list of ports) -HIDE_MESSAGE=true# Hide message content in the TEXT_MESSAGE_APP packets (default: true) \ No newline at end of file +## Hide source data in the exporter (default: false) +MESH_HIDE_SOURCE_DATA=false +## Hide destination data in the exporter (default: false) +MESH_HIDE_DESTINATION_DATA=false +## Filtered ports in the exporter (default: 1, can be a comma-separated list of ports) +FILTERED_PORTS=0 +## Hide message content in the TEXT_MESSAGE_APP packets (default: true) +HIDE_MESSAGE=true \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 2ee4941..8fb30ee 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,11 @@ -FROM python:3.12-slim +FROM python LABEL author="Gleb Tcivie" WORKDIR /app COPY requirements.txt . -RUN pip install -r requirements.txt +COPY .env . +RUN pip3 install -r requirements.txt + +COPY exporter/ exporter COPY main.py . CMD ["python3", "-u", "main.py"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..ab8b052 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,54 @@ +version: "3" + +volumes: + prometheus_data: + grafana_data: + redis_data: + +services: + prometheus: + image: prom/prometheus:v2.51.2 + restart: unless-stopped + extra_hosts: + - "host.docker.internal:host-gateway" + networks: + - mesh-bridge + ports: + - "9090:9090" + volumes: + - prometheus_data:/prometheus + - ./prometheus.yml:/etc/prometheus/prometheus.yml + + grafana: + image: grafana/grafana-oss:10.4.2 + restart: unless-stopped + ports: + - "3000:3000" + networks: + - mesh-bridge + volumes: + - grafana_data:/var/lib/grafana + + exporter: + build: . + restart: unless-stopped + extra_hosts: + - "host.docker.internal:host-gateway" + env_file: + - .env + networks: + - mesh-bridge + + redis: + image: redis:7 + restart: unless-stopped + networks: + - mesh-bridge + ports: + - "6379:6379" + volumes: + - redis_data:/data + +networks: + mesh-bridge: + driver: bridge \ No newline at end of file diff --git a/exporter/processors.py b/exporter/processors.py index 9aaa5b5..f2d366e 100644 --- a/exporter/processors.py +++ b/exporter/processors.py @@ -4,7 +4,7 @@ import redis from meshtastic.mesh_pb2 import MeshPacket from prometheus_client import CollectorRegistry, Counter -from exporter.registry import ProcessorRegistry +from exporter.registry import ProcessorRegistry, ClientDetails class MessageProcessor: @@ -19,37 +19,33 @@ class MessageProcessor: 'rx_time', 'rx_snr', 'hop_limit', 'want_ack', 'via_mqtt', 'hop_start' ], registry=self.registry) + self.processor_registry = ProcessorRegistry() def process(self, mesh_packet: MeshPacket): port_num = int(mesh_packet.decoded.portnum) payload = mesh_packet.decoded.payload - source_client_details = self._get_client_details(mesh_packet['from']) + source_client_details = self._get_client_details(getattr(mesh_packet, 'from')) if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true': - source_client_details = { - 'id': source_client_details['id'], - 'short_name': 'Hidden', - 'long_name': 'Hidden', - } - destination_client_details = self._get_client_details(mesh_packet['to']) + source_client_details = ClientDetails(node_id=source_client_details['id'], short_name='Hidden', + long_name='Hidden') + + destination_client_details = self._get_client_details(getattr(mesh_packet, 'to')) if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true': - destination_client_details = { - 'id': destination_client_details['id'], - 'short_name': 'Hidden', - 'long_name': 'Hidden', - } + destination_client_details = ClientDetails(node_id=destination_client_details['id'], short_name='Hidden', + long_name='Hidden') if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports return None # Ignore this packet self.counter.labels( - source_id=source_client_details['id'], - source_short_name=source_client_details['short_name'], - source_long_name=source_client_details['long_name'], + source_id=source_client_details.node_id, + source_short_name=source_client_details.short_name, + source_long_name=source_client_details.long_name, - destination_id=destination_client_details['id'], - destination_short_name=destination_client_details['short_name'], - destination_long_name=destination_client_details['long_name'], + destination_id=destination_client_details.node_id, + destination_short_name=destination_client_details.short_name, + destination_long_name=destination_client_details.long_name, rx_time=mesh_packet.rx_time, rx_snr=mesh_packet.rx_snr, @@ -61,15 +57,11 @@ class MessageProcessor: ).inc() processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.redis_client) - processor.process(payload) + processor.process(payload, client_details=source_client_details) - def _get_client_details(self, id: str): - details = self.redis_client.hgetall(f"node:{id}") + def _get_client_details(self, node_id: str) -> ClientDetails: + details = self.redis_client.hgetall(f"node:{node_id}") if details: - return details + return ClientDetails(node_id=node_id, short_name=details['short_name'], long_name=details['long_name']) - return { - 'id': id, - 'short_name': 'Unknown', - 'long_name': 'Unknown', - } + return ClientDetails(node_id=node_id, short_name='Unknown', long_name='Unknown') diff --git a/exporter/registry.py b/exporter/registry.py index b1ea9c4..dfdf874 100644 --- a/exporter/registry.py +++ b/exporter/registry.py @@ -16,8 +16,8 @@ from prometheus_client import CollectorRegistry, Counter class ClientDetails: - def __init__(self, id, short_name, long_name): - self.id = id + def __init__(self, node_id, short_name, long_name): + self.node_id = node_id self.short_name = short_name self.long_name = long_name @@ -36,16 +36,16 @@ class ProcessorRegistry: _registry = {} @classmethod - def register_processor(cls, portnum): + def register_processor(cls, port_num): def inner_wrapper(wrapped_class): - cls._registry[portnum] = wrapped_class() + cls._registry[port_num] = wrapped_class return wrapped_class return inner_wrapper @classmethod - def get_processor(cls, portnum): - return cls._registry.get(portnum, UnknownAppProcessor()) + def get_processor(cls, port_num) -> type(Processor): + return cls._registry.get(port_num, UnknownAppProcessor) @ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP) @@ -55,7 +55,7 @@ class UnknownAppProcessor(Processor): return None -@ProcessorRegistry.register_processor('TEXT_MESSAGE_APP') +@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP) class TextMessageAppProcessor(Processor): def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): super().__init__(registry, redis_client) @@ -72,7 +72,7 @@ class TextMessageAppProcessor(Processor): if os.getenv('HIDE_MESSAGE', 'true') == 'true': message = 'Hidden' self.message_counter.labels( - client_id=client_details.id, + client_id=client_details.node_id, short_name=client_details.short_name, long_name=client_details.long_name, message_content=message diff --git a/main.py b/main.py index a941ed1..b6b94db 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,8 @@ import redis from dotenv import load_dotenv from meshtastic.mesh_pb2 import MeshPacket from meshtastic.mqtt_pb2 import ServiceEnvelope -from prometheus_client import push_to_gateway, CollectorRegistry +from paho.mqtt.enums import CallbackAPIVersion +from prometheus_client import CollectorRegistry, start_http_server from exporter.processors import MessageProcessor @@ -17,7 +18,7 @@ def handle_connect(client, userdata, flags, reason_code, properties): def handle_message(client, userdata, message): - print(f"Received message '{message.payload.decode()}' on topic '{message.topic}'") + print(f"Received message on topic '{message.topic}'") envelope = ServiceEnvelope() envelope.ParseFromString(message.payload) @@ -33,39 +34,47 @@ def handle_message(client, userdata, message): if __name__ == "__main__": load_dotenv() # Create Redis client - redis_client = redis.Redis( - host=os.getenv('REDIS_HOST'), - port=int(os.getenv('REDIS_PORT')), - db=int(os.getenv('REDIS_DB', 0)), - password=os.getenv('REDIS_PASSWORD', None), - ) + try: + redis_client = redis.Redis( + host=os.getenv('REDIS_HOST'), + port=int(os.getenv('REDIS_PORT')), + db=int(os.getenv('REDIS_DB', 0)), + password=os.getenv('REDIS_PASSWORD', None), + ) + except Exception as e: + logging.error(f"Failed to connect to Redis: {e}") + exit(1) # Configure Prometheus exporter registry = CollectorRegistry() - push_to_gateway( - os.getenv('PROMETHEUS_PUSHGATEWAY'), - job=os.getenv('PROMETHEUS_JOB'), - registry=registry, - ) + start_http_server(int(os.getenv('PROMETHEUS_COLLECTOR_PORT', 8000)), registry=registry) # Create an MQTT client - mqtt_client = mqtt.Client() + mqtt_client = mqtt.Client( + callback_api_version=CallbackAPIVersion.VERSION2, + protocol=mqtt.MQTTv5 + ) mqtt_client.on_connect = handle_connect mqtt_client.on_message = handle_message - if bool(os.getenv('MQTT_IS_TLS', False)): + if os.getenv('MQTT_IS_TLS', 'false') == 'true': tls_context = mqtt.ssl.create_default_context() mqtt_client.tls_set_context(tls_context) if os.getenv('MQTT_USERNAME', None) and os.getenv('MQTT_PASSWORD', None): mqtt_client.username_pw_set(os.getenv('MQTT_USERNAME'), os.getenv('MQTT_PASSWORD')) - mqtt_client.connect( - os.getenv('MQTT_HOST'), - int(os.getenv('MQTT_PORT')), - keepalive=int(os.getenv('MQTT_KEEPALIVE', 60)), - ) + try: + mqtt_client.connect( + os.getenv('MQTT_HOST'), + int(os.getenv('MQTT_PORT')), + keepalive=int(os.getenv('MQTT_KEEPALIVE', 60)), + + ) + except Exception as e: + logging.error(f"Failed to connect to MQTT broker: {e}") + exit(1) # Configure the Processor and the Exporter processor = MessageProcessor(registry, redis_client) diff --git a/prometheus.yml b/prometheus.yml new file mode 100644 index 0000000..afdc70c --- /dev/null +++ b/prometheus.yml @@ -0,0 +1,8 @@ +global: + scrape_interval: 15s # How often to scrape targets by default + evaluation_interval: 15s # How often to evaluate rules by default + +scrape_configs: + - job_name: 'meshtastic' + static_configs: + - targets: [ 'host.docker.internal:9464' ] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 4fba887..ff100c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ paho-mqtt~=2.1.0 redis~=5.0.6 python-dotenv~=1.0.1 meshtastic~=2.3.11 -prometheus_client~=0.20.0 \ No newline at end of file +prometheus_client~=0.20.0 +unishox2-py3~=1.0.0 \ No newline at end of file