Compare commits

..

85 commits
v1.0.0 ... main

Author SHA1 Message Date
Gleb Tcivie ecb19a4735
Merge pull request #82 from tcivie/PAX-counter-support
Added PAX counter reported metrics to prometheus
2025-02-25 23:03:33 +02:00
Gleb Tcivie 868c2bcab5 .env file 2025-02-25 23:01:34 +02:00
Gleb Tcivie 6a7aa6c112 🐛module 'psycopg_binary.pq' has no attribute 'PGcancelConn' ⬆️ psycopg-binary=3.2.5 2025-02-25 22:56:27 +02:00
Gleb Tcivie 60002b20e9 Added PAX counter reported metrics to prometheus 2025-02-25 22:42:32 +02:00
Gleb Tcivie ae6f8b4160
Merge pull request #81 from tcivie/79-fresh-pull-to-collect-new-hardware-models-and-device-modes-router
Removed invalid meshtastic import and used Buf.build import + fixed security issue with cryptography lib
2025-02-24 21:57:03 +02:00
Gleb Tcivie b78deaf95a Fixed deprecated proto 2025-02-24 21:54:49 +02:00
Gleb Tcivie 92210ee24a Removed invalid meshtastic import and used Buf.build import + fixed security issue with cryptography lib 2025-02-24 21:49:30 +02:00
Gleb Tcivie 5183ade98b
Merge pull request #80 from tcivie/fix-prometheus-stale-metrics
Added simple cleaning to verify if the metrics are stale
2025-02-09 09:26:12 +02:00
Gleb Tcivie 3fb222de43 Added simple cleaning to verify if the metrics are stale 2025-02-01 12:35:57 +02:00
Gleb Tcivie 0281d0b990
Merge pull request #77
Handle broadcast IDs check in client details retrieval
2025-01-05 20:20:59 +02:00
Gleb Tcivie 0650382405 ```
Handle broadcast IDs in client details retrieval

Add a check for broadcast node IDs (4294967295 and 1) in the `_get_client_details` method to return a predefined `ClientDetails` instance. This prevents unnecessary database queries for broadcast IDs and ensures consistent handling of these special cases.
```
2025-01-05 20:20:16 +02:00
Gleb Tcivie 2cfbcaa8cd
Merge pull request #73 from tcivie/72-exporter-creates-doo-many-stale-labels-which-increase-the-load-on-the-prometheus
Added cleanup job for metrics that are not needed anymore
2024-11-23 19:33:10 +02:00
Gleb Tcivie b92b3da4cd Added to requirements.txt 2024-11-23 18:18:29 +02:00
Gleb Tcivie 7b32cb57da Added cleanup job for metrics that are not needed anymore 2024-11-23 18:15:37 +02:00
Gleb Tcivie 1fc4096772
Merge pull request #71 from tcivie/70-list-of-active-deployments-1
Update README.md
2024-11-04 10:42:21 +02:00
Gleb Tcivie 577900faf3
Update README.md 2024-11-04 10:39:52 +02:00
Gleb Tcivie 8eb6184763
Update README.md 2024-10-22 22:01:08 +03:00
Gleb Tcivie 44549c75a6
Update README.md 2024-10-22 21:50:56 +03:00
Gleb Tcivie bc4ff2d6f5
Merge pull request #68 from tcivie/patch-readme
Update README.md
2024-10-22 21:47:27 +03:00
Gleb Tcivie c46049f866 Update README.md 2024-10-22 21:46:45 +03:00
Gleb Tcivie e73837ac6b
Merge pull request #67 from tcivie/dashboard-update
dashboard-update
2024-10-22 21:38:04 +03:00
Gleb Tcivie ddf675be87 Updated the dashboards with a more useful information 2024-10-22 21:32:21 +03:00
Gleb Tcivie c187380bf2 Fixed port on master to be the default for prometheus (9464) 2024-10-22 21:32:01 +03:00
Gleb Tcivie 025259f279
Merge pull request #65 from tcivie/tcivie-patch-1 2024-10-21 18:56:25 +03:00
Gleb Tcivie 5875bde6d9
Update Dockerfile.grafana 2024-10-21 18:54:28 +03:00
Gleb Tcivie f3766c73ec
Update Dockerfile.grafana
Update CVE-2014-9264
2024-10-21 18:41:34 +03:00
Gleb Tcivie f11ca840f7
Merge pull request #61 from panaceya/patch-1
FIX: #60
2024-10-21 16:00:55 +03:00
Gleb Tcivie 091536c79f
Merge branch 'main' into patch-1 2024-10-21 14:40:48 +03:00
Gleb Tcivie 3de42a02d2
Merge pull request #64 from tcivie/alpine-update
Update Dockerfile.exporter
2024-10-21 14:40:18 +03:00
Gleb Tcivie 58eb131488
Update Dockerfile.exporter
Looks like the basic alpine is missing required meshtastic package
2024-10-21 14:38:22 +03:00
panaceya 2b1263ec4a
FIX: #60 2024-10-21 09:20:26 +03:00
Gleb Tcivie e57f315dcb
Merge pull request #59 from tcivie/node-configuration-metrics
Node configuration metrics
2024-08-09 21:18:51 +03:00
Gleb Tcivie 362c06f6e2 Merge remote-tracking branch 'origin/node-configuration-metrics' into node-configuration-metrics 2024-08-09 21:16:22 +03:00
Gleb Tcivie 6f7886435f Merge remote-tracking branch 'origin/node-configuration-metrics' into node-configuration-metrics
# Conflicts:
#	exporter/metric/node_configuration_metrics.py
2024-08-09 21:16:16 +03:00
Gleb Tcivie 93a3947443 Merge remote-tracking branch 'origin/node-configuration-metrics' into node-configuration-metrics
# Conflicts:
#	exporter/metric/node_configuration_metrics.py
2024-08-09 21:13:21 +03:00
Gleb Tcivie 340578ed86 Bugfix (Fixed issue with unknown json data configuration) 2024-08-09 21:13:12 +03:00
Gleb Tcivie e155aeaeae Bugfix (Fixed issue with unknown json data configuration) 2024-08-09 21:11:00 +03:00
Gleb Tcivie 055db4f585
Merge pull request #58 from tcivie/node-configuration-metrics
Bugfix
2024-08-09 20:32:35 +03:00
Gleb Tcivie a694b94cec
Merge branch 'main' into node-configuration-metrics 2024-08-09 20:30:45 +03:00
Gleb Tcivie 0b8bd8e025 Bugfix 2024-08-09 20:29:50 +03:00
Gleb Tcivie 94d8512818
Merge pull request #57 from tcivie/node-configuration-metrics
Node configuration metrics
2024-08-09 13:41:18 +03:00
Gleb Tcivie 6dc3116622 Removed breaking trigger from sql init 2024-08-09 13:36:18 +03:00
Gleb Tcivie 7d1d32d67e Added depends on for the exporter (Ensure it's getting up after the postgress and prometheus) 2024-08-09 13:29:46 +03:00
Gleb Tcivie edc06f001b Updated the grafana boards and added additional node investigation board (Details about nodes and the packets they send) 2024-08-09 13:21:13 +03:00
Gleb Tcivie 1c6eb81889 Refactored the code + added support for node_configuration_metrics 2024-08-09 13:20:31 +03:00
Gleb Tcivie 36ce36287e
Merge pull request #56 from tcivie/50-ingest-from-multiple-mqtt-topics-and-servers-this-somewhat-goes-with-the-other-request
Added support for multiple topics (comma seperated)
2024-08-09 09:24:04 +03:00
Gleb Tcivie 45f47d107a
Update main.yml 2024-08-09 09:22:14 +03:00
Gleb Tcivie c9390ee417 Added support for multiple topics (comma seperated) 2024-08-09 09:18:15 +03:00
Gleb Tcivie 6139b7a968
Merge pull request #53 from tcivie/49-mqtt-import-stops
Performence optimization + redesign
2024-07-26 20:12:12 +03:00
Gleb Tcivie dffc77a9dc Merge remote-tracking branch 'origin/49-mqtt-import-stops' into 49-mqtt-import-stops 2024-07-26 20:09:37 +03:00
Gleb Tcivie 381c29a461 Merge remote-tracking branch 'origin/49-mqtt-import-stops' into 49-mqtt-import-stops
# Conflicts:
#	exporter/processor_base.py
2024-07-26 20:09:32 +03:00
Gleb Tcivie 006ed7ccfb Merge remote-tracking branch 'origin/49-mqtt-import-stops' into 49-mqtt-import-stops
# Conflicts:
#	exporter/processor_base.py
2024-07-26 20:04:52 +03:00
Gleb Tcivie 0b487336fb Bug fixes: Changed Histograms to Gages and updated the dashboards 2024-07-26 20:04:22 +03:00
Gleb Tcivie 3cfadccc27 Removed metric on message size in char len and replaced with overall size of packet by common labels and portnum 2024-07-26 13:21:48 +03:00
Gleb Tcivie 821056664e Removed metric on message size in char len and replaced with overall size of packet by common labels and portnum 2024-07-26 13:18:09 +03:00
Gleb Tcivie ea3f00b466 Added more "Static" data like geolocation to the PostgressDB and removed it from prometheus to reduce the load. + Added support for lookup of Country + City + State for nodes per geolocation. 2024-07-26 13:04:36 +03:00
Gleb Tcivie d3f60cc5ff Added support for filtering specific types of messages from being reported 2024-07-26 12:02:17 +03:00
Gleb Tcivie ed5b1ee0ef
Merge pull request #48 from tcivie/tcivie-patch-1
Update auto-tagging.yml
2024-07-20 18:15:42 +03:00
Gleb Tcivie 7dc721cc43
Update auto-tagging.yml 2024-07-20 18:13:32 +03:00
Gleb Tcivie 9c890fd13e
Merge pull request #47 from tcivie/version-bump-test
Test version bump
2024-07-20 18:09:43 +03:00
Gleb Tcivie fe483759fb
Test version bump 2024-07-20 18:07:37 +03:00
Gleb Tcivie 7a8846fe79
Merge pull request #46 from tcivie/tcivie-patch-1
Update auto-tagging.yml
2024-07-20 18:04:18 +03:00
Gleb Tcivie 1d64675749
Update auto-tagging.yml 2024-07-20 18:02:15 +03:00
Gleb Tcivie 7fc83f581e
Merge pull request #45 from tcivie/tcivie-patch-1
Update README.md
2024-07-20 17:52:31 +03:00
Gleb Tcivie 31dbc6cde9
Update README.md 2024-07-20 17:49:55 +03:00
Gleb Tcivie 681c6a359c
Merge pull request #44 from tcivie/tcivie-patch-1
Update README.md
2024-07-20 17:47:06 +03:00
Gleb Tcivie 67ff04bf6a
Update README.md 2024-07-20 17:44:57 +03:00
Gleb Tcivie add8d0e1f0
Merge pull request #43 from tcivie/tcivie-patch-1
Update auto-tagging.yml
2024-07-20 17:37:46 +03:00
Gleb Tcivie 869401d161
Update auto-tagging.yml 2024-07-20 17:35:03 +03:00
Gleb Tcivie 5b208755d3
Merge pull request #42 from tcivie/tcivie-patch-1
Update auto-tagging.yml
2024-07-20 17:33:52 +03:00
Gleb Tcivie b5fffe831f
Update auto-tagging.yml 2024-07-20 17:30:06 +03:00
Gleb Tcivie 805737f99f
Merge pull request #41 from tcivie/tcivie-patch-1
Update auto-tagging.yml
2024-07-20 17:27:20 +03:00
Gleb Tcivie 6724014ad6 Trigger Build 2024-07-20 17:23:38 +03:00
Gleb Tcivie a3c832e8c5
Update auto-tagging.yml 2024-07-20 17:19:09 +03:00
Gleb Tcivie fe4863cdf1
Merge pull request #40 from tcivie/tcivie-patch-1
Create auto-tagging.yml
2024-07-20 17:16:41 +03:00
Gleb Tcivie 1b4d3aaa8b
Create auto-tagging.yml 2024-07-20 17:14:26 +03:00
Gleb Tcivie d46a322d24
Merge pull request #39 from tcivie/tcivie-patch-1
Create CODEOWNERS
2024-07-19 09:33:55 +03:00
Gleb Tcivie a773d966ba
Create CODEOWNERS 2024-07-19 09:31:58 +03:00
Gleb Tcivie 568abbf4b6
Merge pull request #38 from typicalaimster/reduce_container_size 2024-07-19 09:27:10 +03:00
typicalaimster bb1d3e066e
Update Dockerfile.exporter
Change to alpine to reduce overhead
2024-07-17 15:14:32 -07:00
Gleb Tcivie 5346333087
Merge pull request #37 from tcivie/add-timestamp-to-client-details
Added timestamp colum to client_details
2024-07-15 22:38:56 +03:00
Gleb Tcivie 9bf3df3742
Merge branch 'main' into add-timestamp-to-client-details 2024-07-15 22:35:12 +03:00
Gleb Tcivie 90cda30d49 Added timestamp colum to client_details 2024-07-15 22:34:35 +03:00
Gleb Tcivie ee88bde8dd
Merge pull request #35 from tcivie/tcivie-patch-1
Create FUNDING.yml
2024-07-14 19:21:59 +03:00
Gleb Tcivie 79ad2314bb
Create FUNDING.yml 2024-07-14 19:19:42 +03:00
28 changed files with 3926 additions and 1233 deletions

15
.env
View file

@ -5,7 +5,7 @@ DATABASE_URL=postgres://postgres:postgres@postgres:5432/meshtastic
# Prometheus connection details # Prometheus connection details
PROMETHEUS_COLLECTOR_PORT=9464 PROMETHEUS_COLLECTOR_PORT=9464
PROMETHEUS_JOB=example PROMETHEUS_JOB=meshtastic
# MQTT connection details # MQTT connection details
MQTT_HOST=mqtt.meshtastic.org MQTT_HOST=mqtt.meshtastic.org
@ -13,7 +13,7 @@ MQTT_PORT=1883
MQTT_USERNAME=meshdev MQTT_USERNAME=meshdev
MQTT_PASSWORD=large4cats MQTT_PASSWORD=large4cats
MQTT_KEEPALIVE=60 MQTT_KEEPALIVE=60
MQTT_TOPIC='msh/israel/#' MQTT_TOPIC='msh/EU_868/#,msh/US/#'
MQTT_IS_TLS=false MQTT_IS_TLS=false
# MQTT protocol version (default: MQTTv5) the public MQTT server supports MQTTv311 # MQTT protocol version (default: MQTTv5) the public MQTT server supports MQTTv311
@ -29,9 +29,12 @@ MQTT_CALLBACK_API_VERSION=VERSION2
MESH_HIDE_SOURCE_DATA=false MESH_HIDE_SOURCE_DATA=false
## Hide destination data in the exporter (default: false) ## Hide destination data in the exporter (default: false)
MESH_HIDE_DESTINATION_DATA=false MESH_HIDE_DESTINATION_DATA=false
## Filtered ports in the exporter (default: 1, can be a comma-separated list of ports)
FILTERED_PORTS=0
## Hide message content in the TEXT_MESSAGE_APP packets (default: true) (Currently we only log message length, if we hide then all messages would have the same length)
HIDE_MESSAGE=false
## MQTT server Key for decoding ## MQTT server Key for decoding
MQTT_SERVER_KEY=1PG7OiApB1nwvP+rz05pAQ== MQTT_SERVER_KEY=1PG7OiApB1nwvP+rz05pAQ==
# Message types to filter (default: none) (comma separated) (eg. TEXT_MESSAGE_APP,POSITION_APP)
# Full list can be found here: https://buf.build/meshtastic/protobufs/docs/main:meshtastic#meshtastic.PortNum
EXPORTER_MESSAGE_TYPES_TO_FILTER=TEXT_MESSAGE_APP
# Enable node configurations report (default: true)
REPORT_NODE_CONFIGURATIONS=true

4
.github/CODEOWNERS vendored Normal file
View file

@ -0,0 +1,4 @@
# This is a CODEOWNERS file for automatic assignment of code reviewers on GitHub
# Global rule: @tcivie will be the owner for everything in the repo
* @tcivie

3
.github/FUNDING.yml vendored Normal file
View file

@ -0,0 +1,3 @@
# These are supported funding model platforms
ko_fi: tcivie

28
.github/workflows/auto-tagging.yml vendored Normal file
View file

@ -0,0 +1,28 @@
name: Bump version
on:
pull_request:
types:
- closed
branches:
- master
jobs:
build:
if: github.event.pull_request.merged == true
runs-on: ubuntu-22.04
permissions:
contents: write
steps:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.merge_commit_sha }}
fetch-depth: '0'
- name: Bump version and push tag
uses: anothrNick/github-tag-action@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GIT_API_TAGGING: false
WITH_V: true
PRERELEASE: true
DEFAULT_BUMP: patch

