Refactored the code + added support for node_configuration_metrics

This commit is contained in:
Gleb Tcivie 2024-08-09 13:20:31 +03:00
parent 36ce36287e
commit 1c6eb81889
10 changed files with 368 additions and 15 deletions

3
.env
View file

@ -35,3 +35,6 @@ MQTT_SERVER_KEY=1PG7OiApB1nwvP+rz05pAQ==
# Message types to filter (default: none) (comma separated) (eg. TEXT_MESSAGE_APP,POSITION_APP)
# Full list can be found here: https://buf.build/meshtastic/protobufs/docs/main:meshtastic#meshtastic.PortNum
EXPORTER_MESSAGE_TYPES_TO_FILTER=TEXT_MESSAGE_APP
# Enable node configurations report (default: true)
REPORT_NODE_CONFIGURATIONS=true

View file

@ -1,5 +1,3 @@
CREATE EXTENSION IF NOT EXISTS moddatetime;
CREATE TABLE IF NOT EXISTS messages
(
id TEXT PRIMARY KEY,
@ -53,8 +51,105 @@ CREATE TABLE IF NOT EXISTS node_neighbors
CREATE UNIQUE INDEX idx_unique_node_neighbor ON node_neighbors (node_id, neighbor_id);
CREATE OR REPLACE TRIGGER client_details_updated_at
CREATE OR REPLACE TRIGGER node_details_updated_at
BEFORE UPDATE
ON node_details
FOR EACH ROW
EXECUTE PROCEDURE moddatetime(updated_at);
CREATE TABLE IF NOT EXISTS node_configurations
(
node_id VARCHAR PRIMARY KEY,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- Configuration (Telemetry)
environment_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
environment_update_last_timestamp TIMESTAMP DEFAULT NOW(),
device_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
device_update_last_timestamp TIMESTAMP DEFAULT NOW(),
air_quality_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
air_quality_update_last_timestamp TIMESTAMP DEFAULT NOW(),
power_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
power_update_last_timestamp TIMESTAMP DEFAULT NOW(),
-- Configuration (Range Test)
range_test_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
range_test_packets_total INT DEFAULT 0, -- in packets
range_test_first_packet_timestamp TIMESTAMP DEFAULT NOW(),
range_test_last_packet_timestamp TIMESTAMP DEFAULT NOW(),
-- Configuration (PAX Counter)
pax_counter_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
pax_counter_last_timestamp TIMESTAMP DEFAULT NOW(),
-- Configuration (Neighbor Info)
neighbor_info_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
neighbor_info_last_timestamp TIMESTAMP DEFAULT NOW(),
-- Configuration (MQTT)
mqtt_encryption_enabled BOOLEAN DEFAULT FALSE,
mqtt_json_enabled BOOLEAN DEFAULT FALSE,
mqtt_configured_root_topic TEXT DEFAULT '',
mqtt_info_last_timestamp TIMESTAMP DEFAULT NOW(),
-- Configuration (Map)
map_broadcast_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
map_broadcast_last_timestamp TIMESTAMP DEFAULT NOW(),
-- FOREIGN KEY (node_id) REFERENCES node_details (node_id),
UNIQUE (node_id)
);
-- -- Function to update old values
-- CREATE OR REPLACE FUNCTION update_old_node_configurations()
-- RETURNS TRIGGER AS $$
-- BEGIN
-- -- Update intervals to 0 if not updated in 24 hours
-- IF NEW.environment_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.environment_update_interval := '0 seconds';
-- END IF;
--
-- IF NEW.device_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.device_update_interval := '0 seconds';
-- END IF;
--
-- IF NEW.air_quality_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.air_quality_update_interval := '0 seconds';
-- END IF;
--
-- IF NEW.power_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.power_update_interval := '0 seconds';
-- END IF;
--
-- IF NEW.range_test_last_packet_timestamp < NOW() - INTERVAL '1 hours' THEN
-- NEW.range_test_interval := '0 seconds';
-- NEW.range_test_first_packet_timestamp := 0;
-- NEW.range_test_packets_total := 0;
-- END IF;
--
-- IF NEW.pax_counter_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.pax_counter_interval := '0 seconds';
-- END IF;
--
-- IF NEW.neighbor_info_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.neighbor_info_interval := '0 seconds';
-- END IF;
--
-- IF NEW.map_broadcast_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.map_broadcast_interval := '0 seconds';
-- END IF;
--
-- NEW.last_updated := CURRENT_TIMESTAMP;
--
-- RETURN NEW;
-- END;
-- $$ LANGUAGE plpgsql;
--
-- -- Create the trigger
-- CREATE TRIGGER update_node_configurations_trigger
-- BEFORE UPDATE ON node_configurations
-- FOR EACH ROW
-- EXECUTE FUNCTION update_old_node_configurations();

View file

@ -1 +1 @@
from .processor_base import MessageProcessor
from exporter.processor.processor_base import MessageProcessor

View file

View file

@ -0,0 +1,212 @@
import os
from psycopg_pool import ConnectionPool
from exporter.db_handler import DBHandler
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class NodeConfigurationMetrics(metaclass=Singleton):
def __init__(self, connection_pool: ConnectionPool = None):
self.db = DBHandler(connection_pool)
self.report = os.getenv('REPORT_NODE_CONFIGURATIONS', True)
def process_environment_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (node_id,
environment_update_interval,
environment_update_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
environment_update_interval = NOW() - node_configurations.environment_update_last_timestamp,
environment_update_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_device_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (node_id,
device_update_interval,
device_update_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
device_update_interval = NOW() - node_configurations.device_update_last_timestamp,
device_update_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_power_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (node_id,
power_update_interval,
power_update_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
power_update_interval = NOW() - node_configurations.power_update_last_timestamp,
power_update_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def map_broadcast_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (node_id,
map_broadcast_interval,
map_broadcast_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
map_broadcast_interval = NOW() - node_configurations.map_broadcast_last_timestamp,
map_broadcast_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_air_quality_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (node_id,
air_quality_update_interval,
air_quality_update_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
air_quality_update_interval = NOW() - node_configurations.air_quality_update_last_timestamp,
air_quality_update_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_range_test_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (
node_id,
range_test_interval,
range_test_packets_total,
range_test_first_packet_timestamp,
range_test_last_packet_timestamp
) VALUES (%s, %s, NOW(), NOW(), NOW())
ON CONFLICT(node_id)
DO UPDATE SET
range_test_interval = NOW() - node_configurations.range_test_last_packet_timestamp,
range_test_packets_total = CASE
WHEN EXCLUDED.range_test_last_packet_timestamp - node_configurations.range_test_first_packet_timestamp >= INTERVAL '1 hour'
THEN 1
ELSE node_configurations.range_test_packets_total + 1
END,
range_test_first_packet_timestamp = CASE
WHEN EXCLUDED.range_test_last_packet_timestamp - node_configurations.range_test_first_packet_timestamp >= INTERVAL '1 hour'
THEN NOW()
ELSE node_configurations.range_test_first_packet_timestamp
END,
range_test_last_packet_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_pax_counter_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (
node_id,
pax_counter_interval,
pax_counter_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
pax_counter_interval = NOW() - node_configurations.pax_counter_last_timestamp,
pax_counter_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_neighbor_info_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (
node_id,
neighbor_info_interval,
neighbor_info_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
neighbor_info_interval = NOW() - node_configurations.neighbor_info_last_timestamp,
neighbor_info_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_mqtt_update(self, node_id: str, mqtt_encryption_enabled=None, mqtt_json_enabled=None,
mqtt_configured_root_topic=None):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (
node_id,
mqtt_encryption_enabled,
mqtt_json_enabled,
mqtt_configured_root_topic,
mqtt_info_last_timestamp
) VALUES (%s, COALESCE(%s, FALSE), COALESCE(%s, FALSE), COALESCE(%s, ''), NOW())
ON CONFLICT(node_id)
DO UPDATE SET
mqtt_encryption_enabled = COALESCE(EXCLUDED.mqtt_encryption_enabled, node_configurations.mqtt_encryption_enabled),
mqtt_json_enabled = COALESCE(EXCLUDED.mqtt_json_enabled, node_configurations.mqtt_json_enabled),
mqtt_configured_root_topic = COALESCE(EXCLUDED.mqtt_configured_root_topic, node_configurations.mqtt_configured_root_topic),
mqtt_info_last_timestamp = NOW()
""", (node_id, mqtt_encryption_enabled, mqtt_json_enabled, mqtt_configured_root_topic))
conn.commit()
self.db.execute_db_operation(db_operation)

View file

@ -4,12 +4,12 @@ from exporter.client_details import ClientDetails
from exporter.db_handler import DBHandler
class _Metrics:
class Metrics:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(_Metrics, cls).__new__(cls)
cls._instance = super(Metrics, cls).__new__(cls)
return cls._instance
def __init__(self, registry: CollectorRegistry, db: DBHandler):

View file

View file

@ -1,9 +1,13 @@
import base64
import json
import os
import sys
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope
from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics
try:
from meshtastic.mesh_pb2 import MeshPacket, Data, HardwareModel
@ -16,7 +20,7 @@ from prometheus_client import CollectorRegistry, Counter, Gauge
from psycopg_pool import ConnectionPool
from exporter.client_details import ClientDetails
from exporter.processors import ProcessorRegistry
from exporter.processor.processors import ProcessorRegistry
class MessageProcessor:
@ -141,6 +145,32 @@ class MessageProcessor:
registry=self.registry
)
@staticmethod
def process_json_mqtt(message):
topic = message.topic
json_packet = json.loads(message.payload)
if json_packet['sender'][0] == '!':
gateway_node_id = str(int(json_packet['sender'][1:], 16))
NodeConfigurationMetrics().process_mqtt_update(
node_id=gateway_node_id,
mqtt_encryption_enabled=json_packet.get('encrypted', False),
mqtt_configured_root_topic=topic
)
@staticmethod
def process_mqtt(topic: str, service_envelope: ServiceEnvelope, mesh_packet: MeshPacket):
is_encrypted = False
if getattr(mesh_packet, 'encrypted'):
is_encrypted = True
if getattr(service_envelope, 'gateway_id'):
if service_envelope.gateway_id[0] == '!':
gateway_node_id = str(int(service_envelope.gateway_id[1:], 16))
NodeConfigurationMetrics().process_mqtt_update(
node_id=gateway_node_id,
mqtt_encryption_enabled=is_encrypted,
mqtt_configured_root_topic=topic
)
def process(self, mesh_packet: MeshPacket):
try:
if getattr(mesh_packet, 'encrypted'):

View file

@ -6,6 +6,7 @@ import psycopg
import unishox2
from exporter.db_handler import DBHandler
from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics
try:
from meshtastic.admin_pb2 import AdminMessage
@ -32,13 +33,13 @@ from prometheus_client import CollectorRegistry
from psycopg_pool import ConnectionPool
from exporter.client_details import ClientDetails
from exporter.registry import _Metrics
from exporter.metric.node_metrics import Metrics
class Processor(ABC):
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
self.db_pool = db_pool
self.metrics = _Metrics(registry, DBHandler(db_pool))
self.metrics = Metrics(registry, DBHandler(db_pool))
@abstractmethod
def process(self, payload: bytes, client_details: ClientDetails):
@ -257,6 +258,7 @@ class IpTunnelAppProcessor(Processor):
class PaxCounterAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received PAXCOUNTER_APP packet")
NodeConfigurationMetrics().process_pax_counter_update(client_details.node_id)
paxcounter = Paxcount()
try:
paxcounter.ParseFromString(payload)
@ -288,6 +290,7 @@ class StoreForwardAppProcessor(Processor):
class RangeTestAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received RANGE_TEST_APP packet")
NodeConfigurationMetrics().process_range_test_update(client_details.node_id)
pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9
@ -306,6 +309,7 @@ class TelemetryAppProcessor(Processor):
return
if telemetry.HasField('device_metrics'):
NodeConfigurationMetrics().process_device_update(client_details.node_id)
device_metrics: DeviceMetrics = telemetry.device_metrics
self.metrics.battery_level_gauge.labels(
**client_details.to_dict()
@ -328,6 +332,7 @@ class TelemetryAppProcessor(Processor):
).inc(getattr(device_metrics, 'uptime_seconds', 0))
if telemetry.HasField('environment_metrics'):
NodeConfigurationMetrics().process_environment_update(client_details.node_id)
environment_metrics: EnvironmentMetrics = telemetry.environment_metrics
self.metrics.temperature_gauge.labels(
**client_details.to_dict()
@ -382,6 +387,7 @@ class TelemetryAppProcessor(Processor):
).set(getattr(environment_metrics, 'weight', 0))
if telemetry.HasField('air_quality_metrics'):
NodeConfigurationMetrics().process_air_quality_update(client_details.node_id)
air_quality_metrics: AirQualityMetrics = telemetry.air_quality_metrics
self.metrics.pm10_standard_gauge.labels(
**client_details.to_dict()
@ -432,6 +438,7 @@ class TelemetryAppProcessor(Processor):
).set(getattr(air_quality_metrics, 'particles_100um', 0))
if telemetry.HasField('power_metrics'):
NodeConfigurationMetrics().process_power_update(client_details.node_id)
power_metrics: PowerMetrics = telemetry.power_metrics
self.metrics.ch1_voltage_gauge.labels(
**client_details.to_dict()
@ -493,6 +500,7 @@ class TraceRouteAppProcessor(Processor):
class NeighborInfoAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received NEIGHBORINFO_APP packet")
NodeConfigurationMetrics().process_neighbor_info_update(client_details.node_id)
neighbor_info = NeighborInfo()
try:
neighbor_info.ParseFromString(payload)
@ -547,6 +555,7 @@ class AtakPluginProcessor(Processor):
class MapReportAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received MAP_REPORT_APP packet")
NodeConfigurationMetrics().map_broadcast_update(client_details.node_id)
map_report = MapReport()
try:
map_report.ParseFromString(payload)

16
main.py
View file

@ -6,6 +6,7 @@ import paho.mqtt.client as mqtt
from dotenv import load_dotenv
from constants import callback_api_version_map, protocol_map
from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics
try:
from meshtastic.mesh_pb2 import MeshPacket
@ -38,9 +39,10 @@ def handle_connect(client, userdata, flags, reason_code, properties):
def update_node_status(node_number, status):
with connection_pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO node_details (node_id, mqtt_status) VALUES (%s, %s)"
cur.execute("INSERT INTO node_details (node_id, mqtt_status, short_name, long_name) VALUES (%s, %s, %s, %s)"
"ON CONFLICT(node_id)"
"DO UPDATE SET mqtt_status = %s", (node_number, status, status))
"DO UPDATE SET mqtt_status = %s",
(node_number, status, status, 'Unknown (MQTT)', 'Unknown (MQTT)'))
conn.commit()
@ -48,6 +50,7 @@ def handle_message(client, userdata, message):
current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"Received message on topic '{message.topic}' at {current_timestamp}")
if '/json/' in message.topic:
processor.process_json_mqtt(message)
# Ignore JSON messages as there are also protobuf messages sent on other topic
# Source: https://github.com/meshtastic/firmware/blob/master/src/mqtt/MQTT.cpp#L448
return
@ -78,7 +81,7 @@ def handle_message(client, userdata, message):
cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING",
(str(packet.id),))
conn.commit()
processor.process_mqtt(message.topic, envelope, packet)
processor.process(packet)
except Exception as e:
logging.error(f"Failed to handle message: {e}")
@ -89,14 +92,15 @@ if __name__ == "__main__":
load_dotenv()
# We have to load_dotenv before we can import MessageProcessor to allow filtering of message types
from exporter.processor_base import MessageProcessor
from exporter.processor.processor_base import MessageProcessor
# Setup a connection pool
connection_pool = ConnectionPool(
os.getenv('DATABASE_URL'),
min_size=1,
max_size=10
max_size=100
)
# Configure node configuration metrics
node_conf_metrics = NodeConfigurationMetrics(connection_pool)
# Configure Prometheus exporter
registry = CollectorRegistry()