mirror of
https://github.com/prometheus/node_exporter.git
synced 2025-08-20 18:33:52 -07:00
Merge 0c6c188bf1
into be19d537cd
This commit is contained in:
commit
b59ceccbb7
|
@ -17,12 +17,15 @@
|
||||||
package collector
|
package collector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"syscall"
|
"syscall"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/alecthomas/kingpin/v2"
|
||||||
"github.com/mdlayher/netlink"
|
"github.com/mdlayher/netlink"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
@ -58,6 +61,11 @@ const (
|
||||||
tcpTxQueuedBytes
|
tcpTxQueuedBytes
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
tcpstatSourcePorts = kingpin.Flag("collector.tcpstat.port.source", "List of tcpstat source ports").Strings()
|
||||||
|
tcpstatDestPorts = kingpin.Flag("collector.tcpstat.port.dest", "List of tcpstat destination ports").Strings()
|
||||||
|
)
|
||||||
|
|
||||||
type tcpStatCollector struct {
|
type tcpStatCollector struct {
|
||||||
desc typedDesc
|
desc typedDesc
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
|
@ -73,7 +81,7 @@ func NewTCPStatCollector(logger *slog.Logger) (Collector, error) {
|
||||||
desc: typedDesc{prometheus.NewDesc(
|
desc: typedDesc{prometheus.NewDesc(
|
||||||
prometheus.BuildFQName(namespace, "tcp", "connection_states"),
|
prometheus.BuildFQName(namespace, "tcp", "connection_states"),
|
||||||
"Number of connection states.",
|
"Number of connection states.",
|
||||||
[]string{"state"}, nil,
|
[]string{"state", "port", "direction"}, nil,
|
||||||
), prometheus.GaugeValue},
|
), prometheus.GaugeValue},
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -129,31 +137,97 @@ func parseInetDiagMsg(b []byte) *InetDiagMsg {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *tcpStatCollector) Update(ch chan<- prometheus.Metric) error {
|
func (c *tcpStatCollector) Update(ch chan<- prometheus.Metric) error {
|
||||||
tcpStats, err := getTCPStats(syscall.AF_INET)
|
messages, err := getMessagesFromSocket(syscall.AF_INET)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't get tcpstats: %w", err)
|
return fmt.Errorf("couldn't get tcpstats: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if enabled ipv6 system
|
tcpStats, err := parseTCPStats(messages)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("couldn't parse tcpstats: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
if _, hasIPv6 := os.Stat(procFilePath("net/tcp6")); hasIPv6 == nil {
|
if _, hasIPv6 := os.Stat(procFilePath("net/tcp6")); hasIPv6 == nil {
|
||||||
tcp6Stats, err := getTCPStats(syscall.AF_INET6)
|
messagesIPv6, err := getMessagesFromSocket(syscall.AF_INET6)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't get tcp6stats: %w", err)
|
return fmt.Errorf("couldn't get tcp6stats: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tcp6Stats, err := parseTCPStats(messagesIPv6)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("couldn't parse tcp6stats: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
for st, value := range tcp6Stats {
|
for st, value := range tcp6Stats {
|
||||||
tcpStats[st] += value
|
tcpStats[st] += value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
messages = append(messages, messagesIPv6...)
|
||||||
}
|
}
|
||||||
|
|
||||||
for st, value := range tcpStats {
|
emitTotalTCPStats(c, ch, tcpStats)
|
||||||
ch <- c.desc.mustNewConstMetric(value, st.String())
|
emitTCPStatsPerPort(c, ch, messages, *tcpstatSourcePorts, "source", true)
|
||||||
}
|
emitTCPStatsPerPort(c, ch, messages, *tcpstatDestPorts, "dest", false)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTCPStats(family uint8) (map[tcpConnectionState]float64, error) {
|
func emitTotalTCPStats(c *tcpStatCollector, ch chan<- prometheus.Metric, stats map[tcpConnectionState]float64) {
|
||||||
|
for st, value := range stats {
|
||||||
|
ch <- c.desc.mustNewConstMetric(value, st.String(), "0", "total")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func emitTCPStatsPerPort(
|
||||||
|
c *tcpStatCollector,
|
||||||
|
ch chan<- prometheus.Metric,
|
||||||
|
messages []netlink.Message,
|
||||||
|
ports []string,
|
||||||
|
direction string,
|
||||||
|
isSource bool,
|
||||||
|
) {
|
||||||
|
if len(ports) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
portSet := map[string]struct{}{}
|
||||||
|
for _, p := range ports {
|
||||||
|
portSet[p] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
counts := map[string]map[string]float64{}
|
||||||
|
|
||||||
|
for _, m := range messages {
|
||||||
|
msg := parseInetDiagMsg(m.Data)
|
||||||
|
|
||||||
|
state := tcpConnectionState(msg.State).String()
|
||||||
|
|
||||||
|
var rawPort uint16
|
||||||
|
if isSource {
|
||||||
|
rawPort = binary.BigEndian.Uint16(msg.ID.SourcePort[:])
|
||||||
|
} else {
|
||||||
|
rawPort = binary.BigEndian.Uint16(msg.ID.DestPort[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
portStr := strconv.Itoa(int(rawPort))
|
||||||
|
|
||||||
|
if _, ok := portSet[portStr]; ok {
|
||||||
|
if _, ok := counts[state]; !ok {
|
||||||
|
counts[state] = make(map[string]float64)
|
||||||
|
}
|
||||||
|
|
||||||
|
counts[state][portStr]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for state, portMap := range counts {
|
||||||
|
for port, count := range portMap {
|
||||||
|
ch <- c.desc.mustNewConstMetric(count, state, port, direction)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMessagesFromSocket(family uint8) ([]netlink.Message, error) {
|
||||||
const TCPFAll = 0xFFF
|
const TCPFAll = 0xFFF
|
||||||
const InetDiagInfo = 2
|
const InetDiagInfo = 2
|
||||||
const SockDiagByFamily = 20
|
const SockDiagByFamily = 20
|
||||||
|
@ -177,26 +251,20 @@ func getTCPStats(family uint8) (map[tcpConnectionState]float64, error) {
|
||||||
}).Serialize(),
|
}).Serialize(),
|
||||||
}
|
}
|
||||||
|
|
||||||
messages, err := conn.Execute(msg)
|
return conn.Execute(msg)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return parseTCPStats(messages)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseTCPStats(msgs []netlink.Message) (map[tcpConnectionState]float64, error) {
|
func parseTCPStats(msgs []netlink.Message) (map[tcpConnectionState]float64, error) {
|
||||||
tcpStats := map[tcpConnectionState]float64{}
|
stats := make(map[tcpConnectionState]float64)
|
||||||
|
|
||||||
for _, m := range msgs {
|
for _, m := range msgs {
|
||||||
msg := parseInetDiagMsg(m.Data)
|
msg := parseInetDiagMsg(m.Data)
|
||||||
|
stats[tcpTxQueuedBytes] += float64(msg.WQueue)
|
||||||
tcpStats[tcpTxQueuedBytes] += float64(msg.WQueue)
|
stats[tcpRxQueuedBytes] += float64(msg.RQueue)
|
||||||
tcpStats[tcpRxQueuedBytes] += float64(msg.RQueue)
|
stats[tcpConnectionState(msg.State)]++
|
||||||
tcpStats[tcpConnectionState(msg.State)]++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return tcpStats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st tcpConnectionState) String() string {
|
func (st tcpConnectionState) String() string {
|
||||||
|
|
|
@ -24,21 +24,22 @@ import (
|
||||||
|
|
||||||
"github.com/josharian/native"
|
"github.com/josharian/native"
|
||||||
"github.com/mdlayher/netlink"
|
"github.com/mdlayher/netlink"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
dto "github.com/prometheus/client_model/go"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_parseTCPStats(t *testing.T) {
|
func encodeDiagMsg(m InetDiagMsg) []byte {
|
||||||
encode := func(m InetDiagMsg) []byte {
|
var buf bytes.Buffer
|
||||||
var buf bytes.Buffer
|
if err := binary.Write(&buf, native.Endian, m); err != nil {
|
||||||
err := binary.Write(&buf, native.Endian, m)
|
panic(err)
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return buf.Bytes()
|
|
||||||
}
|
}
|
||||||
|
return buf.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_parseTCPStats(t *testing.T) {
|
||||||
msg := []netlink.Message{
|
msg := []netlink.Message{
|
||||||
{
|
{
|
||||||
Data: encode(InetDiagMsg{
|
Data: encodeDiagMsg(InetDiagMsg{
|
||||||
Family: syscall.AF_INET,
|
Family: syscall.AF_INET,
|
||||||
State: uint8(tcpEstablished),
|
State: uint8(tcpEstablished),
|
||||||
Timer: 0,
|
Timer: 0,
|
||||||
|
@ -52,7 +53,7 @@ func Test_parseTCPStats(t *testing.T) {
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Data: encode(InetDiagMsg{
|
Data: encodeDiagMsg(InetDiagMsg{
|
||||||
Family: syscall.AF_INET,
|
Family: syscall.AF_INET,
|
||||||
State: uint8(tcpListen),
|
State: uint8(tcpListen),
|
||||||
Timer: 0,
|
Timer: 0,
|
||||||
|
@ -67,24 +68,96 @@ func Test_parseTCPStats(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
tcpStats, err := parseTCPStats(msg)
|
stats, err := parseTCPStats(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if want, got := 1, int(tcpStats[tcpEstablished]); want != got {
|
assertStat(t, stats, tcpEstablished, 1)
|
||||||
t.Errorf("want tcpstat number of established state %d, got %d", want, got)
|
assertStat(t, stats, tcpListen, 1)
|
||||||
}
|
assertStat(t, stats, tcpTxQueuedBytes, 42)
|
||||||
|
assertStat(t, stats, tcpRxQueuedBytes, 22)
|
||||||
if want, got := 1, int(tcpStats[tcpListen]); want != got {
|
}
|
||||||
t.Errorf("want tcpstat number of listen state %d, got %d", want, got)
|
|
||||||
}
|
func assertStat(t *testing.T, stats map[tcpConnectionState]float64, state tcpConnectionState, expected int) {
|
||||||
|
t.Helper()
|
||||||
if want, got := 42, int(tcpStats[tcpTxQueuedBytes]); want != got {
|
if got := int(stats[state]); got != expected {
|
||||||
t.Errorf("want tcpstat number of bytes in tx queue %d, got %d", want, got)
|
t.Errorf("expected %s = %d, got %d", state.String(), expected, got)
|
||||||
}
|
}
|
||||||
if want, got := 22, int(tcpStats[tcpRxQueuedBytes]); want != got {
|
}
|
||||||
t.Errorf("want tcpstat number of bytes in rx queue %d, got %d", want, got)
|
|
||||||
}
|
func Test_emitTCPStatsPerPort(t *testing.T) {
|
||||||
|
msg := []netlink.Message{
|
||||||
|
{
|
||||||
|
Data: encodeDiagMsg(InetDiagMsg{
|
||||||
|
State: uint8(tcpEstablished),
|
||||||
|
ID: InetDiagSockID{SourcePort: [2]byte{0, 80}},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Data: encodeDiagMsg(InetDiagMsg{
|
||||||
|
State: uint8(tcpListen),
|
||||||
|
ID: InetDiagSockID{DestPort: [2]byte{0, 123}},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Data: encodeDiagMsg(InetDiagMsg{
|
||||||
|
State: uint8(tcpTimeWait),
|
||||||
|
ID: InetDiagSockID{DestPort: [2]byte{0, 123}},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var metrics []string
|
||||||
|
|
||||||
|
collector := &tcpStatCollector{
|
||||||
|
desc: typedDesc{
|
||||||
|
desc: prometheus.NewDesc("test_tcp_stat", "Test metric", []string{"state", "port", "direction"}, nil),
|
||||||
|
valueType: prometheus.GaugeValue,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan prometheus.Metric, 10)
|
||||||
|
|
||||||
|
emitTCPStatsPerPort(collector, ch, msg, []string{"80"}, "source", true)
|
||||||
|
emitTCPStatsPerPort(collector, ch, msg, []string{"123"}, "dest", false)
|
||||||
|
|
||||||
|
close(ch)
|
||||||
|
for m := range ch {
|
||||||
|
d := &dto.Metric{}
|
||||||
|
if err := m.Write(d); err != nil {
|
||||||
|
t.Fatalf("failed to write metric: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var state, port, direction string
|
||||||
|
for _, label := range d.Label {
|
||||||
|
switch label.GetName() {
|
||||||
|
case "state":
|
||||||
|
state = label.GetValue()
|
||||||
|
case "port":
|
||||||
|
port = label.GetValue()
|
||||||
|
case "direction":
|
||||||
|
direction = label.GetValue()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics = append(metrics, state+"_"+port+"_"+direction)
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := map[string]bool{
|
||||||
|
"established_80_source": true,
|
||||||
|
"listen_123_dest": true,
|
||||||
|
"time_wait_123_dest": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, metric := range metrics {
|
||||||
|
if !expected[metric] {
|
||||||
|
t.Errorf("unexpected metric emitted: %s", metric)
|
||||||
|
}
|
||||||
|
delete(expected, metric)
|
||||||
|
}
|
||||||
|
|
||||||
|
for k := range expected {
|
||||||
|
t.Errorf("expected metric missing: %s", k)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue