Merge pull request #9 from tcivie/migrate-from-redis-to-postgre

Migrate from redis to postgre
This commit is contained in:
Gleb Tcivie 2024-06-28 19:58:26 +03:00 committed by GitHub
commit b763bccdab
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 202 additions and 105 deletions

7
.env
View file

@ -1,10 +1,7 @@
# Description: Environment variables for the application # Description: Environment variables for the application
# Redis connection details # Postgres connection details
REDIS_HOST=redis DATABASE_URL=postgres://postgres:postgres@postgres:5432/meshtastic
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=
# Prometheus connection details # Prometheus connection details
PROMETHEUS_COLLECTOR_PORT=9464 PROMETHEUS_COLLECTOR_PORT=9464

View file

@ -1,11 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="DataSourceManagerImpl" format="xml" multifile-model="true"> <component name="DataSourceManagerImpl" format="xml" multifile-model="true">
<data-source source="LOCAL" name="0@localhost" uuid="6480dc2d-498f-4da7-af1b-7fe2ca0790fa"> <data-source source="LOCAL" name="meshtastic@localhost" uuid="9c9a791a-fbdc-4e29-933b-a615eaa80a3c">
<driver-ref>redis</driver-ref> <driver-ref>postgresql</driver-ref>
<synchronize>true</synchronize> <synchronize>true</synchronize>
<jdbc-driver>jdbc.RedisDriver</jdbc-driver> <jdbc-driver>org.postgresql.Driver</jdbc-driver>
<jdbc-url>jdbc:redis://localhost:6379/0</jdbc-url> <jdbc-url>jdbc:postgresql://localhost:5432/meshtastic</jdbc-url>
<working-dir>$ProjectFileDir$</working-dir> <working-dir>$ProjectFileDir$</working-dir>
</data-source> </data-source>
</component> </component>

View file

@ -7,7 +7,4 @@
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
<component name="PackageRequirementsSettings">
<option name="removeUnused" value="true" />
</component>
</module> </module>

8
.idea/sqldialects.xml Normal file
View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="SqlDialectMappings">
<file url="file://$PROJECT_DIR$/docker/postgres/init.sql" dialect="PostgreSQL" />
<file url="file://$PROJECT_DIR$/exporter/processors.py" dialect="PostgreSQL" />
<file url="PROJECT" dialect="PostgreSQL" />
</component>
</project>

View file

@ -1,7 +1,7 @@
volumes: volumes:
prometheus_data: prometheus_data:
grafana_data: grafana_data:
redis_data: postgres_data:
services: services:
prometheus: prometheus:
@ -42,13 +42,18 @@ services:
networks: networks:
- mesh-bridge - mesh-bridge
redis: postgres:
image: redis:7 image: postgres:13.3
restart: unless-stopped restart: unless-stopped
networks: networks:
- mesh-bridge - mesh-bridge
volumes: volumes:
- redis_data:/data - postgres_data:/var/lib/postgresql/data
- ./docker/postgres/init.sql:/docker-entrypoint-initdb.d/init.sql
environment:
POSTGRES_USER: "postgres"
POSTGRES_PASSWORD: "postgres"
POSTGRES_DB: "meshtastic"
networks: networks:
mesh-bridge: mesh-bridge:

View file

@ -8,12 +8,3 @@ datasources:
editable: true editable: true
jsonData: jsonData:
httpMethod: POST httpMethod: POST
- name: redis-datasource
type: redis-datasource
access: proxy
url: redis://redis:6379/0
isDefault: false
editable: true
jsonData:
client: standalone

29
docker/postgres/init.sql Normal file
View file

