discovery/kubernetes: extract pod discovery

This change extracts pod discovery into its own type.
This commit is contained in:
Fabian Reinartz 2016-07-01 12:17:17 +02:00
parent e03e138d34
commit 8a97c211a8
2 changed files with 350 additions and 269 deletions

View file

@ -14,16 +14,13 @@
package kubernetes
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -113,10 +110,8 @@ type Discovery struct {
nodes map[string]*Node
services map[string]map[string]*Service
// map of namespace to (map of pod name to pod)
pods map[string]map[string]*Pod
nodesMu sync.RWMutex
servicesMu sync.RWMutex
podsMu sync.RWMutex
runDone chan struct{}
}
@ -140,6 +135,19 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
log.Debugf("Kubernetes Discovery.Run beginning")
defer close(ch)
var wg sync.WaitGroup
pd := &podDiscovery{
retryInterval: time.Duration(kd.Conf.RetryInterval),
pods: map[string]map[string]*Pod{},
kd: kd,
}
wg.Add(1)
go func() {
pd.run(ctx, ch)
wg.Done()
}()
// Send an initial full view.
// TODO(fabxc): this does not include all available services and service
// endpoints yet. Service endpoints were also missing in the previous Sources() method.
@ -148,22 +156,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
all = append(all, kd.updateAPIServersTargetGroup())
all = append(all, kd.updateNodesTargetGroup())
pods, _, err := kd.getPods()
if err != nil {
log.Errorf("Cannot initialize pods collection: %s", err)
return
}
kd.podsMu.Lock()
kd.pods = pods
kd.podsMu.Unlock()
all = append(all, kd.updatePodsTargetGroup())
for _, ns := range kd.pods {
for _, pod := range ns {
all = append(all, kd.updatePodTargetGroup(pod))
}
}
select {
case ch <- all:
case <-ctx.Done():
@ -176,7 +168,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
go kd.watchNodes(update, ctx.Done(), retryInterval)
go kd.startServiceWatch(update, ctx.Done(), retryInterval)
go kd.watchPods(update, ctx.Done(), retryInterval)
for {
tg := []*config.TargetGroup{}
@ -195,13 +186,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case *endpointsEvent:
log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name)
tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType))
case *podEvent:
log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", obj.EventType, obj.Pod.ObjectMeta.Name)
// Update the per-pod target group
kd.updatePod(obj.Pod, obj.EventType)
tg = append(tg, kd.updatePodTargetGroup(obj.Pod))
// ...and update the all pods target group
tg = append(tg, kd.updatePodsTargetGroup())
}
}
@ -217,6 +201,8 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
}
}
}
wg.Wait()
}
func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) {
@ -227,6 +213,9 @@ func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) {
return kd.queryAPIServerReq(req)
}
type client struct {
}
func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error) {
// Lock in case we need to rotate API servers to request.
kd.apiServersMu.Lock()
@ -824,243 +813,3 @@ func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) {
}
return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses)
}
func (kd *Discovery) updatePod(pod *Pod, eventType EventType) {
kd.podsMu.Lock()
defer kd.podsMu.Unlock()
switch eventType {
case Deleted:
if _, ok := kd.pods[pod.ObjectMeta.Namespace]; ok {
delete(kd.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name)
if len(kd.pods[pod.ObjectMeta.Namespace]) == 0 {
delete(kd.pods, pod.ObjectMeta.Namespace)
}
}
case Added, Modified:
if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok {
kd.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod
}
}
func (kd *Discovery) getPods() (map[string]map[string]*Pod, string, error) {
res, err := kd.queryAPIServerPath(podsURL)
if err != nil {
return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status)
}
var pods PodList
if err := json.NewDecoder(res.Body).Decode(&pods); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body))
}
podMap := map[string]map[string]*Pod{}
for idx, pod := range pods.Items {
if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok {
podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace)
podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx]
}
return podMap, pods.ResourceVersion, nil
}
func (kd *Discovery) watchPods(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
pods, resourceVersion, err := kd.getPods()
if err != nil {
log.Errorf("Cannot initialize pods collection: %s", err)
return
}
kd.podsMu.Lock()
kd.pods = pods
kd.podsMu.Unlock()
req, err := http.NewRequest("GET", podsURL, nil)
if err != nil {
log.Errorf("Cannot create pods request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch pods: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch pods: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event podEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch pods unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func podSource(pod *Pod) string {
return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name
}
type ByContainerPort []ContainerPort
func (a ByContainerPort) Len() int { return len(a) }
func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort }
func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
type ByContainerName []Container
func (a ByContainerName) Len() int { return len(a) }
func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet {
var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers))
if pod.PodStatus.PodIP == "" {
log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name)
return targets
}
if pod.PodStatus.Phase != "Running" {
log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name)
return targets
}
ready := "unknown"
for _, cond := range pod.PodStatus.Conditions {
if strings.ToLower(cond.Type) == "ready" {
ready = strings.ToLower(cond.Status)
}
}
sort.Sort(ByContainerName(pod.PodSpec.Containers))
for _, container := range pod.PodSpec.Containers {
// Collect a list of TCP ports
// Sort by port number, ascending
// Product a target pointed at the first port
// Include a label containing all ports (portName=port,PortName=port,...,)
var tcpPorts []ContainerPort
var portLabel *bytes.Buffer = bytes.NewBufferString(",")
for _, port := range container.Ports {
if port.Protocol == "TCP" {
tcpPorts = append(tcpPorts, port)
}
}
if len(tcpPorts) == 0 {
log.Debugf("skipping container %s with no TCP ports", container.Name)
continue
}
sort.Sort(ByContainerPort(tcpPorts))
t := model.LabelSet{
model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))),
podNameLabel: model.LabelValue(pod.ObjectMeta.Name),
podAddressLabel: model.LabelValue(pod.PodStatus.PodIP),
podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace),
podContainerNameLabel: model.LabelValue(container.Name),
podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name),
podReadyLabel: model.LabelValue(ready),
}
for _, port := range tcpPorts {
portLabel.WriteString(port.Name)
portLabel.WriteString("=")
portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10))
portLabel.WriteString(",")
t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10))
}
t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String())
for k, v := range pod.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(podLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
for k, v := range pod.ObjectMeta.Annotations {
labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
targets = append(targets, t)
if !allContainers {
break
}
}
if len(targets) == 0 {
log.Debugf("no targets for pod %s", pod.ObjectMeta.Name)
}
return targets
}
func (kd *Discovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup {
kd.podsMu.RLock()
defer kd.podsMu.RUnlock()
tg := &config.TargetGroup{
Source: podSource(pod),
}
// If this pod doesn't exist, return an empty target group
if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok {
return tg
}
if _, ok := kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok {
return tg
}
tg.Labels = model.LabelSet{
roleLabel: model.LabelValue("container"),
}
tg.Targets = updatePodTargets(pod, true)
return tg
}
func (kd *Discovery) updatePodsTargetGroup() *config.TargetGroup {
tg := &config.TargetGroup{
Source: podsTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("pod"),
},
}
for _, namespace := range kd.pods {
for _, pod := range namespace {
tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...)
}
}
return tg
}

