diff --git a/exporter/processors.py b/exporter/processors.py index 1127e5d..44637c4 100644 --- a/exporter/processors.py +++ b/exporter/processors.py @@ -1,16 +1,41 @@ +import redis from meshtastic.mesh_pb2 import MeshPacket -from prometheus_client import CollectorRegistry +from prometheus_client import CollectorRegistry, Counter from exporter.registry import ProcessorRegistry class MessageProcessor: - def __init__(self, registry: CollectorRegistry): + def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): self.registry = registry + self.redis_client = redis_client + self.counter = Counter('mesh_packets', 'Number of mesh packets processed', + ['source_id', 'source_short_name', 'source_long_name', 'portnum'], + registry=self.registry) def process(self, mesh_packet: MeshPacket): port_num = mesh_packet.decoded.portnum payload = mesh_packet.decoded.payload - processor = ProcessorRegistry.get_processor(port_num)(self.registry) + processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.redis_client) - processor.process(payload) + client_details = self._get_client_details(mesh_packet) + self.counter.labels( + source_id=client_details['id'], + source_short_name=client_details['short_name'], + source_long_name=client_details['long_name'], + portnum=port_num + ).inc() + processor.process_packet(payload) + + def _get_client_details(self, mesh_packet: MeshPacket): + from_id = mesh_packet['from'] + + details = self.redis_client.hgetall(f"node:{from_id}") + if details: + return details + + return { + 'id': from_id, + 'short_name': 'Unknown', + 'long_name': 'Unknown', + } diff --git a/exporter/registry.py b/exporter/registry.py index 857594b..b2e4231 100644 --- a/exporter/registry.py +++ b/exporter/registry.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from venv import logger +import redis import unishox2 from meshtastic.admin_pb2 import AdminMessage from meshtastic.mesh_pb2 import Position, User, Routing, Waypoint, RouteDiscovery, NeighborInfo @@ -14,8 +15,9 @@ from prometheus_client import CollectorRegistry class Processor(ABC): - def __init__(self, registry: CollectorRegistry): + def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): self.registry = registry + self.redis_client = redis_client @abstractmethod def process(self, payload): diff --git a/main.py b/main.py index 912d5b7..4e4be23 100644 --- a/main.py +++ b/main.py @@ -67,6 +67,6 @@ if __name__ == "__main__": keepalive=int(os.getenv('mqtt_keepalive', 60)), ) # Configure the Processor and the Exporter - processor = MessageProcessor(registry) + processor = MessageProcessor(registry, redis_client) mqtt_client.loop_forever()