View file

@ -12,7 +12,7 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Start Docker Compose - name: Start Docker Compose
run: docker-compose up -d run: docker compose up -d
- name: Wait for containers to start - name: Wait for containers to start
run: sleep 60 # 1 Minute run: sleep 60 # 1 Minute
@ -23,11 +23,11 @@ jobs:
for service in "${services[@]}" for service in "${services[@]}"
do do
container_id=$(docker-compose ps -q $service) container_id=$(docker compose ps -q $service)
if [ -z "$container_id" ]; then if [ -z "$container_id" ]; then
echo "Error: Container for $service not found" echo "Error: Container for $service not found"
docker-compose logs $service docker compose logs $service
exit 1 exit 1
fi fi
@ -37,14 +37,14 @@ jobs:
if [ "$status" != "running" ]; then if [ "$status" != "running" ]; then
echo "Error: Container $service ($container_id) is not running. Current status: $status" echo "Error: Container $service ($container_id) is not running. Current status: $status"
echo "Last logs for $service:" echo "Last logs for $service:"
docker-compose logs --tail=50 $service docker compose logs --tail=50 $service
exit 1 exit 1
fi fi
if [ "$restarts" -gt 0 ]; then if [ "$restarts" -gt 0 ]; then
echo "Error: Container $service ($container_id) has restarted $restarts times" echo "Error: Container $service ($container_id) has restarted $restarts times"
echo "Last logs for $service:" echo "Last logs for $service:"
docker-compose logs --tail=50 $service docker compose logs --tail=50 $service
exit 1 exit 1
fi fi
@ -65,4 +65,4 @@ jobs:
- name: Clean up - name: Clean up
if: always() if: always()
run: docker-compose down -v run: docker compose down -v

View file

@ -9,5 +9,6 @@
</component> </component>
<component name="PackageRequirementsSettings"> <component name="PackageRequirementsSettings">
<option name="versionSpecifier" value="Greater or equal (&gt;=x.y.z)" /> <option name="versionSpecifier" value="Greater or equal (&gt;=x.y.z)" />
<option name="keepMatchingSpecifier" value="false" />
</component> </component>
</module> </module>

View file

@ -2,7 +2,7 @@
<project version="4"> <project version="4">
<component name="SqlDialectMappings"> <component name="SqlDialectMappings">
<file url="file://$PROJECT_DIR$/docker/postgres/init.sql" dialect="PostgreSQL" /> <file url="file://$PROJECT_DIR$/docker/postgres/init.sql" dialect="PostgreSQL" />
<file url="file://$PROJECT_DIR$/exporter/processor_base.py" dialect="PostgreSQL" /> <file url="file://$PROJECT_DIR$/exporter/processor/processor_base.py" dialect="PostgreSQL" />
<file url="PROJECT" dialect="PostgreSQL" /> <file url="PROJECT" dialect="PostgreSQL" />
</component> </component>
</project> </project>

226
README.md
View file