@ -0,0 +1,29 @@
CREATE TABLE IF NOT EXISTS messages
(
id TEXT PRIMARY KEY,
received_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION expire_old_messages()
RETURNS TRIGGER AS
$$
BEGIN
DELETE FROM messages WHERE received_at < NOW() - INTERVAL '1 minute';
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trigger_expire_old_messages
AFTER INSERT
ON messages
FOR EACH ROW
EXECUTE FUNCTION expire_old_messages();
CREATE TABLE IF NOT EXISTS client_details
(
node_id VARCHAR PRIMARY KEY,
short_name VARCHAR,
long_name VARCHAR,
hardware_model VARCHAR,
role VARCHAR
);

View file

@ -1,20 +1,18 @@
import base64 import base64
import json
import os import os
import redis
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from meshtastic.config_pb2 import Config from meshtastic.mesh_pb2 import MeshPacket, Data, HardwareModel
from meshtastic.mesh_pb2 import MeshPacket, HardwareModel, Data
from meshtastic.portnums_pb2 import PortNum from meshtastic.portnums_pb2 import PortNum
from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge
from psycopg_pool import ConnectionPool
from exporter.registry import ProcessorRegistry, ClientDetails from exporter.registry import ProcessorRegistry, ClientDetails
class MessageProcessor: class MessageProcessor:
def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
self.rx_rssi_gauge = None self.rx_rssi_gauge = None
self.channel_counter = None self.channel_counter = None
self.packet_id_counter = None self.packet_id_counter = None
@ -28,7 +26,7 @@ class MessageProcessor:
self.destination_message_type_counter = None self.destination_message_type_counter = None
self.source_message_type_counter = None self.source_message_type_counter = None
self.registry = registry self.registry = registry
self.redis_client = redis_client self.db_pool = db_pool
self.init_metrics() self.init_metrics()
self.processor_registry = ProcessorRegistry() self.processor_registry = ProcessorRegistry()
@ -137,12 +135,14 @@ class MessageProcessor:
port_num = int(mesh_packet.decoded.portnum) port_num = int(mesh_packet.decoded.portnum)
payload = mesh_packet.decoded.payload payload = mesh_packet.decoded.payload
source_client_details = self._get_client_details(getattr(mesh_packet, 'from')) source_node_id = getattr(mesh_packet, 'from')
source_client_details = self._get_client_details(source_node_id)
if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true': if os.getenv('MESH_HIDE_SOURCE_DATA', 'false') == 'true':
source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden', source_client_details = ClientDetails(node_id=source_client_details.node_id, short_name='Hidden',
long_name='Hidden') long_name='Hidden')
destination_client_details = self._get_client_details(getattr(mesh_packet, 'to')) destination_node_id = getattr(mesh_packet, 'to')
destination_client_details = self._get_client_details(destination_node_id)
if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true': if os.getenv('MESH_HIDE_DESTINATION_DATA', 'false') == 'true':
destination_client_details = ClientDetails(node_id=destination_client_details.node_id, short_name='Hidden', destination_client_details = ClientDetails(node_id=destination_client_details.node_id, short_name='Hidden',
long_name='Hidden') long_name='Hidden')
@ -152,7 +152,7 @@ class MessageProcessor:
self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details) self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details)
processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.redis_client) processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool)
processor.process(payload, client_details=source_client_details) processor.process(payload, client_details=source_client_details)
def get_port_name_from_portnum(self, port_num): def get_port_name_from_portnum(self, port_num):
@ -237,16 +237,33 @@ class MessageProcessor:
destination_id=destination_client_details.node_id destination_id=destination_client_details.node_id
).set(mesh_packet.rx_rssi) ).set(mesh_packet.rx_rssi)
def _get_client_details(self, node_id: str) -> ClientDetails: def _get_client_details(self, node_id: int) -> ClientDetails:
user_details_json = self.redis_client.get(f"node:{node_id}") node_id_str = str(node_id) # Convert the integer to a string
if user_details_json is not None: with self.db_pool.connection() as conn:
# Decode the JSON string to a Python dictionary with conn.cursor() as cur:
user_details = json.loads(user_details_json) # First, try to select the existing record
return ClientDetails(node_id=node_id, cur.execute("""
short_name=user_details.get('short_name', 'Unknown'), SELECT node_id, short_name, long_name, hardware_model, role
long_name=user_details.get('long_name', 'Unknown'), FROM client_details
hardware_model=user_details.get('hardware_model', HardwareModel.UNSET), WHERE node_id = %s;
role=user_details.get('role', Config.DeviceConfig.Role.ValueType), """, (node_id_str,))
) result = cur.fetchone()
return ClientDetails(node_id=node_id) if not result:
# If the client is not found, insert a new record
cur.execute("""
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role)
VALUES (%s, %s, %s, %s, %s)
RETURNING node_id, short_name, long_name, hardware_model, role;
""", (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None))
conn.commit()
result = cur.fetchone()
# At this point, we should always have a result, either from SELECT or INSERT
return ClientDetails(
node_id=result[0],
short_name=result[1],
long_name=result[2],
hardware_model=result[3],
role=result[4]
)

