meshmap.net/cmd/meshobserv/meshobserv.go

222 lines
6.7 KiB
Go
Raw Normal View History

2024-05-06 15:47:19 -07:00
package main
import (
"errors"
"flag"
"io/fs"
"log"
"os"
"os/signal"
"regexp"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/brianshea2/meshmap.net/internal/meshtastic"
"github.com/brianshea2/meshmap.net/internal/meshtastic/generated"
"google.golang.org/protobuf/proto"
)
const (
NodeExpiration = 172800 // 2 days
PruneWriteInterval = time.Minute
)
var (
Nodes meshtastic.NodeDB
NodesMutex sync.Mutex
Receiving atomic.Bool
)
func handleMessage(from uint32, topic string, portNum generated.PortNum, payload []byte) {
Receiving.Store(true)
switch portNum {
case generated.PortNum_TEXT_MESSAGE_APP:
log.Printf("[msg] %v (%v) %s: \"%s\"", from, topic, portNum, payload)
case generated.PortNum_POSITION_APP:
var position generated.Position
if err := proto.Unmarshal(payload, &position); err != nil {
log.Printf("[warn] could not parse Position payload from %v on %v: %v", from, topic, err)
return
}
latitude := position.GetLatitudeI()
longitude := position.GetLongitudeI()
precision := position.GetPrecisionBits()
log.Printf("[msg] %v (%v) %s: (%v, %v) %v/32", from, topic, portNum, latitude, longitude, precision)
if latitude == 0 && longitude == 0 {
return
}
NodesMutex.Lock()
if Nodes[from] == nil {
Nodes[from] = meshtastic.NewNode(topic)
}
Nodes[from].UpdatePosition(latitude, longitude, precision)
Nodes[from].UpdateSeenBy(topic)
NodesMutex.Unlock()
case generated.PortNum_NODEINFO_APP:
var user generated.User
if err := proto.Unmarshal(payload, &user); err != nil {
log.Printf("[warn] could not parse User payload from %v on %v: %v", from, topic, err)
return
}
longName := user.GetLongName()
shortName := user.GetShortName()
hwModel := user.GetHwModel().String()
role := user.GetRole().String()
log.Printf("[msg] %v (%v) %s: {\"%v\" \"%v\" %v %v}", from, topic, portNum, longName, shortName, hwModel, role)
if len(longName) == 0 {
return
}
NodesMutex.Lock()
if Nodes[from] == nil {
Nodes[from] = meshtastic.NewNode(topic)
}
Nodes[from].UpdateUser(longName, shortName, hwModel, role)
NodesMutex.Unlock()
case generated.PortNum_TELEMETRY_APP:
var telemetry generated.Telemetry
if err := proto.Unmarshal(payload, &telemetry); err != nil {
log.Printf("[warn] could not parse Telemetry payload from %v on %v: %v", from, topic, err)
return
}
if deviceMetrics := telemetry.GetDeviceMetrics(); deviceMetrics != nil {
batteryLevel := deviceMetrics.GetBatteryLevel()
voltage := deviceMetrics.GetVoltage()
chUtil := deviceMetrics.GetChannelUtilization()
airUtilTx := deviceMetrics.GetAirUtilTx()
uptime := deviceMetrics.GetUptimeSeconds()
log.Printf(
"[msg] %v (%v) %s: DeviceMetrics{power: %v%% (%vV); chUtil: %v%%; airUtilTx: %v%%; uptime: %vs}",
from, topic, portNum, batteryLevel, voltage, chUtil, airUtilTx, uptime,
)
NodesMutex.Lock()
if Nodes[from] == nil {
Nodes[from] = meshtastic.NewNode(topic)
}
Nodes[from].UpdateDeviceMetrics(batteryLevel, voltage, chUtil, airUtilTx, uptime)
NodesMutex.Unlock()
}
case generated.PortNum_NEIGHBORINFO_APP:
var neighborInfo generated.NeighborInfo
if err := proto.Unmarshal(payload, &neighborInfo); err != nil {
log.Printf("[warn] could not parse NeighborInfo payload from %v on %v: %v", from, topic, err)
return
}
nodeNum := neighborInfo.GetNodeId()
neighbors := neighborInfo.GetNeighbors()
log.Printf("[msg] %v (%v) %s: %v <-> %v neighbors", from, topic, portNum, nodeNum, len(neighbors))
if nodeNum != from {
return
}
if len(neighbors) == 0 {
return
}
NodesMutex.Lock()
if Nodes[from] == nil {
Nodes[from] = meshtastic.NewNode(topic)
}
for _, neighbor := range neighbors {
neighborNum := neighbor.GetNodeId()
if neighborNum == 0 {
continue
}
Nodes[from].UpdateNeighborInfo(neighborNum, neighbor.GetSnr())
}
NodesMutex.Unlock()
case generated.PortNum_MAP_REPORT_APP:
var mapReport generated.MapReport
if err := proto.Unmarshal(payload, &mapReport); err != nil {
log.Printf("[warn] could not parse MapReport payload from %v on %v: %v", from, topic, err)
return
}
longName := mapReport.GetLongName()
shortName := mapReport.GetShortName()
hwModel := mapReport.GetHwModel().String()
role := mapReport.GetRole().String()
fwVersion := mapReport.GetFirmwareVersion()
region := mapReport.GetRegion().String()
modemPreset := mapReport.GetModemPreset().String()
hasDefaultCh := mapReport.GetHasDefaultChannel()
latitude := mapReport.GetLatitudeI()
longitude := mapReport.GetLongitudeI()
precision := mapReport.GetPositionPrecision()
log.Printf(
"[msg] %v (%v) %s: {\"%v\" \"%v\" %v %v %v %v %v %v} (%v, %v) %v/32",
from, topic, portNum,
longName, shortName, hwModel, role, fwVersion, region, modemPreset, hasDefaultCh,
latitude, longitude, precision,
)
if len(longName) == 0 {
return
}
if latitude == 0 && longitude == 0 {
return
}
NodesMutex.Lock()
if Nodes[from] == nil {
Nodes[from] = meshtastic.NewNode(topic)
}
Nodes[from].UpdateUser(longName, shortName, hwModel, role)
Nodes[from].UpdateMapReport(fwVersion, region, modemPreset, hasDefaultCh)
Nodes[from].UpdatePosition(latitude, longitude, precision)
Nodes[from].UpdateSeenBy(topic)
NodesMutex.Unlock()
default:
log.Printf("[msg] %v (%v) %s", from, topic, portNum)
}
}
func main() {
var dbPath string
flag.StringVar(&dbPath, "f", "", "node database `file`")
flag.Parse()
// load or make NodeDB
if len(dbPath) > 0 {
err := Nodes.LoadFile(dbPath)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Fatalf("[error] load nodes: %v", err)
}
log.Printf("[info] loaded %v nodes from disk", len(Nodes))
}
if Nodes == nil {
Nodes = make(meshtastic.NodeDB)
}
// connect to MQTT
client := &meshtastic.MQTTClient{
TopicRegex: regexp.MustCompile(`/2/[ce]/[^/]+/![0-9a-f]+$|/2/map/$`),
BlockCipher: meshtastic.NewBlockCipher(meshtastic.DefaultKey),
MessageHandler: handleMessage,
}
err := client.Connect()
if err != nil {
log.Fatalf("[error] connect: %v", err)
}
// start NodeDB prune and write loop
go func() {
for {
time.Sleep(PruneWriteInterval)
NodesMutex.Lock()
Nodes.Prune(NodeExpiration, NodeExpiration, NodeExpiration, NodeExpiration)
if len(dbPath) > 0 {
valid := Nodes.GetValid()
err := valid.WriteFile(dbPath)
if err != nil {
log.Fatalf("[error] write nodes: %v", err)
}
log.Printf("[info] wrote %v nodes to disk", len(valid))
}
NodesMutex.Unlock()
if !Receiving.CompareAndSwap(true, false) {
log.Fatal("[crit] no messages received")
}
}
}()
// wait until exit
terminate := make(chan os.Signal, 1)
signal.Notify(terminate, syscall.SIGINT, syscall.SIGTERM)
<-terminate
log.Print("[info] exiting")
client.Disconnect()
}