@ -1,37 +1,141 @@
# Meshtastic Metrics Exporter # Meshtastic Metrics Exporter
[![CodeQL](https://github.com/tcivie/meshtastic-metrics-exporter/actions/workflows/github-code-scanning/codeql/badge.svg)](https://github.com/tcivie/meshtastic-metrics-exporter/actions/workflows/github-code-scanning/codeql)
The `meshtastic-metrics-exporter` is a tool designed to export nearly all available data from an MQTT server to a The `meshtastic-metrics-exporter` is a tool designed to export nearly all available data from an MQTT server
Prometheus server. It comes with a pre-configured Grafana dashboard connected to both data sources, allowing users to to a Prometheus server. It comes with a pre-configured Grafana dashboard connected to both data sources,
start creating dashboards immediately. allowing users to start creating dashboards immediately.
## Public Dashboards
You can explore these public instances to see the exporter in action:
- **Canadaverse Dashboard**: [dash.mt.gt](https://dash.mt.gt) (Guest access: username: `guest`, password: `guest`)
> This instance demonstrates the metrics exporter's capabilities in a production environment, maintained by [@tb0hdan](https://github.com/tb0hdan).
## Features ## Features
- Exports a comprehensive set of metrics from an MQTT server to Prometheus. - Exports a comprehensive set of metrics from an MQTT server to Prometheus.
- Comes with a Grafana dashboard configured to connect to both Prometheus and Postgres data sources. - Comes with a Grafana dashboard configured to connect to both Prometheus and Postgres data sources.
- Comes with some basic dashboards, see the section below for general view of the dashboards - Comes with some basic dashboards, see the section below for general view of the dashboards
- Stores node details (ID, short/long name, hardware details, and client type) in a Postgres server, which is also part of - Stores node details (ID, short/long name, hardware details, and client type) in a Postgres server, which is also part
the package. of the package.
- Configuration via a `.env` file. - Configuration via a `.env` file.
### Grafana Dashboards ### Database Structure
The project comes wtih 2 dashboards.
#### Main Dashboard
<img width="1514" alt="SCR-20240707-qgnn" src="https://github.com/tcivie/meshtastic-metrics-exporter/assets/87943721/9679c140-c5f7-4ea5-bfc6-0173b52fb28c">
> The dashboard has some basic data about the mesh network and it's data is temporarely updated (With new data coming in it would fill out the missing pieces automatically) The system uses PostgreSQL with the following tables:
#### 1. messages
- Stores message IDs and timestamps
- Auto-expires messages older than 1 minute using a trigger
```sql
Columns:
- id (TEXT, PRIMARY KEY)
- received_at (TIMESTAMP)
```
#### 2. node_details
- Stores basic information about mesh nodes
```sql
Columns:
- node_id (VARCHAR, PRIMARY KEY)
- short_name (VARCHAR)
- long_name (VARCHAR)
- hardware_model (VARCHAR)
- role (VARCHAR)
- mqtt_status (VARCHAR, default 'none')
- longitude (INT)
- latitude (INT)
- altitude (INT)
- precision (INT)
- created_at (TIMESTAMP)
- updated_at (TIMESTAMP)
```
#### 3. node_neighbors
- Tracks connections between nodes
```sql
Columns:
- id (SERIAL, PRIMARY KEY)
- node_id (VARCHAR, FOREIGN KEY)
- neighbor_id (VARCHAR, FOREIGN KEY)
- snr (FLOAT)
```
#### 4. node_configurations
- Stores detailed configuration and timing information for nodes
```sql
Columns:
- node_id (VARCHAR, PRIMARY KEY)
- last_updated (TIMESTAMP)
- environment_update_interval (INTERVAL)
- environment_update_last_timestamp (TIMESTAMP)
- device_update_interval (INTERVAL)
- device_update_last_timestamp (TIMESTAMP)
- air_quality_update_interval (INTERVAL)
- air_quality_update_last_timestamp (TIMESTAMP)
- power_update_interval (INTERVAL)
- power_update_last_timestamp (TIMESTAMP)
- range_test_interval (INTERVAL)
- range_test_packets_total (INT)
- range_test_first_packet_timestamp (TIMESTAMP)
- range_test_last_packet_timestamp (TIMESTAMP)
- pax_counter_interval (INTERVAL)
- pax_counter_last_timestamp (TIMESTAMP)
- neighbor_info_interval (INTERVAL)
- neighbor_info_last_timestamp (TIMESTAMP)
- mqtt_encryption_enabled (BOOLEAN)
- mqtt_json_enabled (BOOLEAN)
- mqtt_json_message_timestamp (TIMESTAMP)
- mqtt_configured_root_topic (TEXT)
- mqtt_info_last_timestamp (TIMESTAMP)
- map_broadcast_interval (INTERVAL)
- map_broadcast_last_timestamp (TIMESTAMP)
```
### Grafana Dashboards
The project comes with 2 dashboards.
#### Main Dashboard
<img width="1470" alt="image" src="https://github.com/user-attachments/assets/09fe72e5-23eb-4516-9f34-19e2cc38b7dc">
> The dashboard has some basic data about the mesh network and its data is temporarily updated
> (With new data coming in it would fill out the missing pieces automatically)
**Note:** The dashboard contains links to nodes that target `localhost:3000`.
If you're accessing Grafana from a different host, you'll need to modify these links in the panel configuration
to match your Grafana server's address.
#### User Panel #### User Panel
<img width="1470" alt="SCR-20240707-qhth" src="https://github.com/tcivie/meshtastic-metrics-exporter/assets/87943721/58f15190-127d-4481-b896-1c3e2121dea5">
> This panel can be reached from the "Node ID" link on the main dashboard (The table in the center) or you can go to it from the dashbaords tab in grafana and select the node you want to spectate. This board includes some telemetry data and basic information about the node. ![image](https://github.com/user-attachments/assets/d344b7dd-dadc-4cbe-84cc-44333ea6e0c4)
> This panel can be reached from the "Node ID" link on the main dashboard (The table in the center)
> or you can go to it from the dashboards tab in Grafana and select the node you want to spectate.
> This board includes some telemetry data and basic information about the node.
#### The Node Graph #### The Node Graph
<img width="585" alt="SCR-20240707-qjaj" src="https://github.com/tcivie/meshtastic-metrics-exporter/assets/87943721/d29b2ac4-6291-4095-9938-e6e63df15098"> <img width="585" alt="SCR-20240707-qjaj" src="https://github.com/tcivie/meshtastic-metrics-exporter/assets/87943721/d29b2ac4-6291-4095-9938-e6e63df15098">
> Both boards also include node graph which allows you to view nodes which are sending [Neighbour Info packets](https://meshtastic.org/docs/configuration/module/neighbor-info) > Both boards also include node graph which allows you to view nodes which are sending [Neighbour Info packets](https://meshtastic.org/docs/configuration/module/neighbor-info)
> As long as we have some node which is connected to our MQTT server the data would be read buy the exporter and parsed as node graph. The line colors indicate the SNR value and the arrow is the direction of the flow captured (It can be two way). And the node circle color indicates which node is connected to MQTT (Green) which one is disconnected from MQTT (Red) and unknown (Gray - Never connected to the MQTT server) > As long as we have some node which is connected to our MQTT server the data would be read by the exporter
> and parsed as node graph. The line colors indicate the SNR value and the arrow is the direction of the flow captured
> (It can be two way). And the node circle color indicates which node is connected to MQTT (Green)
> which one is disconnected from MQTT (Red) and unknown (Gray - Never connected to the MQTT server)
**I highly recomend giving the system to stabilize over 24 hours before seeking any useful information from it.** **It is highly recommended to give the system 24 hours to stabilize before seeking any useful information from it.**
## Exported Metrics ## Exported Metrics
@ -42,65 +146,55 @@ Label Notation:
- 🏷️ (source): Indicates that all common labels are used, prefixed with "source_" (e.g., source_node_id, source_short_name, etc.). - 🏷️ (source): Indicates that all common labels are used, prefixed with "source_" (e.g., source_node_id, source_short_name, etc.).
- 🏷️ (destination): Indicates that all common labels are used, prefixed with "destination_" (e.g., destination_node_id, destination_short_name, etc.). - 🏷️ (destination): Indicates that all common labels are used, prefixed with "destination_" (e.g., destination_node_id, destination_short_name, etc.).
The following is a list of metrics exported by the `meshtastic-metrics-exporter`: ### Available Metrics
| Metric Name | Description | Type | Labels | | Metric Name | Description | Type | Labels |
|-----------------------------------|------------------------------------------------------------------------------|-----------|--------------------------------------| |------------------------------------------------|--------------------------------------------------|-----------|--------------------------------------------|
| text_message_app_length | Length of text messages processed by the app | Histogram | 🏷️ | | text_message_app_length | Length of text messages processed by the app | Histogram | 🏷️ |
| device_latitude | Device latitude | Gauge | 🏷️ | | device_latitude | Device latitude | Gauge | 🏷️ |
| device_longitude | Device longitude | Gauge | 🏷️ | | device_longitude | Device longitude | Gauge | 🏷️ |
| device_altitude | Device altitude | Gauge | 🏷️ | | device_altitude | Device altitude | Gauge | 🏷️ |
| device_position_precision | Device position precision | Gauge | 🏷️ | | device_position_precision | Device position precision | Gauge | 🏷️ |
| telemetry_app_ch1_voltage | Voltage measured by the device on channel 1 | Gauge | 🏷️ | | telemetry_app_ch[1-3]_voltage | Voltage measured by the device on channels 1-3 | Gauge | 🏷️ |
| telemetry_app_ch1_current | Current measured by the device on channel 1 | Gauge | 🏷️ | | telemetry_app_ch[1-3]_current | Current measured by the device on channels 1-3 | Gauge | 🏷️ |
| telemetry_app_ch2_voltage | Voltage measured by the device on channel 2 | Gauge | 🏷️ | | telemetry_app_pm[10/25/100]_standard | Concentration Units Standard PM1.0/2.5/10.0 | Gauge | 🏷️ |
| telemetry_app_ch2_current | Current measured by the device on channel 2 | Gauge | 🏷️ | | telemetry_app_pm[10/25/100]_environmental | Concentration Units Environmental PM1.0/2.5/10.0 | Gauge | 🏷️ |
| telemetry_app_ch3_voltage | Voltage measured by the device on channel 3 | Gauge | 🏷️ | | telemetry_app_particles_[03/05/10/25/50/100]um | Particle Count for different sizes | Gauge | 🏷️ |
| telemetry_app_ch3_current | Current measured by the device on channel 3 | Gauge | 🏷️ |
| telemetry_app_pm10_standard | Concentration Units Standard PM1.0 | Gauge | 🏷️ |
| telemetry_app_pm25_standard | Concentration Units Standard PM2.5 | Gauge | 🏷️ |
| telemetry_app_pm100_standard | Concentration Units Standard PM10.0 | Gauge | 🏷️ |
| telemetry_app_pm10_environmental | Concentration Units Environmental PM1.0 | Gauge | 🏷️ |
| telemetry_app_pm25_environmental | Concentration Units Environmental PM2.5 | Gauge | 🏷️ |
| telemetry_app_pm100_environmental | Concentration Units Environmental PM10.0 | Gauge | 🏷️ |
| telemetry_app_particles_03um | 0.3um Particle Count | Gauge | 🏷️ |
| telemetry_app_particles_05um | 0.5um Particle Count | Gauge | 🏷️ |
| telemetry_app_particles_10um | 1.0um Particle Count | Gauge | 🏷️ |
| telemetry_app_particles_25um | 2.5um Particle Count | Gauge | 🏷️ |
| telemetry_app_particles_50um | 5.0um Particle Count | Gauge | 🏷️ |
| telemetry_app_particles_100um | 10.0um Particle Count | Gauge | 🏷️ |
| telemetry_app_temperature | Temperature measured by the device | Gauge | 🏷️ | | telemetry_app_temperature | Temperature measured by the device | Gauge | 🏷️ |
| telemetry_app_relative_humidity | Relative humidity percent measured by the device | Gauge | 🏷️ | | telemetry_app_relative_humidity | Relative humidity percent | Gauge | 🏷️ |
| telemetry_app_barometric_pressure | Barometric pressure in hPA measured by the device | Gauge | 🏷️ | | telemetry_app_barometric_pressure | Barometric pressure in hPA | Gauge | 🏷️ |
| telemetry_app_gas_resistance | Gas resistance in MOhm measured by the device | Gauge | 🏷️ | | telemetry_app_gas_resistance | Gas resistance in MOhm | Gauge | 🏷️ |
| telemetry_app_iaq | IAQ value measured by the device (0-500) | Gauge | 🏷️ | | telemetry_app_iaq | IAQ value (0-500) | Gauge | 🏷️ |
| telemetry_app_distance | Distance measured by the device in mm | Gauge | 🏷️ | | telemetry_app_distance | Distance in mm | Gauge | 🏷️ |
| telemetry_app_lux | Ambient light measured by the device in Lux | Gauge | 🏷️ | | telemetry_app_lux | Ambient light in Lux | Gauge | 🏷️ |
| telemetry_app_white_lux | White light measured by the device in Lux | Gauge | 🏷️ | | telemetry_app_white_lux | White light in Lux | Gauge | 🏷️ |
| telemetry_app_ir_lux | Infrared light measured by the device in Lux | Gauge | 🏷️ | | telemetry_app_ir_lux | Infrared light in Lux | Gauge | 🏷️ |
| telemetry_app_uv_lux | Ultraviolet light measured by the device in Lux | Gauge | 🏷️ | | telemetry_app_uv_lux | Ultraviolet light in Lux | Gauge | 🏷️ |
| telemetry_app_wind_direction | Wind direction in degrees measured by the device | Gauge | 🏷️ | | telemetry_app_wind_direction | Wind direction in degrees | Gauge | 🏷️ |
| telemetry_app_wind_speed | Wind speed in m/s measured by the device | Gauge | 🏷️ | | telemetry_app_wind_speed | Wind speed in m/s | Gauge | 🏷️ |
| telemetry_app_weight | Weight in KG measured by the device | Gauge | 🏷️ | | telemetry_app_weight | Weight in KG | Gauge | 🏷️ |
| telemetry_app_battery_level | Battery level of the device (0-100, >100 means powered) | Gauge | 🏷️ | | telemetry_app_battery_level | Battery level (0-100, >100 means powered) | Gauge | 🏷️ |
| telemetry_app_voltage | Voltage measured by the device | Gauge | 🏷️ | | telemetry_app_voltage | Voltage | Gauge | 🏷️ |
| telemetry_app_channel_utilization | Utilization for the current channel, including well-formed TX, RX, and noise | Gauge | 🏷️ | | telemetry_app_channel_utilization | Channel utilization including TX, RX, and noise | Gauge | 🏷️ |
| telemetry_app_air_util_tx | Percent of airtime for transmission used within the last hour | Gauge | 🏷️ | | telemetry_app_air_util_tx | Airtime utilization for TX in last hour | Gauge | 🏷️ |
| telemetry_app_uptime_seconds | How long the device has been running since the last reboot (in seconds) | Counter | 🏷️ | | telemetry_app_uptime_seconds | Device uptime in seconds | Counter | 🏷️ |
| route_length | Number of nodes in the route | Counter | 🏷️ | | route_length | Number of nodes in route | Counter | 🏷️ |
| route_response | Number of responses to route discovery | Counter | 🏷️, response_type | | route_response | Number of route discovery responses | Counter | 🏷️, response_type |
| mesh_packet_source_types | Types of mesh packets processed by source | Counter | 🏷️ (source), portnum | | mesh_packet_source_types | Mesh packet types by source | Counter | 🏷️ (source), portnum |
| mesh_packet_destination_types | Types of mesh packets processed by destination | Counter | 🏷️ (destination), portnum | | mesh_packet_destination_types | Mesh packet types by destination | Counter | 🏷️ (destination), portnum |
| mesh_packet_total | Total number of mesh packets processed | Counter | 🏷️ (source), 🏷️ (destination) | | mesh_packet_total | Total mesh packets processed | Counter | 🏷️ (source), 🏷️ (destination) |
| mesh_packet_rx_time | Receive time of mesh packets (seconds since 1970) | Histogram | 🏷️ (source), 🏷️ (destination) | | mesh_packet_rx_time | Packet receive time (seconds since 1970) | Histogram | 🏷️ (source), 🏷️ (destination) |
| mesh_packet_rx_snr | Receive SNR of mesh packets | Gauge | 🏷️ (source), 🏷️ (destination) | | mesh_packet_rx_snr | Packet receive SNR | Gauge | 🏷️ (source), 🏷️ (destination) |
| mesh_packet_hop_limit | Hop limit of mesh packets | Counter | 🏷️ (source), 🏷️ (destination) | | mesh_packet_hop_limit | Packet hop limit | Counter | 🏷️ (source), 🏷️ (destination) |
| mesh_packet_want_ack | Occurrences of want ACK for mesh packets | Counter | 🏷️ (source), 🏷️ (destination) | | mesh_packet_want_ack | Want ACK occurrences | Counter | 🏷️ (source), 🏷️ (destination) |
| mesh_packet_via_mqtt | Occurrences of mesh packets sent via MQTT | Counter | 🏷️ (source), 🏷️ (destination) | | mesh_packet_via_mqtt | MQTT transmission occurrences | Counter | 🏷️ (source), 🏷️ (destination) |
| mesh_packet_hop_start | Hop start of mesh packets | Gauge | 🏷️ (source), 🏷️ (destination) | | mesh_packet_hop_start | Packet hop start | Gauge | 🏷️ (source), 🏷️ (destination) |
| mesh_packet_ids | Unique IDs for mesh packets | Counter | 🏷️ (source), 🏷️ (destination), packet_id | | mesh_packet_ids | Unique packet IDs | Counter | 🏷️ (source), 🏷️ (destination), packet_id |
| mesh_packet_channel | Channel used for mesh packets | Counter | 🏷️ (source), 🏷️ (destination), channel | | mesh_packet_channel | Packet channel | Counter | 🏷️ (source), 🏷️ (destination), channel |
| mesh_packet_rx_rssi | Receive RSSI of mesh packets | Gauge | 🏷️ (source), 🏷️ (destination) | | mesh_packet_rx_rssi | Packet receive RSSI | Gauge | 🏷️ (source), 🏷️ (destination) |
| pax_wifi | Number of Wifi devices (PAX) | Gauge | 🏷 |
| pax_ble | Number of Bluetooth devices (PAX) | Gauge | 🏷 |
| pax_uptime | PAX device uptime | Gauge | 🏷 |
## Configuration ## Configuration
@ -152,7 +246,7 @@ MQTT_CALLBACK_API_VERSION=VERSION2
To run the project, simply use Docker Compose: To run the project, simply use Docker Compose:
```bash ```bash
docker-compose up --build docker compose up -d
``` ```
This command will build and start all the necessary services, including the exporter, Prometheus server, Postgres This command will build and start all the necessary services, including the exporter, Prometheus server, Postgres

View file

@ -35,6 +35,9 @@ services:
context: . context: .
dockerfile: ./docker/exporter/Dockerfile.exporter dockerfile: ./docker/exporter/Dockerfile.exporter
restart: unless-stopped restart: unless-stopped
depends_on:
- prometheus
- postgres
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"
env_file: env_file:
@ -43,7 +46,7 @@ services:
- mesh-bridge - mesh-bridge
postgres: postgres:
image: postgres:13.3 image: postgres:16.3
restart: unless-stopped restart: unless-stopped
networks: networks:
- mesh-bridge - mesh-bridge

View file

@ -1,7 +1,8 @@
FROM python FROM python:3.9.20-alpine3.19
LABEL author="Gleb Tcivie" LABEL author="Gleb Tcivie"
WORKDIR /app WORKDIR /app
RUN apk add --update --no-cache gcc libc-dev libffi-dev
COPY ../../requirements.txt . COPY ../../requirements.txt .
COPY ../../.env . COPY ../../.env .
RUN pip3 install -r requirements.txt RUN pip3 install -r requirements.txt

View file

@ -1,4 +1,4 @@
FROM grafana/grafana-oss:11.1.0 FROM grafana/grafana-oss:11.2.2-security-01
# Copy the datasource configuration # Copy the datasource configuration
COPY docker/grafana/provisioning/datasources/datasources.yml /etc/grafana/provisioning/datasources/datasources.yml COPY docker/grafana/provisioning/datasources/datasources.yml /etc/grafana/provisioning/datasources/datasources.yml

View file

@ -0,0 +1,453 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 13,
"links": [],
"panels": [
{
"datasource": {
"type": "datasource",
"uid": "-- Mixed --"
},
"description": "Information about the clients we have in the network and their relative packets sent",
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"custom": {
"align": "center",
"cellOptions": {
"type": "auto",
"wrapText": true
},
"filterable": true,
"inspect": true,
"minWidth": 180
},
"fieldMinMax": true,
"mappings": [
{
"options": {
"none": {
"color": "text",
"index": 2,
"text": "⚪️ Unknown"
},
"offline": {
"color": "red",
"index": 1,
"text": "🛑 offline"
},
"online": {
"color": "green",
"index": 0,
"text": "🟢 online"
}
},
"type": "value"
}
],
"thresholds": {
"mode": "percentage",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "#EAB839",
"value": 70
},
{
"color": "red",
"value": 90
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "MAP_REPORT_APP"
},
"properties": [
{
"id": "custom.cellOptions",
"value": {
"drawStyle": "line",
"hideValue": false,
"type": "sparkline"
}
},
{
"id": "unit",
"value": "packets"
}
]
},
{
"matcher": {
"id": "byName",
"options": "NEIGHBORINFO_APP"
},
"properties": [
{
"id": "custom.cellOptions",
"value": {
"hideValue": false,
"type": "sparkline"
}
},
{
"id": "unit",
"value": "packets"
}
]
},
{
"matcher": {
"id": "byName",
"options": "NODEINFO_APP"
},
"properties": [
{
"id": "custom.cellOptions",
"value": {
"drawStyle": "line",
"hideValue": false,
"lineStyle": {
"dash": [
10,
10
],
"fill": "solid"
},
"type": "sparkline"
}
},
{
"id": "unit",
"value": "packets"
}
]
},
{
"matcher": {
"id": "byName",
"options": "POSITION_APP"
},
"properties": [
{
"id": "custom.cellOptions",
"value": {
"hideValue": false,
"type": "sparkline"
}
},
{
"id": "unit",
"value": "packets"
}
]
},
{
"matcher": {
"id": "byName",
"options": "RANGE_TEST_APP"
},
"properties": [
{
"id": "custom.cellOptions",
"value": {
"hideValue": false,
"type": "sparkline"
}
},
{
"id": "unit",
"value": "packets"
}
]
},
{
"matcher": {
"id": "byName",
"options": "ROUTING_APP"
},
"properties": [
{
"id": "custom.cellOptions",
"value": {
"hideValue": false,
"type": "sparkline"
}
},
{
"id": "unit",
"value": "packets"
}
]
},
{
"matcher": {
"id": "byName",
"options": "TELEMETRY_APP"
},
"properties": [
{
"id": "custom.cellOptions",
"value": {
"hideValue": false,
"type": "sparkline"
}
},
{
"id": "unit",
"value": "packets"
}
]
},
{
"matcher": {
"id": "byName",
"options": "TEXT_MESSAGE_APP"
},
"properties": [
{
"id": "custom.cellOptions",
"value": {
"hideValue": false,
"type": "sparkline"
}
},
{
"id": "unit",
"value": "packets"
}
]
},
{
"matcher": {
"id": "byName",
"options": "TRACEROUTE_APP"
},
"properties": [
{
"id": "custom.cellOptions",
"value": {
"hideValue": false,
"type": "sparkline"
}
},
{
"id": "unit",
"value": "packets"
}
]
},
{
"matcher": {
"id": "byName",
"options": "node_id"
},
"properties": [
{
"id": "unit",
"value": "hex"
}
]
}
]
},
"gridPos": {
"h": 18,
"w": 24,
"x": 0,
"y": 0
},
"id": 3,
"options": {
"cellHeight": "md",
"footer": {
"countRows": true,
"enablePagination": true,
"fields": [],
"reducer": [
"count"
],
"show": true
},
"frameIndex": 1,
"showHeader": true,
"sortBy": [
{
"desc": true,
"displayName": "TELEMETRY_APP"
}
]
},
"pluginVersion": "11.2.2+security-01",
"targets": [
{
"datasource": {
"type": "grafana-postgresql-datasource",
"uid": "PA942B37CCFAF5A81"
},
"editorMode": "code",
"format": "table",
"hide": false,
"rawQuery": true,
"rawSql": "SELECT * FROM node_details",
"refId": "Client Details",
"sql": {
"columns": [
{
"parameters": [
{
"name": "*",
"type": "functionParameter"
}
],
"type": "function"
}
],
"groupBy": [
{
"property": {
"type": "string"
},
"type": "groupBy"
}
],
"limit": 50
},
"table": "client_details"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "sum by(source_id, portnum) (\n mesh_packet_source_types_total\nand\n(mesh_packet_source_types_total - mesh_packet_source_types_total offset $__range) != 0\n)",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": false,
"legendFormat": "__auto",
"range": true,
"refId": "Packet Types",
"useBackend": false
},
{
"datasource": {
"type": "grafana-postgresql-datasource",
"uid": "PA942B37CCFAF5A81"
},
"editorMode": "code",
"format": "table",
"hide": false,
"rawQuery": true,
"rawSql": "SELECT\n node_id,\n CASE\n WHEN EXTRACT(EPOCH FROM environment_update_interval) > 1\n THEN to_char(environment_update_interval, 'HH24:MI:SS')\n ELSE NULL\n END AS environment_update_interval,\n CASE\n WHEN EXTRACT(EPOCH FROM device_update_interval) > 1\n THEN to_char(device_update_interval, 'HH24:MI:SS')\n ELSE NULL\n END AS device_update_interval,\n CASE\n WHEN EXTRACT(EPOCH FROM air_quality_update_interval) > 1\n THEN to_char(air_quality_update_interval, 'HH24:MI:SS')\n ELSE NULL\n END AS air_quality_update_interval,\n CASE\n WHEN EXTRACT(EPOCH FROM power_update_interval) > 1\n THEN to_char(power_update_interval, 'HH24:MI:SS')\n ELSE NULL\n END AS power_update_interval,\n CASE\n WHEN EXTRACT(EPOCH FROM range_test_interval) > 1\n THEN to_char(range_test_interval, 'HH24:MI:SS')\n ELSE NULL\n END AS range_test_interval,\n CASE\n WHEN EXTRACT(EPOCH FROM pax_counter_interval) > 1\n THEN to_char(pax_counter_interval, 'HH24:MI:SS')\n ELSE NULL\n END AS pax_counter_interval,\n CASE\n WHEN EXTRACT(EPOCH FROM neighbor_info_interval) > 1\n THEN to_char(neighbor_info_interval, 'HH24:MI:SS')\n ELSE NULL\n END AS neighbor_info_interval,\n CASE\n WHEN EXTRACT(EPOCH FROM map_broadcast_interval) > 1\n THEN to_char(map_broadcast_interval, 'HH24:MI:SS')\n ELSE NULL\n END AS map_broadcast_interval,\n mqtt_encryption_enabled,\n mqtt_json_enabled,\n mqtt_configured_root_topic,\n last_updated\nFROM node_configurations\nWHERE\n EXTRACT(EPOCH FROM environment_update_interval) > 1 OR\n EXTRACT(EPOCH FROM device_update_interval) > 1 OR\n EXTRACT(EPOCH FROM air_quality_update_interval) > 1 OR\n EXTRACT(EPOCH FROM power_update_interval) > 1 OR\n EXTRACT(EPOCH FROM range_test_interval) > 1 OR\n EXTRACT(EPOCH FROM pax_counter_interval) > 1 OR\n EXTRACT(EPOCH FROM neighbor_info_interval) > 1 OR\n EXTRACT(EPOCH FROM map_broadcast_interval) > 1",
"refId": "A",
"sql": {
"columns": [
{
"parameters": [],
"type": "function"
}
],
"groupBy": [
{
"property": {
"type": "string"
},
"type": "groupBy"
}
],
"limit": 50
}
}
],
"title": "General Information",
"transformations": [
{
"filter": {
"id": "byRefId",
"options": "Packet Types"
},
"id": "timeSeriesTable",
"options": {
"Packet Types": {
"stat": "lastNotNull",
"timeField": "Time"
}
}
},
{
"filter": {
"id": "byRefId",
"options": "Packet Types"
},
"id": "groupingToMatrix",
"options": {
"columnField": "portnum",
"emptyValue": "null",
"rowField": "source_id",
"valueField": "Trend #Packet Types"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "(source_id\\\\portnum)",
"renamePattern": "node_id"
}
},
{
"id": "joinByField",
"options": {
"byField": "node_id",
"mode": "outer"
}
}
],
"transparent": true,
"type": "table"
}
],
"schemaVersion": 39,
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "Investigation Board",
"uid": "adrqynul4j3eoa",
"version": 29,
"weekStart": ""
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,357 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 17,
"links": [],
"panels": [
{
"datasource": {
"type": "grafana-postgresql-datasource",
"uid": "PA942B37CCFAF5A81"
},
"description": "Graph that is built from Neighbor Info reports and shows the signal strenth for each line",
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"custom": {
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "red",
"value": null
},
{
"color": "#EAB839",
"value": -13
},
{
"color": "green",
"value": -7
}
]
},
"unit": "dB"
},
"overrides": []
},
"gridPos": {
"h": 24,
"w": 24,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"basemap": {
"config": {},
"name": "Layer 0",
"opacity": 1,
"tooltip": true,
"type": "xyz"
},
"controls": {
"mouseWheelZoom": true,
"showAttribution": true,
"showDebug": false,
"showMeasure": true,
"showScale": true,
"showZoom": true
},
"layers": [
{
"config": {
"arrow": 1,
"edgeStyle": {
"color": {
"field": "mainstat",
"fixed": "dark-green"
},
"opacity": 1,
"rotation": {
"fixed": 0,
"max": 360,
"min": -360,
"mode": "mod"
},
"size": {
"field": "thickness",
"fixed": 5,
"max": 3,
"min": 1
},
"symbol": {
"fixed": "img/icons/marker/circle.svg",
"mode": "fixed"
},
"symbolAlign": {
"horizontal": "center",
"vertical": "center"
},
"text": {
"field": "mainstat",
"fixed": "",
"mode": "field"
},
"textConfig": {
"fontSize": 10,
"offsetX": 0,
"offsetY": 0,
"textAlign": "center",
"textBaseline": "top"
}
},
"showLegend": false,
"style": {
"color": {
"fixed": "dark-green"
},
"opacity": 1,
"rotation": {
"fixed": 5,
"max": 360,
"min": -360,
"mode": "mod"
},
"size": {
"fixed": 5,
"max": 15,
"min": 2
},
"symbol": {
"fixed": "img/icons/marker/circle.svg",
"mode": "fixed"
},
"symbolAlign": {
"horizontal": "center",
"vertical": "center"
},
"text": {
"field": "title",
"fixed": "",
"mode": "field"
},
"textConfig": {
"fontSize": 12,
"offsetX": 0,
"offsetY": 0,
"textAlign": "left",
"textBaseline": "top"
}
}
},
"name": "Layer 1",
"tooltip": true,
"type": "network"
}
],
"tooltip": {
"mode": "none"
},
"view": {
"allLayers": true,
"id": "coords",
"lat": 31.991767,
"lon": 34.703985,
"zoom": 7.65
}
},
"pluginVersion": "11.1.0",
"targets": [
{
"datasource": {
"type": "postgres",
"uid": "PA942B37CCFAF5A81"
},
"editorMode": "code",
"format": "table",
"rawQuery": true,
"rawSql": "SELECT DISTINCT\n cd.node_id AS \"id\",\n cd.long_name AS \"title\",\n cd.hardware_model AS \"detail__Hardware Detail\",\n cd.role AS \"detail__Client Role\",\n cd.mqtt_status AS \"detail__MQTT Status\",\n cd.short_name AS \"subtitle\",\n cd.longitude * 1e-7 AS \"longitude\",\n cd.latitude * 1e-7 AS \"latitude\"\nFROM\n node_details cd\nLEFT JOIN (\n SELECT node_id FROM node_neighbors\n UNION\n SELECT neighbor_id FROM node_neighbors\n) nn ON cd.node_id = nn.node_id\nWHERE nn.node_id IS NOT NULL",
"refId": "nodes",
"sql": {
"columns": [
{
"alias": "\"id\"",
"parameters": [
{
"name": "node_id",
"type": "functionParameter"
}
],
"type": "function"
},
{
"alias": "\"title\"",
"parameters": [
{
"name": "long_name",
"type": "functionParameter"
}
],
"type": "function"
},
{
"alias": "\"detail__Hardware Detail\"",
"parameters": [
{
"name": "hardware_model",
"type": "functionParameter"
}
],
"type": "function"
},
{
"alias": "\"detail__Client Role\"",
"parameters": [
{
"name": "role",
"type": "functionParameter"
}
],
"type": "function"
},
{
"alias": "\"detail__MQTT Status\"",
"parameters": [
{
"name": "mqtt_status",
"type": "functionParameter"
}
],
"type": "function"
},
{
"alias": "\"subtitle\"",
"parameters": [
{
"name": "short_name",
"type": "functionParameter"
}
],
"type": "function"
}
],
"groupBy": [
{
"property": {
"type": "string"
},
"type": "groupBy"
}
],
"limit": 50
},
"table": "node_details"
},
{
"datasource": {
"type": "postgres",
"uid": "PA942B37CCFAF5A81"
},
"editorMode": "code",
"format": "table",
"hide": false,
"rawQuery": true,
"rawSql": "SELECT \n CONCAT(neighbor_id, '_', node_id) AS id,\n neighbor_id AS \"source\",\n node_id AS \"target\",\n snr AS \"mainstat\",\n CASE\n WHEN snr < -13 THEN '#E74C3C' -- Red for SNR < -13\n WHEN snr < -7 THEN '#F4D03F' -- Yellow for -13 ≤ SNR < -7\n ELSE '#2ECC71' -- Green for SNR ≥ -7\n END AS \"color\",\n GREATEST(0.1, LEAST(2, 1 + ((snr + 13) / 10))) AS \"thickness\"\nFROM \n node_neighbors",
"refId": "edges",
"sql": {
"columns": [
{
"parameters": [
{
"name": "id",
"type": "functionParameter"
}
],
"type": "function"
},
{
"alias": "\"source\"",
"parameters": [
{
"name": "neighbor_id",
"type": "functionParameter"
}
],
"type": "function"
},
{
"alias": "\"target\"",
"parameters": [
{
"name": "node_id",
"type": "functionParameter"
}
],
"type": "function"
},
{
"alias": "\"mainstat\"",
"parameters": [
{
"name": "snr",
"type": "functionParameter"
}
],
"type": "function"
}
],
"groupBy": [
{
"property": {
"type": "string"
},
"type": "groupBy"
}
],
"limit": 50
},
"table": "node_neighbors"
}
],
"title": "Node Graph (Map)",
"type": "geomap"
}
],
"schemaVersion": 39,
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "Node Graph (Map)",
"uid": "cdt467x8uwbgga",
"version": 5,
"weekStart": ""
}

