Bare bones are ready

This commit is contained in:
Gleb Tcivie 2024-06-23 22:15:31 +03:00
commit 07ddcdf5d1
12 changed files with 415 additions and 0 deletions

0
.env Normal file
View file

8
.idea/.gitignore vendored Normal file
View file

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View file

@ -0,0 +1,29 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<Languages>
<language minSize="141" name="Python" />
</Languages>
</inspection_tool>
<inspection_tool class="Eslint" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="3">
<item index="0" class="java.lang.String" itemvalue="torch" />
<item index="1" class="java.lang.String" itemvalue="scipy" />
<item index="2" class="java.lang.String" itemvalue="numpy" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N806" />
</list>
</option>
</inspection_tool>
</profile>
</component>

View file

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

View file

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

7
.idea/misc.xml Normal file
View file

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12 (meshtastic-metrics-exporter)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (meshtastic-metrics-exporter)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml Normal file
View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/meshtastic-metrics-exporter.iml" filepath="$PROJECT_DIR$/.idea/meshtastic-metrics-exporter.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml Normal file
View file

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

1
exporter/__init__.py Normal file
View file

@ -0,0 +1 @@
from .processors import MessageProcessor

16
exporter/processors.py Normal file
View file

@ -0,0 +1,16 @@
from meshtastic.mesh_pb2 import MeshPacket
from prometheus_client import CollectorRegistry
from exporter.registry import ProcessorRegistry
class MessageProcessor:
def __init__(self, registry: CollectorRegistry):
self.registry = registry
def process(self, mesh_packet: MeshPacket):
port_num = mesh_packet.decoded.portnum
payload = mesh_packet.decoded.payload
processor = ProcessorRegistry.get_processor(port_num)(self.registry)
processor.process(payload)

252
exporter/registry.py Normal file
View file

@ -0,0 +1,252 @@
from abc import ABC, abstractmethod
from venv import logger
import unishox2
from meshtastic.admin_pb2 import AdminMessage
from meshtastic.mesh_pb2 import Position, User, Routing, Waypoint, RouteDiscovery, NeighborInfo
from meshtastic.mqtt_pb2 import MapReport
from meshtastic.paxcount_pb2 import Paxcount
from meshtastic.portnums_pb2 import PortNum
from meshtastic.remote_hardware_pb2 import HardwareMessage
from meshtastic.storeforward_pb2 import StoreAndForward
from meshtastic.telemetry_pb2 import Telemetry
from prometheus_client import CollectorRegistry
class Processor(ABC):
def __init__(self, registry: CollectorRegistry):
self.registry = registry
@abstractmethod
def process(self, payload):
pass
class ProcessorRegistry:
_registry = {}
@classmethod
def register_processor(cls, portnum):
def inner_wrapper(wrapped_class):
cls._registry[portnum] = wrapped_class()
return wrapped_class
return inner_wrapper
@classmethod
def get_processor(cls, portnum):
return cls._registry.get(portnum, UnknownAppProcessor())
@ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP)
class UnknownAppProcessor(Processor):
def process(self, payload):
logger.debug("Received UNKNOWN_APP packet")
return None
@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP)
class TextMessageAppProcessor(Processor):
def process(self, payload):
logger.debug("Received TEXT_MESSAGE_APP packet")
pass
@ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP)
class RemoteHardwareAppProcessor(Processor):
def process(self, payload):
logger.debug("Received REMOTE_HARDWARE_APP packet")
hardware_message = HardwareMessage()
hardware_message.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.POSITION_APP)
class PositionAppProcessor(Processor):
def process(self, payload):
logger.debug("Received POSITION_APP packet")
position = Position()
position.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.NODEINFO_APP)
class NodeInfoAppProcessor(Processor):
def process(self, payload):
logger.debug("Received NODEINFO_APP packet")
user = User()
user.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.ROUTING_APP)
class RoutingAppProcessor(Processor):
def process(self, payload):
logger.debug("Received ROUTING_APP packet")
routing = Routing()
routing.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.ADMIN_APP)
class AdminAppProcessor(Processor):
def process(self, payload):
logger.debug("Received ADMIN_APP packet")
admin_message = AdminMessage()
admin_message.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_COMPRESSED_APP)
class TextMessageCompressedAppProcessor(Processor):
def process(self, payload):
logger.debug("Received TEXT_MESSAGE_COMPRESSED_APP packet")
decompressed_payload = unishox2.decompress(payload, len(payload))
pass
@ProcessorRegistry.register_processor(PortNum.WAYPOINT_APP)
class WaypointAppProcessor(Processor):
def process(self, payload):
logger.debug("Received WAYPOINT_APP packet")
waypoint = Waypoint()
waypoint.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.AUDIO_APP)
class AudioAppProcessor(Processor):
def process(self, payload):
logger.debug("Received AUDIO_APP packet")
pass # NOTE: Audio packet. should probably be processed
@ProcessorRegistry.register_processor(PortNum.DETECTION_SENSOR_APP)
class DetectionSensorAppProcessor(Processor):
def process(self, payload):
logger.debug("Received DETECTION_SENSOR_APP packet")
pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9
@ProcessorRegistry.register_processor(PortNum.REPLY_APP)
class ReplyAppProcessor(Processor):
def process(self, payload):
logger.debug("Received REPLY_APP packet")
pass # NOTE: Provides a 'ping' service that replies to any packet it receives. This is useful for testing.
@ProcessorRegistry.register_processor(PortNum.IP_TUNNEL_APP)
class IpTunnelAppProcessor(Processor):
def process(self, payload):
logger.debug("Received IP_TUNNEL_APP packet")
pass # NOTE: IP Packet. Handled by the python API, firmware ignores this one and passes it on.
@ProcessorRegistry.register_processor(PortNum.PAXCOUNTER_APP)
class PaxCounterAppProcessor(Processor):
def process(self, payload):
logger.debug("Received PAXCOUNTER_APP packet")
paxcounter = Paxcount()
paxcounter.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.SERIAL_APP)
class SerialAppProcessor(Processor):
def process(self, payload):
logger.debug("Received SERIAL_APP packet")
pass # NOTE: Provides a hardware serial interface to send and receive from the Meshtastic network.
@ProcessorRegistry.register_processor(PortNum.STORE_FORWARD_APP)
class StoreForwardAppProcessor(Processor):
def process(self, payload):
logger.debug("Received STORE_FORWARD_APP packet")
store_and_forward = StoreAndForward()
store_and_forward.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.RANGE_TEST_APP)
class RangeTestAppProcessor(Processor):
def process(self, payload):
logger.debug("Received RANGE_TEST_APP packet")
pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9
@ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP)
class TelemetryAppProcessor(Processor):
def process(self, payload):
logger.debug("Received TELEMETRY_APP packet")
telemetry = Telemetry()
telemetry.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.ZPS_APP)
class ZpsAppProcessor(Processor):
def process(self, payload):
logger.debug("Received ZPS_APP packet")
pass # NOTE: Experimental tools for estimating node position without a GPS
@ProcessorRegistry.register_processor(PortNum.SIMULATOR_APP)
class SimulatorAppProcessor(Processor):
def process(self, payload):
logger.debug("Received SIMULATOR_APP packet")
pass # NOTE: Used to let multiple instances of Linux native applications communicate as if they did using their LoRa chip.
@ProcessorRegistry.register_processor(PortNum.TRACEROUTE_APP)
class TraceRouteAppProcessor(Processor):
def process(self, payload):
logger.debug("Received TRACEROUTE_APP packet")
traceroute = RouteDiscovery()
traceroute.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.NEIGHBORINFO_APP)
class NeighborInfoAppProcessor(Processor):
def process(self, payload):
logger.debug("Received NEIGHBORINFO_APP packet")
neighbor_info = NeighborInfo()
neighbor_info.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN)
class AtakPluginProcessor(Processor):
def process(self, payload):
logger.debug("Received ATAK_PLUGIN packet")
pass # NOTE: ATAK Plugin
@ProcessorRegistry.register_processor(PortNum.MAP_REPORT_APP)
class MapReportAppProcessor(Processor):
def process(self, payload):
logger.debug("Received MAP_REPORT_APP packet")
map_report = MapReport()
map_report.ParseFromString(payload)
pass
@ProcessorRegistry.register_processor(PortNum.PRIVATE_APP)
class PrivateAppProcessor(Processor):
def process(self, payload):
logger.debug("Received PRIVATE_APP packet")
pass # NOTE: Private application portnum
@ProcessorRegistry.register_processor(PortNum.ATAK_FORWARDER)
class AtakForwarderProcessor(Processor):
def process(self, payload):
logger.debug("Received ATAK_FORWARDER packet")
pass # NOTE: ATAK Forwarder
@ProcessorRegistry.register_processor(PortNum.MAX)
class MaxProcessor(Processor):
def process(self, payload):
logger.debug("Received MAX packet")
pass # NOTE: Maximum portnum value

