Reformatted the code.
client_details.py -> Now handles the client details Class processor_base.py -> All the base classes for the processors processors.py -> All available processors registry.py -> Registry mechanism for the processors
This commit is contained in:
parent
adb10fd422
commit
af29d71a7d
|
@ -2,7 +2,7 @@
|
|||
<project version="4">
|
||||
<component name="SqlDialectMappings">
|
||||
<file url="file://$PROJECT_DIR$/docker/postgres/init.sql" dialect="PostgreSQL" />
|
||||
<file url="file://$PROJECT_DIR$/exporter/processors.py" dialect="PostgreSQL" />
|
||||
<file url="file://$PROJECT_DIR$/exporter/processor_base.py" dialect="PostgreSQL" />
|
||||
<file url="PROJECT" dialect="PostgreSQL" />
|
||||
</component>
|
||||
</project>
|
|
@ -1 +1 @@
|
|||
from .processors import MessageProcessor
|
||||
from .processor_base import MessageProcessor
|
||||
|
|
37
exporter/client_details.py
Normal file
37
exporter/client_details.py
Normal file
|
@ -0,0 +1,37 @@
|
|||
from meshtastic.config_pb2 import Config
|
||||
from meshtastic.mesh_pb2 import HardwareModel
|
||||
|
||||
|
||||
class ClientDetails:
|
||||
def __init__(self, node_id, short_name='Unknown', long_name='Unknown', hardware_model=HardwareModel.UNSET,
|
||||
role=None):
|
||||
self.node_id = node_id
|
||||
self.short_name = short_name
|
||||
self.long_name = long_name
|
||||
self.hardware_model: HardwareModel = hardware_model
|
||||
self.role: Config.DeviceConfig.Role = role
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'node_id': self.node_id,
|
||||
'short_name': self.short_name,
|
||||
'long_name': self.long_name,
|
||||
'hardware_model': self.get_hardware_model_name_from_code(self.hardware_model),
|
||||
'role': self.get_role_name_from_role(self.role)
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def get_role_name_from_role(role):
|
||||
descriptor = Config.DeviceConfig.Role.DESCRIPTOR
|
||||
for enum_value in descriptor.values:
|
||||
if enum_value.number == role:
|
||||
return enum_value.name
|
||||
return 'UNKNOWN_ROLE'
|
||||
|
||||
@staticmethod
|
||||
def get_hardware_model_name_from_code(hardware_model):
|
||||
descriptor = HardwareModel.DESCRIPTOR
|
||||
for enum_value in descriptor.values:
|
||||
if enum_value.number == hardware_model:
|
||||
return enum_value.name
|
||||
return 'UNKNOWN_HARDWARE_MODEL'
|
271
exporter/processor_base.py
Normal file
271
exporter/processor_base.py
Normal file
|
@ -0,0 +1,271 @@
|
|||
import base64
|
||||
import os
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from meshtastic.mesh_pb2 import MeshPacket, Data, HardwareModel
|
||||
from meshtastic.portnums_pb2 import PortNum
|
||||
from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge
|
||||
from psycopg_pool import ConnectionPool
|
||||
|
||||
from exporter.client_details import ClientDetails
|
||||
from exporter.processors import ProcessorRegistry
|
||||
|
||||
|
||||
class MessageProcessor:
|
||||
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
|
||||
self.rx_rssi_gauge = None
|
||||
self.channel_counter = None
|
||||
self.packet_id_counter = None
|
||||
self.hop_start_gauge = None
|
||||
self.via_mqtt_counter = None
|
||||
self.want_ack_counter = None
|
||||
self.hop_limit_counter = None
|
||||
self.rx_snr_gauge = None
|
||||
self.rx_time_histogram = None
|
||||
self.total_packets_counter = None
|
||||
self.destination_message_type_counter = None
|
||||
self.source_message_type_counter = None
|
||||
self.registry = registry
|
||||
self.db_pool = db_pool
|
||||
self.init_metrics()
|
||||
self.processor_registry = ProcessorRegistry()
|
||||
|
||||
def init_metrics(self):
|
||||
common_labels = [
|
||||
'source_id', 'source_short_name', 'source_long_name', 'source_hardware_model', 'source_role',
|
||||
'destination_id', 'destination_short_name', 'destination_long_name', 'destination_hardware_model',
|
||||
'destination_role'
|
||||
]
|
||||
|
||||
self.source_message_type_counter = Counter(
|
||||
'mesh_packet_source_types',
|
||||
'Types of mesh packets processed by source',
|
||||
common_labels + ['portnum'],
|
||||
registry=self.registry
|
||||
)
|
||||
# Destination-related counters
|
||||
self.destination_message_type_counter = Counter(
|
||||
'mesh_packet_destination_types',
|
||||
'Types of mesh packets processed by destination',
|
||||
common_labels + ['portnum'],
|
||||
registry=self.registry
|
||||
)
|
||||
# Counters for the total number of packets
|
||||
self.total_packets_counter = Counter(
|
||||
'mesh_packet_total',
|
||||
'Total number of mesh packets processed',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Histogram for the rx_time (time in seconds)
|
||||
self.rx_time_histogram = Histogram(
|
||||
'mesh_packet_rx_time',
|
||||
'Receive time of mesh packets (seconds since 1970)',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Gauge for the rx_snr (signal-to-noise ratio)
|
||||
self.rx_snr_gauge = Gauge(
|
||||
'mesh_packet_rx_snr',
|
||||
'Receive SNR of mesh packets',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Counter for hop_limit
|
||||
self.hop_limit_counter = Counter(
|
||||
'mesh_packet_hop_limit',
|
||||
'Hop limit of mesh packets',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Counter for want_ack (occurrences of want_ack set to true)
|
||||
self.want_ack_counter = Counter(
|
||||
'mesh_packet_want_ack',
|
||||
'Occurrences of want ACK for mesh packets',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Counter for via_mqtt (occurrences of via_mqtt set to true)
|
||||
self.via_mqtt_counter = Counter(
|
||||
'mesh_packet_via_mqtt',
|
||||
'Occurrences of mesh packets sent via MQTT',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Gauge for hop_start
|
||||
self.hop_start_gauge = Gauge(
|
||||
'mesh_packet_hop_start',
|
||||
'Hop start of mesh packets',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Counter for unique packet IDs
|
||||
self.packet_id_counter = Counter(
|
||||
'mesh_packet_ids',
|
||||
'Unique IDs for mesh packets',
|
||||
common_labels + ['packet_id'],
|
||||
registry=self.registry
|
||||
)
|
||||
# Counter for the channel used
|
||||
self.channel_counter = Counter(
|
||||
'mesh_packet_channel',
|
||||
'Channel used for mesh packets',
|
||||
common_labels + ['channel'],
|
||||
registry=self.registry
|
||||
)
|
||||
# Gauge for the rx_rssi (received signal strength indicator)
|
||||
self.rx_rssi_gauge = Gauge(
|
||||
'mesh_packet_rx_rssi',
|
||||
'Receive RSSI of mesh packets',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
|
||||
def process(self, mesh_packet: MeshPacket):
|
||||
if getattr(mesh_packet, 'encrypted'):
|
||||
key_bytes = base64.b64decode(os.getenv('MQTT_SERVER_KEY', '1PG7OiApB1nwvP+rz05pAQ==').encode('ascii'))
|
||||
nonce_packet_id = getattr(mesh_packet, "id").to_bytes(8, "little")
|
||||
nonce_from_node = getattr(mesh_packet, "from").to_bytes(8, "little")
|
||||
|
||||
# Put both parts into a single byte array.
|
||||
nonce = nonce_packet_id + nonce_from_node
|
||||
|
||||
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
decrypted_bytes = decryptor.update(getattr(mesh_packet, "encrypted")) + decryptor.finalize()
|
||||
|
||||
data = Data()
|
||||
data.ParseFromString(decrypted_bytes)
|
||||
mesh_packet.decoded.CopyFrom(data)
|
||||
port_num = int(mesh_packet.decoded.portnum)
|
||||
payload = mesh_packet.decoded.payload
|
||||
|
||||
source_node_id = getattr(mesh_packet, 'from')
|
||||
source_client_details = self._get_client_details(source_node_id)
|
||||
if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true':
|
||||
source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden',
|
||||
long_name='Hidden')
|
||||
|
||||
destination_node_id = getattr(mesh_packet, 'to')
|
||||
destination_client_details = self._get_client_details(destination_node_id)
|
||||
if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true':
|
||||
destination_client_details = ClientDetails(node_id=destination_client_details.node_id, short_name='Hidden',
|
||||
long_name='Hidden')
|
||||
|
||||
if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports
|
||||
return None # Ignore this packet
|
||||
|
||||
self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details)
|
||||
|
||||
processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool)
|
||||
processor.process(payload, client_details=source_client_details)
|
||||
|
||||
@staticmethod
|
||||
def get_port_name_from_portnum(port_num):
|
||||
descriptor = PortNum.DESCRIPTOR
|
||||
for enum_value in descriptor.values:
|
||||
if enum_value.number == port_num:
|
||||
return enum_value.name
|
||||
return 'UNKNOWN_PORT'
|
||||
|
||||
def process_simple_packet_details(self, destination_client_details, mesh_packet, port_num, source_client_details):
|
||||
common_labels = {
|
||||
'source_id': source_client_details.node_id,
|
||||
'source_short_name': source_client_details.short_name,
|
||||
'source_long_name': source_client_details.long_name,
|
||||
'source_hardware_model': source_client_details.hardware_model,
|
||||
'source_role': source_client_details.role,
|
||||
'destination_id': destination_client_details.node_id,
|
||||
'destination_short_name': destination_client_details.short_name,
|
||||
'destination_long_name': destination_client_details.long_name,
|
||||
'destination_hardware_model': destination_client_details.hardware_model,
|
||||
'destination_role': destination_client_details.role,
|
||||
}
|
||||
|
||||
self.source_message_type_counter.labels(
|
||||
**common_labels,
|
||||
portnum=self.get_port_name_from_portnum(port_num)
|
||||
).inc()
|
||||
|
||||
self.destination_message_type_counter.labels(
|
||||
**common_labels,
|
||||
portnum=self.get_port_name_from_portnum(port_num)
|
||||
).inc()
|
||||
|
||||
self.total_packets_counter.labels(
|
||||
**common_labels
|
||||
).inc()
|
||||
|
||||
self.rx_time_histogram.labels(
|
||||
**common_labels
|
||||
).observe(mesh_packet.rx_time)
|
||||
|
||||
self.rx_snr_gauge.labels(
|
||||
**common_labels
|
||||
).set(mesh_packet.rx_snr)
|
||||
|
||||
self.hop_limit_counter.labels(
|
||||
**common_labels
|
||||
).inc(mesh_packet.hop_limit)
|
||||
|
||||
if mesh_packet.want_ack:
|
||||
self.want_ack_counter.labels(
|
||||
**common_labels
|
||||
).inc()
|
||||
|
||||
if mesh_packet.via_mqtt:
|
||||
self.via_mqtt_counter.labels(
|
||||
**common_labels
|
||||
).inc()
|
||||
|
||||
self.hop_start_gauge.labels(
|
||||
**common_labels
|
||||
).set(mesh_packet.hop_start)
|
||||
|
||||
self.packet_id_counter.labels(
|
||||
**common_labels,
|
||||
packet_id=mesh_packet.id
|
||||
).inc()
|
||||
|
||||
# Increment the channel counter
|
||||
self.channel_counter.labels(
|
||||
**common_labels,
|
||||
channel=mesh_packet.channel
|
||||
).inc()
|
||||
|
||||
# Set the rx_rssi in the gauge
|
||||
self.rx_rssi_gauge.labels(
|
||||
**common_labels
|
||||
).set(mesh_packet.rx_rssi)
|
||||
|
||||
def _get_client_details(self, node_id: int) -> ClientDetails:
|
||||
node_id_str = str(node_id) # Convert the integer to a string
|
||||
with self.db_pool.connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
# First, try to select the existing record
|
||||
cur.execute("""
|
||||
SELECT node_id, short_name, long_name, hardware_model, role
|
||||
FROM client_details
|
||||
WHERE node_id = %s;
|
||||
""", (node_id_str,))
|
||||
result = cur.fetchone()
|
||||
|
||||
if not result:
|
||||
# If the client is not found, insert a new record
|
||||
cur.execute("""
|
||||
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
RETURNING node_id, short_name, long_name, hardware_model, role;
|
||||
""", (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None))
|
||||
conn.commit()
|
||||
result = cur.fetchone()
|
||||
|
||||
# At this point, we should always have a result, either from SELECT or INSERT
|
||||
return ClientDetails(
|
||||
node_id=result[0],
|
||||
short_name=result[1],
|
||||
long_name=result[2],
|
||||
hardware_model=result[3],
|
||||
role=result[4]
|
||||
)
|
|
@ -1,269 +1,498 @@
|
|||
import base64
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from venv import logger
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from meshtastic.mesh_pb2 import MeshPacket, Data, HardwareModel
|
||||
import psycopg
|
||||
import unishox2
|
||||
from meshtastic.admin_pb2 import AdminMessage
|
||||
from meshtastic.mesh_pb2 import Position, User, HardwareModel, Routing, Waypoint, RouteDiscovery, NeighborInfo
|
||||
from meshtastic.mqtt_pb2 import MapReport
|
||||
from meshtastic.paxcount_pb2 import Paxcount
|
||||
from meshtastic.portnums_pb2 import PortNum
|
||||
from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge
|
||||
from meshtastic.remote_hardware_pb2 import HardwareMessage
|
||||
from meshtastic.storeforward_pb2 import StoreAndForward
|
||||
from meshtastic.telemetry_pb2 import Telemetry, DeviceMetrics, EnvironmentMetrics, AirQualityMetrics, PowerMetrics
|
||||
from prometheus_client import CollectorRegistry
|
||||
from psycopg_pool import ConnectionPool
|
||||
|
||||
from exporter.registry import ProcessorRegistry, ClientDetails
|
||||
from exporter.client_details import ClientDetails
|
||||
from exporter.registry import _Metrics
|
||||
|
||||
|
||||
class MessageProcessor:
|
||||
class Processor(ABC):
|
||||
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
|
||||
self.rx_rssi_gauge = None
|
||||
self.channel_counter = None
|
||||
self.packet_id_counter = None
|
||||
self.hop_start_gauge = None
|
||||
self.via_mqtt_counter = None
|
||||
self.want_ack_counter = None
|
||||
self.hop_limit_counter = None
|
||||
self.rx_snr_gauge = None
|
||||
self.rx_time_histogram = None
|
||||
self.total_packets_counter = None
|
||||
self.destination_message_type_counter = None
|
||||
self.source_message_type_counter = None
|
||||
self.registry = registry
|
||||
self.db_pool = db_pool
|
||||
self.init_metrics()
|
||||
self.processor_registry = ProcessorRegistry()
|
||||
self.metrics = _Metrics(registry)
|
||||
|
||||
def init_metrics(self):
|
||||
common_labels = [
|
||||
'source_id', 'source_short_name', 'source_long_name', 'source_hardware_model', 'source_role',
|
||||
'destination_id', 'destination_short_name', 'destination_long_name', 'destination_hardware_model',
|
||||
'destination_role'
|
||||
]
|
||||
@abstractmethod
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
pass
|
||||
|
||||
self.source_message_type_counter = Counter(
|
||||
'mesh_packet_source_types',
|
||||
'Types of mesh packets processed by source',
|
||||
common_labels + ['portnum'],
|
||||
registry=self.registry
|
||||
)
|
||||
# Destination-related counters
|
||||
self.destination_message_type_counter = Counter(
|
||||
'mesh_packet_destination_types',
|
||||
'Types of mesh packets processed by destination',
|
||||
common_labels + ['portnum'],
|
||||
registry=self.registry
|
||||
)
|
||||
# Counters for the total number of packets
|
||||
self.total_packets_counter = Counter(
|
||||
'mesh_packet_total',
|
||||
'Total number of mesh packets processed',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Histogram for the rx_time (time in seconds)
|
||||
self.rx_time_histogram = Histogram(
|
||||
'mesh_packet_rx_time',
|
||||
'Receive time of mesh packets (seconds since 1970)',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Gauge for the rx_snr (signal-to-noise ratio)
|
||||
self.rx_snr_gauge = Gauge(
|
||||
'mesh_packet_rx_snr',
|
||||
'Receive SNR of mesh packets',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Counter for hop_limit
|
||||
self.hop_limit_counter = Counter(
|
||||
'mesh_packet_hop_limit',
|
||||
'Hop limit of mesh packets',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Counter for want_ack (occurrences of want_ack set to true)
|
||||
self.want_ack_counter = Counter(
|
||||
'mesh_packet_want_ack',
|
||||
'Occurrences of want ACK for mesh packets',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Counter for via_mqtt (occurrences of via_mqtt set to true)
|
||||
self.via_mqtt_counter = Counter(
|
||||
'mesh_packet_via_mqtt',
|
||||
'Occurrences of mesh packets sent via MQTT',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Gauge for hop_start
|
||||
self.hop_start_gauge = Gauge(
|
||||
'mesh_packet_hop_start',
|
||||
'Hop start of mesh packets',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
# Counter for unique packet IDs
|
||||
self.packet_id_counter = Counter(
|
||||
'mesh_packet_ids',
|
||||
'Unique IDs for mesh packets',
|
||||
common_labels + ['packet_id'],
|
||||
registry=self.registry
|
||||
)
|
||||
# Counter for the channel used
|
||||
self.channel_counter = Counter(
|
||||
'mesh_packet_channel',
|
||||
'Channel used for mesh packets',
|
||||
common_labels + ['channel'],
|
||||
registry=self.registry
|
||||
)
|
||||
# Gauge for the rx_rssi (received signal strength indicator)
|
||||
self.rx_rssi_gauge = Gauge(
|
||||
'mesh_packet_rx_rssi',
|
||||
'Receive RSSI of mesh packets',
|
||||
common_labels,
|
||||
registry=self.registry
|
||||
)
|
||||
|
||||
def process(self, mesh_packet: MeshPacket):
|
||||
if getattr(mesh_packet, 'encrypted'):
|
||||
key_bytes = base64.b64decode(os.getenv('MQTT_SERVER_KEY', '1PG7OiApB1nwvP+rz05pAQ==').encode('ascii'))
|
||||
nonce_packet_id = getattr(mesh_packet, "id").to_bytes(8, "little")
|
||||
nonce_from_node = getattr(mesh_packet, "from").to_bytes(8, "little")
|
||||
|
||||
# Put both parts into a single byte array.
|
||||
nonce = nonce_packet_id + nonce_from_node
|
||||
|
||||
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
decrypted_bytes = decryptor.update(getattr(mesh_packet, "encrypted")) + decryptor.finalize()
|
||||
|
||||
data = Data()
|
||||
data.ParseFromString(decrypted_bytes)
|
||||
mesh_packet.decoded.CopyFrom(data)
|
||||
port_num = int(mesh_packet.decoded.portnum)
|
||||
payload = mesh_packet.decoded.payload
|
||||
|
||||
source_node_id = getattr(mesh_packet, 'from')
|
||||
source_client_details = self._get_client_details(source_node_id)
|
||||
if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true':
|
||||
source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden',
|
||||
long_name='Hidden')
|
||||
|
||||
destination_node_id = getattr(mesh_packet, 'to')
|
||||
destination_client_details = self._get_client_details(destination_node_id)
|
||||
if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true':
|
||||
destination_client_details = ClientDetails(node_id=destination_client_details.node_id, short_name='Hidden',
|
||||
long_name='Hidden')
|
||||
|
||||
if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports
|
||||
return None # Ignore this packet
|
||||
|
||||
self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details)
|
||||
|
||||
processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool)
|
||||
processor.process(payload, client_details=source_client_details)
|
||||
|
||||
def get_port_name_from_portnum(self, port_num):
|
||||
descriptor = PortNum.DESCRIPTOR
|
||||
for enum_value in descriptor.values:
|
||||
if enum_value.number == port_num:
|
||||
return enum_value.name
|
||||
return 'UNKNOWN_PORT'
|
||||
|
||||
def process_simple_packet_details(self, destination_client_details, mesh_packet, port_num, source_client_details):
|
||||
common_labels = {
|
||||
'source_id': source_client_details.node_id,
|
||||
'source_short_name': source_client_details.short_name,
|
||||
'source_long_name': source_client_details.long_name,
|
||||
'source_hardware_model': source_client_details.hardware_model,
|
||||
'source_role': source_client_details.role,
|
||||
'destination_id': destination_client_details.node_id,
|
||||
'destination_short_name': destination_client_details.short_name,
|
||||
'destination_long_name': destination_client_details.long_name,
|
||||
'destination_hardware_model': destination_client_details.hardware_model,
|
||||
'destination_role': destination_client_details.role,
|
||||
}
|
||||
|
||||
self.source_message_type_counter.labels(
|
||||
**common_labels,
|
||||
portnum=self.get_port_name_from_portnum(port_num)
|
||||
).inc()
|
||||
|
||||
self.destination_message_type_counter.labels(
|
||||
**common_labels,
|
||||
portnum=self.get_port_name_from_portnum(port_num)
|
||||
).inc()
|
||||
|
||||
self.total_packets_counter.labels(
|
||||
**common_labels
|
||||
).inc()
|
||||
|
||||
self.rx_time_histogram.labels(
|
||||
**common_labels
|
||||
).observe(mesh_packet.rx_time)
|
||||
|
||||
self.rx_snr_gauge.labels(
|
||||
**common_labels
|
||||
).set(mesh_packet.rx_snr)
|
||||
|
||||
self.hop_limit_counter.labels(
|
||||
**common_labels
|
||||
).inc(mesh_packet.hop_limit)
|
||||
|
||||
if mesh_packet.want_ack:
|
||||
self.want_ack_counter.labels(
|
||||
**common_labels
|
||||
).inc()
|
||||
|
||||
if mesh_packet.via_mqtt:
|
||||
self.via_mqtt_counter.labels(
|
||||
**common_labels
|
||||
).inc()
|
||||
|
||||
self.hop_start_gauge.labels(
|
||||
**common_labels
|
||||
).set(mesh_packet.hop_start)
|
||||
|
||||
self.packet_id_counter.labels(
|
||||
**common_labels,
|
||||
packet_id=mesh_packet.id
|
||||
).inc()
|
||||
|
||||
# Increment the channel counter
|
||||
self.channel_counter.labels(
|
||||
**common_labels,
|
||||
channel=mesh_packet.channel
|
||||
).inc()
|
||||
|
||||
# Set the rx_rssi in the gauge
|
||||
self.rx_rssi_gauge.labels(
|
||||
**common_labels
|
||||
).set(mesh_packet.rx_rssi)
|
||||
|
||||
def _get_client_details(self, node_id: int) -> ClientDetails:
|
||||
node_id_str = str(node_id) # Convert the integer to a string
|
||||
def execute_db_operation(self, operation):
|
||||
with self.db_pool.connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
return operation(cur, conn)
|
||||
|
||||
|
||||
class ProcessorRegistry:
|
||||
_registry = {}
|
||||
|
||||
@classmethod
|
||||
def register_processor(cls, port_num):
|
||||
def inner_wrapper(wrapped_class):
|
||||
cls._registry[port_num] = wrapped_class
|
||||
return wrapped_class
|
||||
|
||||
return inner_wrapper
|
||||
|
||||
@classmethod
|
||||
def get_processor(cls, port_num) -> type(Processor):
|
||||
return cls._registry.get(port_num, UnknownAppProcessor)
|
||||
|
||||
|
||||
########################################################################################################################
|
||||
# PROCESSORS #
|
||||
########################################################################################################################
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP)
|
||||
class UnknownAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received UNKNOWN_APP packet")
|
||||
return None
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP)
|
||||
class TextMessageAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received TEXT_MESSAGE_APP packet")
|
||||
message = payload.decode('utf-8')
|
||||
if os.getenv('HIDE_MESSAGE', 'true') == 'true':
|
||||
message = 'Hidden'
|
||||
self.metrics.message_length_histogram.labels(
|
||||
**client_details.to_dict()
|
||||
).observe(len(message))
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP)
|
||||
class RemoteHardwareAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received REMOTE_HARDWARE_APP packet")
|
||||
hardware_message = HardwareMessage()
|
||||
hardware_message.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.POSITION_APP)
|
||||
class PositionAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received POSITION_APP packet")
|
||||
position = Position()
|
||||
position.ParseFromString(payload)
|
||||
self.metrics.device_latitude_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.latitude_i)
|
||||
self.metrics.device_longitude_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.longitude_i)
|
||||
self.metrics.device_altitude_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.altitude)
|
||||
self.metrics.device_position_precision_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.precision_bits)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.NODEINFO_APP)
|
||||
class NodeInfoAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received NODEINFO_APP packet")
|
||||
user = User()
|
||||
user.ParseFromString(payload)
|
||||
|
||||
def db_operation(cur, conn):
|
||||
# First, try to select the existing record
|
||||
cur.execute("""
|
||||
SELECT node_id, short_name, long_name, hardware_model, role
|
||||
SELECT short_name, long_name, hardware_model, role
|
||||
FROM client_details
|
||||
WHERE node_id = %s;
|
||||
""", (node_id_str,))
|
||||
result = cur.fetchone()
|
||||
""", (client_details.node_id,))
|
||||
existing_record = cur.fetchone()
|
||||
|
||||
if not result:
|
||||
# If the client is not found, insert a new record
|
||||
if existing_record:
|
||||
# If record exists, update only the fields that are provided in the new data
|
||||
update_fields = []
|
||||
update_values = []
|
||||
if user.short_name:
|
||||
update_fields.append("short_name = %s")
|
||||
update_values.append(user.short_name)
|
||||
if user.long_name:
|
||||
update_fields.append("long_name = %s")
|
||||
update_values.append(user.long_name)
|
||||
if user.hw_model != HardwareModel.UNSET:
|
||||
update_fields.append("hardware_model = %s")
|
||||
update_values.append(ClientDetails.get_hardware_model_name_from_code(user.hw_model))
|
||||
if user.role is not None:
|
||||
update_fields.append("role = %s")
|
||||
update_values.append(ClientDetails.get_role_name_from_role(user.role))
|
||||
|
||||
if update_fields:
|
||||
update_query = f"""
|
||||
UPDATE client_details
|
||||
SET {", ".join(update_fields)}
|
||||
WHERE node_id = %s
|
||||
"""
|
||||
cur.execute(update_query, update_values + [client_details.node_id])
|
||||
else:
|
||||
# If record doesn't exist, insert a new one
|
||||
cur.execute("""
|
||||
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
RETURNING node_id, short_name, long_name, hardware_model, role;
|
||||
""", (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None))
|
||||
conn.commit()
|
||||
result = cur.fetchone()
|
||||
""", (client_details.node_id, user.short_name, user.long_name,
|
||||
ClientDetails.get_hardware_model_name_from_code(user.hw_model),
|
||||
ClientDetails.get_role_name_from_role(user.role)))
|
||||
|
||||
# At this point, we should always have a result, either from SELECT or INSERT
|
||||
return ClientDetails(
|
||||
node_id=result[0],
|
||||
short_name=result[1],
|
||||
long_name=result[2],
|
||||
hardware_model=result[3],
|
||||
role=result[4]
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
self.execute_db_operation(db_operation)
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ROUTING_APP)
|
||||
class RoutingAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ROUTING_APP packet")
|
||||
routing = Routing()
|
||||
routing.ParseFromString(payload)
|
||||
self.metrics.route_discovery_response_counter.labels(
|
||||
**client_details.to_dict(),
|
||||
response_type=self.get_error_name_from_routing(routing.error_reason)
|
||||
).inc()
|
||||
|
||||
@staticmethod
|
||||
def get_error_name_from_routing(error_code):
|
||||
for name, value in Routing.Error.__dict__.items():
|
||||
if isinstance(value, int) and value == error_code:
|
||||
return name
|
||||
return 'UNKNOWN_ERROR'
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ADMIN_APP)
|
||||
class AdminAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ADMIN_APP packet")
|
||||
admin_message = AdminMessage()
|
||||
admin_message.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP)
|
||||
class TextMessageCompressedAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received TEXT_MESSAGE_COMPRESSED_APP packet")
|
||||
decompressed_payload = unishox2.decompress(payload, len(payload))
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.WAYPOINT_APP)
|
||||
class WaypointAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received WAYPOINT_APP packet")
|
||||
waypoint = Waypoint()
|
||||
waypoint.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.AUDIO_APP)
|
||||
class AudioAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received AUDIO_APP packet")
|
||||
pass # NOTE: Audio packet. should probably be processed
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.DETECTION_SENSOR_APP)
|
||||
class DetectionSensorAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received DETECTION_SENSOR_APP packet")
|
||||
pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.REPLY_APP)
|
||||
class ReplyAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received REPLY_APP packet")
|
||||
pass # NOTE: Provides a 'ping' service that replies to any packet it receives. This is useful for testing.
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.IP_TUNNEL_APP)
|
||||
class IpTunnelAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received IP_TUNNEL_APP packet")
|
||||
pass # NOTE: IP Packet. Handled by the python API, firmware ignores this one and passes it on.
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.PAXCOUNTER_APP)
|
||||
class PaxCounterAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received PAXCOUNTER_APP packet")
|
||||
paxcounter = Paxcount()
|
||||
paxcounter.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.SERIAL_APP)
|
||||
class SerialAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received SERIAL_APP packet")
|
||||
pass # NOTE: Provides a hardware serial interface to send and receive from the Meshtastic network.
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.STORE_FORWARD_APP)
|
||||
class StoreForwardAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received STORE_FORWARD_APP packet")
|
||||
store_and_forward = StoreAndForward()
|
||||
store_and_forward.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP)
|
||||
class RangeTestAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received RANGE_TEST_APP packet")
|
||||
pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP)
|
||||
class TelemetryAppProcessor(Processor):
|
||||
def __init__(self, registry: CollectorRegistry, db_connection: psycopg.connection):
|
||||
super().__init__(registry, db_connection)
|
||||
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received TELEMETRY_APP packet")
|
||||
telemetry = Telemetry()
|
||||
telemetry.ParseFromString(payload)
|
||||
|
||||
if telemetry.HasField('device_metrics'):
|
||||
device_metrics: DeviceMetrics = telemetry.device_metrics
|
||||
self.metrics.battery_level_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(device_metrics, 'battery_level', 0))
|
||||
|
||||
self.metrics.voltage_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(device_metrics, 'voltage', 0))
|
||||
|
||||
self.metrics.channel_utilization_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(device_metrics, 'channel_utilization', 0))
|
||||
|
||||
self.metrics.air_util_tx_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(device_metrics, 'air_util_tx', 0))
|
||||
|
||||
self.metrics.uptime_seconds_counter.labels(
|
||||
**client_details.to_dict()
|
||||
).inc(getattr(device_metrics, 'uptime_seconds', 0))
|
||||
|
||||
if telemetry.HasField('environment_metrics'):
|
||||
environment_metrics: EnvironmentMetrics = telemetry.environment_metrics
|
||||
self.metrics.temperature_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'temperature', 0))
|
||||
|
||||
self.metrics.relative_humidity_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'relative_humidity', 0))
|
||||
|
||||
self.metrics.barometric_pressure_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'barometric_pressure', 0))
|
||||
|
||||
self.metrics.gas_resistance_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'gas_resistance', 0))
|
||||
|
||||
self.metrics.iaq_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'iaq', 0))
|
||||
|
||||
self.metrics.distance_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'distance', 0))
|
||||
|
||||
self.metrics.lux_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'lux', 0))
|
||||
|
||||
self.metrics.white_lux_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'white_lux', 0))
|
||||
|
||||
self.metrics.ir_lux_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'ir_lux', 0))
|
||||
|
||||
self.metrics.uv_lux_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'uv_lux', 0))
|
||||
|
||||
self.metrics.wind_direction_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'wind_direction', 0))
|
||||
|
||||
self.metrics.wind_speed_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'wind_speed', 0))
|
||||
|
||||
self.metrics.weight_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'weight', 0))
|
||||
|
||||
if telemetry.HasField('air_quality_metrics'):
|
||||
air_quality_metrics: AirQualityMetrics = telemetry.air_quality_metrics
|
||||
self.metrics.pm10_standard_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm10_standard', 0))
|
||||
|
||||
self.metrics.pm25_standard_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm25_standard', 0))
|
||||
|
||||
self.metrics.pm100_standard_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm100_standard', 0))
|
||||
|
||||
self.metrics.pm10_environmental_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm10_environmental', 0))
|
||||
|
||||
self.metrics.pm25_environmental_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm25_environmental', 0))
|
||||
|
||||
self.metrics.pm100_environmental_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm100_environmental', 0))
|
||||
|
||||
self.metrics.particles_03um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_03um', 0))
|
||||
|
||||
self.metrics.particles_05um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_05um', 0))
|
||||
|
||||
self.metrics.particles_10um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_10um', 0))
|
||||
|
||||
self.metrics.particles_25um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_25um', 0))
|
||||
|
||||
self.metrics.particles_50um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_50um', 0))
|
||||
|
||||
self.metrics.particles_100um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_100um', 0))
|
||||
|
||||
if telemetry.HasField('power_metrics'):
|
||||
power_metrics: PowerMetrics = telemetry.power_metrics
|
||||
self.metrics.ch1_voltage_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch1_voltage', 0))
|
||||
|
||||
self.metrics.ch1_current_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch1_current', 0))
|
||||
|
||||
self.metrics.ch2_voltage_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch2_voltage', 0))
|
||||
|
||||
self.metrics.ch2_current_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch2_current', 0))
|
||||
|
||||
self.metrics.ch3_voltage_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch3_voltage', 0))
|
||||
|
||||
self.metrics.ch3_current_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch3_current', 0))
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ZPS_APP)
|
||||
class ZpsAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ZPS_APP packet")
|
||||
pass # NOTE: Experimental tools for estimating node position without a GPS
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.SIMULATOR_APP)
|
||||
class SimulatorAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received SIMULATOR_APP packet")
|
||||
pass # NOTE: Used to let multiple instances of Linux native applications communicate as if they did using their LoRa chip.
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.TRACEROUTE_APP)
|
||||
class TraceRouteAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received TRACEROUTE_APP packet")
|
||||
traceroute = RouteDiscovery()
|
||||
traceroute.ParseFromString(payload)
|
||||
if traceroute.route:
|
||||
route = traceroute.route
|
||||
self.metrics.route_discovery_counter.labels(
|
||||
**client_details.to_dict()
|
||||
).inc(len(route))
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.NEIGHBORINFO_APP)
|
||||
class NeighborInfoAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received NEIGHBORINFO_APP packet")
|
||||
neighbor_info = NeighborInfo()
|
||||
neighbor_info.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN)
|
||||
class AtakPluginProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ATAK_PLUGIN packet")
|
||||
pass # NOTE: ATAK Plugin
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.MAP_REPORT_APP)
|
||||
class MapReportAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received MAP_REPORT_APP packet")
|
||||
map_report = MapReport()
|
||||
map_report.ParseFromString(payload)
|
||||
pass # Nothing interesting here
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.PRIVATE_APP)
|
||||
class PrivateAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received PRIVATE_APP packet")
|
||||
pass # NOTE: Private application portnum
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ATAK_FORWARDER)
|
||||
class AtakForwarderProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ATAK_FORWARDER packet")
|
||||
pass # NOTE: ATAK Forwarder
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.MAX)
|
||||
class MaxProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received MAX packet")
|
||||
pass # NOTE: Maximum portnum value
|
||||
|
|
|
@ -1,20 +1,4 @@
|
|||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from venv import logger
|
||||
|
||||
import psycopg
|
||||
import unishox2
|
||||
from meshtastic.admin_pb2 import AdminMessage
|
||||
from meshtastic.config_pb2 import Config
|
||||
from meshtastic.mesh_pb2 import Position, User, Routing, Waypoint, RouteDiscovery, NeighborInfo, HardwareModel
|
||||
from meshtastic.mqtt_pb2 import MapReport
|
||||
from meshtastic.paxcount_pb2 import Paxcount
|
||||
from meshtastic.portnums_pb2 import PortNum
|
||||
from meshtastic.remote_hardware_pb2 import HardwareMessage
|
||||
from meshtastic.storeforward_pb2 import StoreAndForward
|
||||
from meshtastic.telemetry_pb2 import Telemetry, DeviceMetrics, EnvironmentMetrics, AirQualityMetrics, PowerMetrics
|
||||
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
|
||||
from psycopg_pool import ConnectionPool
|
||||
|
||||
|
||||
class _Metrics:
|
||||
|
@ -37,10 +21,7 @@ class _Metrics:
|
|||
'node_id', 'short_name', 'long_name', 'hardware_model', 'role'
|
||||
]
|
||||
|
||||
def _init_metrics(self): # TODO: Go over the metrics and rethink some of them to be more like the longtitute and
|
||||
# latitude - The values should represent something and we shouldn't just label stuff. Also, the labels should
|
||||
# be less used looked upon like keys for the data
|
||||
# Histogram for the length of messages
|
||||
def _init_metrics(self):
|
||||
self._init_metrics_text_message()
|
||||
self._init_metrics_telemetry_device()
|
||||
self._init_metrics_telemetry_environment()
|
||||
|
@ -357,510 +338,3 @@ class _Metrics:
|
|||
)
|
||||
|
||||
|
||||
def get_hardware_model_name_from_code(hardware_model):
|
||||
descriptor = HardwareModel.DESCRIPTOR
|
||||
for enum_value in descriptor.values:
|
||||
if enum_value.number == hardware_model:
|
||||
return enum_value.name
|
||||
return 'UNKNOWN_HARDWARE_MODEL'
|
||||
|
||||
|
||||
def get_role_name_from_role(role):
|
||||
descriptor = Config.DeviceConfig.Role.DESCRIPTOR
|
||||
for enum_value in descriptor.values:
|
||||
if enum_value.number == role:
|
||||
return enum_value.name
|
||||
return 'UNKNOWN_ROLE'
|
||||
|
||||
|
||||
class ClientDetails:
|
||||
def __init__(self, node_id, short_name='Unknown', long_name='Unknown', hardware_model=HardwareModel.UNSET,
|
||||
role=None):
|
||||
self.node_id = node_id
|
||||
self.short_name = short_name
|
||||
self.long_name = long_name
|
||||
self.hardware_model: HardwareModel = hardware_model
|
||||
self.role: Config.DeviceConfig.Role = role
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'node_id': self.node_id,
|
||||
'short_name': self.short_name,
|
||||
'long_name': self.long_name,
|
||||
'hardware_model': get_hardware_model_name_from_code(self.hardware_model),
|
||||
'role': get_role_name_from_role(self.role)
|
||||
}
|
||||
|
||||
|
||||
class Processor(ABC):
|
||||
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
|
||||
self.db_pool = db_pool
|
||||
self.metrics = _Metrics(registry)
|
||||
|
||||
@abstractmethod
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
pass
|
||||
|
||||
def execute_db_operation(self, operation):
|
||||
with self.db_pool.connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
return operation(cur, conn)
|
||||
|
||||
|
||||
class ProcessorRegistry:
|
||||
_registry = {}
|
||||
|
||||
@classmethod
|
||||
def register_processor(cls, port_num):
|
||||
def inner_wrapper(wrapped_class):
|
||||
cls._registry[port_num] = wrapped_class
|
||||
return wrapped_class
|
||||
|
||||
return inner_wrapper
|
||||
|
||||
@classmethod
|
||||
def get_processor(cls, port_num) -> type(Processor):
|
||||
return cls._registry.get(port_num, UnknownAppProcessor)
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP)
|
||||
class UnknownAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received UNKNOWN_APP packet")
|
||||
return None
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP)
|
||||
class TextMessageAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received TEXT_MESSAGE_APP packet")
|
||||
message = payload.decode('utf-8')
|
||||
if os.getenv('HIDE_MESSAGE', 'true') == 'true':
|
||||
message = 'Hidden'
|
||||
self.metrics.message_length_histogram.labels(
|
||||
**client_details.to_dict()
|
||||
).observe(len(message))
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP)
|
||||
class RemoteHardwareAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received REMOTE_HARDWARE_APP packet")
|
||||
hardware_message = HardwareMessage()
|
||||
hardware_message.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.POSITION_APP)
|
||||
class PositionAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received POSITION_APP packet")
|
||||
position = Position()
|
||||
position.ParseFromString(payload)
|
||||
self.metrics.device_latitude_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.latitude_i)
|
||||
self.metrics.device_longitude_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.longitude_i)
|
||||
self.metrics.device_altitude_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.altitude)
|
||||
self.metrics.device_position_precision_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(position.precision_bits)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.NODEINFO_APP)
|
||||
class NodeInfoAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received NODEINFO_APP packet")
|
||||
user = User()
|
||||
user.ParseFromString(payload)
|
||||
|
||||
def db_operation(cur, conn):
|
||||
# First, try to select the existing record
|
||||
cur.execute("""
|
||||
SELECT short_name, long_name, hardware_model, role
|
||||
FROM client_details
|
||||
WHERE node_id = %s;
|
||||
""", (client_details.node_id,))
|
||||
existing_record = cur.fetchone()
|
||||
|
||||
if existing_record:
|
||||
# If record exists, update only the fields that are provided in the new data
|
||||
update_fields = []
|
||||
update_values = []
|
||||
if user.short_name:
|
||||
update_fields.append("short_name = %s")
|
||||
update_values.append(user.short_name)
|
||||
if user.long_name:
|
||||
update_fields.append("long_name = %s")
|
||||
update_values.append(user.long_name)
|
||||
if user.hw_model != HardwareModel.UNSET:
|
||||
update_fields.append("hardware_model = %s")
|
||||
update_values.append(get_hardware_model_name_from_code(user.hw_model))
|
||||
if user.role is not None:
|
||||
update_fields.append("role = %s")
|
||||
update_values.append(get_role_name_from_role(user.role))
|
||||
|
||||
if update_fields:
|
||||
update_query = f"""
|
||||
UPDATE client_details
|
||||
SET {", ".join(update_fields)}
|
||||
WHERE node_id = %s
|
||||
"""
|
||||
cur.execute(update_query, update_values + [client_details.node_id])
|
||||
else:
|
||||
# If record doesn't exist, insert a new one
|
||||
cur.execute("""
|
||||
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
""", (client_details.node_id, user.short_name, user.long_name,
|
||||
get_hardware_model_name_from_code(user.hw_model), get_role_name_from_role(user.role)))
|
||||
|
||||
conn.commit()
|
||||
|
||||
self.execute_db_operation(db_operation)
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ROUTING_APP)
|
||||
class RoutingAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ROUTING_APP packet")
|
||||
routing = Routing()
|
||||
routing.ParseFromString(payload)
|
||||
self.metrics.route_discovery_response_counter.labels(
|
||||
**client_details.to_dict(),
|
||||
response_type=self.get_error_name_from_routing(routing.error_reason)
|
||||
).inc()
|
||||
|
||||
@staticmethod
|
||||
def get_error_name_from_routing(error_code):
|
||||
for name, value in Routing.Error.__dict__.items():
|
||||
if isinstance(value, int) and value == error_code:
|
||||
return name
|
||||
return 'UNKNOWN_ERROR'
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ADMIN_APP)
|
||||
class AdminAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ADMIN_APP packet")
|
||||
admin_message = AdminMessage()
|
||||
admin_message.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP)
|
||||
class TextMessageCompressedAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received TEXT_MESSAGE_COMPRESSED_APP packet")
|
||||
decompressed_payload = unishox2.decompress(payload, len(payload))
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.WAYPOINT_APP)
|
||||
class WaypointAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received WAYPOINT_APP packet")
|
||||
waypoint = Waypoint()
|
||||
waypoint.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.AUDIO_APP)
|
||||
class AudioAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received AUDIO_APP packet")
|
||||
pass # NOTE: Audio packet. should probably be processed
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.DETECTION_SENSOR_APP)
|
||||
class DetectionSensorAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received DETECTION_SENSOR_APP packet")
|
||||
pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.REPLY_APP)
|
||||
class ReplyAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received REPLY_APP packet")
|
||||
pass # NOTE: Provides a 'ping' service that replies to any packet it receives. This is useful for testing.
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.IP_TUNNEL_APP)
|
||||
class IpTunnelAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received IP_TUNNEL_APP packet")
|
||||
pass # NOTE: IP Packet. Handled by the python API, firmware ignores this one and passes it on.
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.PAXCOUNTER_APP)
|
||||
class PaxCounterAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received PAXCOUNTER_APP packet")
|
||||
paxcounter = Paxcount()
|
||||
paxcounter.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.SERIAL_APP)
|
||||
class SerialAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received SERIAL_APP packet")
|
||||
pass # NOTE: Provides a hardware serial interface to send and receive from the Meshtastic network.
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.STORE_FORWARD_APP)
|
||||
class StoreForwardAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received STORE_FORWARD_APP packet")
|
||||
store_and_forward = StoreAndForward()
|
||||
store_and_forward.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP)
|
||||
class RangeTestAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received RANGE_TEST_APP packet")
|
||||
pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP)
|
||||
class TelemetryAppProcessor(Processor):
|
||||
def __init__(self, registry: CollectorRegistry, db_connection: psycopg.connection):
|
||||
super().__init__(registry, db_connection)
|
||||
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received TELEMETRY_APP packet")
|
||||
telemetry = Telemetry()
|
||||
telemetry.ParseFromString(payload)
|
||||
|
||||
if telemetry.HasField('device_metrics'):
|
||||
device_metrics: DeviceMetrics = telemetry.device_metrics
|
||||
self.metrics.battery_level_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(device_metrics, 'battery_level', 0))
|
||||
|
||||
self.metrics.voltage_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(device_metrics, 'voltage', 0))
|
||||
|
||||
self.metrics.channel_utilization_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(device_metrics, 'channel_utilization', 0))
|
||||
|
||||
self.metrics.air_util_tx_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(device_metrics, 'air_util_tx', 0))
|
||||
|
||||
self.metrics.uptime_seconds_counter.labels(
|
||||
**client_details.to_dict()
|
||||
).inc(getattr(device_metrics, 'uptime_seconds', 0))
|
||||
|
||||
if telemetry.HasField('environment_metrics'):
|
||||
environment_metrics: EnvironmentMetrics = telemetry.environment_metrics
|
||||
self.metrics.temperature_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'temperature', 0))
|
||||
|
||||
self.metrics.relative_humidity_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'relative_humidity', 0))
|
||||
|
||||
self.metrics.barometric_pressure_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'barometric_pressure', 0))
|
||||
|
||||
self.metrics.gas_resistance_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'gas_resistance', 0))
|
||||
|
||||
self.metrics.iaq_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'iaq', 0))
|
||||
|
||||
self.metrics.distance_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'distance', 0))
|
||||
|
||||
self.metrics.lux_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'lux', 0))
|
||||
|
||||
self.metrics.white_lux_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'white_lux', 0))
|
||||
|
||||
self.metrics.ir_lux_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'ir_lux', 0))
|
||||
|
||||
self.metrics.uv_lux_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'uv_lux', 0))
|
||||
|
||||
self.metrics.wind_direction_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'wind_direction', 0))
|
||||
|
||||
self.metrics.wind_speed_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'wind_speed', 0))
|
||||
|
||||
self.metrics.weight_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(environment_metrics, 'weight', 0))
|
||||
|
||||
if telemetry.HasField('air_quality_metrics'):
|
||||
air_quality_metrics: AirQualityMetrics = telemetry.air_quality_metrics
|
||||
self.metrics.pm10_standard_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm10_standard', 0))
|
||||
|
||||
self.metrics.pm25_standard_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm25_standard', 0))
|
||||
|
||||
self.metrics.pm100_standard_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm100_standard', 0))
|
||||
|
||||
self.metrics.pm10_environmental_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm10_environmental', 0))
|
||||
|
||||
self.metrics.pm25_environmental_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm25_environmental', 0))
|
||||
|
||||
self.metrics.pm100_environmental_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'pm100_environmental', 0))
|
||||
|
||||
self.metrics.particles_03um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_03um', 0))
|
||||
|
||||
self.metrics.particles_05um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_05um', 0))
|
||||
|
||||
self.metrics.particles_10um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_10um', 0))
|
||||
|
||||
self.metrics.particles_25um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_25um', 0))
|
||||
|
||||
self.metrics.particles_50um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_50um', 0))
|
||||
|
||||
self.metrics.particles_100um_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(air_quality_metrics, 'particles_100um', 0))
|
||||
|
||||
if telemetry.HasField('power_metrics'):
|
||||
power_metrics: PowerMetrics = telemetry.power_metrics
|
||||
self.metrics.ch1_voltage_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch1_voltage', 0))
|
||||
|
||||
self.metrics.ch1_current_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch1_current', 0))
|
||||
|
||||
self.metrics.ch2_voltage_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch2_voltage', 0))
|
||||
|
||||
self.metrics.ch2_current_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch2_current', 0))
|
||||
|
||||
self.metrics.ch3_voltage_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch3_voltage', 0))
|
||||
|
||||
self.metrics.ch3_current_gauge.labels(
|
||||
**client_details.to_dict()
|
||||
).set(getattr(power_metrics, 'ch3_current', 0))
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ZPS_APP)
|
||||
class ZpsAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ZPS_APP packet")
|
||||
pass # NOTE: Experimental tools for estimating node position without a GPS
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.SIMULATOR_APP)
|
||||
class SimulatorAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received SIMULATOR_APP packet")
|
||||
pass # NOTE: Used to let multiple instances of Linux native applications communicate as if they did using their LoRa chip.
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.TRACEROUTE_APP)
|
||||
class TraceRouteAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received TRACEROUTE_APP packet")
|
||||
traceroute = RouteDiscovery()
|
||||
traceroute.ParseFromString(payload)
|
||||
if traceroute.route:
|
||||
route = traceroute.route
|
||||
self.metrics.route_discovery_counter.labels(
|
||||
**client_details.to_dict()
|
||||
).inc(len(route))
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.NEIGHBORINFO_APP)
|
||||
class NeighborInfoAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received NEIGHBORINFO_APP packet")
|
||||
neighbor_info = NeighborInfo()
|
||||
neighbor_info.ParseFromString(payload)
|
||||
pass
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN)
|
||||
class AtakPluginProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ATAK_PLUGIN packet")
|
||||
pass # NOTE: ATAK Plugin
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.MAP_REPORT_APP)
|
||||
class MapReportAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received MAP_REPORT_APP packet")
|
||||
map_report = MapReport()
|
||||
map_report.ParseFromString(payload)
|
||||
pass # Nothing interesting here
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.PRIVATE_APP)
|
||||
class PrivateAppProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received PRIVATE_APP packet")
|
||||
pass # NOTE: Private application portnum
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.ATAK_FORWARDER)
|
||||
class AtakForwarderProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received ATAK_FORWARDER packet")
|
||||
pass # NOTE: ATAK Forwarder
|
||||
|
||||
|
||||
@ProcessorRegistry.register_processor(PortNum.MAX)
|
||||
class MaxProcessor(Processor):
|
||||
def process(self, payload: bytes, client_details: ClientDetails):
|
||||
logger.debug("Received MAX packet")
|
||||
pass # NOTE: Maximum portnum value
|
||||
|
|
2
main.py
2
main.py
|
@ -10,7 +10,7 @@ from paho.mqtt.enums import CallbackAPIVersion
|
|||
from prometheus_client import CollectorRegistry, start_http_server
|
||||
from psycopg_pool import ConnectionPool
|
||||
|
||||
from exporter.processors import MessageProcessor
|
||||
from exporter.processor_base import MessageProcessor
|
||||
|
||||
# Global connection pool
|
||||
connection_pool = None
|
||||
|
|
Loading…
Reference in a new issue