View file

@ -3,7 +3,7 @@ apiVersion: 1
providers: providers:
- name: 'default' - name: 'default'
orgId: 1 orgId: 1
folder: '' folder: 'Main Dashboards'
type: file type: file
disableDeletion: false disableDeletion: false
updateIntervalSeconds: 10 updateIntervalSeconds: 10

View file

@ -19,23 +19,23 @@ CREATE TRIGGER trigger_expire_old_messages
FOR EACH ROW FOR EACH ROW
EXECUTE FUNCTION expire_old_messages(); EXECUTE FUNCTION expire_old_messages();
CREATE TABLE IF NOT EXISTS client_details CREATE TABLE IF NOT EXISTS node_details
( (
node_id VARCHAR PRIMARY KEY, node_id VARCHAR PRIMARY KEY,
-- Base Data
short_name VARCHAR, short_name VARCHAR,
long_name VARCHAR, long_name VARCHAR,
hardware_model VARCHAR, hardware_model VARCHAR,
role VARCHAR, role VARCHAR,
mqtt_status VARCHAR default 'none' mqtt_status VARCHAR default 'none',
); -- Location Data
longitude INT,
CREATE TABLE IF NOT EXISTS node_graph latitude INT,
( altitude INT,
node_id VARCHAR PRIMARY KEY, precision INT,
last_sent_by_node_id VARCHAR, -- SQL Data
last_sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
broadcast_interval_secs INTEGER, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
FOREIGN KEY (node_id) REFERENCES client_details (node_id)
); );
CREATE TABLE IF NOT EXISTS node_neighbors CREATE TABLE IF NOT EXISTS node_neighbors
@ -44,9 +44,108 @@ CREATE TABLE IF NOT EXISTS node_neighbors
node_id VARCHAR, node_id VARCHAR,
neighbor_id VARCHAR, neighbor_id VARCHAR,
snr FLOAT, snr FLOAT,
FOREIGN KEY (node_id) REFERENCES client_details (node_id), FOREIGN KEY (node_id) REFERENCES node_details (node_id),
FOREIGN KEY (neighbor_id) REFERENCES client_details (node_id), FOREIGN KEY (neighbor_id) REFERENCES node_details (node_id),
UNIQUE (node_id, neighbor_id) UNIQUE (node_id, neighbor_id)
); );
CREATE UNIQUE INDEX idx_unique_node_neighbor ON node_neighbors (node_id, neighbor_id); CREATE UNIQUE INDEX idx_unique_node_neighbor ON node_neighbors (node_id, neighbor_id);
CREATE TABLE IF NOT EXISTS node_configurations
(
node_id VARCHAR PRIMARY KEY,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- Configuration (Telemetry)
environment_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
environment_update_last_timestamp TIMESTAMP DEFAULT NOW(),
device_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
device_update_last_timestamp TIMESTAMP DEFAULT NOW(),
air_quality_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
air_quality_update_last_timestamp TIMESTAMP DEFAULT NOW(),
power_update_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
power_update_last_timestamp TIMESTAMP DEFAULT NOW(),
-- Configuration (Range Test)
range_test_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
range_test_packets_total INT DEFAULT 0, -- in packets
range_test_first_packet_timestamp TIMESTAMP DEFAULT NOW(),
range_test_last_packet_timestamp TIMESTAMP DEFAULT NOW(),
-- Configuration (PAX Counter)
pax_counter_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
pax_counter_last_timestamp TIMESTAMP DEFAULT NOW(),
-- Configuration (Neighbor Info)
neighbor_info_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
neighbor_info_last_timestamp TIMESTAMP DEFAULT NOW(),
-- Configuration (MQTT)
mqtt_encryption_enabled BOOLEAN DEFAULT FALSE,
mqtt_json_enabled BOOLEAN DEFAULT FALSE,
mqtt_json_message_timestamp TIMESTAMP DEFAULT NOW(),
mqtt_configured_root_topic TEXT DEFAULT '',
mqtt_info_last_timestamp TIMESTAMP DEFAULT NOW(),
-- Configuration (Map)
map_broadcast_interval INTERVAL DEFAULT '0 seconds' NOT NULL,
map_broadcast_last_timestamp TIMESTAMP DEFAULT NOW(),
-- FOREIGN KEY (node_id) REFERENCES node_details (node_id),
UNIQUE (node_id)
);
-- -- Function to update old values
-- CREATE OR REPLACE FUNCTION update_old_node_configurations()
-- RETURNS TRIGGER AS $$
-- BEGIN
-- -- Update intervals to 0 if not updated in 24 hours
-- IF NEW.environment_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.environment_update_interval := '0 seconds';
-- END IF;
--
-- IF NEW.device_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.device_update_interval := '0 seconds';
-- END IF;
--
-- IF NEW.air_quality_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.air_quality_update_interval := '0 seconds';
-- END IF;
--
-- IF NEW.power_update_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.power_update_interval := '0 seconds';
-- END IF;
--
-- IF NEW.range_test_last_packet_timestamp < NOW() - INTERVAL '1 hours' THEN
-- NEW.range_test_interval := '0 seconds';
-- NEW.range_test_first_packet_timestamp := 0;
-- NEW.range_test_packets_total := 0;
-- END IF;
--
-- IF NEW.pax_counter_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.pax_counter_interval := '0 seconds';
-- END IF;
--
-- IF NEW.neighbor_info_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.neighbor_info_interval := '0 seconds';
-- END IF;
--
-- IF NEW.map_broadcast_last_timestamp < NOW() - INTERVAL '24 hours' THEN
-- NEW.map_broadcast_interval := '0 seconds';
-- END IF;
--
-- NEW.last_updated := CURRENT_TIMESTAMP;
--
-- RETURN NEW;
-- END;
-- $$ LANGUAGE plpgsql;
--
-- -- Create the trigger
-- CREATE TRIGGER update_node_configurations_trigger
-- BEFORE UPDATE ON node_configurations
-- FOR EACH ROW
-- EXECUTE FUNCTION update_old_node_configurations();