72
main.py Normal file
View file

@ -0,0 +1,72 @@
import logging
import os
import paho.mqtt.client as mqtt
import redis
from dotenv import load_dotenv
from meshtastic.mesh_pb2 import MeshPacket
from meshtastic.mqtt_pb2 import ServiceEnvelope
from prometheus_client import push_to_gateway, CollectorRegistry
from exporter.processors import MessageProcessor
def handle_connect(client, userdata, flags, reason_code, properties):
print(f"Connected with result code {reason_code}")
client.subscribe(os.getenv('mqtt_topic', 'msh/israel/#'))
def handle_message(client, userdata, message):
print(f"Received message '{message.payload.decode()}' on topic '{message.topic}'")
envelope = ServiceEnvelope()
envelope.ParseFromString(message.payload)
packet: MeshPacket = envelope.packet
if redis_client.set(str(packet.id), 1, nx=True, ex=os.getenv('redis_expiration', 60), get=True) is not None:
logging.debug(f"Packet {packet.id} already processed")
return
# Process the packet
processor.process(packet)
if __name__ == "__main__":
load_dotenv()
# Create Redis client
redis_client = redis.Redis(
host=os.getenv('redis_host'),
port=int(os.getenv('redis_port')),
db=int(os.getenv('redis_db', 0)),
password=os.getenv('redis_password', None),
)
# Configure Prometheus exporter
registry = CollectorRegistry()
push_to_gateway(
os.getenv('prometheus_pushgateway'),
job=os.getenv('prometheus_job'),
registry=registry,
)
# Create an MQTT client
mqtt_client = mqtt.Client()
mqtt_client.on_connect = handle_connect
mqtt_client.on_message = handle_message
if bool(os.getenv('mqtt_is_tls', False)):
tls_context = mqtt.ssl.create_default_context()
mqtt_client.tls_set_context(tls_context)
if os.getenv('mqtt_username', None) and os.getenv('mqtt_password', None):
mqtt_client.username_pw_set(os.getenv('mqtt_username'), os.getenv('mqtt_password'))
mqtt_client.connect(
os.getenv('mqtt_host'),
int(os.getenv('mqtt_port')),
keepalive=int(os.getenv('mqtt_keepalive', 60)),
)
# Configure the Processor and the Exporter
processor = MessageProcessor(registry)
mqtt_client.loop_forever()