From 1dc18920fd3fa031955ce1d73819c5fda345d202 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kadan=C4=9B?= Date: Tue, 12 Nov 2024 20:21:40 +0100 Subject: [PATCH] Add tcpstat connection states per port metric MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tomáš Kadaně --- collector/tcpstat_linux.go | 95 ++++++++++++++++++++++++++++++--- collector/tcpstat_linux_test.go | 80 ++++++++++++++++++++++++++- 2 files changed, 168 insertions(+), 7 deletions(-) diff --git a/collector/tcpstat_linux.go b/collector/tcpstat_linux.go index 476a9b47..1e49d9f1 100644 --- a/collector/tcpstat_linux.go +++ b/collector/tcpstat_linux.go @@ -17,12 +17,15 @@ package collector import ( + "encoding/binary" "fmt" "log/slog" "os" + "strconv" "syscall" "unsafe" + "github.com/alecthomas/kingpin/v2" "github.com/mdlayher/netlink" "github.com/prometheus/client_golang/prometheus" ) @@ -58,6 +61,11 @@ const ( tcpTxQueuedBytes ) +var ( + tcpstatSourcePorts = kingpin.Flag("collector.tcpstat.port.source", "List of tcpstat source ports").Strings() + tcpstatDestPorts = kingpin.Flag("collector.tcpstat.port.dest", "List of tcpstat destination ports").Strings() +) + type tcpStatCollector struct { desc typedDesc logger *slog.Logger @@ -73,7 +81,7 @@ func NewTCPStatCollector(logger *slog.Logger) (Collector, error) { desc: typedDesc{prometheus.NewDesc( prometheus.BuildFQName(namespace, "tcp", "connection_states"), "Number of connection states.", - []string{"state"}, nil, + []string{"state", "port", "type"}, nil, ), prometheus.GaugeValue}, logger: logger, }, nil @@ -129,31 +137,59 @@ func parseInetDiagMsg(b []byte) *InetDiagMsg { } func (c *tcpStatCollector) Update(ch chan<- prometheus.Metric) error { - tcpStats, err := getTCPStats(syscall.AF_INET) + messages, err := getMessagesFromSocket(syscall.AF_INET) if err != nil { return fmt.Errorf("couldn't get tcpstats: %w", err) } + tcpStats, err := parseTCPStats(messages) + if err != nil { + return fmt.Errorf("couldn't parse tcpstats: %w", err) + } + // if enabled ipv6 system if _, hasIPv6 := os.Stat(procFilePath("net/tcp6")); hasIPv6 == nil { - tcp6Stats, err := getTCPStats(syscall.AF_INET6) + messagesIPv6, err := getMessagesFromSocket(syscall.AF_INET6) if err != nil { return fmt.Errorf("couldn't get tcp6stats: %w", err) } + tcp6Stats, err := parseTCPStats(messagesIPv6) + if err != nil { + return fmt.Errorf("couldn't parse tcp6stats: %w", err) + } + for st, value := range tcp6Stats { tcpStats[st] += value } } for st, value := range tcpStats { - ch <- c.desc.mustNewConstMetric(value, st.String()) + ch <- c.desc.mustNewConstMetric(value, st.String(), "0", "total") + } + + statsPerSourcePorts, err := parseTCPStatsPerSourcePort(messages, *tcpstatSourcePorts) + if err != nil { + return fmt.Errorf("couldn't get tcpstats per source port: %w", err) + } + + for statePort, value := range statsPerSourcePorts { + ch <- c.desc.mustNewConstMetric(value, statePort.state.String(), strconv.Itoa(statePort.port), "source") + } + + statsPerDestPorts, err := parseTCPStatsPerDestPort(messages, *tcpstatDestPorts) + if err != nil { + return fmt.Errorf("couldn't get tcpstats per dest port: %w", err) + } + + for statePort, value := range statsPerDestPorts { + ch <- c.desc.mustNewConstMetric(value, statePort.state.String(), strconv.Itoa(statePort.port), "dest") } return nil } -func getTCPStats(family uint8) (map[tcpConnectionState]float64, error) { +func getMessagesFromSocket(family uint8) ([]netlink.Message, error) { const TCPFAll = 0xFFF const InetDiagInfo = 2 const SockDiagByFamily = 20 @@ -182,7 +218,7 @@ func getTCPStats(family uint8) (map[tcpConnectionState]float64, error) { return nil, err } - return parseTCPStats(messages) + return messages, nil } func parseTCPStats(msgs []netlink.Message) (map[tcpConnectionState]float64, error) { @@ -199,6 +235,53 @@ func parseTCPStats(msgs []netlink.Message) (map[tcpConnectionState]float64, erro return tcpStats, nil } +type statePortPair struct { + state tcpConnectionState + port int +} + +func parseTCPStatsPerSourcePort(msgs []netlink.Message, sourcePorts []string) (map[statePortPair]float64, error) { + tcpStatsPerStatePerPort := map[statePortPair]float64{} + + for _, m := range msgs { + msg := parseInetDiagMsg(m.Data) + + for _, sourcePort := range sourcePorts { + sourcePortInt := int(binary.BigEndian.Uint16(msg.ID.SourcePort[:])) + + if sourcePort == strconv.Itoa(sourcePortInt) { + tcpStatsPerStatePerPort[statePortPair{ + state: tcpConnectionState(msg.State), + port: sourcePortInt, + }]++ + } + } + } + + return tcpStatsPerStatePerPort, nil +} + +func parseTCPStatsPerDestPort(msgs []netlink.Message, destPorts []string) (map[statePortPair]float64, error) { + tcpStatsPerStatePerPort := map[statePortPair]float64{} + + for _, m := range msgs { + msg := parseInetDiagMsg(m.Data) + + for _, destPort := range destPorts { + destPortInt := int(binary.BigEndian.Uint16(msg.ID.DestPort[:])) + + if destPort == strconv.Itoa(destPortInt) { + tcpStatsPerStatePerPort[statePortPair{ + state: tcpConnectionState(msg.State), + port: destPortInt, + }]++ + } + } + } + + return tcpStatsPerStatePerPort, nil +} + func (st tcpConnectionState) String() string { switch st { case tcpEstablished: diff --git a/collector/tcpstat_linux_test.go b/collector/tcpstat_linux_test.go index e1bd090a..5a18e3ec 100644 --- a/collector/tcpstat_linux_test.go +++ b/collector/tcpstat_linux_test.go @@ -86,5 +86,83 @@ func Test_parseTCPStats(t *testing.T) { if want, got := 22, int(tcpStats[tcpRxQueuedBytes]); want != got { t.Errorf("want tcpstat number of bytes in rx queue %d, got %d", want, got) } - +} + +func Test_parseTCPStatsPerPort(t *testing.T) { + encode := func(m InetDiagMsg) []byte { + var buf bytes.Buffer + err := binary.Write(&buf, native.Endian, m) + if err != nil { + panic(err) + } + return buf.Bytes() + } + + msg := []netlink.Message{ + { + Data: encode(InetDiagMsg{ + Family: syscall.AF_INET, + State: uint8(tcpEstablished), + ID: InetDiagSockID{ + DestPort: [2]byte{0, 22}, + }, + }), + }, + { + Data: encode(InetDiagMsg{ + Family: syscall.AF_INET, + State: uint8(tcpEstablished), + ID: InetDiagSockID{ + DestPort: [2]byte{0, 22}, + SourcePort: [2]byte{0, 23}, + }, + }), + }, + { + Data: encode(InetDiagMsg{ + Family: syscall.AF_INET6, + State: uint8(tcpEstablished), + ID: InetDiagSockID{ + SourcePort: [2]byte{0, 23}, + }, + }), + }, + } + + tcpStatsPerDestPort, err := parseTCPStatsPerDestPort(msg, []string{"22"}) + if err != nil { + t.Fatal(err) + } + + sp1 := statePortPair{ + state: tcpEstablished, + port: 22, + } + + if want, got := 2, int(tcpStatsPerDestPort[sp1]); want != got { + t.Errorf("tcpstat connection per %s states per dest port %d. want %d, got %d", sp1.state.String(), sp1.port, want, got) + } + + tcpStatsPerSourcePort, err := parseTCPStatsPerSourcePort(msg, []string{"23"}) + if err != nil { + t.Fatal(err) + } + + sp2 := statePortPair{ + state: tcpEstablished, + port: 23, + } + + if want, got := 2, int(tcpStatsPerSourcePort[sp2]); want != got { + t.Errorf("tcpstat connection per %s states per source port %d. want %d, got %d", sp2.state.String(), sp2.port, want, got) + } + + emptyDestPortStats, err := parseTCPStatsPerDestPort(msg, []string{"80"}) + if err != nil { + t.Fatal(err) + } + + if want, got := 0, int(emptyDestPortStats[sp1]); want != got { + t.Errorf("expected 0 connections for unmatched port 80, got %d", got) + } }