View file

@ -1,9 +1,8 @@
import json
import os import os
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from venv import logger from venv import logger
import redis import psycopg
import unishox2 import unishox2
from meshtastic.admin_pb2 import AdminMessage from meshtastic.admin_pb2 import AdminMessage
from meshtastic.config_pb2 import Config from meshtastic.config_pb2 import Config
@ -15,6 +14,7 @@ from meshtastic.remote_hardware_pb2 import HardwareMessage
from meshtastic.storeforward_pb2 import StoreAndForward from meshtastic.storeforward_pb2 import StoreAndForward
from meshtastic.telemetry_pb2 import Telemetry, DeviceMetrics, EnvironmentMetrics, AirQualityMetrics, PowerMetrics from meshtastic.telemetry_pb2 import Telemetry, DeviceMetrics, EnvironmentMetrics, AirQualityMetrics, PowerMetrics
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
from psycopg_pool import ConnectionPool
class _Metrics: class _Metrics:
@ -351,6 +351,22 @@ class _Metrics:
) )
def get_hardware_model_name_from_code(hardware_model):
descriptor = HardwareModel.DESCRIPTOR
for enum_value in descriptor.values:
if enum_value.number == hardware_model:
return enum_value.name
return 'UNKNOWN_HARDWARE_MODEL'
def get_role_name_from_role(role):
descriptor = Config.DeviceConfig.Role.DESCRIPTOR
for enum_value in descriptor.values:
if enum_value.number == role:
return enum_value.name
return 'UNKNOWN_ROLE'
class ClientDetails: class ClientDetails:
def __init__(self, node_id, short_name='Unknown', long_name='Unknown', hardware_model=HardwareModel.UNSET, def __init__(self, node_id, short_name='Unknown', long_name='Unknown', hardware_model=HardwareModel.UNSET,
role=None): role=None):
@ -360,39 +376,30 @@ class ClientDetails:
self.hardware_model: HardwareModel = hardware_model self.hardware_model: HardwareModel = hardware_model
self.role: Config.DeviceConfig.Role = role self.role: Config.DeviceConfig.Role = role
def get_role_name_from_role(self):
descriptor = Config.DeviceConfig.Role.DESCRIPTOR
for enum_value in descriptor.values:
if enum_value.number == self.role:
return enum_value.name
return 'UNKNOWN_ROLE'
def get_hardware_model_name_from_code(self):
descriptor = HardwareModel.DESCRIPTOR
for enum_value in descriptor.values:
if enum_value.number == self.hardware_model:
return enum_value.name
return 'UNKNOWN_HARDWARE_MODEL'
def to_dict(self): def to_dict(self):
return { return {
'node_id': self.node_id, 'node_id': self.node_id,
'short_name': self.short_name, 'short_name': self.short_name,
'long_name': self.long_name, 'long_name': self.long_name,
'hardware_model': self.get_hardware_model_name_from_code(), 'hardware_model': get_hardware_model_name_from_code(self.hardware_model),
'role': self.get_role_name_from_role() 'role': get_role_name_from_role(self.role)
} }
class Processor(ABC): class Processor(ABC):
def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
self.redis_client = redis_client self.db_pool = db_pool
self.metrics = _Metrics(registry) self.metrics = _Metrics(registry)
@abstractmethod @abstractmethod
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
pass pass
def execute_db_operation(self, operation):
with self.db_pool.connection() as conn:
with conn.cursor() as cur:
return operation(cur, conn)
class ProcessorRegistry: class ProcessorRegistry:
_registry = {} _registry = {}
@ -419,14 +426,10 @@ class UnknownAppProcessor(Processor):
@ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP) @ProcessorRegistry.register_processor(PortNum.TEXT_MESSAGE_APP)
class TextMessageAppProcessor(Processor): class TextMessageAppProcessor(Processor):
def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis):
super().__init__(registry, redis_client)
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received TEXT_MESSAGE_APP packet") logger.debug("Received TEXT_MESSAGE_APP packet")
message = payload.decode('utf-8') message = payload.decode('utf-8')
if os.getenv('HIDE_MESSAGE', 'true') == 'true': # Currently there is no use for the message content, if os.getenv('HIDE_MESSAGE', 'true') == 'true':
# but later we could store it in redis or something
message = 'Hidden' message = 'Hidden'
self.metrics.message_length_histogram.labels( self.metrics.message_length_histogram.labels(
client_id=client_details.node_id client_id=client_details.node_id
@ -469,13 +472,52 @@ class NodeInfoAppProcessor(Processor):
logger.debug("Received NODEINFO_APP packet") logger.debug("Received NODEINFO_APP packet")
user = User() user = User()
user.ParseFromString(payload) user.ParseFromString(payload)
client_details.short_name = user.short_name
client_details.long_name = user.long_name def db_operation(cur, conn):
client_details.hardware_model = user.hw_model # First, try to select the existing record
client_details.role = user.role cur.execute("""
user_details_json = json.dumps(client_details.to_dict()) SELECT short_name, long_name, hardware_model, role
self.redis_client.set(f"node:{client_details.node_id}", user_details_json) FROM client_details
pass WHERE node_id = %s;
""", (client_details.node_id,))
existing_record = cur.fetchone()
if existing_record:
# If record exists, update only the fields that are provided in the new data
update_fields = []
update_values = []
if user.short_name:
update_fields.append("short_name = %s")
update_values.append(user.short_name)
if user.long_name:
update_fields.append("long_name = %s")
update_values.append(user.long_name)
if user.hw_model != HardwareModel.UNSET:
update_fields.append("hardware_model = %s")
update_values.append(get_hardware_model_name_from_code(user.hw_model))
if user.role is not None:
update_fields.append("role = %s")
update_values.append(get_role_name_from_role(user.role))
if update_fields:
update_query = f"""
UPDATE client_details
SET {", ".join(update_fields)}
WHERE node_id = %s
"""
cur.execute(update_query, update_values + [client_details.node_id])
else:
# If record doesn't exist, insert a new one
cur.execute("""
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role)
VALUES (%s, %s, %s, %s, %s)
""", (client_details.node_id, user.short_name, user.long_name,
get_hardware_model_name_from_code(user.hw_model), get_role_name_from_role(user.role)))
conn.commit()
self.execute_db_operation(db_operation)
@ProcessorRegistry.register_processor(PortNum.ROUTING_APP) @ProcessorRegistry.register_processor(PortNum.ROUTING_APP)
@ -584,8 +626,8 @@ class RangeTestAppProcessor(Processor):
@ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP) @ProcessorRegistry.register_processor(PortNum.TELEMETRY_APP)
class TelemetryAppProcessor(Processor): class TelemetryAppProcessor(Processor):
def __init__(self, registry: CollectorRegistry, redis_client: redis.Redis): def __init__(self, registry: CollectorRegistry, db_connection: psycopg.connection):
super().__init__(registry, redis_client) super().__init__(registry, db_connection)
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received TELEMETRY_APP packet") logger.debug("Received TELEMETRY_APP packet")

48
main.py
View file

@ -3,15 +3,26 @@ import os
from datetime import datetime from datetime import datetime
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import redis
from dotenv import load_dotenv from dotenv import load_dotenv
from meshtastic.mesh_pb2 import MeshPacket from meshtastic.mesh_pb2 import MeshPacket
from meshtastic.mqtt_pb2 import ServiceEnvelope from meshtastic.mqtt_pb2 import ServiceEnvelope
from paho.mqtt.enums import CallbackAPIVersion from paho.mqtt.enums import CallbackAPIVersion
from prometheus_client import CollectorRegistry, start_http_server from prometheus_client import CollectorRegistry, start_http_server
from psycopg_pool import ConnectionPool
from exporter.processors import MessageProcessor from exporter.processors import MessageProcessor
# Global connection pool
connection_pool = None
def get_connection():
return connection_pool.getconn()
def release_connection(conn):
connection_pool.putconn(conn)
def handle_connect(client, userdata, flags, reason_code, properties): def handle_connect(client, userdata, flags, reason_code, properties):
print(f"Connected with result code {reason_code}") print(f"Connected with result code {reason_code}")
@ -21,37 +32,37 @@ def handle_connect(client, userdata, flags, reason_code, properties):
def handle_message(client, userdata, message): def handle_message(client, userdata, message):
current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"Received message on topic '{message.topic}' at {current_timestamp}") print(f"Received message on topic '{message.topic}' at {current_timestamp}")
# Filter out messages from the 'stat' topic
if '/stat/' in message.topic: if '/stat/' in message.topic:
print(f"Filtered out message from topic containing '/stat/': {message.topic}") print(f"Filtered out message from topic containing '/stat/': {message.topic}")
return return
envelope = ServiceEnvelope() envelope = ServiceEnvelope()
envelope.ParseFromString(message.payload) envelope.ParseFromString(message.payload)
packet: MeshPacket = envelope.packet packet: MeshPacket = envelope.packet
if redis_client.set(str(packet.id), 1, nx=True, ex=os.getenv('redis_expiration', 60), get=True) is not None:
with connection_pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id FROM messages WHERE id = %s", (str(packet.id),))
if cur.fetchone() is not None:
logging.debug(f"Packet {packet.id} already processed") logging.debug(f"Packet {packet.id} already processed")
return return
# Process the packet cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING",
(str(packet.id),))
conn.commit()
processor.process(packet) processor.process(packet)
if __name__ == "__main__": if __name__ == "__main__":
load_dotenv() load_dotenv()
# Create Redis client
try: # Setup a connection pool
redis_client = redis.Redis( connection_pool = ConnectionPool(
host=os.getenv('REDIS_HOST'), os.getenv('DATABASE_URL'),
port=int(os.getenv('REDIS_PORT')), min_size=1,
db=int(os.getenv('REDIS_DB', 0)), max_size=10
password=os.getenv('REDIS_PASSWORD', None),
) )
except Exception as e:
logging.error(f"Failed to connect to Redis: {e}")
exit(1)
# Configure Prometheus exporter # Configure Prometheus exporter
registry = CollectorRegistry() registry = CollectorRegistry()
@ -62,7 +73,6 @@ if __name__ == "__main__":
callback_api_version=CallbackAPIVersion.VERSION2, callback_api_version=CallbackAPIVersion.VERSION2,
protocol=mqtt.MQTTv5 protocol=mqtt.MQTTv5
) )
mqtt_client.on_connect = handle_connect mqtt_client.on_connect = handle_connect
mqtt_client.on_message = handle_message mqtt_client.on_message = handle_message
@ -78,12 +88,12 @@ if __name__ == "__main__":
os.getenv('MQTT_HOST'), os.getenv('MQTT_HOST'),
int(os.getenv('MQTT_PORT')), int(os.getenv('MQTT_PORT')),
keepalive=int(os.getenv('MQTT_KEEPALIVE', 60)), keepalive=int(os.getenv('MQTT_KEEPALIVE', 60)),
) )
except Exception as e: except Exception as e:
logging.error(f"Failed to connect to MQTT broker: {e}") logging.error(f"Failed to connect to MQTT broker: {e}")
exit(1) exit(1)
# Configure the Processor and the Exporter # Configure the Processor and the Exporter
processor = MessageProcessor(registry, redis_client) processor = MessageProcessor(registry, connection_pool)
mqtt_client.loop_forever() mqtt_client.loop_forever()

View file

@ -1,7 +1,8 @@
paho-mqtt~=2.1.0 paho-mqtt~=2.1.0
redis~=5.0.6
python-dotenv~=1.0.1 python-dotenv~=1.0.1
meshtastic~=2.3.11 meshtastic~=2.3.11
prometheus_client~=0.20.0 prometheus_client~=0.20.0
unishox2-py3~=1.0.0 unishox2-py3~=1.0.0
cryptography~=42.0.8 cryptography~=42.0.8
psycopg~=3.1.19
psycopg_pool~=3.2.2