prometheus/retrieval/discovery/kubernetes/pod.go

343 lines
9.3 KiB
Go
Raw Normal View History

// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
)
type podDiscovery struct {
mtx sync.RWMutex
pods map[string]map[string]*Pod
retryInterval time.Duration
kd *Discovery
}
func (d *podDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) {
pods, _, err := d.getPods()
if err != nil {
log.Errorf("Cannot initialize pods collection: %s", err)
return
}
d.pods = pods
initial := []*config.TargetGroup{}
switch d.kd.Conf.Role {
case config.KubernetesRolePod:
initial = append(initial, d.updatePodsTargetGroup())
case config.KubernetesRoleContainer:
for _, ns := range d.pods {
for _, pod := range ns {
initial = append(initial, d.updateContainerTargetGroup(pod))
}
}
}
select {
case ch <- initial:
case <-ctx.Done():
return
}
update := make(chan *podEvent, 10)
go d.watchPods(update, ctx.Done(), d.retryInterval)
for {
tgs := []*config.TargetGroup{}
select {
case <-ctx.Done():
return
case e := <-update:
log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", e.EventType, e.Pod.ObjectMeta.Name)
d.updatePod(e.Pod, e.EventType)
switch d.kd.Conf.Role {
case config.KubernetesRoleContainer:
// Update the per-pod target group
tgs = append(tgs, d.updateContainerTargetGroup(e.Pod))
case config.KubernetesRolePod:
// Update the all pods target group
tgs = append(tgs, d.updatePodsTargetGroup())
}
}
if tgs == nil {
continue
}
for _, tg := range tgs {
select {
case ch <- []*config.TargetGroup{tg}:
case <-ctx.Done():
return
}
}
}
}
func (d *podDiscovery) getPods() (map[string]map[string]*Pod, string, error) {
res, err := d.kd.queryAPIServerPath(podsURL)
if err != nil {
return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status)
}
var pods PodList
if err := json.NewDecoder(res.Body).Decode(&pods); err != nil {
body, _ := ioutil.ReadAll(res.Body)
return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body))
}
podMap := map[string]map[string]*Pod{}
for idx, pod := range pods.Items {
if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok {
podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace)
podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx]
}
return podMap, pods.ResourceVersion, nil
}
func (d *podDiscovery) watchPods(events chan *podEvent, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
pods, resourceVersion, err := d.getPods()
if err != nil {
log.Errorf("Cannot initialize pods collection: %s", err)
return
}
d.mtx.Lock()
d.pods = pods
d.mtx.Unlock()
req, err := http.NewRequest("GET", podsURL, nil)
if err != nil {
log.Errorf("Cannot create pods request: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", resourceVersion)
req.URL.RawQuery = values.Encode()
res, err := d.kd.queryAPIServerReq(req)
if err != nil {
log.Errorf("Failed to watch pods: %s", err)
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch pods: %d", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event podEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Watch pods unexpectedly closed: %s", err)
return
}
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func (d *podDiscovery) updatePod(pod *Pod, eventType EventType) {
d.mtx.Lock()
defer d.mtx.Unlock()
switch eventType {
case Deleted:
if _, ok := d.pods[pod.ObjectMeta.Namespace]; ok {
delete(d.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name)
if len(d.pods[pod.ObjectMeta.Namespace]) == 0 {
delete(d.pods, pod.ObjectMeta.Namespace)
}
}
case Added, Modified:
if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok {
d.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{}
}
d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod
}
}
func (d *podDiscovery) updateContainerTargetGroup(pod *Pod) *config.TargetGroup {
d.mtx.RLock()
defer d.mtx.RUnlock()
tg := &config.TargetGroup{
Source: podSource(pod),
}
// If this pod doesn't exist, return an empty target group
if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok {
return tg
}
if _, ok := d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok {
return tg
}
tg.Labels = model.LabelSet{
roleLabel: model.LabelValue("container"),
}
tg.Targets = updatePodTargets(pod, true)
return tg
}
func (d *podDiscovery) updatePodsTargetGroup() *config.TargetGroup {
tg := &config.TargetGroup{
Source: podsTargetGroupName,
Labels: model.LabelSet{
roleLabel: model.LabelValue("pod"),
},
}
for _, namespace := range d.pods {
for _, pod := range namespace {
tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...)
}
}
return tg
}
func podSource(pod *Pod) string {
return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name
}
func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet {
var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers))
if pod.PodStatus.PodIP == "" {
log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name)
return targets
}
if pod.PodStatus.Phase != "Running" {
log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name)
return targets
}
ready := "unknown"
for _, cond := range pod.PodStatus.Conditions {
if strings.ToLower(cond.Type) == "ready" {
ready = strings.ToLower(cond.Status)
}
}
sort.Sort(ByContainerName(pod.PodSpec.Containers))
for _, container := range pod.PodSpec.Containers {
// Collect a list of TCP ports
// Sort by port number, ascending
// Product a target pointed at the first port
// Include a label containing all ports (portName=port,PortName=port,...,)
var tcpPorts []ContainerPort
var portLabel *bytes.Buffer = bytes.NewBufferString(",")
for _, port := range container.Ports {
if port.Protocol == "TCP" {
tcpPorts = append(tcpPorts, port)
}
}
if len(tcpPorts) == 0 {
log.Debugf("skipping container %s with no TCP ports", container.Name)
continue
}
sort.Sort(ByContainerPort(tcpPorts))
t := model.LabelSet{
model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))),
podNameLabel: model.LabelValue(pod.ObjectMeta.Name),
podAddressLabel: model.LabelValue(pod.PodStatus.PodIP),
podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace),
podContainerNameLabel: model.LabelValue(container.Name),
podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name),
podReadyLabel: model.LabelValue(ready),
}
for _, port := range tcpPorts {
portLabel.WriteString(port.Name)
portLabel.WriteString("=")
portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10))
portLabel.WriteString(",")
t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10))
}
t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String())
for k, v := range pod.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(podLabelPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
for k, v := range pod.ObjectMeta.Annotations {
labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k)
t[model.LabelName(labelName)] = model.LabelValue(v)
}
targets = append(targets, t)
if !allContainers {
break
}
}
if len(targets) == 0 {
log.Debugf("no targets for pod %s", pod.ObjectMeta.Name)
}
return targets
}
type ByContainerPort []ContainerPort
func (a ByContainerPort) Len() int { return len(a) }
func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort }
func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
type ByContainerName []Container
func (a ByContainerName) Len() int { return len(a) }
func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }