From d48297c90414f231cd7b396f55692bc7c2edce5f Mon Sep 17 00:00:00 2001 From: Jimmi Dyson Date: Tue, 7 Jun 2016 11:31:29 +0100 Subject: [PATCH] Kubernetes SD: Add labels for all node addresses and discover node port if available --- retrieval/discovery/kubernetes/discovery.go | 62 +++++++++++++++++---- retrieval/discovery/kubernetes/types.go | 20 +++++++ 2 files changed, 71 insertions(+), 11 deletions(-) diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index 2be61c7ed..48fb25fc0 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -26,6 +26,7 @@ import ( "strings" "sync" "time" + "unicode" "github.com/prometheus/common/log" "github.com/prometheus/common/model" @@ -83,6 +84,10 @@ const ( nodesTargetGroupName = "nodes" // nodeLabelPrefix is the prefix for the node labels. nodeLabelPrefix = metaLabelPrefix + "node_label_" + // nodeAddressPrefix is the prefix for the node addresses. + nodeAddressPrefix = metaLabelPrefix + "node_address_" + // nodePortLabel is the name of the label for the node port. + nodePortLabel = metaLabelPrefix + "node_port" // apiServersTargetGroupName is the name given to the target group for API servers. apiServersTargetGroupName = "apiServers" @@ -292,18 +297,31 @@ func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup { // Now let's loop through the nodes & add them to the target group with appropriate labels. for nodeName, node := range kd.nodes { - nodeAddress, err := nodeHostIP(node) + defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node) if err != nil { log.Debugf("Skipping node %s: %s", node.Name, err) continue } - address := fmt.Sprintf("%s:%d", nodeAddress.String(), kd.Conf.KubeletPort) + kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) + if kubeletPort == 0 { + kubeletPort = kd.Conf.KubeletPort + } + + address := fmt.Sprintf("%s:%d", defaultNodeAddress.String(), kubeletPort) t := model.LabelSet{ model.AddressLabel: model.LabelValue(address), model.InstanceLabel: model.LabelValue(nodeName), } + + for addrType, ip := range nodeAddressMap { + labelName := strutil.SanitizeLabelName(nodeAddressPrefix + toSnake(string(addrType))) + t[model.LabelName(labelName)] = model.LabelValue(ip[0].String()) + } + + t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort)) + for k, v := range node.ObjectMeta.Labels { labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k) t[model.LabelName(labelName)] = model.LabelValue(v) @@ -778,28 +796,33 @@ func until(f func(), period time.Duration, stopCh <-chan struct{}) { } } -// nodeHostIP returns the provided node's address, based on the priority: +// nodeAddresses returns the provided node's address, based on the priority: // 1. NodeInternalIP // 2. NodeExternalIP // 3. NodeLegacyHostIP // // Copied from k8s.io/kubernetes/pkg/util/node/node.go -func nodeHostIP(node *Node) (net.IP, error) { +func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) { addresses := node.Status.Addresses - addressMap := make(map[NodeAddressType][]NodeAddress) - for i := range addresses { - addressMap[addresses[i].Type] = append(addressMap[addresses[i].Type], addresses[i]) + addressMap := map[NodeAddressType][]net.IP{} + for _, addr := range addresses { + ip := net.ParseIP(addr.Address) + // All addresses should be valid IPs. + if ip == nil { + continue + } + addressMap[addr.Type] = append(addressMap[addr.Type], ip) } if addresses, ok := addressMap[NodeInternalIP]; ok { - return net.ParseIP(addresses[0].Address), nil + return addresses[0], addressMap, nil } if addresses, ok := addressMap[NodeExternalIP]; ok { - return net.ParseIP(addresses[0].Address), nil + return addresses[0], addressMap, nil } if addresses, ok := addressMap[NodeLegacyHostIP]; ok { - return net.ParseIP(addresses[0].Address), nil + return addresses[0], addressMap, nil } - return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) + return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) } func (kd *Discovery) updatePod(pod *Pod, eventType EventType) { @@ -1041,3 +1064,20 @@ func (kd *Discovery) updatePodsTargetGroup() *config.TargetGroup { return tg } + +// toSnake convert the given string to snake case following the Golang format: +// acronyms are converted to lower-case and preceded by an underscore. +func toSnake(in string) string { + runes := []rune(in) + length := len(runes) + + var out []rune + for i := 0; i < length; i++ { + if i > 0 && unicode.IsUpper(runes[i]) && ((i+1 < length && unicode.IsLower(runes[i+1])) || unicode.IsLower(runes[i-1])) { + out = append(out, '_') + } + out = append(out, unicode.ToLower(runes[i])) + } + + return string(out) +} diff --git a/retrieval/discovery/kubernetes/types.go b/retrieval/discovery/kubernetes/types.go index ba4479c7d..3abc51857 100644 --- a/retrieval/discovery/kubernetes/types.go +++ b/retrieval/discovery/kubernetes/types.go @@ -208,6 +208,26 @@ type EndpointsList struct { type NodeStatus struct { // Queried from cloud provider, if available. Addresses []NodeAddress `json:"addresses,omitempty" description:"list of addresses reachable to the node; see http://releases.k8s.io/HEAD/docs/admin/node.md#node-addresses" patchStrategy:"merge" patchMergeKey:"type"` + // Endpoints of daemons running on the Node. + DaemonEndpoints NodeDaemonEndpoints `json:"daemonEndpoints,omitempty"` +} + +// NodeDaemonEndpoints lists ports opened by daemons running on the Node. +type NodeDaemonEndpoints struct { + // Endpoint on which Kubelet is listening. + KubeletEndpoint DaemonEndpoint `json:"kubeletEndpoint,omitempty"` +} + +// DaemonEndpoint contains information about a single Daemon endpoint. +type DaemonEndpoint struct { + /* + The port tag was not properly in quotes in earlier releases, so it must be + uppercased for backwards compat (since it was falling back to var name of + 'Port'). + */ + + // Port number of the given endpoint. + Port int32 `json:"Port"` } // NodeAddressType can legally only have the values defined as constants below.