View file

@ -1 +1 @@
from .processor_base import MessageProcessor from exporter.processor.processor_base import MessageProcessor

17
exporter/db_handler.py Normal file
View file

@ -0,0 +1,17 @@
from psycopg_pool import ConnectionPool
class DBHandler:
def __init__(self, db_pool: ConnectionPool):
self.db_pool = db_pool
def get_connection(self):
return self.db_pool.getconn()
def release_connection(self, conn):
self.db_pool.putconn(conn)
def execute_db_operation(self, operation):
with self.db_pool.connection() as conn:
with conn.cursor() as cur:
return operation(cur, conn)

View file

View file

@ -0,0 +1,234 @@
import os
from psycopg_pool import ConnectionPool
from exporter.db_handler import DBHandler
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class NodeConfigurationMetrics(metaclass=Singleton):
def __init__(self, connection_pool: ConnectionPool = None):
self.db = DBHandler(connection_pool)
self.report = os.getenv('REPORT_NODE_CONFIGURATIONS', True)
def process_environment_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (node_id,
environment_update_interval,
environment_update_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
environment_update_interval = NOW() - node_configurations.environment_update_last_timestamp,
environment_update_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_device_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (node_id,
device_update_interval,
device_update_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
device_update_interval = NOW() - node_configurations.device_update_last_timestamp,
device_update_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_power_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (node_id,
power_update_interval,
power_update_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
power_update_interval = NOW() - node_configurations.power_update_last_timestamp,
power_update_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def map_broadcast_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (node_id,
map_broadcast_interval,
map_broadcast_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
map_broadcast_interval = NOW() - node_configurations.map_broadcast_last_timestamp,
map_broadcast_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_air_quality_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (node_id,
air_quality_update_interval,
air_quality_update_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
air_quality_update_interval = NOW() - node_configurations.air_quality_update_last_timestamp,
air_quality_update_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_range_test_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (
node_id,
range_test_interval,
range_test_packets_total,
range_test_first_packet_timestamp,
range_test_last_packet_timestamp
) VALUES (%s, %s, NOW(), NOW(), NOW())
ON CONFLICT(node_id)
DO UPDATE SET
range_test_interval = NOW() - node_configurations.range_test_last_packet_timestamp,
range_test_packets_total = CASE
WHEN EXCLUDED.range_test_last_packet_timestamp - node_configurations.range_test_first_packet_timestamp >= INTERVAL '1 hour'
THEN 1
ELSE node_configurations.range_test_packets_total + 1
END,
range_test_first_packet_timestamp = CASE
WHEN EXCLUDED.range_test_last_packet_timestamp - node_configurations.range_test_first_packet_timestamp >= INTERVAL '1 hour'
THEN NOW()
ELSE node_configurations.range_test_first_packet_timestamp
END,
range_test_last_packet_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_pax_counter_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (
node_id,
pax_counter_interval,
pax_counter_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
pax_counter_interval = NOW() - node_configurations.pax_counter_last_timestamp,
pax_counter_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_neighbor_info_update(self, node_id: str):
if not self.report:
return
def db_operation(cur, conn):
cur.execute("""
INSERT INTO node_configurations (
node_id,
neighbor_info_interval,
neighbor_info_last_timestamp
) VALUES (%s, %s, NOW())
ON CONFLICT(node_id)
DO UPDATE SET
neighbor_info_interval = NOW() - node_configurations.neighbor_info_last_timestamp,
neighbor_info_last_timestamp = NOW()
""", (node_id, '0 seconds'))
conn.commit()
self.db.execute_db_operation(db_operation)
def process_mqtt_update(self, node_id: str, mqtt_encryption_enabled=None, mqtt_json_enabled=None,
mqtt_configured_root_topic=None):
if not self.report:
return
def db_operation(cur, conn):
# Update the last MQTT message timestamp for every message
cur.execute("""
UPDATE node_configurations
SET mqtt_info_last_timestamp = NOW()
WHERE node_id = %s
""", (node_id,))
# If it's a JSON message, update the JSON message timestamp
if mqtt_json_enabled:
cur.execute("""
UPDATE node_configurations
SET mqtt_json_message_timestamp = NOW(),
mqtt_json_enabled = TRUE
WHERE node_id = %s
""", (node_id,))
# Perform the main update
cur.execute("""
INSERT INTO node_configurations (
node_id,
mqtt_encryption_enabled,
mqtt_json_enabled,
mqtt_configured_root_topic,
mqtt_info_last_timestamp
) VALUES (%s, COALESCE(%s, FALSE), COALESCE(%s, FALSE), COALESCE(%s, ''), NOW())
ON CONFLICT(node_id)
DO UPDATE SET
mqtt_encryption_enabled = COALESCE(EXCLUDED.mqtt_encryption_enabled, node_configurations.mqtt_encryption_enabled),
mqtt_json_enabled = CASE
WHEN (node_configurations.mqtt_info_last_timestamp - node_configurations.mqtt_json_message_timestamp) > INTERVAL '1 hour'
THEN FALSE
ELSE COALESCE(EXCLUDED.mqtt_json_enabled, node_configurations.mqtt_json_enabled)
END,
mqtt_configured_root_topic = COALESCE(EXCLUDED.mqtt_configured_root_topic, node_configurations.mqtt_configured_root_topic),
mqtt_info_last_timestamp = NOW()
RETURNING mqtt_json_enabled
""", (node_id, mqtt_encryption_enabled, mqtt_json_enabled, mqtt_configured_root_topic))
conn.commit()
self.db.execute_db_operation(db_operation)

View file

@ -1,19 +1,28 @@
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram from datetime import datetime
from prometheus_client import CollectorRegistry, Counter, Gauge
from exporter.client_details import ClientDetails
from exporter.db_handler import DBHandler
class _Metrics: class Metrics:
_instance = None _instance = None
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if not cls._instance: if not cls._instance:
cls._instance = super(_Metrics, cls).__new__(cls) cls._instance = super(Metrics, cls).__new__(cls)
return cls._instance return cls._instance
def __init__(self, registry: CollectorRegistry): def __init__(self, registry: CollectorRegistry, db: DBHandler):
if not hasattr(self, 'initialized'): # Ensuring __init__ runs only once if not hasattr(self, 'initialized'): # Ensuring __init__ runs only once
self._registry = registry self._registry = registry
self._init_metrics() self._init_metrics()
self.initialized = True # Attribute to indicate initialization self.initialized = True # Attribute to indicate initialization
self.db = db
def get_db(self):
return self.db
@staticmethod @staticmethod
def _get_common_labels(): def _get_common_labels():
@ -22,47 +31,32 @@ class _Metrics:
] ]
def _init_metrics(self): def _init_metrics(self):
self._init_metrics_text_message()
self._init_metrics_telemetry_device() self._init_metrics_telemetry_device()
self._init_metrics_telemetry_environment() self._init_metrics_telemetry_environment()
self._init_metrics_telemetry_air_quality() self._init_metrics_telemetry_air_quality()
self._init_metrics_telemetry_power() self._init_metrics_telemetry_power()
self._init_metrics_position()
self._init_route_discovery_metrics() self._init_route_discovery_metrics()
self._init_pax_counter_metrics()
def _init_metrics_text_message(self): def update_metrics_position(self, latitude, longitude, altitude, precision, client_details: ClientDetails):
self.message_length_histogram = Histogram( # Could be used to calculate more complex data (Like distances etc..)
'text_message_app_length', # point = geopy.point.Point(latitude, longitude, altitude) # Not used for now
'Length of text messages processed by the app',
self._get_common_labels(),
registry=self._registry
)
def _init_metrics_position(self): if latitude != 0 and longitude != 0:
self.device_latitude_gauge = Gauge( # location = RateLimiter(self.geolocator.reverse, min_delay_seconds=10, swallow_exceptions=False)((latitude, longitude), language='en', timeout=10)
'device_latitude', # country = location.raw.get('address', {}).get('country', 'Unknown')
'Device latitude', # city = location.raw.get('address', {}).get('city', 'Unknown')
self._get_common_labels(), # state = location.raw.get('address', {}).get('state', 'Unknown')
registry=self._registry
) def db_operation(cur, conn):
self.device_longitude_gauge = Gauge( cur.execute("""
'device_longitude', UPDATE node_details
'Device longitude', SET latitude = %s, longitude = %s, altitude = %s, precision = %s, updated_at = %s
self._get_common_labels(), WHERE node_id = %s
registry=self._registry """, (latitude, longitude, altitude, precision, datetime.now().isoformat(), client_details.node_id))
) conn.commit()
self.device_altitude_gauge = Gauge(
'device_altitude', self.db.execute_db_operation(db_operation)
'Device altitude',
self._get_common_labels(),
registry=self._registry
)
self.device_position_precision_gauge = Gauge(
'device_position_precision',
'Device position precision',
self._get_common_labels(),
registry=self._registry
)
def _init_metrics_telemetry_power(self): def _init_metrics_telemetry_power(self):
self.ch1_voltage_gauge = Gauge( self.ch1_voltage_gauge = Gauge(
@ -337,4 +331,22 @@ class _Metrics:
registry=self._registry registry=self._registry
) )
def _init_pax_counter_metrics(self):
self.pax_wifi_gauge = Gauge(
'pax_wifi',
'Number of WiFi devices',
self._get_common_labels(),
registry=self._registry
)
self.pax_ble_gauge = Gauge(
'pax_ble',
'Number of BLE devices',
self._get_common_labels(),
registry=self._registry
)
self.pax_uptime_gauge = Gauge(
'pax_uptime',
'Uptime of the device',
self._get_common_labels(),
registry=self._registry
)

View file

@ -0,0 +1,100 @@
from datetime import datetime, timedelta
from typing import Any, Dict, Tuple
from apscheduler.schedulers.background import BackgroundScheduler
from prometheus_client import CollectorRegistry
class TrackedMetricsDict(dict):
"""A dictionary that tracks updates for metrics"""
def __init__(self, collector, metric_tracker, *args, **kwargs):
super().__init__(*args, **kwargs)
self._collector = collector
self._metric_tracker = metric_tracker
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._metric_tracker.update_metric_timestamp(self._collector, key)
class MetricTrackingRegistry(CollectorRegistry):
"""Extended CollectorRegistry that tracks metric updates"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metric_tracker = MetricCleanupJob(self)
self._metric_tracker.start()
def register(self, collector):
"""Override register to add update tracking to collectors"""
super().register(collector)
if hasattr(collector, '_metrics'):
# Replace the metrics dict with our tracking version
tracked_dict = TrackedMetricsDict(
collector,
self._metric_tracker,
collector._metrics
)
collector._metrics = tracked_dict
def __del__(self):
"""Ensure cleanup job is stopped when registry is destroyed"""
if hasattr(self, '_metric_tracker'):
self._metric_tracker.stop()
class MetricCleanupJob:
def __init__(self, registry: CollectorRegistry):
self.registry = registry
self.scheduler = BackgroundScheduler()
self.last_updates: Dict[Tuple[Any, Any], datetime] = {}
def start(self):
"""Start the cleanup job to run every hour"""
self.scheduler.add_job(
self.cleanup_stale_metrics,
'interval',
minutes=10,
next_run_time=datetime.now() + timedelta(minutes=1)
)
self.scheduler.start()
def stop(self):
"""Stop the cleanup job"""
self.scheduler.shutdown()
def update_metric_timestamp(self, collector, labels):
"""Update the last modification time for a metric"""
metric_key = (collector, labels)
self.last_updates[metric_key] = datetime.now()
def cleanup_stale_metrics(self):
"""Remove metric entries that haven't been updated in 24 hours"""
try:
current_time = datetime.now()
stale_threshold = current_time - timedelta(hours=24)
for collector, _ in list(self.registry._collector_to_names.items()):
if hasattr(collector, '_metrics'):
labels_to_remove = []
for labels, _ in list(collector._metrics.items()):
metric_key = (collector, labels)
last_update = self.last_updates.get(metric_key)
if last_update is None or last_update < stale_threshold:
labels_to_remove.append(labels)
for labels in labels_to_remove:
try:
del collector._metrics[labels]
metric_key = (collector, labels)
self.last_updates.pop(metric_key, None)
print(f"Removed stale metric entry with labels: {labels}")
except KeyError:
pass
except Exception as e:
print(f"Error removing metric entry: {e}")
except Exception as e:
print(f"Error during metric cleanup: {e}")

View file

View file

@ -1,25 +1,32 @@
import base64 import base64
import json
import os import os
import sys
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics
try: try:
from meshtastic.mesh_pb2 import MeshPacket, Data, HardwareModel from meshtastic.mesh_pb2 import MeshPacket, Data, HardwareModel
from meshtastic.portnums_pb2 import PortNum from meshtastic.portnums_pb2 import PortNum
from meshtastic.mqtt_pb2 import ServiceEnvelope
except ImportError: except ImportError:
from meshtastic.protobuf.mesh_pb2 import MeshPacket, Data, HardwareModel from meshtastic.protobuf.mesh_pb2 import MeshPacket, Data, HardwareModel
from meshtastic.protobuf.portnums_pb2 import PortNum from meshtastic.protobuf.portnums_pb2 import PortNum
from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope
from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge from prometheus_client import CollectorRegistry, Counter, Gauge
from psycopg_pool import ConnectionPool from psycopg_pool import ConnectionPool
from exporter.client_details import ClientDetails from exporter.client_details import ClientDetails
from exporter.processors import ProcessorRegistry from exporter.processor.processors import ProcessorRegistry
class MessageProcessor: class MessageProcessor:
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool): def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
self.message_size_in_bytes = None
self.rx_rssi_gauge = None self.rx_rssi_gauge = None
self.channel_counter = None self.channel_counter = None
self.packet_id_counter = None self.packet_id_counter = None
@ -44,6 +51,17 @@ class MessageProcessor:
'destination_role' 'destination_role'
] ]
reduced_labels = [
'source_id', 'destination_id'
]
self.message_size_in_bytes = Gauge(
'text_message_app_size_in_bytes',
'Size of text messages processed by the app in Bytes',
reduced_labels + ['portnum'],
registry=self.registry
)
self.source_message_type_counter = Counter( self.source_message_type_counter = Counter(
'mesh_packet_source_types', 'mesh_packet_source_types',
'Types of mesh packets processed by source', 'Types of mesh packets processed by source',
@ -65,7 +83,7 @@ class MessageProcessor:
registry=self.registry registry=self.registry
) )
# Histogram for the rx_time (time in seconds) # Histogram for the rx_time (time in seconds)
self.rx_time_histogram = Histogram( self.rx_time_histogram = Gauge(
'mesh_packet_rx_time', 'mesh_packet_rx_time',
'Receive time of mesh packets (seconds since 1970)', 'Receive time of mesh packets (seconds since 1970)',
common_labels, common_labels,
@ -128,6 +146,35 @@ class MessageProcessor:
registry=self.registry registry=self.registry
) )
@staticmethod
def process_json_mqtt(message):
topic = message.topic
json_packet = json.loads(message.payload)
if 'sender' in json_packet:
if json_packet['sender'][0] == '!':
gateway_node_id = str(int(json_packet['sender'][1:], 16))
NodeConfigurationMetrics().process_mqtt_update(
node_id=gateway_node_id,
mqtt_json_enabled=True,
mqtt_encryption_enabled=json_packet.get('encrypted', False),
mqtt_configured_root_topic=topic
)
@staticmethod
def process_mqtt(topic: str, service_envelope: ServiceEnvelope, mesh_packet: MeshPacket):
is_encrypted = False
if getattr(mesh_packet, 'encrypted'):
is_encrypted = True
if getattr(service_envelope, 'gateway_id'):
if service_envelope.gateway_id[0] == '!':
gateway_node_id = str(int(service_envelope.gateway_id[1:], 16))
NodeConfigurationMetrics().process_mqtt_update(
node_id=gateway_node_id,
mqtt_json_enabled=False,
mqtt_encryption_enabled=is_encrypted,
mqtt_configured_root_topic=topic
)
def process(self, mesh_packet: MeshPacket): def process(self, mesh_packet: MeshPacket):
try: try:
if getattr(mesh_packet, 'encrypted'): if getattr(mesh_packet, 'encrypted'):
@ -165,9 +212,6 @@ class MessageProcessor:
short_name='Hidden', short_name='Hidden',
long_name='Hidden') long_name='Hidden')
if port_num in map(int, os.getenv('FILTERED_PORTS', '1').split(',')): # Filter out ports
return None # Ignore this packet
self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details) self.process_simple_packet_details(destination_client_details, mesh_packet, port_num, source_client_details)
processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool) processor = ProcessorRegistry.get_processor(port_num)(self.registry, self.db_pool)
@ -184,7 +228,8 @@ class MessageProcessor:
return enum_value.name return enum_value.name
return 'UNKNOWN_PORT' return 'UNKNOWN_PORT'
def process_simple_packet_details(self, destination_client_details, mesh_packet, port_num, source_client_details): def process_simple_packet_details(self, destination_client_details, mesh_packet: MeshPacket, port_num,
source_client_details):
common_labels = { common_labels = {
'source_id': source_client_details.node_id, 'source_id': source_client_details.node_id,
'source_short_name': source_client_details.short_name, 'source_short_name': source_client_details.short_name,
@ -198,6 +243,16 @@ class MessageProcessor:
'destination_role': destination_client_details.role, 'destination_role': destination_client_details.role,
} }
reduced_labels = {
'source_id': source_client_details.node_id,
'destination_id': destination_client_details.node_id
}
self.message_size_in_bytes.labels(
**reduced_labels,
portnum=self.get_port_name_from_portnum(port_num)
).set(sys.getsizeof(mesh_packet))
self.source_message_type_counter.labels( self.source_message_type_counter.labels(
**common_labels, **common_labels,
portnum=self.get_port_name_from_portnum(port_num) portnum=self.get_port_name_from_portnum(port_num)
@ -214,7 +269,7 @@ class MessageProcessor:
self.rx_time_histogram.labels( self.rx_time_histogram.labels(
**common_labels **common_labels
).observe(mesh_packet.rx_time) ).set(mesh_packet.rx_time)
self.rx_snr_gauge.labels( self.rx_snr_gauge.labels(
**common_labels **common_labels
@ -255,13 +310,15 @@ class MessageProcessor:
).set(mesh_packet.rx_rssi) ).set(mesh_packet.rx_rssi)
def _get_client_details(self, node_id: int) -> ClientDetails: def _get_client_details(self, node_id: int) -> ClientDetails:
if node_id == 4294967295 or node_id == 1: # FFFFFFFF or 1 (Broadcast)
return ClientDetails(node_id=node_id, short_name='Broadcast', long_name='Broadcast')
node_id_str = str(node_id) # Convert the integer to a string node_id_str = str(node_id) # Convert the integer to a string
with self.db_pool.connection() as conn: with self.db_pool.connection() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
# First, try to select the existing record # First, try to select the existing record
cur.execute(""" cur.execute("""
SELECT node_id, short_name, long_name, hardware_model, role SELECT node_id, short_name, long_name, hardware_model, role
FROM client_details FROM node_details
WHERE node_id = %s; WHERE node_id = %s;
""", (node_id_str,)) """, (node_id_str,))
result = cur.fetchone() result = cur.fetchone()
@ -269,7 +326,7 @@ class MessageProcessor:
if not result: if not result:
# If the client is not found, insert a new record # If the client is not found, insert a new record
cur.execute(""" cur.execute("""
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role) INSERT INTO node_details (node_id, short_name, long_name, hardware_model, role)
VALUES (%s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s)
RETURNING node_id, short_name, long_name, hardware_model, role; RETURNING node_id, short_name, long_name, hardware_model, role;
""", (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None)) """, (node_id_str, 'Unknown', 'Unknown', HardwareModel.UNSET, None))

View file

@ -1,10 +1,14 @@
import os import os
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime
from venv import logger from venv import logger
import psycopg import psycopg
import unishox2 import unishox2
from exporter.db_handler import DBHandler
from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics
try: try:
from meshtastic.admin_pb2 import AdminMessage from meshtastic.admin_pb2 import AdminMessage
from meshtastic.mesh_pb2 import Position, User, HardwareModel, Routing, Waypoint, RouteDiscovery, NeighborInfo from meshtastic.mesh_pb2 import Position, User, HardwareModel, Routing, Waypoint, RouteDiscovery, NeighborInfo
@ -30,23 +34,18 @@ from prometheus_client import CollectorRegistry
from psycopg_pool import ConnectionPool from psycopg_pool import ConnectionPool
from exporter.client_details import ClientDetails from exporter.client_details import ClientDetails
from exporter.registry import _Metrics from exporter.metric.node_metrics import Metrics
class Processor(ABC): class Processor(ABC):
def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool): def __init__(self, registry: CollectorRegistry, db_pool: ConnectionPool):
self.db_pool = db_pool self.db_pool = db_pool
self.metrics = _Metrics(registry) self.metrics = Metrics(registry, DBHandler(db_pool))
@abstractmethod @abstractmethod
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
pass pass
def execute_db_operation(self, operation):
with self.db_pool.connection() as conn:
with conn.cursor() as cur:
return operation(cur, conn)
class ProcessorRegistry: class ProcessorRegistry:
_registry = {} _registry = {}
@ -54,6 +53,11 @@ class ProcessorRegistry:
@classmethod @classmethod
def register_processor(cls, port_num): def register_processor(cls, port_num):
def inner_wrapper(wrapped_class): def inner_wrapper(wrapped_class):
if PortNum.DESCRIPTOR.values_by_number[port_num].name in os.getenv('EXPORTER_MESSAGE_TYPES_TO_FILTER',
'').split(','):
logger.info(f"Processor for port_num {port_num} is filtered out")
return wrapped_class
cls._registry[port_num] = wrapped_class cls._registry[port_num] = wrapped_class
return wrapped_class return wrapped_class
@ -71,7 +75,6 @@ class ProcessorRegistry:
@ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP) @ProcessorRegistry.register_processor(PortNum.UNKNOWN_APP)
class UnknownAppProcessor(Processor): class UnknownAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received UNKNOWN_APP packet")
return None return None
@ -79,12 +82,7 @@ class UnknownAppProcessor(Processor):
class TextMessageAppProcessor(Processor): class TextMessageAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received TEXT_MESSAGE_APP packet") logger.debug("Received TEXT_MESSAGE_APP packet")
message = payload.decode('utf-8') pass
if os.getenv('HIDE_MESSAGE', 'true') == 'true':
message = 'Hidden'
self.metrics.message_length_histogram.labels(
**client_details.to_dict()
).observe(len(message))
@ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP) @ProcessorRegistry.register_processor(PortNum.REMOTE_HARDWARE_APP)
@ -109,18 +107,10 @@ class PositionAppProcessor(Processor):
except Exception as e: except Exception as e:
logger.error(f"Failed to parse POSITION_APP packet: {e}") logger.error(f"Failed to parse POSITION_APP packet: {e}")
return return
self.metrics.device_latitude_gauge.labels(
**client_details.to_dict() self.metrics.update_metrics_position(
).set(position.latitude_i) position.latitude_i, position.longitude_i, position.altitude,
self.metrics.device_longitude_gauge.labels( position.precision_bits, client_details)
**client_details.to_dict()
).set(position.longitude_i)
self.metrics.device_altitude_gauge.labels(
**client_details.to_dict()
).set(position.altitude)
self.metrics.device_position_precision_gauge.labels(
**client_details.to_dict()
).set(position.precision_bits)
pass pass
@ -139,7 +129,7 @@ class NodeInfoAppProcessor(Processor):
# First, try to select the existing record # First, try to select the existing record
cur.execute(""" cur.execute("""
SELECT short_name, long_name, hardware_model, role SELECT short_name, long_name, hardware_model, role
FROM client_details FROM node_details
WHERE node_id = %s; WHERE node_id = %s;
""", (client_details.node_id,)) """, (client_details.node_id,))
existing_record = cur.fetchone() existing_record = cur.fetchone()
@ -162,16 +152,17 @@ class NodeInfoAppProcessor(Processor):
update_values.append(ClientDetails.get_role_name_from_role(user.role)) update_values.append(ClientDetails.get_role_name_from_role(user.role))
if update_fields: if update_fields:
update_fields.append("updated_at = %s")
update_query = f""" update_query = f"""
UPDATE client_details UPDATE node_details
SET {", ".join(update_fields)} SET {", ".join(update_fields)}
WHERE node_id = %s WHERE node_id = %s
""" """
cur.execute(update_query, update_values + [client_details.node_id]) cur.execute(update_query, update_values + [datetime.now().isoformat(), client_details.node_id])
else: else:
# If record doesn't exist, insert a new one # If record doesn't exist, insert a new one
cur.execute(""" cur.execute("""
INSERT INTO client_details (node_id, short_name, long_name, hardware_model, role) INSERT INTO node_details (node_id, short_name, long_name, hardware_model, role)
VALUES (%s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s)
""", (client_details.node_id, user.short_name, user.long_name, """, (client_details.node_id, user.short_name, user.long_name,
ClientDetails.get_hardware_model_name_from_code(user.hw_model), ClientDetails.get_hardware_model_name_from_code(user.hw_model),
@ -179,7 +170,7 @@ class NodeInfoAppProcessor(Processor):
conn.commit() conn.commit()
self.execute_db_operation(db_operation) self.metrics.get_db().execute_db_operation(db_operation)
@ProcessorRegistry.register_processor(PortNum.ROUTING_APP) @ProcessorRegistry.register_processor(PortNum.ROUTING_APP)
@ -269,12 +260,23 @@ class IpTunnelAppProcessor(Processor):
class PaxCounterAppProcessor(Processor): class PaxCounterAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received PAXCOUNTER_APP packet") logger.debug("Received PAXCOUNTER_APP packet")
NodeConfigurationMetrics().process_pax_counter_update(client_details.node_id)
paxcounter = Paxcount() paxcounter = Paxcount()
try: try:
paxcounter.ParseFromString(payload) paxcounter.ParseFromString(payload)
except Exception as e: except Exception as e:
logger.error(f"Failed to parse PAXCOUNTER_APP packet: {e}") logger.error(f"Failed to parse PAXCOUNTER_APP packet: {e}")
return return
self.metrics.pax_wifi_gauge.labels(
**client_details.to_dict()
).set(paxcounter.wifi)
self.metrics.pax_ble_gauge.labels(
**client_details.to_dict()
).set(paxcounter.ble)
self.metrics.pax_uptime_gauge.labels(
**client_details.to_dict()
).set(paxcounter.uptime)
@ProcessorRegistry.register_processor(PortNum.SERIAL_APP) @ProcessorRegistry.register_processor(PortNum.SERIAL_APP)
@ -300,6 +302,7 @@ class StoreForwardAppProcessor(Processor):
class RangeTestAppProcessor(Processor): class RangeTestAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received RANGE_TEST_APP packet") logger.debug("Received RANGE_TEST_APP packet")
NodeConfigurationMetrics().process_range_test_update(client_details.node_id)
pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9 pass # NOTE: This portnum traffic is not sent to the public MQTT starting at firmware version 2.2.9
@ -318,6 +321,7 @@ class TelemetryAppProcessor(Processor):
return return
if telemetry.HasField('device_metrics'): if telemetry.HasField('device_metrics'):
NodeConfigurationMetrics().process_device_update(client_details.node_id)
device_metrics: DeviceMetrics = telemetry.device_metrics device_metrics: DeviceMetrics = telemetry.device_metrics
self.metrics.battery_level_gauge.labels( self.metrics.battery_level_gauge.labels(
**client_details.to_dict() **client_details.to_dict()
@ -340,6 +344,7 @@ class TelemetryAppProcessor(Processor):
).inc(getattr(device_metrics, 'uptime_seconds', 0)) ).inc(getattr(device_metrics, 'uptime_seconds', 0))
if telemetry.HasField('environment_metrics'): if telemetry.HasField('environment_metrics'):
NodeConfigurationMetrics().process_environment_update(client_details.node_id)
environment_metrics: EnvironmentMetrics = telemetry.environment_metrics environment_metrics: EnvironmentMetrics = telemetry.environment_metrics
self.metrics.temperature_gauge.labels( self.metrics.temperature_gauge.labels(
**client_details.to_dict() **client_details.to_dict()
@ -394,6 +399,7 @@ class TelemetryAppProcessor(Processor):
).set(getattr(environment_metrics, 'weight', 0)) ).set(getattr(environment_metrics, 'weight', 0))
if telemetry.HasField('air_quality_metrics'): if telemetry.HasField('air_quality_metrics'):
NodeConfigurationMetrics().process_air_quality_update(client_details.node_id)
air_quality_metrics: AirQualityMetrics = telemetry.air_quality_metrics air_quality_metrics: AirQualityMetrics = telemetry.air_quality_metrics
self.metrics.pm10_standard_gauge.labels( self.metrics.pm10_standard_gauge.labels(
**client_details.to_dict() **client_details.to_dict()
@ -444,6 +450,7 @@ class TelemetryAppProcessor(Processor):
).set(getattr(air_quality_metrics, 'particles_100um', 0)) ).set(getattr(air_quality_metrics, 'particles_100um', 0))
if telemetry.HasField('power_metrics'): if telemetry.HasField('power_metrics'):
NodeConfigurationMetrics().process_power_update(client_details.node_id)
power_metrics: PowerMetrics = telemetry.power_metrics power_metrics: PowerMetrics = telemetry.power_metrics
self.metrics.ch1_voltage_gauge.labels( self.metrics.ch1_voltage_gauge.labels(
**client_details.to_dict() **client_details.to_dict()
@ -505,30 +512,15 @@ class TraceRouteAppProcessor(Processor):
class NeighborInfoAppProcessor(Processor): class NeighborInfoAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received NEIGHBORINFO_APP packet") logger.debug("Received NEIGHBORINFO_APP packet")
NodeConfigurationMetrics().process_neighbor_info_update(client_details.node_id)
neighbor_info = NeighborInfo() neighbor_info = NeighborInfo()
try: try:
neighbor_info.ParseFromString(payload) neighbor_info.ParseFromString(payload)
except Exception as e: except Exception as e:
logger.error(f"Failed to parse NEIGHBORINFO_APP packet: {e}") logger.error(f"Failed to parse NEIGHBORINFO_APP packet: {e}")
return return
self.update_node_graph(neighbor_info, client_details)
self.update_node_neighbors(neighbor_info, client_details) self.update_node_neighbors(neighbor_info, client_details)
def update_node_graph(self, neighbor_info: NeighborInfo, client_details: ClientDetails):
def operation(cur, conn):
cur.execute("""
INSERT INTO node_graph (node_id, last_sent_by_node_id, broadcast_interval_secs)
VALUES (%s, %s, %s)
ON CONFLICT (node_id)
DO UPDATE SET
last_sent_by_node_id = EXCLUDED.last_sent_by_node_id,
broadcast_interval_secs = EXCLUDED.broadcast_interval_secs,
last_sent_at = CURRENT_TIMESTAMP
""", (client_details.node_id, neighbor_info.last_sent_by_id, neighbor_info.node_broadcast_interval_secs))
conn.commit()
self.execute_db_operation(operation)
def update_node_neighbors(self, neighbor_info: NeighborInfo, client_details: ClientDetails): def update_node_neighbors(self, neighbor_info: NeighborInfo, client_details: ClientDetails):
def operation(cur, conn): def operation(cur, conn):
new_neighbor_ids = [str(neighbor.node_id) for neighbor in neighbor_info.neighbors] new_neighbor_ids = [str(neighbor.node_id) for neighbor in neighbor_info.neighbors]
@ -550,18 +542,18 @@ class NeighborInfoAppProcessor(Processor):
DO UPDATE SET snr = EXCLUDED.snr DO UPDATE SET snr = EXCLUDED.snr
RETURNING node_id, neighbor_id RETURNING node_id, neighbor_id
) )
INSERT INTO client_details (node_id) INSERT INTO node_details (node_id)
SELECT node_id FROM upsert SELECT node_id FROM upsert
WHERE NOT EXISTS (SELECT 1 FROM client_details WHERE node_id = upsert.node_id) WHERE NOT EXISTS (SELECT 1 FROM node_details WHERE node_id = upsert.node_id)
UNION UNION
SELECT neighbor_id FROM upsert SELECT neighbor_id FROM upsert
WHERE NOT EXISTS (SELECT 1 FROM client_details WHERE node_id = upsert.neighbor_id) WHERE NOT EXISTS (SELECT 1 FROM node_details WHERE node_id = upsert.neighbor_id)
ON CONFLICT (node_id) DO NOTHING; ON CONFLICT (node_id) DO NOTHING;
""", (str(client_details.node_id), str(neighbor.node_id), float(neighbor.snr))) """, (str(client_details.node_id), str(neighbor.node_id), float(neighbor.snr)))
conn.commit() conn.commit()
self.execute_db_operation(operation) self.metrics.get_db().execute_db_operation(operation)
@ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN) @ProcessorRegistry.register_processor(PortNum.ATAK_PLUGIN)
@ -575,6 +567,7 @@ class AtakPluginProcessor(Processor):
class MapReportAppProcessor(Processor): class MapReportAppProcessor(Processor):
def process(self, payload: bytes, client_details: ClientDetails): def process(self, payload: bytes, client_details: ClientDetails):
logger.debug("Received MAP_REPORT_APP packet") logger.debug("Received MAP_REPORT_APP packet")
NodeConfigurationMetrics().map_broadcast_update(client_details.node_id)
map_report = MapReport() map_report = MapReport()
try: try:
map_report.ParseFromString(payload) map_report.ParseFromString(payload)

