diff --git a/.env b/.env index aea9f4f..4d4e224 100644 --- a/.env +++ b/.env @@ -26,5 +26,5 @@ MESH_HIDE_SOURCE_DATA=false MESH_HIDE_DESTINATION_DATA=false ## Filtered ports in the exporter (default: 1, can be a comma-separated list of ports) FILTERED_PORTS=0 -## Hide message content in the TEXT_MESSAGE_APP packets (default: true) +## Hide message content in the TEXT_MESSAGE_APP packets (default: true) (Currently we only log message length, if we hide then all messages would have the same length) HIDE_MESSAGE=true \ No newline at end of file diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml new file mode 100644 index 0000000..5391cb6 --- /dev/null +++ b/.idea/dataSources.xml @@ -0,0 +1,12 @@ + + + + + redis + true + jdbc.RedisDriver + jdbc:redis://localhost:6379/0 + $ProjectFileDir$ + + + \ No newline at end of file diff --git a/exporter/processors.py b/exporter/processors.py index 679f5e5..9b3d5fe 100644 --- a/exporter/processors.py +++ b/exporter/processors.py @@ -1,27 +1,120 @@ +import json import os import redis from meshtastic.config_pb2 import Config from meshtastic.mesh_pb2 import MeshPacket, HardwareModel -from prometheus_client import CollectorRegistry, Counter +from meshtastic.portnums_pb2 import PortNum +from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge from exporter.registry import ProcessorRegistry, ClientDetails class MessageProcessor: def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): + self.rx_rssi_gauge = None + self.channel_counter = None + self.packet_id_counter = None + self.hop_start_gauge = None + self.via_mqtt_counter = None + self.want_ack_counter = None + self.hop_limit_counter = None + self.rx_snr_gauge = None + self.rx_time_histogram = None + self.total_packets_counter = None + self.destination_message_type_counter = None + self.source_message_type_counter = None self.registry = registry self.redis_client = redis_client - self.counter = Counter('mesh_packets', 'Number of mesh packets processed', - [ - 'source_id', 'source_short_name', 'source_long_name', - 'destination_id', 'destination_short_name', 'destination_long_name', - 'portnum', - 'rx_time', 'rx_snr', 'hop_limit', 'want_ack', 'via_mqtt', 'hop_start' - ], - registry=self.registry) + self.init_metrics() self.processor_registry = ProcessorRegistry() + def init_metrics(self): + # Source-related counters + self.source_message_type_counter = Counter( + 'mesh_packet_source_types', + 'Types of mesh packets processed by source', + ['source_id', 'portnum'], + registry=self.registry + ) + # Destination-related counters + self.destination_message_type_counter = Counter( + 'mesh_packet_destination_types', + 'Types of mesh packets processed by destination', + ['destination_id', 'portnum'], + registry=self.registry + ) + # Counters for the total number of packets + self.total_packets_counter = Counter( + 'mesh_packet_total', + 'Total number of mesh packets processed', + ['source_id', 'destination_id'], + registry=self.registry + ) + # Histogram for the rx_time (time in seconds) + self.rx_time_histogram = Histogram( + 'mesh_packet_rx_time', + 'Receive time of mesh packets (seconds since 1970)', + ['source_id', 'destination_id'], + registry=self.registry + ) + # Gauge for the rx_snr (signal-to-noise ratio) + self.rx_snr_gauge = Gauge( + 'mesh_packet_rx_snr', + 'Receive SNR of mesh packets', + ['source_id', 'destination_id'], + registry=self.registry + ) + # Counter for hop_limit + self.hop_limit_counter = Counter( + 'mesh_packet_hop_limit', + 'Hop limit of mesh packets', + ['source_id', 'destination_id'], + registry=self.registry + ) + # Counter for want_ack (occurrences of want_ack set to true) + self.want_ack_counter = Counter( + 'mesh_packet_want_ack', + 'Occurrences of want ACK for mesh packets', + ['source_id', 'destination_id'], + registry=self.registry + ) + # Counter for via_mqtt (occurrences of via_mqtt set to true) + self.via_mqtt_counter = Counter( + 'mesh_packet_via_mqtt', + 'Occurrences of mesh packets sent via MQTT', + ['source_id', 'destination_id'], + registry=self.registry + ) + # Gauge for hop_start + self.hop_start_gauge = Gauge( + 'mesh_packet_hop_start', + 'Hop start of mesh packets', + ['source_id', 'destination_id'], + registry=self.registry + ) + # Counter for unique packet IDs + self.packet_id_counter = Counter( + 'mesh_packet_ids', + 'Unique IDs for mesh packets', + ['source_id', 'destination_id', 'packet_id'], + registry=self.registry + ) + # Counter for the channel used + self.channel_counter = Counter( + 'mesh_packet_channel', + 'Channel used for mesh packets', + ['source_id', 'destination_id', 'channel'], + registry=self.registry + ) + # Gauge for the rx_rssi (received signal strength indicator) + self.rx_rssi_gauge = Gauge( + 'mesh_packet_rx_rssi', + 'Receive RSSI of mesh packets', + ['source_id', 'destination_id'], + registry=self.registry + ) + def process(self, mesh_packet: MeshPacket): port_num = int(mesh_packet.decoded.portnum) payload = mesh_packet.decoded.payload @@ -39,35 +132,102 @@ class MessageProcessor: if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports return None # Ignore this packet - self.counter.labels( - source_id=source_client_details.node_id, - source_short_name=source_client_details.short_name, - source_long_name=source_client_details.long_name, - - destination_id=destination_client_details.node_id, - destination_short_name=destination_client_details.short_name, - destination_long_name=destination_client_details.long_name, - - rx_time=mesh_packet.rx_time, - rx_snr=mesh_packet.rx_snr, - hop_limit=mesh_packet.hop_limit, - want_ack=mesh_packet.want_ack, - via_mqtt=mesh_packet.via_mqtt, - hop_start=mesh_packet.hop_start, - portnum=port_num - ).inc() + self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details) processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.redis_client) processor.process(payload, client_details=source_client_details) + def get_port_name_from_portnum(self, port_num): + for name, value in PortNum.__dict__.items(): + if isinstance(value, int) and value == port_num: + return name + return 'UNKNOWN_PORT' + + def process_simple_packet_details(self, destination_client_details, mesh_packet, port_num, source_client_details): + self.source_message_type_counter.labels( + source_id=source_client_details.node_id, + portnum=self.get_port_name_from_portnum(port_num) + ).inc() + + self.destination_message_type_counter.labels( + destination_id=destination_client_details.node_id, + portnum=self.get_port_name_from_portnum(port_num) + ).inc() + + # Increment the total packets counter + self.total_packets_counter.labels( + source_id=source_client_details.node_id, + destination_id=destination_client_details.node_id + ).inc() + + # Observe the rx_time in the histogram + self.rx_time_histogram.labels( + source_id=source_client_details.node_id, + destination_id=destination_client_details.node_id + ).observe(mesh_packet.rx_time) + + # Set the rx_snr in the gauge + self.rx_snr_gauge.labels( + source_id=source_client_details.node_id, + destination_id=destination_client_details.node_id + ).set(mesh_packet.rx_snr) + + # Increment the hop_limit counter + self.hop_limit_counter.labels( + source_id=source_client_details.node_id, + destination_id=destination_client_details.node_id + ).inc(mesh_packet.hop_limit) + + # Increment the want_ack counter if want_ack is true + if mesh_packet.want_ack: + self.want_ack_counter.labels( + source_id=source_client_details.node_id, + destination_id=destination_client_details.node_id + ).inc() + + # Increment the via_mqtt counter if via_mqtt is true + if mesh_packet.via_mqtt: + self.via_mqtt_counter.labels( + source_id=source_client_details.node_id, + destination_id=destination_client_details.node_id + ).inc() + + # Set the hop_start in the gauge + self.hop_start_gauge.labels( + source_id=source_client_details.node_id, + destination_id=destination_client_details.node_id + ).set(mesh_packet.hop_start) + + # Increment the unique packet ID counter + self.packet_id_counter.labels( + source_id=source_client_details.node_id, + destination_id=destination_client_details.node_id, + packet_id=mesh_packet.id + ).inc() + + # Increment the channel counter + self.channel_counter.labels( + source_id=source_client_details.node_id, + destination_id=destination_client_details.node_id, + channel=mesh_packet.channel + ).inc() + + # Set the rx_rssi in the gauge + self.rx_rssi_gauge.labels( + source_id=source_client_details.node_id, + destination_id=destination_client_details.node_id + ).set(mesh_packet.rx_rssi) + def _get_client_details(self, node_id: str) -> ClientDetails: - details = self.redis_client.hgetall(f"node:{node_id}") - if details: + user_details_json = self.redis_client.get(f"node:{node_id}") + if user_details_json is not None: + # Decode the JSON string to a Python dictionary + user_details = json.loads(user_details_json) return ClientDetails(node_id=node_id, - short_name=details.get('short_name', 'Unknown'), - long_name=details.get('long_name', 'Unknown'), - hardware_model=details.get('hardware_model', HardwareModel.UNSET), - role=details.get('role', Config.DeviceConfig.Role.ValueType.UNSET), + short_name=user_details.get('short_name', 'Unknown'), + long_name=user_details.get('long_name', 'Unknown'), + hardware_model=user_details.get('hardware_model', HardwareModel.UNSET), + role=user_details.get('role', Config.DeviceConfig.Role.ValueType), ) return ClientDetails(node_id=node_id) diff --git a/exporter/registry.py b/exporter/registry.py index 63341ba..d30b438 100644 --- a/exporter/registry.py +++ b/exporter/registry.py @@ -14,7 +14,7 @@ from meshtastic.portnums_pb2 import PortNum from meshtastic.remote_hardware_pb2 import HardwareMessage from meshtastic.storeforward_pb2 import StoreAndForward from meshtastic.telemetry_pb2 import Telemetry, DeviceMetrics, EnvironmentMetrics, AirQualityMetrics, PowerMetrics -from prometheus_client import CollectorRegistry, Counter, Gauge +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram class _Metrics: @@ -34,72 +34,319 @@ class _Metrics: def _init_metrics(self): # TODO: Go over the metrics and rethink some of them to be more like the longtitute and # latitude - The values should represent something and we shouldn't just label stuff. Also, the labels should # be less used looked upon like keys for the data - self.message_counter = Counter( - 'text_message_app', - 'Text message app payload details', - ['client_id', 'short_name', 'long_name', 'message_content'], - registry=self._registry - ) - self.telemetry_counter_device_metrics = Counter( - 'telemetry_app_device_metrics', - 'Telemetry app payload details - Device metrics', - ['client_id', 'short_name', 'long_name' - 'battery_level', 'voltage', 'channel_utilization', 'air_until_tx', - 'uptime_seconds' - ], - registry=self._registry - ) - self.telemetry_counter_environment_metrics = Counter( - 'telemetry_app_environment_metrics', - 'Telemetry app payload details - Environment metrics', - ['client_id', 'short_name', 'long_name' - 'temperature', 'relative_humidity', 'barometric_pressure', 'gas_resistance', - 'voltage', 'current', 'iaq', 'distance', 'lux', 'white_lux', 'ir_lux', 'uv_lux', 'wind_direction', - 'wind_speed', 'weight' - ], - registry=self._registry - ) - self.telemetry_counter_air_quality_metrics = Counter( - 'telemetry_app_air_quality_metrics', - 'Telemetry app payload details - Air quality metrics', - ['client_id', 'short_name', 'long_name' - 'pm10_standard', 'pm25_standard', 'pm100_standard', 'pm10_environmental', - 'pm25_environmental', 'pm100_environmental', 'particles_03um', 'particles_05um', 'particles_10um', - 'particles_25um', 'particles_50um', 'particles_100um' - ], - registry=self._registry - ) - self.telemetry_counter_power_metrics = Counter( - 'telemetry_app_power_metrics', - 'Telemetry app payload details - Power metrics', - ['client_id', 'short_name', 'long_name' - 'ch1_voltage', 'ch1_current', 'ch2_voltage', 'ch2_current', 'ch3_voltage', - 'ch3_current' - ], + # Histogram for the length of messages + self._init_metrics_text_message() + self._init_metrics_telemetry_device() + self._init_metrics_telemetry_environment() + self._init_metrics_telemetry_air_quality() + self._init_metrics_telemetry_power() + self._init_metrics_position() + self._init_route_discovery_metrics() + + def _init_metrics_text_message(self): + self.message_length_histogram = Histogram( + 'text_message_app_length', + 'Length of text messages processed by the app', + ['client_id'], registry=self._registry ) + + def _init_metrics_position(self): self.device_latitude_gauge = Gauge( 'device_latitude', 'Device latitude', - ['client_id', 'short_name', 'long_name'], + ['client_id'], registry=self._registry ) self.device_longitude_gauge = Gauge( 'device_longitude', 'Device longitude', - ['client_id', 'short_name', 'long_name'], + ['client_id'], registry=self._registry ) self.device_altitude_gauge = Gauge( 'device_altitude', 'Device altitude', - ['client_id', 'short_name', 'long_name'], + ['client_id'], registry=self._registry ) self.device_position_precision_gauge = Gauge( 'device_position_precision', 'Device position precision', - ['client_id', 'short_name', 'long_name'], + ['client_id'], + registry=self._registry + ) + + def _init_metrics_telemetry_power(self): + self.ch1_voltage_gauge = Gauge( + 'telemetry_app_ch1_voltage', + 'Voltage measured by the device on channel 1', + ['client_id'], + registry=self._registry + ) + + self.ch1_current_gauge = Gauge( + 'telemetry_app_ch1_current', + 'Current measured by the device on channel 1', + ['client_id'], + registry=self._registry + ) + + self.ch2_voltage_gauge = Gauge( + 'telemetry_app_ch2_voltage', + 'Voltage measured by the device on channel 2', + ['client_id'], + registry=self._registry + ) + + self.ch2_current_gauge = Gauge( + 'telemetry_app_ch2_current', + 'Current measured by the device on channel 2', + ['client_id'], + registry=self._registry + ) + + self.ch3_voltage_gauge = Gauge( + 'telemetry_app_ch3_voltage', + 'Voltage measured by the device on channel 3', + ['client_id'], + registry=self._registry + ) + + self.ch3_current_gauge = Gauge( + 'telemetry_app_ch3_current', + 'Current measured by the device on channel 3', + ['client_id'], + registry=self._registry + ) + + def _init_metrics_telemetry_air_quality(self): + self.pm10_standard_gauge = Gauge( + 'telemetry_app_pm10_standard', + 'Concentration Units Standard PM1.0', + ['client_id'], + registry=self._registry + ) + + self.pm25_standard_gauge = Gauge( + 'telemetry_app_pm25_standard', + 'Concentration Units Standard PM2.5', + ['client_id'], + registry=self._registry + ) + + self.pm100_standard_gauge = Gauge( + 'telemetry_app_pm100_standard', + 'Concentration Units Standard PM10.0', + ['client_id'], + registry=self._registry + ) + + self.pm10_environmental_gauge = Gauge( + 'telemetry_app_pm10_environmental', + 'Concentration Units Environmental PM1.0', + ['client_id'], + registry=self._registry + ) + + self.pm25_environmental_gauge = Gauge( + 'telemetry_app_pm25_environmental', + 'Concentration Units Environmental PM2.5', + ['client_id'], + registry=self._registry + ) + + self.pm100_environmental_gauge = Gauge( + 'telemetry_app_pm100_environmental', + 'Concentration Units Environmental PM10.0', + ['client_id'], + registry=self._registry + ) + + self.particles_03um_gauge = Gauge( + 'telemetry_app_particles_03um', + '0.3um Particle Count', + ['client_id'], + registry=self._registry + ) + + self.particles_05um_gauge = Gauge( + 'telemetry_app_particles_05um', + '0.5um Particle Count', + ['client_id'], + registry=self._registry + ) + + self.particles_10um_gauge = Gauge( + 'telemetry_app_particles_10um', + '1.0um Particle Count', + ['client_id'], + registry=self._registry + ) + + self.particles_25um_gauge = Gauge( + 'telemetry_app_particles_25um', + '2.5um Particle Count', + ['client_id'], + registry=self._registry + ) + + self.particles_50um_gauge = Gauge( + 'telemetry_app_particles_50um', + '5.0um Particle Count', + ['client_id'], + registry=self._registry + ) + + self.particles_100um_gauge = Gauge( + 'telemetry_app_particles_100um', + '10.0um Particle Count', + ['client_id'], + registry=self._registry + ) + + def _init_metrics_telemetry_environment(self): + # Define gauges for environment metrics + self.temperature_gauge = Gauge( + 'telemetry_app_temperature', + 'Temperature measured by the device', + ['client_id'], + registry=self._registry + ) + + self.relative_humidity_gauge = Gauge( + 'telemetry_app_relative_humidity', + 'Relative humidity percent measured by the device', + ['client_id'], + registry=self._registry + ) + + self.barometric_pressure_gauge = Gauge( + 'telemetry_app_barometric_pressure', + 'Barometric pressure in hPA measured by the device', + ['client_id'], + registry=self._registry + ) + + self.gas_resistance_gauge = Gauge( + 'telemetry_app_gas_resistance', + 'Gas resistance in MOhm measured by the device', + ['client_id'], + registry=self._registry + ) + + self.iaq_gauge = Gauge( + 'telemetry_app_iaq', + 'IAQ value measured by the device (0-500)', + ['client_id'], + registry=self._registry + ) + + self.distance_gauge = Gauge( + 'telemetry_app_distance', + 'Distance measured by the device in mm', + ['client_id'], + registry=self._registry + ) + + self.lux_gauge = Gauge( + 'telemetry_app_lux', + 'Ambient light measured by the device in Lux', + ['client_id'], + registry=self._registry + ) + + self.white_lux_gauge = Gauge( + 'telemetry_app_white_lux', + 'White light measured by the device in Lux', + ['client_id'], + registry=self._registry + ) + + self.ir_lux_gauge = Gauge( + 'telemetry_app_ir_lux', + 'Infrared light measured by the device in Lux', + ['client_id'], + registry=self._registry + ) + + self.uv_lux_gauge = Gauge( + 'telemetry_app_uv_lux', + 'Ultraviolet light measured by the device in Lux', + ['client_id'], + registry=self._registry + ) + + self.wind_direction_gauge = Gauge( + 'telemetry_app_wind_direction', + 'Wind direction in degrees measured by the device', + ['client_id'], + registry=self._registry + ) + + self.wind_speed_gauge = Gauge( + 'telemetry_app_wind_speed', + 'Wind speed in m/s measured by the device', + ['client_id'], + registry=self._registry + ) + + self.weight_gauge = Gauge( + 'telemetry_app_weight', + 'Weight in KG measured by the device', + ['client_id'], + registry=self._registry + ) + + def _init_metrics_telemetry_device(self): + self.battery_level_gauge = Gauge( + 'telemetry_app_battery_level', + 'Battery level of the device (0-100, >100 means powered)', + ['client_id'], + registry=self._registry + ) + + self.voltage_gauge = Gauge( + 'telemetry_app_voltage', + 'Voltage measured by the device', + ['client_id'], + registry=self._registry + ) + + # Define gauges for channel utilization and air utilization for TX + self.channel_utilization_gauge = Gauge( + 'telemetry_app_channel_utilization', + 'Utilization for the current channel, including well-formed TX, RX, and noise', + ['client_id'], + registry=self._registry + ) + + self.air_util_tx_gauge = Gauge( + 'telemetry_app_air_util_tx', + 'Percent of airtime for transmission used within the last hour', + ['client_id'], + registry=self._registry + ) + + # Define a counter for uptime in seconds + self.uptime_seconds_counter = Counter( + 'telemetry_app_uptime_seconds', + 'How long the device has been running since the last reboot (in seconds)', + ['client_id'], + registry=self._registry + ) + + def _init_route_discovery_metrics(self): + self.route_discovery_counter = Counter( + 'route_length', + 'Number of nodes in the route', + ['client_id'], + registry=self._registry + ) + self.route_discovery_response_counter = Counter( + 'route_response', + 'Number of responses to route discovery', + ['client_id', 'response_type'], registry=self._registry ) @@ -113,6 +360,27 @@ class ClientDetails: self.hardware_model: HardwareModel = hardware_model self.role: Config.DeviceConfig.Role = role + def get_role_name_from_role(self): + for name, value in Config.DeviceConfig.Role.__dict__.items(): + if isinstance(value, int) and value == self.role: + return name + return 'UNKNOWN_ROLE' + + def get_hardware_model_name_from_code(self): + for name, value in HardwareModel.__dict__.items(): + if isinstance(value, int) and value == self.hardware_model: + return name + return 'UNKNOWN_HARDWARE_MODEL' + + def to_dict(self): + return { + 'node_id': self.node_id, + 'short_name': self.short_name, + 'long_name': self.long_name, + 'hardware_model': self.get_hardware_model_name_from_code(), + 'role': self.get_role_name_from_role() + } + class Processor(ABC): def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): @@ -155,14 +423,12 @@ class TextMessageAppProcessor(Processor): def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received TEXT_MESSAGE_APP packet") message = payload.decode('utf-8') - if os.getenv('HIDE_MESSAGE', 'true') == 'true': + if os.getenv('HIDE_MESSAGE', 'true') == 'true': # Currently there is no use for the message content, + # but later we could store it in redis or something message = 'Hidden' - self.metrics.message_counter.labels( - client_id=client_details.node_id, - short_name=client_details.short_name, - long_name=client_details.long_name, - message_content=message - ).inc() + self.metrics.message_length_histogram.labels( + client_id=client_details.node_id + ).observe(len(message)) @ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP) @@ -182,24 +448,16 @@ class PositionAppProcessor(Processor): position.ParseFromString(payload) self.metrics.device_latitude_gauge.labels( client_id=client_details.node_id, - short_name=client_details.short_name, - long_name=client_details.long_name ).set(position.latitude_i) self.metrics.device_longitude_gauge.labels( client_id=client_details.node_id, - short_name=client_details.short_name, - long_name=client_details.long_name ).set(position.longitude_i) self.metrics.device_altitude_gauge.labels( client_id=client_details.node_id, - short_name=client_details.short_name, - long_name=client_details.long_name ).set(position.altitude) self.metrics.device_position_precision_gauge.labels( client_id=client_details.node_id, - short_name=client_details.short_name, - long_name=client_details.long_name - ).set(position.position_precision) + ).set(position.precision_bits) pass @@ -209,14 +467,11 @@ class NodeInfoAppProcessor(Processor): logger.debug("Received NODEINFO_APP packet") user = User() user.ParseFromString(payload) - user_details = { - 'short_name': user.short_name, - 'long_name': user.long_name, - 'id': user.id, - 'hardware_model': user.hardware_model, - 'role': user.role, - } - user_details_json = json.dumps(user_details) + client_details.short_name = user.short_name + client_details.long_name = user.long_name + client_details.hardware_model = user.hw_model + client_details.role = user.role + user_details_json = json.dumps(client_details.to_dict()) self.redis_client.set(f"node:{client_details.node_id}", user_details_json) pass @@ -227,7 +482,16 @@ class RoutingAppProcessor(Processor): logger.debug("Received ROUTING_APP packet") routing = Routing() routing.ParseFromString(payload) - pass + self.metrics.route_discovery_response_counter.labels( + client_id=client_details.node_id, + response_type=self.get_error_name_from_routing(routing.error_reason) + ).inc() + + def get_error_name_from_routing(self, error_code): + for name, value in Routing.Error.__dict__.items(): + if isinstance(value, int) and value == error_code: + return name + return 'UNKNOWN_ERROR' @ProcessorRegistry.register_processor(PortNum.ADMIN_APP) @@ -318,76 +582,165 @@ class RangeTestAppProcessor(Processor): @ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP) class TelemetryAppProcessor(Processor): + def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): + super().__init__(registry, redis_client) + def process(self, payload: bytes, client_details: ClientDetails): logger.debug("Received TELEMETRY_APP packet") telemetry = Telemetry() telemetry.ParseFromString(payload) + if telemetry.HasField('device_metrics'): device_metrics: DeviceMetrics = telemetry.device_metrics - self.metrics.telemetry_counter_device_metrics.labels( + self.metrics.battery_level_gauge.labels( client_id=client_details.node_id, - short_name=client_details.short_name, - long_name=client_details.long_name, - battery_level=device_metrics.battery_level, - voltage=device_metrics.voltage, - channel_utilization=device_metrics.channel_utilization, - air_until_tx=device_metrics.air_until_tx, - uptime_seconds=device_metrics.uptime_seconds - ).inc() + ).set(getattr(device_metrics, 'battery_level', 0)) + + self.metrics.voltage_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(device_metrics, 'voltage', 0)) + + self.metrics.channel_utilization_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(device_metrics, 'channel_utilization', 0)) + + self.metrics.air_util_tx_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(device_metrics, 'air_util_tx', 0)) + + self.metrics.uptime_seconds_counter.labels( + client_id=client_details.node_id, + ).inc(getattr(device_metrics, 'uptime_seconds', 0)) + if telemetry.HasField('environment_metrics'): environment_metrics: EnvironmentMetrics = telemetry.environment_metrics - self.metrics.telemetry_counter_environment_metrics.labels( + self.metrics.temperature_gauge.labels( client_id=client_details.node_id, - short_name=client_details.short_name, - long_name=client_details.long_name, - temperature=environment_metrics.temperature, - relative_humidity=environment_metrics.relative_humidity, - barometric_pressure=environment_metrics.barometric_pressure, - gas_resistance=environment_metrics.gas_resistance, - voltage=environment_metrics.voltage, - current=environment_metrics.current, - iaq=environment_metrics.iaq, - distance=environment_metrics.distance, - lux=environment_metrics.lux, - white_lux=environment_metrics.white_lux, - ir_lux=environment_metrics.ir_lux, - uv_lux=environment_metrics.uv_lux, - wind_direction=environment_metrics.wind_direction, - wind_speed=environment_metrics.wind_speed, - weight=environment_metrics.weight - ).inc() + ).set(getattr(environment_metrics, 'temperature', 0)) + + self.metrics.relative_humidity_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'relative_humidity', 0)) + + self.metrics.barometric_pressure_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'barometric_pressure', 0)) + + self.metrics.gas_resistance_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'gas_resistance', 0)) + + self.metrics.iaq_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'iaq', 0)) + + self.metrics.distance_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'distance', 0)) + + self.metrics.lux_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'lux', 0)) + + self.metrics.white_lux_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'white_lux', 0)) + + self.metrics.ir_lux_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'ir_lux', 0)) + + self.metrics.uv_lux_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'uv_lux', 0)) + + self.metrics.wind_direction_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'wind_direction', 0)) + + self.metrics.wind_speed_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'wind_speed', 0)) + + self.metrics.weight_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(environment_metrics, 'weight', 0)) + if telemetry.HasField('air_quality_metrics'): air_quality_metrics: AirQualityMetrics = telemetry.air_quality_metrics - self.metrics.telemetry_counter_air_quality_metrics.labels( + self.metrics.pm10_standard_gauge.labels( client_id=client_details.node_id, - short_name=client_details.short_name, - long_name=client_details.long_name, - pm10_standard=air_quality_metrics.pm10_standard, - pm25_standard=air_quality_metrics.pm25_standard, - pm100_standard=air_quality_metrics.pm100_standard, - pm10_environmental=air_quality_metrics.pm10_environmental, - pm25_environmental=air_quality_metrics.pm25_environmental, - pm100_environmental=air_quality_metrics.pm100_environmental, - particles_03um=air_quality_metrics.particles_03um, - particles_05um=air_quality_metrics.particles_05um, - particles_10um=air_quality_metrics.particles_10um, - particles_25um=air_quality_metrics.particles_25um, - particles_50um=air_quality_metrics.particles_50um, - particles_100um=air_quality_metrics.particles_100um - ).inc() + ).set(getattr(air_quality_metrics, 'pm10_standard', 0)) + + self.metrics.pm25_standard_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'pm25_standard', 0)) + + self.metrics.pm100_standard_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'pm100_standard', 0)) + + self.metrics.pm10_environmental_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'pm10_environmental', 0)) + + self.metrics.pm25_environmental_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'pm25_environmental', 0)) + + self.metrics.pm100_environmental_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'pm100_environmental', 0)) + + self.metrics.particles_03um_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'particles_03um', 0)) + + self.metrics.particles_05um_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'particles_05um', 0)) + + self.metrics.particles_10um_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'particles_10um', 0)) + + self.metrics.particles_25um_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'particles_25um', 0)) + + self.metrics.particles_50um_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'particles_50um', 0)) + + self.metrics.particles_100um_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(air_quality_metrics, 'particles_100um', 0)) + if telemetry.HasField('power_metrics'): power_metrics: PowerMetrics = telemetry.power_metrics - self.metrics.telemetry_counter_power_metrics.labels( + self.metrics.ch1_voltage_gauge.labels( client_id=client_details.node_id, - short_name=client_details.short_name, - long_name=client_details.long_name, - ch1_voltage=power_metrics.ch1_voltage, - ch1_current=power_metrics.ch1_current, - ch2_voltage=power_metrics.ch2_voltage, - ch2_current=power_metrics.ch2_current, - ch3_voltage=power_metrics.ch3_voltage, - ch3_current=power_metrics.ch3_current - ).inc() + ).set(getattr(power_metrics, 'ch1_voltage', 0)) + + self.metrics.ch1_current_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(power_metrics, 'ch1_current', 0)) + + self.metrics.ch2_voltage_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(power_metrics, 'ch2_voltage', 0)) + + self.metrics.ch2_current_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(power_metrics, 'ch2_current', 0)) + + self.metrics.ch3_voltage_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(power_metrics, 'ch3_voltage', 0)) + + self.metrics.ch3_current_gauge.labels( + client_id=client_details.node_id, + ).set(getattr(power_metrics, 'ch3_current', 0)) @ProcessorRegistry.register_processor(PortNum.ZPS_APP) @@ -410,7 +763,11 @@ class TraceRouteAppProcessor(Processor): logger.debug("Received TRACEROUTE_APP packet") traceroute = RouteDiscovery() traceroute.ParseFromString(payload) - pass + if traceroute.route: + route = traceroute.route + self.metrics.route_discovery_counter.labels( + client_id=client_details.node_id + ).inc(len(route)) @ProcessorRegistry.register_processor(PortNum.NEIGHBORINFO_APP) @@ -435,7 +792,7 @@ class MapReportAppProcessor(Processor): logger.debug("Received MAP_REPORT_APP packet") map_report = MapReport() map_report.ParseFromString(payload) - pass + pass # Nothing interesting here @ProcessorRegistry.register_processor(PortNum.PRIVATE_APP) diff --git a/main.py b/main.py index b6b94db..8fb4b92 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ import logging import os +from datetime import datetime import paho.mqtt.client as mqtt import redis @@ -18,7 +19,8 @@ def handle_connect(client, userdata, flags, reason_code, properties): def handle_message(client, userdata, message): - print(f"Received message on topic '{message.topic}'") + current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print(f"Received message on topic '{message.topic}' at {current_timestamp}") envelope = ServiceEnvelope() envelope.ParseFromString(message.payload)