mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
discovery/kubernetes: extract service endpoint discovery
This extract discovery of services and their endpoints into its own type.
This commit is contained in:
parent
fdbe28df85
commit
e0f8caacd7
|
@ -14,7 +14,6 @@
|
||||||
package kubernetes
|
package kubernetes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
|
@ -29,7 +28,6 @@ import (
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/util/httputil"
|
"github.com/prometheus/prometheus/util/httputil"
|
||||||
"github.com/prometheus/prometheus/util/strutil"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -106,10 +104,7 @@ type Discovery struct {
|
||||||
|
|
||||||
apiServers []config.URL
|
apiServers []config.URL
|
||||||
apiServersMu sync.RWMutex
|
apiServersMu sync.RWMutex
|
||||||
services map[string]map[string]*Service
|
runDone chan struct{}
|
||||||
// map of namespace to (map of pod name to pod)
|
|
||||||
servicesMu sync.RWMutex
|
|
||||||
runDone chan struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize sets up the discovery for usage.
|
// Initialize sets up the discovery for usage.
|
||||||
|
@ -154,6 +149,16 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
sd := &serviceDiscovery{
|
||||||
|
retryInterval: time.Duration(kd.Conf.RetryInterval),
|
||||||
|
kd: kd,
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
sd.run(ctx, ch)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
// Send an initial full view.
|
// Send an initial full view.
|
||||||
// TODO(fabxc): this does not include all available services and service
|
// TODO(fabxc): this does not include all available services and service
|
||||||
// endpoints yet. Service endpoints were also missing in the previous Sources() method.
|
// endpoints yet. Service endpoints were also missing in the previous Sources() method.
|
||||||
|
@ -167,41 +172,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
retryInterval := time.Duration(kd.Conf.RetryInterval)
|
|
||||||
|
|
||||||
update := make(chan interface{}, 10)
|
|
||||||
|
|
||||||
go kd.startServiceWatch(update, ctx.Done(), retryInterval)
|
|
||||||
|
|
||||||
for {
|
|
||||||
tg := []*config.TargetGroup{}
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case event := <-update:
|
|
||||||
switch obj := event.(type) {
|
|
||||||
case *serviceEvent:
|
|
||||||
log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", obj.EventType, obj.Service.ObjectMeta.Name)
|
|
||||||
tg = append(tg, kd.updateService(obj.Service, obj.EventType))
|
|
||||||
case *endpointsEvent:
|
|
||||||
log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name)
|
|
||||||
tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if tg == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, t := range tg {
|
|
||||||
select {
|
|
||||||
case ch <- []*config.TargetGroup{t}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,9 +183,6 @@ func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) {
|
||||||
return kd.queryAPIServerReq(req)
|
return kd.queryAPIServerReq(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
type client struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error) {
|
func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error) {
|
||||||
// Lock in case we need to rotate API servers to request.
|
// Lock in case we need to rotate API servers to request.
|
||||||
kd.apiServersMu.Lock()
|
kd.apiServersMu.Lock()
|
||||||
|
@ -272,297 +239,6 @@ func (kd *Discovery) updateAPIServersTargetGroup() *config.TargetGroup {
|
||||||
return tg
|
return tg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kd *Discovery) getServices() (map[string]map[string]*Service, string, error) {
|
|
||||||
res, err := kd.queryAPIServerPath(servicesURL)
|
|
||||||
if err != nil {
|
|
||||||
// If we can't list services then we can't watch them. Assume this is a misconfiguration
|
|
||||||
// & return error.
|
|
||||||
return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err)
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
if res.StatusCode != http.StatusOK {
|
|
||||||
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status)
|
|
||||||
}
|
|
||||||
var services ServiceList
|
|
||||||
if err := json.NewDecoder(res.Body).Decode(&services); err != nil {
|
|
||||||
body, _ := ioutil.ReadAll(res.Body)
|
|
||||||
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body))
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceMap := map[string]map[string]*Service{}
|
|
||||||
for idx, service := range services.Items {
|
|
||||||
namespace, ok := serviceMap[service.ObjectMeta.Namespace]
|
|
||||||
if !ok {
|
|
||||||
namespace = map[string]*Service{}
|
|
||||||
serviceMap[service.ObjectMeta.Namespace] = namespace
|
|
||||||
}
|
|
||||||
namespace[service.ObjectMeta.Name] = &services.Items[idx]
|
|
||||||
}
|
|
||||||
|
|
||||||
return serviceMap, services.ResourceVersion, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// watchServices watches services as they come & go.
|
|
||||||
func (kd *Discovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
|
||||||
until(func() {
|
|
||||||
// We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted
|
|
||||||
// in Kubernetes while we couldn't connect - small chance of this, but worth dealing with.
|
|
||||||
existingServices := kd.services
|
|
||||||
|
|
||||||
// Reset the known services.
|
|
||||||
kd.servicesMu.Lock()
|
|
||||||
kd.services = map[string]map[string]*Service{}
|
|
||||||
kd.servicesMu.Unlock()
|
|
||||||
|
|
||||||
services, resourceVersion, err := kd.getServices()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Cannot initialize services collection: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now let's loop through the old services & see if they still exist in here
|
|
||||||
for oldNSName, oldNS := range existingServices {
|
|
||||||
if ns, ok := services[oldNSName]; !ok {
|
|
||||||
for _, service := range existingServices[oldNSName] {
|
|
||||||
events <- &serviceEvent{Deleted, service}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for oldServiceName, oldService := range oldNS {
|
|
||||||
if _, ok := ns[oldServiceName]; !ok {
|
|
||||||
events <- &serviceEvent{Deleted, oldService}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Discard the existing services map for GC.
|
|
||||||
existingServices = nil
|
|
||||||
|
|
||||||
for _, ns := range services {
|
|
||||||
for _, service := range ns {
|
|
||||||
events <- &serviceEvent{Added, service}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(2)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
kd.watchServices(resourceVersion, events, done)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
kd.watchServiceEndpoints(resourceVersion, events, done)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}, retryInterval, done)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kd *Discovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
|
|
||||||
req, err := http.NewRequest("GET", servicesURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Failed to create services request: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
values := req.URL.Query()
|
|
||||||
values.Add("watch", "true")
|
|
||||||
values.Add("resourceVersion", resourceVersion)
|
|
||||||
req.URL.RawQuery = values.Encode()
|
|
||||||
|
|
||||||
res, err := kd.queryAPIServerReq(req)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Failed to watch services: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
if res.StatusCode != http.StatusOK {
|
|
||||||
log.Errorf("Failed to watch services: %d", res.StatusCode)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
d := json.NewDecoder(res.Body)
|
|
||||||
|
|
||||||
for {
|
|
||||||
var event serviceEvent
|
|
||||||
if err := d.Decode(&event); err != nil {
|
|
||||||
log.Errorf("Watch services unexpectedly closed: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case events <- &event:
|
|
||||||
case <-done:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// watchServiceEndpoints watches service endpoints as they come & go.
|
|
||||||
func (kd *Discovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
|
|
||||||
req, err := http.NewRequest("GET", endpointsURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Failed to create service endpoints request: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
values := req.URL.Query()
|
|
||||||
values.Add("watch", "true")
|
|
||||||
values.Add("resourceVersion", resourceVersion)
|
|
||||||
req.URL.RawQuery = values.Encode()
|
|
||||||
|
|
||||||
res, err := kd.queryAPIServerReq(req)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Failed to watch service endpoints: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
if res.StatusCode != http.StatusOK {
|
|
||||||
log.Errorf("Failed to watch service endpoints: %d", res.StatusCode)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
d := json.NewDecoder(res.Body)
|
|
||||||
|
|
||||||
for {
|
|
||||||
var event endpointsEvent
|
|
||||||
if err := d.Decode(&event); err != nil {
|
|
||||||
log.Errorf("Watch service endpoints unexpectedly closed: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case events <- &event:
|
|
||||||
case <-done:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kd *Discovery) updateService(service *Service, eventType EventType) *config.TargetGroup {
|
|
||||||
kd.servicesMu.Lock()
|
|
||||||
defer kd.servicesMu.Unlock()
|
|
||||||
|
|
||||||
switch eventType {
|
|
||||||
case Deleted:
|
|
||||||
return kd.deleteService(service)
|
|
||||||
case Added, Modified:
|
|
||||||
return kd.addService(service)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kd *Discovery) deleteService(service *Service) *config.TargetGroup {
|
|
||||||
tg := &config.TargetGroup{Source: serviceSource(service)}
|
|
||||||
|
|
||||||
delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name)
|
|
||||||
if len(kd.services[service.ObjectMeta.Namespace]) == 0 {
|
|
||||||
delete(kd.services, service.ObjectMeta.Namespace)
|
|
||||||
}
|
|
||||||
|
|
||||||
return tg
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kd *Discovery) addService(service *Service) *config.TargetGroup {
|
|
||||||
namespace, ok := kd.services[service.ObjectMeta.Namespace]
|
|
||||||
if !ok {
|
|
||||||
namespace = map[string]*Service{}
|
|
||||||
kd.services[service.ObjectMeta.Namespace] = namespace
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace[service.ObjectMeta.Name] = service
|
|
||||||
endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name)
|
|
||||||
|
|
||||||
res, err := kd.queryAPIServerPath(endpointURL)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Error getting service endpoints: %s", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
if res.StatusCode != http.StatusOK {
|
|
||||||
log.Errorf("Failed to get service endpoints: %d", res.StatusCode)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var eps Endpoints
|
|
||||||
if err := json.NewDecoder(res.Body).Decode(&eps); err != nil {
|
|
||||||
log.Errorf("Error getting service endpoints: %s", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return kd.updateServiceTargetGroup(service, &eps)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kd *Discovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup {
|
|
||||||
tg := &config.TargetGroup{
|
|
||||||
Source: serviceSource(service),
|
|
||||||
Labels: model.LabelSet{
|
|
||||||
serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace),
|
|
||||||
serviceNameLabel: model.LabelValue(service.ObjectMeta.Name),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, v := range service.ObjectMeta.Labels {
|
|
||||||
labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k)
|
|
||||||
tg.Labels[model.LabelName(labelName)] = model.LabelValue(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, v := range service.ObjectMeta.Annotations {
|
|
||||||
labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k)
|
|
||||||
tg.Labels[model.LabelName(labelName)] = model.LabelValue(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc"
|
|
||||||
|
|
||||||
// Append the first TCP service port if one exists.
|
|
||||||
for _, port := range service.Spec.Ports {
|
|
||||||
if port.Protocol == ProtocolTCP {
|
|
||||||
serviceAddress += fmt.Sprintf(":%d", port.Port)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t := model.LabelSet{
|
|
||||||
model.AddressLabel: model.LabelValue(serviceAddress),
|
|
||||||
roleLabel: model.LabelValue("service"),
|
|
||||||
}
|
|
||||||
tg.Targets = append(tg.Targets, t)
|
|
||||||
|
|
||||||
// Now let's loop through the endpoints & add them to the target group with appropriate labels.
|
|
||||||
for _, ss := range eps.Subsets {
|
|
||||||
epPort := ss.Ports[0].Port
|
|
||||||
|
|
||||||
for _, addr := range ss.Addresses {
|
|
||||||
ipAddr := addr.IP
|
|
||||||
if len(ipAddr) == net.IPv6len {
|
|
||||||
ipAddr = "[" + ipAddr + "]"
|
|
||||||
}
|
|
||||||
address := fmt.Sprintf("%s:%d", ipAddr, epPort)
|
|
||||||
|
|
||||||
t := model.LabelSet{
|
|
||||||
model.AddressLabel: model.LabelValue(address),
|
|
||||||
roleLabel: model.LabelValue("endpoint"),
|
|
||||||
}
|
|
||||||
|
|
||||||
tg.Targets = append(tg.Targets, t)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return tg
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kd *Discovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup {
|
|
||||||
kd.servicesMu.Lock()
|
|
||||||
defer kd.servicesMu.Unlock()
|
|
||||||
|
|
||||||
serviceNamespace := endpoints.ObjectMeta.Namespace
|
|
||||||
serviceName := endpoints.ObjectMeta.Name
|
|
||||||
|
|
||||||
if service, ok := kd.services[serviceNamespace][serviceName]; ok {
|
|
||||||
return kd.updateServiceTargetGroup(service, endpoints)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) {
|
func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) {
|
||||||
bearerTokenFile := conf.BearerTokenFile
|
bearerTokenFile := conf.BearerTokenFile
|
||||||
caFile := conf.TLSConfig.CAFile
|
caFile := conf.TLSConfig.CAFile
|
||||||
|
@ -622,10 +298,6 @@ func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, err
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func serviceSource(service *Service) string {
|
|
||||||
return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
// Until loops until stop channel is closed, running f every period.
|
// Until loops until stop channel is closed, running f every period.
|
||||||
// f may not be invoked if stop channel is already closed.
|
// f may not be invoked if stop channel is already closed.
|
||||||
func until(f func(), period time.Duration, stopCh <-chan struct{}) {
|
func until(f func(), period time.Duration, stopCh <-chan struct{}) {
|
||||||
|
|
366
retrieval/discovery/kubernetes/service.go
Normal file
366
retrieval/discovery/kubernetes/service.go
Normal file
|
@ -0,0 +1,366 @@
|
||||||
|
// Copyright 2016 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kubernetes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/log"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/util/strutil"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type serviceDiscovery struct {
|
||||||
|
mtx sync.RWMutex
|
||||||
|
services map[string]map[string]*Service
|
||||||
|
retryInterval time.Duration
|
||||||
|
kd *Discovery
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *serviceDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||||
|
update := make(chan interface{}, 10)
|
||||||
|
go d.startServiceWatch(update, ctx.Done(), d.retryInterval)
|
||||||
|
|
||||||
|
for {
|
||||||
|
tgs := []*config.TargetGroup{}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case event := <-update:
|
||||||
|
switch e := event.(type) {
|
||||||
|
case *endpointsEvent:
|
||||||
|
log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", e.EventType, e.Endpoints.ObjectMeta.Name)
|
||||||
|
tgs = append(tgs, d.updateServiceEndpoints(e.Endpoints, e.EventType))
|
||||||
|
case *serviceEvent:
|
||||||
|
log.Debugf("k8s discovery received service event (EventType=%s, Service Name=%s)", e.EventType, e.Service.ObjectMeta.Name)
|
||||||
|
tgs = append(tgs, d.updateService(e.Service, e.EventType))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if tgs == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tg := range tgs {
|
||||||
|
select {
|
||||||
|
case ch <- []*config.TargetGroup{tg}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *serviceDiscovery) getServices() (map[string]map[string]*Service, string, error) {
|
||||||
|
res, err := d.kd.queryAPIServerPath(servicesURL)
|
||||||
|
if err != nil {
|
||||||
|
// If we can't list services then we can't watch them. Assume this is a misconfiguration
|
||||||
|
// & return error.
|
||||||
|
return nil, "", fmt.Errorf("unable to list Kubernetes services: %s", err)
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response: %d %s", res.StatusCode, res.Status)
|
||||||
|
}
|
||||||
|
var services ServiceList
|
||||||
|
if err := json.NewDecoder(res.Body).Decode(&services); err != nil {
|
||||||
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
|
return nil, "", fmt.Errorf("unable to list Kubernetes services; unexpected response body: %s", string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceMap := map[string]map[string]*Service{}
|
||||||
|
for idx, service := range services.Items {
|
||||||
|
namespace, ok := serviceMap[service.ObjectMeta.Namespace]
|
||||||
|
if !ok {
|
||||||
|
namespace = map[string]*Service{}
|
||||||
|
serviceMap[service.ObjectMeta.Namespace] = namespace
|
||||||
|
}
|
||||||
|
namespace[service.ObjectMeta.Name] = &services.Items[idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
return serviceMap, services.ResourceVersion, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// watchServices watches services as they come & go.
|
||||||
|
func (d *serviceDiscovery) startServiceWatch(events chan<- interface{}, done <-chan struct{}, retryInterval time.Duration) {
|
||||||
|
until(func() {
|
||||||
|
// We use separate target groups for each discovered service so we'll need to clean up any if they've been deleted
|
||||||
|
// in Kubernetes while we couldn't connect - small chance of this, but worth dealing with.
|
||||||
|
d.mtx.Lock()
|
||||||
|
existingServices := d.services
|
||||||
|
|
||||||
|
// Reset the known services.
|
||||||
|
d.services = map[string]map[string]*Service{}
|
||||||
|
d.mtx.Unlock()
|
||||||
|
|
||||||
|
services, resourceVersion, err := d.getServices()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Cannot initialize services collection: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now let's loop through the old services & see if they still exist in here
|
||||||
|
for oldNSName, oldNS := range existingServices {
|
||||||
|
if ns, ok := services[oldNSName]; !ok {
|
||||||
|
for _, service := range existingServices[oldNSName] {
|
||||||
|
events <- &serviceEvent{Deleted, service}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for oldServiceName, oldService := range oldNS {
|
||||||
|
if _, ok := ns[oldServiceName]; !ok {
|
||||||
|
events <- &serviceEvent{Deleted, oldService}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Discard the existing services map for GC.
|
||||||
|
existingServices = nil
|
||||||
|
|
||||||
|
for _, ns := range services {
|
||||||
|
for _, service := range ns {
|
||||||
|
events <- &serviceEvent{Added, service}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
d.watchServices(resourceVersion, events, done)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
d.watchServiceEndpoints(resourceVersion, events, done)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}, retryInterval, done)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *serviceDiscovery) watchServices(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
|
||||||
|
req, err := http.NewRequest("GET", servicesURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to create services request: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
values := req.URL.Query()
|
||||||
|
values.Add("watch", "true")
|
||||||
|
values.Add("resourceVersion", resourceVersion)
|
||||||
|
req.URL.RawQuery = values.Encode()
|
||||||
|
|
||||||
|
res, err := d.kd.queryAPIServerReq(req)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to watch services: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
log.Errorf("Failed to watch services: %d", res.StatusCode)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dec := json.NewDecoder(res.Body)
|
||||||
|
|
||||||
|
for {
|
||||||
|
var event serviceEvent
|
||||||
|
if err := dec.Decode(&event); err != nil {
|
||||||
|
log.Errorf("Watch services unexpectedly closed: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case events <- &event:
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// watchServiceEndpoints watches service endpoints as they come & go.
|
||||||
|
func (d *serviceDiscovery) watchServiceEndpoints(resourceVersion string, events chan<- interface{}, done <-chan struct{}) {
|
||||||
|
req, err := http.NewRequest("GET", endpointsURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to create service endpoints request: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
values := req.URL.Query()
|
||||||
|
values.Add("watch", "true")
|
||||||
|
values.Add("resourceVersion", resourceVersion)
|
||||||
|
req.URL.RawQuery = values.Encode()
|
||||||
|
|
||||||
|
res, err := d.kd.queryAPIServerReq(req)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to watch service endpoints: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
log.Errorf("Failed to watch service endpoints: %d", res.StatusCode)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dec := json.NewDecoder(res.Body)
|
||||||
|
|
||||||
|
for {
|
||||||
|
var event endpointsEvent
|
||||||
|
if err := dec.Decode(&event); err != nil {
|
||||||
|
log.Errorf("Watch service endpoints unexpectedly closed: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case events <- &event:
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *serviceDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup {
|
||||||
|
d.mtx.Lock()
|
||||||
|
defer d.mtx.Unlock()
|
||||||
|
|
||||||
|
switch eventType {
|
||||||
|
case Deleted:
|
||||||
|
return d.deleteService(service)
|
||||||
|
case Added, Modified:
|
||||||
|
return d.addService(service)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *serviceDiscovery) deleteService(service *Service) *config.TargetGroup {
|
||||||
|
tg := &config.TargetGroup{Source: serviceSource(service)}
|
||||||
|
|
||||||
|
delete(d.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name)
|
||||||
|
if len(d.services[service.ObjectMeta.Namespace]) == 0 {
|
||||||
|
delete(d.services, service.ObjectMeta.Namespace)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *serviceDiscovery) addService(service *Service) *config.TargetGroup {
|
||||||
|
namespace, ok := d.services[service.ObjectMeta.Namespace]
|
||||||
|
if !ok {
|
||||||
|
namespace = map[string]*Service{}
|
||||||
|
d.services[service.ObjectMeta.Namespace] = namespace
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace[service.ObjectMeta.Name] = service
|
||||||
|
endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name)
|
||||||
|
|
||||||
|
res, err := d.kd.queryAPIServerPath(endpointURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error getting service endpoints: %s", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
log.Errorf("Failed to get service endpoints: %d", res.StatusCode)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var eps Endpoints
|
||||||
|
if err := json.NewDecoder(res.Body).Decode(&eps); err != nil {
|
||||||
|
log.Errorf("Error getting service endpoints: %s", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return d.updateServiceTargetGroup(service, &eps)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *serviceDiscovery) updateServiceTargetGroup(service *Service, eps *Endpoints) *config.TargetGroup {
|
||||||
|
tg := &config.TargetGroup{
|
||||||
|
Source: serviceSource(service),
|
||||||
|
Labels: model.LabelSet{
|
||||||
|
serviceNamespaceLabel: model.LabelValue(service.ObjectMeta.Namespace),
|
||||||
|
serviceNameLabel: model.LabelValue(service.ObjectMeta.Name),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range service.ObjectMeta.Labels {
|
||||||
|
labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k)
|
||||||
|
tg.Labels[model.LabelName(labelName)] = model.LabelValue(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range service.ObjectMeta.Annotations {
|
||||||
|
labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k)
|
||||||
|
tg.Labels[model.LabelName(labelName)] = model.LabelValue(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceAddress := service.ObjectMeta.Name + "." + service.ObjectMeta.Namespace + ".svc"
|
||||||
|
|
||||||
|
// Append the first TCP service port if one exists.
|
||||||
|
for _, port := range service.Spec.Ports {
|
||||||
|
if port.Protocol == ProtocolTCP {
|
||||||
|
serviceAddress += fmt.Sprintf(":%d", port.Port)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t := model.LabelSet{
|
||||||
|
model.AddressLabel: model.LabelValue(serviceAddress),
|
||||||
|
roleLabel: model.LabelValue("service"),
|
||||||
|
}
|
||||||
|
tg.Targets = append(tg.Targets, t)
|
||||||
|
|
||||||
|
// Now let's loop through the endpoints & add them to the target group with appropriate labels.
|
||||||
|
for _, ss := range eps.Subsets {
|
||||||
|
epPort := ss.Ports[0].Port
|
||||||
|
|
||||||
|
for _, addr := range ss.Addresses {
|
||||||
|
ipAddr := addr.IP
|
||||||
|
if len(ipAddr) == net.IPv6len {
|
||||||
|
ipAddr = "[" + ipAddr + "]"
|
||||||
|
}
|
||||||
|
address := fmt.Sprintf("%s:%d", ipAddr, epPort)
|
||||||
|
|
||||||
|
t := model.LabelSet{
|
||||||
|
model.AddressLabel: model.LabelValue(address),
|
||||||
|
roleLabel: model.LabelValue("endpoint"),
|
||||||
|
}
|
||||||
|
|
||||||
|
tg.Targets = append(tg.Targets, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *serviceDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup {
|
||||||
|
d.mtx.Lock()
|
||||||
|
defer d.mtx.Unlock()
|
||||||
|
|
||||||
|
serviceNamespace := endpoints.ObjectMeta.Namespace
|
||||||
|
serviceName := endpoints.ObjectMeta.Name
|
||||||
|
|
||||||
|
if service, ok := d.services[serviceNamespace][serviceName]; ok {
|
||||||
|
return d.updateServiceTargetGroup(service, endpoints)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func serviceSource(service *Service) string {
|
||||||
|
return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name
|
||||||
|
}
|
Loading…
Reference in a new issue