31
main.py
View file

@ -6,6 +6,8 @@ import paho.mqtt.client as mqtt
from dotenv import load_dotenv from dotenv import load_dotenv
from constants import callback_api_version_map, protocol_map from constants import callback_api_version_map, protocol_map
from exporter.metric.node_configuration_metrics import NodeConfigurationMetrics
from exporter.metric_cleanup_job import MetricTrackingRegistry
try: try:
from meshtastic.mesh_pb2 import MeshPacket from meshtastic.mesh_pb2 import MeshPacket
@ -14,12 +16,9 @@ except ImportError:
from meshtastic.protobuf.mesh_pb2 import MeshPacket from meshtastic.protobuf.mesh_pb2 import MeshPacket
from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope
from prometheus_client import CollectorRegistry, start_http_server from prometheus_client import start_http_server
from psycopg_pool import ConnectionPool from psycopg_pool import ConnectionPool
from exporter.processor_base import MessageProcessor
# Global connection pool
connection_pool = None connection_pool = None
@ -33,15 +32,18 @@ def release_connection(conn):
def handle_connect(client, userdata, flags, reason_code, properties): def handle_connect(client, userdata, flags, reason_code, properties):
print(f"Connected with result code {reason_code}") print(f"Connected with result code {reason_code}")
client.subscribe(os.getenv('MQTT_TOPIC', 'msh/israel/#')) topics = os.getenv('MQTT_TOPIC', 'msh/israel/#').split(',')
topics_tuples = [(topic, 0) for topic in topics]
client.subscribe(topics_tuples)
def update_node_status(node_number, status): def update_node_status(node_number, status):
with connection_pool.connection() as conn: with connection_pool.connection() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute("INSERT INTO client_details (node_id, mqtt_status) VALUES (%s, %s)" cur.execute("INSERT INTO node_details (node_id, mqtt_status, short_name, long_name) VALUES (%s, %s, %s, %s)"
"ON CONFLICT(node_id)" "ON CONFLICT(node_id)"
"DO UPDATE SET mqtt_status = %s", (node_number, status, status)) "DO UPDATE SET mqtt_status = %s",
(node_number, status, 'Unknown (MQTT)', 'Unknown (MQTT)', status))
conn.commit() conn.commit()
@ -49,6 +51,7 @@ def handle_message(client, userdata, message):
current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"Received message on topic '{message.topic}' at {current_timestamp}") print(f"Received message on topic '{message.topic}' at {current_timestamp}")
if '/json/' in message.topic: if '/json/' in message.topic:
processor.process_json_mqtt(message)
# Ignore JSON messages as there are also protobuf messages sent on other topic # Ignore JSON messages as there are also protobuf messages sent on other topic
# Source: https://github.com/meshtastic/firmware/blob/master/src/mqtt/MQTT.cpp#L448 # Source: https://github.com/meshtastic/firmware/blob/master/src/mqtt/MQTT.cpp#L448
return return
@ -79,7 +82,7 @@ def handle_message(client, userdata, message):
cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING", cur.execute("INSERT INTO messages (id, received_at) VALUES (%s, NOW()) ON CONFLICT (id) DO NOTHING",
(str(packet.id),)) (str(packet.id),))
conn.commit() conn.commit()
processor.process_mqtt(message.topic, envelope, packet)
processor.process(packet) processor.process(packet)
except Exception as e: except Exception as e:
logging.error(f"Failed to handle message: {e}") logging.error(f"Failed to handle message: {e}")
@ -89,16 +92,20 @@ def handle_message(client, userdata, message):
if __name__ == "__main__": if __name__ == "__main__":
load_dotenv() load_dotenv()
# We have to load_dotenv before we can import MessageProcessor to allow filtering of message types
from exporter.processor.processor_base import MessageProcessor
# Setup a connection pool # Setup a connection pool
connection_pool = ConnectionPool( connection_pool = ConnectionPool(
os.getenv('DATABASE_URL'), os.getenv('DATABASE_URL'),
min_size=1, max_size=100
max_size=10
) )
# Configure node configuration metrics
node_conf_metrics = NodeConfigurationMetrics(connection_pool)
# Configure Prometheus exporter # Configure Prometheus exporter
registry = CollectorRegistry() registry = MetricTrackingRegistry()
start_http_server(int(os.getenv('PROMETHEUS_COLLECTOR_PORT', 8000)), registry=registry) start_http_server(int(os.getenv('PROMETHEUS_COLLECTOR_PORT', 9464)), registry=registry)
# Create an MQTT client # Create an MQTT client
mqtt_protocol = os.getenv('MQTT_PROTOCOL', 'MQTTv5') mqtt_protocol = os.getenv('MQTT_PROTOCOL', 'MQTTv5')

View file

@ -1,9 +1,15 @@
paho-mqtt~=2.1.0 paho-mqtt>=2.1.0
python-dotenv~=1.0.1 python-dotenv>=1.0.1
prometheus_client~=0.20.0 prometheus_client>=0.21.1
unishox2-py3~=1.0.0 unishox2-py3~=1.0.0
cryptography~=42.0.8 cryptography>=44.0.1
psycopg~=3.1.19 psycopg>=3.2.5
psycopg_pool~=3.2.2 psycopg_pool~=3.2.2
meshtastic~=2.3.13 psycopg-binary>=3.2.5
psycopg-binary~=3.1.20 geopy>=2.4.1
psycopg-pool>=3.2.5
APScheduler>=3.11.0
# Meshtastic Protocol Buffers
meshtastic-protobufs-protocolbuffers-python==29.3.0.1.20241006120827+cc36fd21e859
--extra-index-url https://buf.build/gen/python