View file

@ -0,0 +1,332 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
)
type podDiscovery struct {
mtx sync.RWMutex
pods map[string]map[string]*Pod
retryInterval time.Duration
kd *Discovery
}
func (d *podDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) {
pods, _, err := d.getPods()
if err != nil {
log.Errorf("Cannot initialize pods collection: %s", err)
return
}
d.pods = pods
initial := []*config.TargetGroup{d.updatePodsTargetGroup()}
for _, ns := range d.pods {
for _, pod := range ns {
initial = append(initial, d.updatePodTargetGroup(pod))
}
}
select {
case ch <- initial:
case <-ctx.Done():
return
}
update := make(chan *podEvent, 10)
go d.watchPods(update, ctx.Done(), d.retryInterval)
for {
tgs := []*config.TargetGroup{}
select {
case <-ctx.Done():
return
case e := <-update:
log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", e.EventType, e.Pod.ObjectMeta.Name)
// Update the per-pod target group
d.updatePod(e.Pod, e.EventType)
tgs = append(tgs, d.updatePodTargetGroup(e.Pod))
// ...and update the all pods target group
tgs = append(tgs, d.updatePodsTargetGroup())
}
if tgs == nil {
continue
}
for _, tg := range tgs {
select {
case ch <- []*config.TargetGroup{tg}:
case <-ctx.Done():
return
}
}
}
}
func (d *podDiscovery) getPods() (map[string]map[string]*Pod, string, error) {
res, err := d.kd.queryAPIServerPath(podsURL)
if err != nil {
return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status)
}
var pods PodList
if err := json.NewDecoder(res.Body).Decode(&pods); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body))
}
podMap := map[string]map[string]*Pod{}
for idx, pod := range pods.Items {
if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok {
podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace)
podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx]
}
return podMap, pods.ResourceVersion, nil
}
func (d *podDiscovery) watchPods(events chan *podEvent, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
pods, resourceVersion, err := d.getPods()
if err != nil {
log.Errorf("Cannot initialize pods collection: %s", err)
return
}
d.mtx.Lock()
d.pods = pods
d.mtx.Unlock()
req, err := http.NewRequest("GET", podsURL, nil)
if err != nil {
log.Errorf("Cannot create pods request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch pods: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch pods: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event podEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch pods unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func (d *podDiscovery) updatePod(pod *Pod, eventType EventType) {
d.mtx.Lock()
defer d.mtx.Unlock()
switch eventType {
case Deleted:
if _, ok := d.pods[pod.ObjectMeta.Namespace]; ok {
delete(d.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name)
if len(d.pods[pod.ObjectMeta.Namespace]) == 0 {
delete(d.pods, pod.ObjectMeta.Namespace)
}
}
case Added, Modified:
if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok {
d.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod
}
}
func (d *podDiscovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup {
d.mtx.RLock()
defer d.mtx.RUnlock()
tg := &config.TargetGroup{
Source: podSource(pod),
}
// If this pod doesn't exist, return an empty target group
if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok {
return tg
}
if _, ok := d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok {
return tg
}
tg.Labels = model.LabelSet{
roleLabel: model.LabelValue("container"),
}
tg.Targets = updatePodTargets(pod, true)
return tg
}
func (d *podDiscovery) updatePodsTargetGroup() *config.TargetGroup {
tg := &config.TargetGroup{
Source: podsTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("pod"),
},
}
for _, namespace := range d.pods {
for _, pod := range namespace {
tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...)
}
}
return tg
}
func podSource(pod *Pod) string {
return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name
}
func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet {
var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers))
if pod.PodStatus.PodIP == "" {
log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name)
return targets
}
if pod.PodStatus.Phase != "Running" {
log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name)
return targets
}
ready := "unknown"
for _, cond := range pod.PodStatus.Conditions {
if strings.ToLower(cond.Type) == "ready" {
ready = strings.ToLower(cond.Status)
}
}
sort.Sort(ByContainerName(pod.PodSpec.Containers))
for _, container := range pod.PodSpec.Containers {
// Collect a list of TCP ports
// Sort by port number, ascending
// Product a target pointed at the first port
// Include a label containing all ports (portName=port,PortName=port,...,)
var tcpPorts []ContainerPort
var portLabel *bytes.Buffer = bytes.NewBufferString(",")
for _, port := range container.Ports {
if port.Protocol == "TCP" {
tcpPorts = append(tcpPorts, port)
}
}
if len(tcpPorts) == 0 {
log.Debugf("skipping container %s with no TCP ports", container.Name)
continue
}
sort.Sort(ByContainerPort(tcpPorts))
t := model.LabelSet{
model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))),
podNameLabel: model.LabelValue(pod.ObjectMeta.Name),
podAddressLabel: model.LabelValue(pod.PodStatus.PodIP),
podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace),
podContainerNameLabel: model.LabelValue(container.Name),
podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name),
podReadyLabel: model.LabelValue(ready),
}
for _, port := range tcpPorts {
portLabel.WriteString(port.Name)
portLabel.WriteString("=")
portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10))
portLabel.WriteString(",")
t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10))
}
t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String())
for k, v := range pod.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(podLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
for k, v := range pod.ObjectMeta.Annotations {
labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
targets = append(targets, t)
if !allContainers {
break
}
}
if len(targets) == 0 {
log.Debugf("no targets for pod %s", pod.ObjectMeta.Name)
}
return targets
}
type ByContainerPort []ContainerPort
func (a ByContainerPort) Len() int { return len(a) }
func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort }
func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
type ByContainerName []Container
func (a ByContainerName) Len() int { return len(a) }
func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }