discovery/kubernetes: extract node discovery

This change extracts node discovery into its own type.
This commit is contained in:
Fabian Reinartz 2016-07-01 16:55:37 +02:00
parent 8a97c211a8
commit fdbe28df85
2 changed files with 252 additions and 179 deletions

View file

@ -20,7 +20,6 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"strconv"
"sync" "sync"
"time" "time"
@ -107,10 +106,8 @@ type Discovery struct {
apiServers []config.URL apiServers []config.URL
apiServersMu sync.RWMutex apiServersMu sync.RWMutex
nodes map[string]*Node
services map[string]map[string]*Service services map[string]map[string]*Service
// map of namespace to (map of pod name to pod) // map of namespace to (map of pod name to pod)
nodesMu sync.RWMutex
servicesMu sync.RWMutex servicesMu sync.RWMutex
runDone chan struct{} runDone chan struct{}
} }
@ -139,7 +136,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
pd := &podDiscovery{ pd := &podDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval), retryInterval: time.Duration(kd.Conf.RetryInterval),
pods: map[string]map[string]*Pod{},
kd: kd, kd: kd,
} }
wg.Add(1) wg.Add(1)
@ -148,13 +144,22 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
wg.Done() wg.Done()
}() }()
nd := &nodeDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval),
kd: kd,
}
wg.Add(1)
go func() {
nd.run(ctx, ch)
wg.Done()
}()
// Send an initial full view. // Send an initial full view.
// TODO(fabxc): this does not include all available services and service // TODO(fabxc): this does not include all available services and service
// endpoints yet. Service endpoints were also missing in the previous Sources() method. // endpoints yet. Service endpoints were also missing in the previous Sources() method.
var all []*config.TargetGroup var all []*config.TargetGroup
all = append(all, kd.updateAPIServersTargetGroup()) all = append(all, kd.updateAPIServersTargetGroup())
all = append(all, kd.updateNodesTargetGroup())
select { select {
case ch <- all: case ch <- all:
@ -166,7 +171,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
update := make(chan interface{}, 10) update := make(chan interface{}, 10)
go kd.watchNodes(update, ctx.Done(), retryInterval)
go kd.startServiceWatch(update, ctx.Done(), retryInterval) go kd.startServiceWatch(update, ctx.Done(), retryInterval)
for { for {
@ -176,10 +180,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
return return
case event := <-update: case event := <-update:
switch obj := event.(type) { switch obj := event.(type) {
case *nodeEvent:
log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", obj.EventType, obj.Node.ObjectMeta.Name)
kd.updateNode(obj.Node, obj.EventType)
tg = append(tg, kd.updateNodesTargetGroup())
case *serviceEvent: case *serviceEvent:
log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name) log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name)
tg = append(tg, kd.updateService(obj.Service, obj.EventType)) tg = append(tg, kd.updateService(obj.Service, obj.EventType))
@ -272,91 +272,6 @@ func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup {
return tg return tg
} }
func (kd *Discovery) updateNodesTargetGroup() *config.TargetGroup {
kd.nodesMu.RLock()
defer kd.nodesMu.RUnlock()
tg := &config.TargetGroup{
Source: nodesTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("node"),
},
}
// Now let's loop through the nodes & add them to the target group with appropriate labels.
for nodeName, node := range kd.nodes {
defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node)
if err != nil {
log.Debugf("Skipping node %s: %s", node.Name, err)
continue
}
kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
address := fmt.Sprintf("%s:%d", defaultNodeAddress.String(), kubeletPort)
t := model.LabelSet{
model.AddressLabel: model.LabelValue(address),
model.InstanceLabel: model.LabelValue(nodeName),
}
for addrType, ip := range nodeAddressMap {
labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType))
t[model.LabelName(labelName)] = model.LabelValue(ip[0].String())
}
t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort))
for k, v := range node.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
tg.Targets = append(tg.Targets, t)
}
return tg
}
func (kd *Discovery) updateNode(node *Node, eventType EventType) {
kd.nodesMu.Lock()
defer kd.nodesMu.Unlock()
updatedNodeName := node.ObjectMeta.Name
switch eventType {
case Deleted:
// Deleted - remove from nodes map.
delete(kd.nodes, updatedNodeName)
case Added, Modified:
// Added/Modified - update the node in the nodes map.
kd.nodes[updatedNodeName] = node
}
}
func (kd *Discovery) getNodes() (map[string]*Node, string, error) {
res, err := kd.queryAPIServerPath(nodesURL)
if err != nil {
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
// & return error.
return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status)
}
var nodes NodeList
if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body))
}
nodeMap := map[string]*Node{}
for idx, node := range nodes.Items {
nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx]
}
return nodeMap, nodes.ResourceVersion, nil
}
func (kd *Discovery) getServices() (map[string]map[string]*Service, string, error) { func (kd *Discovery) getServices() (map[string]map[string]*Service, string, error) {
res, err := kd.queryAPIServerPath(servicesURL) res, err := kd.queryAPIServerPath(servicesURL)
if err != nil { if err != nil {
@ -387,61 +302,6 @@ func (kd *Discovery) getServices() (map[string]map[string]*Service, string, erro
return serviceMap, services.ResourceVersion, nil return serviceMap, services.ResourceVersion, nil
} }
// watchNodes watches nodes as they come & go.
func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
nodes, resourceVersion, err := kd.getNodes()
if err != nil {
log.Errorf("Cannot initialize nodes collection: %s", err)
return
}
// Reset the known nodes.
kd.nodesMu.Lock()
kd.nodes = map[string]*Node{}
kd.nodesMu.Unlock()
for _, node := range nodes {
events <- &nodeEvent{Added, node}
}
req, err := http.NewRequest("GET", nodesURL, nil)
if err != nil {
log.Errorf("Cannot create nodes request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch nodes: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch nodes: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event nodeEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch nodes unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
// watchServices watches services as they come & go. // watchServices watches services as they come & go.
func (kd *Discovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) { func (kd *Discovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() { until(func() {
@ -784,32 +644,3 @@ func until(f func(), period time.Duration, stopCh <-chan struct{}) {
} }
} }
} }
// nodeAddresses returns the provided node's address, based on the priority:
// 1. NodeInternalIP
// 2. NodeExternalIP
// 3. NodeLegacyHostIP
//
// Copied from k8s.io/kubernetes/pkg/util/node/node.go
func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) {
addresses := node.Status.Addresses
addressMap := map[NodeAddressType][]net.IP{}
for _, addr := range addresses {
ip := net.ParseIP(addr.Address)
// All addresses should be valid IPs.
if ip == nil {
continue
}
addressMap[addr.Type] = append(addressMap[addr.Type], ip)
}
if addresses, ok := addressMap[NodeInternalIP]; ok {
return addresses[0], addressMap, nil
}
if addresses, ok := addressMap[NodeExternalIP]; ok {
return addresses[0], addressMap, nil
}
if addresses, ok := addressMap[NodeLegacyHostIP]; ok {
return addresses[0], addressMap, nil
}
return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses)
}

View file

@ -0,0 +1,242 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"sync"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
)
type nodeDiscovery struct {
mtx sync.RWMutex
nodes map[string]*Node
retryInterval time.Duration
kd *Discovery
}
func (d *nodeDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) {
select {
case ch <- []*config.TargetGroup{d.updateNodesTargetGroup()}:
case <-ctx.Done():
return
}
update := make(chan *nodeEvent, 10)
go d.watchNodes(update, ctx.Done(), d.retryInterval)
for {
tgs := []*config.TargetGroup{}
select {
case <-ctx.Done():
return
case e := <-update:
log.Debugf("k8s discovery received node event (EventType=%s, Node Name=%s)", e.EventType, e.Node.ObjectMeta.Name)
d.updateNode(e.Node, e.EventType)
tgs = append(tgs, d.updateNodesTargetGroup())
}
if tgs == nil {
continue
}
for _, tg := range tgs {
select {
case ch <- []*config.TargetGroup{tg}:
case <-ctx.Done():
return
}
}
}
}
func (d *nodeDiscovery) updateNodesTargetGroup() *config.TargetGroup {
d.mtx.RLock()
defer d.mtx.RUnlock()
tg := &config.TargetGroup{
Source: nodesTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("node"),
},
}
// Now let's loop through the nodes & add them to the target group with appropriate labels.
for nodeName, node := range d.nodes {
defaultNodeAddress, nodeAddressMap, err := nodeAddresses(node)
if err != nil {
log.Debugf("Skipping node %s: %s", node.Name, err)
continue
}
kubeletPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
address := fmt.Sprintf("%s:%d", defaultNodeAddress.String(), kubeletPort)
t := model.LabelSet{
model.AddressLabel: model.LabelValue(address),
model.InstanceLabel: model.LabelValue(nodeName),
}
for addrType, ip := range nodeAddressMap {
labelName := strutil.SanitizeLabelName(nodeAddressPrefix + string(addrType))
t[model.LabelName(labelName)] = model.LabelValue(ip[0].String())
}
t[model.LabelName(nodePortLabel)] = model.LabelValue(strconv.Itoa(kubeletPort))
for k, v := range node.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
tg.Targets = append(tg.Targets, t)
}
return tg
}
// watchNodes watches nodes as they come & go.
func (d *nodeDiscovery) watchNodes(events chan *nodeEvent, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
nodes, resourceVersion, err := d.getNodes()
if err != nil {
log.Errorf("Cannot initialize nodes collection: %s", err)
return
}
// Reset the known nodes.
d.mtx.Lock()
d.nodes = map[string]*Node{}
d.mtx.Unlock()
for _, node := range nodes {
events <- &nodeEvent{Added, node}
}
req, err := http.NewRequest("GET", nodesURL, nil)
if err != nil {
log.Errorf("Cannot create nodes request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch nodes: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch nodes: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event nodeEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch nodes unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func (d *nodeDiscovery) updateNode(node *Node, eventType EventType) {
d.mtx.Lock()
defer d.mtx.Unlock()
updatedNodeName := node.ObjectMeta.Name
switch eventType {
case Deleted:
// Deleted - remove from nodes map.
delete(d.nodes, updatedNodeName)
case Added, Modified:
// Added/Modified - update the node in the nodes map.
d.nodes[updatedNodeName] = node
}
}
func (d *nodeDiscovery) getNodes() (map[string]*Node, string, error) {
res, err := d.kd.queryAPIServerPath(nodesURL)
if err != nil {
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
// & return error.
return nil, "", fmt.Errorf("unable to list Kubernetes nodes: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response: %d %s", res.StatusCode, res.Status)
}
var nodes NodeList
if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes nodes; unexpected response body: %s", string(body))
}
nodeMap := map[string]*Node{}
for idx, node := range nodes.Items {
nodeMap[node.ObjectMeta.Name] = &nodes.Items[idx]
}
return nodeMap, nodes.ResourceVersion, nil
}
// nodeAddresses returns the provided node's address, based on the priority:
// 1. NodeInternalIP
// 2. NodeExternalIP
// 3. NodeLegacyHostIP
//
// Copied from k8s.io/kubernetes/pkg/util/node/node.go
func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) {
addresses := node.Status.Addresses
addressMap := map[NodeAddressType][]net.IP{}
for _, addr := range addresses {
ip := net.ParseIP(addr.Address)
// All addresses should be valid IPs.
if ip == nil {
continue
}
addressMap[addr.Type] = append(addressMap[addr.Type], ip)
}
if addresses, ok := addressMap[NodeInternalIP]; ok {
return addresses[0], addressMap, nil
}
if addresses, ok := addressMap[NodeExternalIP]; ok {
return addresses[0], addressMap, nil
}
if addresses, ok := addressMap[NodeLegacyHostIP]; ok {
return addresses[0], addressMap, nil
}
return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses)
}