mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Merge pull request #6838 from brancz/endpointslice
discovery/kubernetes: EndpointSlice discovery
This commit is contained in:
commit
06e2c2f804
407
discovery/kubernetes/endpointslice.go
Normal file
407
discovery/kubernetes/endpointslice.go
Normal file
|
@ -0,0 +1,407 @@
|
|||
// Copyright 2020 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/common/model"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
disv1beta1 "k8s.io/api/discovery/v1beta1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
)
|
||||
|
||||
var (
|
||||
epslAddCount = eventCount.WithLabelValues("endpointslice", "add")
|
||||
epslUpdateCount = eventCount.WithLabelValues("endpointslice", "update")
|
||||
epslDeleteCount = eventCount.WithLabelValues("endpointslice", "delete")
|
||||
)
|
||||
|
||||
// EndpointSlice discovers new endpoint targets.
|
||||
type EndpointSlice struct {
|
||||
logger log.Logger
|
||||
|
||||
endpointSliceInf cache.SharedInformer
|
||||
serviceInf cache.SharedInformer
|
||||
podInf cache.SharedInformer
|
||||
|
||||
podStore cache.Store
|
||||
endpointSliceStore cache.Store
|
||||
serviceStore cache.Store
|
||||
|
||||
queue *workqueue.Type
|
||||
}
|
||||
|
||||
// NewEndpointSlice returns a new endpointslice discovery.
|
||||
func NewEndpointSlice(l log.Logger, svc, eps, pod cache.SharedInformer) *EndpointSlice {
|
||||
if l == nil {
|
||||
l = log.NewNopLogger()
|
||||
}
|
||||
e := &EndpointSlice{
|
||||
logger: l,
|
||||
endpointSliceInf: eps,
|
||||
endpointSliceStore: eps.GetStore(),
|
||||
serviceInf: svc,
|
||||
serviceStore: svc.GetStore(),
|
||||
podInf: pod,
|
||||
podStore: pod.GetStore(),
|
||||
queue: workqueue.NewNamed("endpointSlice"),
|
||||
}
|
||||
|
||||
e.endpointSliceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(o interface{}) {
|
||||
epslAddCount.Inc()
|
||||
e.enqueue(o)
|
||||
},
|
||||
UpdateFunc: func(_, o interface{}) {
|
||||
epslUpdateCount.Inc()
|
||||
e.enqueue(o)
|
||||
},
|
||||
DeleteFunc: func(o interface{}) {
|
||||
epslDeleteCount.Inc()
|
||||
e.enqueue(o)
|
||||
},
|
||||
})
|
||||
|
||||
serviceUpdate := func(o interface{}) {
|
||||
svc, err := convertToService(o)
|
||||
if err != nil {
|
||||
level.Error(e.logger).Log("msg", "converting to Service object failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(brancz): use cache.Indexer to index endpoints by
|
||||
// disv1beta1.LabelServiceName so this operation doesn't have to
|
||||
// iterate over all endpoint objects.
|
||||
for _, obj := range e.endpointSliceStore.List() {
|
||||
ep := obj.(*disv1beta1.EndpointSlice)
|
||||
if lv, exists := ep.Labels[disv1beta1.LabelServiceName]; exists && lv == svc.Name {
|
||||
e.enqueue(ep)
|
||||
}
|
||||
}
|
||||
}
|
||||
e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(o interface{}) {
|
||||
svcAddCount.Inc()
|
||||
serviceUpdate(o)
|
||||
},
|
||||
UpdateFunc: func(_, o interface{}) {
|
||||
svcUpdateCount.Inc()
|
||||
serviceUpdate(o)
|
||||
},
|
||||
DeleteFunc: func(o interface{}) {
|
||||
svcDeleteCount.Inc()
|
||||
serviceUpdate(o)
|
||||
},
|
||||
})
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *EndpointSlice) enqueue(obj interface{}) {
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
e.queue.Add(key)
|
||||
}
|
||||
|
||||
// Run implements the Discoverer interface.
|
||||
func (e *EndpointSlice) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
||||
defer e.queue.ShutDown()
|
||||
|
||||
if !cache.WaitForCacheSync(ctx.Done(), e.endpointSliceInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) {
|
||||
if ctx.Err() != context.Canceled {
|
||||
level.Error(e.logger).Log("msg", "endpointslice informer unable to sync cache")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
for e.process(ctx, ch) {
|
||||
}
|
||||
}()
|
||||
|
||||
// Block until the target provider is explicitly canceled.
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (e *EndpointSlice) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool {
|
||||
keyObj, quit := e.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer e.queue.Done(keyObj)
|
||||
key := keyObj.(string)
|
||||
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
level.Error(e.logger).Log("msg", "splitting key failed", "key", key)
|
||||
return true
|
||||
}
|
||||
|
||||
o, exists, err := e.endpointSliceStore.GetByKey(key)
|
||||
if err != nil {
|
||||
level.Error(e.logger).Log("msg", "getting object from store failed", "key", key)
|
||||
return true
|
||||
}
|
||||
if !exists {
|
||||
send(ctx, ch, &targetgroup.Group{Source: endpointSliceSourceFromNamespaceAndName(namespace, name)})
|
||||
return true
|
||||
}
|
||||
eps, err := convertToEndpointSlice(o)
|
||||
if err != nil {
|
||||
level.Error(e.logger).Log("msg", "converting to EndpointSlice object failed", "err", err)
|
||||
return true
|
||||
}
|
||||
send(ctx, ch, e.buildEndpointSlice(eps))
|
||||
return true
|
||||
}
|
||||
|
||||
func convertToEndpointSlice(o interface{}) (*disv1beta1.EndpointSlice, error) {
|
||||
endpoints, ok := o.(*disv1beta1.EndpointSlice)
|
||||
if ok {
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("received unexpected object: %v", o)
|
||||
}
|
||||
|
||||
func endpointSliceSource(ep *disv1beta1.EndpointSlice) string {
|
||||
return endpointSliceSourceFromNamespaceAndName(ep.Namespace, ep.Name)
|
||||
}
|
||||
|
||||
func endpointSliceSourceFromNamespaceAndName(namespace, name string) string {
|
||||
return "endpointslice/" + namespace + "/" + name
|
||||
}
|
||||
|
||||
const (
|
||||
endpointSliceNameLabel = metaLabelPrefix + "endpointslice_name"
|
||||
endpointSliceAddressTypeLabel = metaLabelPrefix + "endpointslice_address_type"
|
||||
endpointSlicePortNameLabel = metaLabelPrefix + "endpointslice_port_name"
|
||||
endpointSlicePortProtocolLabel = metaLabelPrefix + "endpointslice_port_protocol"
|
||||
endpointSlicePortLabel = metaLabelPrefix + "endpointslice_port"
|
||||
endpointSlicePortAppProtocol = metaLabelPrefix + "endpointslice_port_app_protocol"
|
||||
endpointSliceEndpointConditionsReadyLabel = metaLabelPrefix + "endpointslice_endpoint_conditions_ready"
|
||||
endpointSliceEndpointHostnameLabel = metaLabelPrefix + "endpointslice_endpoint_hostname"
|
||||
endpointSliceAddressTargetKindLabel = metaLabelPrefix + "endpointslice_address_target_kind"
|
||||
endpointSliceAddressTargetNameLabel = metaLabelPrefix + "endpointslice_address_target_name"
|
||||
endpointSliceEndpointTopologyLabelPrefix = metaLabelPrefix + "endpointslice_endpoint_topology_"
|
||||
endpointSliceEndpointTopologyLabelPresentPrefix = metaLabelPrefix + "endpointslice_endpoint_topology_present_"
|
||||
)
|
||||
|
||||
func (e *EndpointSlice) buildEndpointSlice(eps *disv1beta1.EndpointSlice) *targetgroup.Group {
|
||||
tg := &targetgroup.Group{
|
||||
Source: endpointSliceSource(eps),
|
||||
}
|
||||
tg.Labels = model.LabelSet{
|
||||
namespaceLabel: lv(eps.Namespace),
|
||||
endpointSliceNameLabel: lv(eps.Name),
|
||||
endpointSliceAddressTypeLabel: lv(string(eps.AddressType)),
|
||||
}
|
||||
e.addServiceLabels(eps, tg)
|
||||
|
||||
type podEntry struct {
|
||||
pod *apiv1.Pod
|
||||
servicePorts []disv1beta1.EndpointPort
|
||||
}
|
||||
seenPods := map[string]*podEntry{}
|
||||
|
||||
add := func(addr string, ep disv1beta1.Endpoint, port disv1beta1.EndpointPort) {
|
||||
a := addr
|
||||
if port.Port != nil {
|
||||
a = net.JoinHostPort(addr, strconv.FormatUint(uint64(*port.Port), 10))
|
||||
}
|
||||
|
||||
target := model.LabelSet{
|
||||
model.AddressLabel: lv(a),
|
||||
}
|
||||
|
||||
if port.Name != nil {
|
||||
target[endpointSlicePortNameLabel] = lv(*port.Name)
|
||||
}
|
||||
|
||||
if port.Protocol != nil {
|
||||
target[endpointSlicePortProtocolLabel] = lv(string(*port.Protocol))
|
||||
}
|
||||
|
||||
if port.Port != nil {
|
||||
target[endpointSlicePortLabel] = lv(strconv.FormatUint(uint64(*port.Port), 10))
|
||||
}
|
||||
|
||||
if port.AppProtocol != nil {
|
||||
target[endpointSlicePortAppProtocol] = lv(*port.AppProtocol)
|
||||
}
|
||||
|
||||
if ep.Conditions.Ready != nil {
|
||||
target[endpointSliceEndpointConditionsReadyLabel] = lv(strconv.FormatBool(*ep.Conditions.Ready))
|
||||
}
|
||||
|
||||
if ep.Hostname != nil {
|
||||
target[endpointSliceEndpointHostnameLabel] = lv(*ep.Hostname)
|
||||
}
|
||||
|
||||
if ep.TargetRef != nil {
|
||||
target[model.LabelName(endpointSliceAddressTargetKindLabel)] = lv(ep.TargetRef.Kind)
|
||||
target[model.LabelName(endpointSliceAddressTargetNameLabel)] = lv(ep.TargetRef.Name)
|
||||
}
|
||||
|
||||
for k, v := range ep.Topology {
|
||||
ln := strutil.SanitizeLabelName(k)
|
||||
target[model.LabelName(endpointSliceEndpointTopologyLabelPrefix+ln)] = lv(v)
|
||||
target[model.LabelName(endpointSliceEndpointTopologyLabelPresentPrefix+ln)] = presentValue
|
||||
}
|
||||
|
||||
pod := e.resolvePodRef(ep.TargetRef)
|
||||
if pod == nil {
|
||||
// This target is not a Pod, so don't continue with Pod specific logic.
|
||||
tg.Targets = append(tg.Targets, target)
|
||||
return
|
||||
}
|
||||
s := pod.Namespace + "/" + pod.Name
|
||||
|
||||
sp, ok := seenPods[s]
|
||||
if !ok {
|
||||
sp = &podEntry{pod: pod}
|
||||
seenPods[s] = sp
|
||||
}
|
||||
|
||||
// Attach standard pod labels.
|
||||
target = target.Merge(podLabels(pod))
|
||||
|
||||
// Attach potential container port labels matching the endpoint port.
|
||||
for _, c := range pod.Spec.Containers {
|
||||
for _, cport := range c.Ports {
|
||||
if port.Port == nil {
|
||||
continue
|
||||
}
|
||||
if *port.Port == cport.ContainerPort {
|
||||
ports := strconv.FormatUint(uint64(*port.Port), 10)
|
||||
|
||||
target[podContainerNameLabel] = lv(c.Name)
|
||||
target[podContainerPortNameLabel] = lv(cport.Name)
|
||||
target[podContainerPortNumberLabel] = lv(ports)
|
||||
target[podContainerPortProtocolLabel] = lv(string(cport.Protocol))
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add service port so we know that we have already generated a target
|
||||
// for it.
|
||||
sp.servicePorts = append(sp.servicePorts, port)
|
||||
tg.Targets = append(tg.Targets, target)
|
||||
}
|
||||
|
||||
for _, ep := range eps.Endpoints {
|
||||
for _, port := range eps.Ports {
|
||||
for _, addr := range ep.Addresses {
|
||||
add(addr, ep, port)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// For all seen pods, check all container ports. If they were not covered
|
||||
// by one of the service endpoints, generate targets for them.
|
||||
for _, pe := range seenPods {
|
||||
for _, c := range pe.pod.Spec.Containers {
|
||||
for _, cport := range c.Ports {
|
||||
hasSeenPort := func() bool {
|
||||
for _, eport := range pe.servicePorts {
|
||||
if eport.Port == nil {
|
||||
continue
|
||||
}
|
||||
if cport.ContainerPort == *eport.Port {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
if hasSeenPort() {
|
||||
continue
|
||||
}
|
||||
|
||||
a := net.JoinHostPort(pe.pod.Status.PodIP, strconv.FormatUint(uint64(cport.ContainerPort), 10))
|
||||
ports := strconv.FormatUint(uint64(cport.ContainerPort), 10)
|
||||
|
||||
target := model.LabelSet{
|
||||
model.AddressLabel: lv(a),
|
||||
podContainerNameLabel: lv(c.Name),
|
||||
podContainerPortNameLabel: lv(cport.Name),
|
||||
podContainerPortNumberLabel: lv(ports),
|
||||
podContainerPortProtocolLabel: lv(string(cport.Protocol)),
|
||||
}
|
||||
tg.Targets = append(tg.Targets, target.Merge(podLabels(pe.pod)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return tg
|
||||
}
|
||||
|
||||
func (e *EndpointSlice) resolvePodRef(ref *apiv1.ObjectReference) *apiv1.Pod {
|
||||
if ref == nil || ref.Kind != "Pod" {
|
||||
return nil
|
||||
}
|
||||
p := &apiv1.Pod{}
|
||||
p.Namespace = ref.Namespace
|
||||
p.Name = ref.Name
|
||||
|
||||
obj, exists, err := e.podStore.Get(p)
|
||||
if err != nil {
|
||||
level.Error(e.logger).Log("msg", "resolving pod ref failed", "err", err)
|
||||
return nil
|
||||
}
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
return obj.(*apiv1.Pod)
|
||||
}
|
||||
|
||||
func (e *EndpointSlice) addServiceLabels(eps *disv1beta1.EndpointSlice, tg *targetgroup.Group) {
|
||||
var (
|
||||
svc = &apiv1.Service{}
|
||||
found bool
|
||||
)
|
||||
svc.Namespace = eps.Namespace
|
||||
|
||||
// Every EndpointSlice object has the Service they belong to in the
|
||||
// kubernetes.io/service-name label.
|
||||
svc.Name, found = eps.Labels[disv1beta1.LabelServiceName]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
|
||||
obj, exists, err := e.serviceStore.Get(svc)
|
||||
if err != nil {
|
||||
level.Error(e.logger).Log("msg", "retrieving service failed", "err", err)
|
||||
return
|
||||
}
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
svc = obj.(*apiv1.Service)
|
||||
|
||||
tg.Labels = tg.Labels.Merge(serviceLabels(svc))
|
||||
}
|
631
discovery/kubernetes/endpointslice_test.go
Normal file
631
discovery/kubernetes/endpointslice_test.go
Normal file
|
@ -0,0 +1,631 @@
|
|||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
disv1beta1 "k8s.io/api/discovery/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
func strptr(str string) *string {
|
||||
return &str
|
||||
}
|
||||
|
||||
func boolptr(b bool) *bool {
|
||||
return &b
|
||||
}
|
||||
|
||||
func int32ptr(i int32) *int32 {
|
||||
return &i
|
||||
}
|
||||
|
||||
func protocolptr(p v1.Protocol) *v1.Protocol {
|
||||
return &p
|
||||
}
|
||||
|
||||
func makeEndpointSlice() *disv1beta1.EndpointSlice {
|
||||
return &disv1beta1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testendpoints",
|
||||
Namespace: "default",
|
||||
Labels: map[string]string{
|
||||
disv1beta1.LabelServiceName: "testendpoints",
|
||||
},
|
||||
},
|
||||
AddressType: disv1beta1.AddressTypeIPv4,
|
||||
Ports: []disv1beta1.EndpointPort{
|
||||
{
|
||||
Name: strptr("testport"),
|
||||
Port: int32ptr(9000),
|
||||
Protocol: protocolptr(v1.ProtocolTCP),
|
||||
},
|
||||
},
|
||||
Endpoints: []disv1beta1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Hostname: strptr("testendpoint1"),
|
||||
}, {
|
||||
Addresses: []string{"2.3.4.5"},
|
||||
Conditions: disv1beta1.EndpointConditions{
|
||||
Ready: boolptr(true),
|
||||
},
|
||||
}, {
|
||||
Addresses: []string{"3.4.5.6"},
|
||||
Conditions: disv1beta1.EndpointConditions{
|
||||
Ready: boolptr(false),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointSliceDiscoveryBeforeRun(t *testing.T) {
|
||||
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{})
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
beforeRun: func() {
|
||||
obj := makeEndpointSlice()
|
||||
c.DiscoveryV1beta1().EndpointSlices(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
},
|
||||
expectedMaxItems: 1,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testendpoints": {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "1.2.3.4:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
{
|
||||
"__address__": "2.3.4.5:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
{
|
||||
"__address__": "3.4.5.6:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_endpointslice_address_type": "IPv4",
|
||||
"__meta_kubernetes_namespace": "default",
|
||||
"__meta_kubernetes_endpointslice_name": "testendpoints",
|
||||
},
|
||||
Source: "endpointslice/default/testendpoints",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestEndpointSliceDiscoveryAdd(t *testing.T) {
|
||||
obj := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testpod",
|
||||
Namespace: "default",
|
||||
UID: types.UID("deadbeef"),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "testnode",
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "c1",
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
Name: "mainport",
|
||||
ContainerPort: 9000,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "c2",
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
Name: "sideport",
|
||||
ContainerPort: 9001,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
HostIP: "2.3.4.5",
|
||||
PodIP: "1.2.3.4",
|
||||
},
|
||||
}
|
||||
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, obj)
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
obj := &disv1beta1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testendpoints",
|
||||
Namespace: "default",
|
||||
},
|
||||
AddressType: disv1beta1.AddressTypeIPv4,
|
||||
Ports: []disv1beta1.EndpointPort{
|
||||
{
|
||||
Name: strptr("testport"),
|
||||
Port: int32ptr(9000),
|
||||
Protocol: protocolptr(v1.ProtocolTCP),
|
||||
},
|
||||
},
|
||||
Endpoints: []disv1beta1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"4.3.2.1"},
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Name: "testpod",
|
||||
Namespace: "default",
|
||||
},
|
||||
Conditions: disv1beta1.EndpointConditions{
|
||||
Ready: boolptr(false),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
c.DiscoveryV1beta1().EndpointSlices(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
},
|
||||
expectedMaxItems: 1,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testendpoints": {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "4.3.2.1:9000",
|
||||
"__meta_kubernetes_endpointslice_address_target_kind": "Pod",
|
||||
"__meta_kubernetes_endpointslice_address_target_name": "testpod",
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
"__meta_kubernetes_pod_container_name": "c1",
|
||||
"__meta_kubernetes_pod_container_port_name": "mainport",
|
||||
"__meta_kubernetes_pod_container_port_number": "9000",
|
||||
"__meta_kubernetes_pod_container_port_protocol": "TCP",
|
||||
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
|
||||
"__meta_kubernetes_pod_ip": "1.2.3.4",
|
||||
"__meta_kubernetes_pod_name": "testpod",
|
||||
"__meta_kubernetes_pod_node_name": "testnode",
|
||||
"__meta_kubernetes_pod_phase": "",
|
||||
"__meta_kubernetes_pod_ready": "unknown",
|
||||
"__meta_kubernetes_pod_uid": "deadbeef",
|
||||
},
|
||||
{
|
||||
"__address__": "1.2.3.4:9001",
|
||||
"__meta_kubernetes_pod_container_name": "c2",
|
||||
"__meta_kubernetes_pod_container_port_name": "sideport",
|
||||
"__meta_kubernetes_pod_container_port_number": "9001",
|
||||
"__meta_kubernetes_pod_container_port_protocol": "TCP",
|
||||
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
|
||||
"__meta_kubernetes_pod_ip": "1.2.3.4",
|
||||
"__meta_kubernetes_pod_name": "testpod",
|
||||
"__meta_kubernetes_pod_node_name": "testnode",
|
||||
"__meta_kubernetes_pod_phase": "",
|
||||
"__meta_kubernetes_pod_ready": "unknown",
|
||||
"__meta_kubernetes_pod_uid": "deadbeef",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_endpointslice_address_type": "IPv4",
|
||||
"__meta_kubernetes_endpointslice_name": "testendpoints",
|
||||
"__meta_kubernetes_namespace": "default",
|
||||
},
|
||||
Source: "endpointslice/default/testendpoints",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestEndpointSliceDiscoveryDelete(t *testing.T) {
|
||||
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, makeEndpointSlice())
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
obj := makeEndpointSlice()
|
||||
c.DiscoveryV1beta1().EndpointSlices(obj.Namespace).Delete(context.Background(), obj.Name, metav1.DeleteOptions{})
|
||||
},
|
||||
expectedMaxItems: 2,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testendpoints": {
|
||||
Source: "endpointslice/default/testendpoints",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestEndpointSliceDiscoveryUpdate(t *testing.T) {
|
||||
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, makeEndpointSlice())
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
obj := &disv1beta1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testendpoints",
|
||||
Namespace: "default",
|
||||
},
|
||||
AddressType: disv1beta1.AddressTypeIPv4,
|
||||
Ports: []disv1beta1.EndpointPort{
|
||||
{
|
||||
Name: strptr("testport"),
|
||||
Port: int32ptr(9000),
|
||||
Protocol: protocolptr(v1.ProtocolTCP),
|
||||
},
|
||||
},
|
||||
Endpoints: []disv1beta1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Hostname: strptr("testendpoint1"),
|
||||
}, {
|
||||
Addresses: []string{"2.3.4.5"},
|
||||
Conditions: disv1beta1.EndpointConditions{
|
||||
Ready: boolptr(true),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
c.DiscoveryV1beta1().EndpointSlices(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{})
|
||||
},
|
||||
expectedMaxItems: 2,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testendpoints": {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "1.2.3.4:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
{
|
||||
"__address__": "2.3.4.5:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_endpointslice_address_type": "IPv4",
|
||||
"__meta_kubernetes_endpointslice_name": "testendpoints",
|
||||
"__meta_kubernetes_namespace": "default",
|
||||
},
|
||||
Source: "endpointslice/default/testendpoints",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestEndpointSliceDiscoveryEmptyEndpoints(t *testing.T) {
|
||||
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, makeEndpointSlice())
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
afterStart: func() {
|
||||
obj := &disv1beta1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testendpoints",
|
||||
Namespace: "default",
|
||||
},
|
||||
AddressType: disv1beta1.AddressTypeIPv4,
|
||||
Ports: []disv1beta1.EndpointPort{
|
||||
{
|
||||
Name: strptr("testport"),
|
||||
Port: int32ptr(9000),
|
||||
Protocol: protocolptr(v1.ProtocolTCP),
|
||||
},
|
||||
},
|
||||
Endpoints: []disv1beta1.Endpoint{},
|
||||
}
|
||||
c.DiscoveryV1beta1().EndpointSlices(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{})
|
||||
},
|
||||
expectedMaxItems: 2,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testendpoints": {
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_endpointslice_address_type": "IPv4",
|
||||
"__meta_kubernetes_endpointslice_name": "testendpoints",
|
||||
"__meta_kubernetes_namespace": "default",
|
||||
},
|
||||
Source: "endpointslice/default/testendpoints",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestEndpointSliceDiscoveryWithService(t *testing.T) {
|
||||
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, makeEndpointSlice())
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
beforeRun: func() {
|
||||
obj := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testendpoints",
|
||||
Namespace: "default",
|
||||
Labels: map[string]string{
|
||||
"app/name": "test",
|
||||
},
|
||||
},
|
||||
}
|
||||
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
},
|
||||
expectedMaxItems: 1,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testendpoints": {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "1.2.3.4:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
{
|
||||
"__address__": "2.3.4.5:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
{
|
||||
"__address__": "3.4.5.6:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_endpointslice_address_type": "IPv4",
|
||||
"__meta_kubernetes_endpointslice_name": "testendpoints",
|
||||
"__meta_kubernetes_namespace": "default",
|
||||
"__meta_kubernetes_service_label_app_name": "test",
|
||||
"__meta_kubernetes_service_labelpresent_app_name": "true",
|
||||
"__meta_kubernetes_service_name": "testendpoints",
|
||||
},
|
||||
Source: "endpointslice/default/testendpoints",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestEndpointSliceDiscoveryWithServiceUpdate(t *testing.T) {
|
||||
n, c := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{}, makeEndpointSlice())
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
beforeRun: func() {
|
||||
obj := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testendpoints",
|
||||
Namespace: "default",
|
||||
Labels: map[string]string{
|
||||
"app/name": "test",
|
||||
},
|
||||
},
|
||||
}
|
||||
c.CoreV1().Services(obj.Namespace).Create(context.Background(), obj, metav1.CreateOptions{})
|
||||
},
|
||||
afterStart: func() {
|
||||
obj := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testendpoints",
|
||||
Namespace: "default",
|
||||
Labels: map[string]string{
|
||||
"app/name": "svc",
|
||||
"component": "testing",
|
||||
},
|
||||
},
|
||||
}
|
||||
c.CoreV1().Services(obj.Namespace).Update(context.Background(), obj, metav1.UpdateOptions{})
|
||||
},
|
||||
expectedMaxItems: 2,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testendpoints": {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "1.2.3.4:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
{
|
||||
"__address__": "2.3.4.5:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
{
|
||||
"__address__": "3.4.5.6:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_endpointslice_address_type": "IPv4",
|
||||
"__meta_kubernetes_endpointslice_name": "testendpoints",
|
||||
"__meta_kubernetes_namespace": "default",
|
||||
"__meta_kubernetes_service_label_app_name": "svc",
|
||||
"__meta_kubernetes_service_label_component": "testing",
|
||||
"__meta_kubernetes_service_labelpresent_app_name": "true",
|
||||
"__meta_kubernetes_service_labelpresent_component": "true",
|
||||
"__meta_kubernetes_service_name": "testendpoints",
|
||||
},
|
||||
Source: "endpointslice/default/testendpoints",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestEndpointSliceDiscoveryNamespaces(t *testing.T) {
|
||||
epOne := makeEndpointSlice()
|
||||
epOne.Namespace = "ns1"
|
||||
objs := []runtime.Object{
|
||||
epOne,
|
||||
&disv1beta1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testendpoints",
|
||||
Namespace: "ns2",
|
||||
},
|
||||
AddressType: disv1beta1.AddressTypeIPv4,
|
||||
Ports: []disv1beta1.EndpointPort{
|
||||
{
|
||||
Name: strptr("testport"),
|
||||
Port: int32ptr(9000),
|
||||
Protocol: protocolptr(v1.ProtocolTCP),
|
||||
},
|
||||
},
|
||||
Endpoints: []disv1beta1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"4.3.2.1"},
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Name: "testpod",
|
||||
Namespace: "ns2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testendpoints",
|
||||
Namespace: "ns1",
|
||||
Labels: map[string]string{
|
||||
"app": "app1",
|
||||
},
|
||||
},
|
||||
},
|
||||
&v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "testpod",
|
||||
Namespace: "ns2",
|
||||
UID: types.UID("deadbeef"),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "testnode",
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "c1",
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
Name: "mainport",
|
||||
ContainerPort: 9000,
|
||||
Protocol: v1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
HostIP: "2.3.4.5",
|
||||
PodIP: "4.3.2.1",
|
||||
},
|
||||
},
|
||||
}
|
||||
n, _ := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"ns1", "ns2"}}, objs...)
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
expectedMaxItems: 2,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/ns1/testendpoints": {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "1.2.3.4:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_hostname": "testendpoint1",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
{
|
||||
"__address__": "2.3.4.5:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
{
|
||||
"__address__": "3.4.5.6:9000",
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "false",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_endpointslice_address_type": "IPv4",
|
||||
"__meta_kubernetes_endpointslice_name": "testendpoints",
|
||||
"__meta_kubernetes_namespace": "ns1",
|
||||
"__meta_kubernetes_service_label_app": "app1",
|
||||
"__meta_kubernetes_service_labelpresent_app": "true",
|
||||
"__meta_kubernetes_service_name": "testendpoints",
|
||||
},
|
||||
Source: "endpointslice/ns1/testendpoints",
|
||||
},
|
||||
"endpointslice/ns2/testendpoints": {
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
"__address__": "4.3.2.1:9000",
|
||||
"__meta_kubernetes_endpointslice_address_target_kind": "Pod",
|
||||
"__meta_kubernetes_endpointslice_address_target_name": "testpod",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "testport",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
"__meta_kubernetes_pod_container_name": "c1",
|
||||
"__meta_kubernetes_pod_container_port_name": "mainport",
|
||||
"__meta_kubernetes_pod_container_port_number": "9000",
|
||||
"__meta_kubernetes_pod_container_port_protocol": "TCP",
|
||||
"__meta_kubernetes_pod_host_ip": "2.3.4.5",
|
||||
"__meta_kubernetes_pod_ip": "4.3.2.1",
|
||||
"__meta_kubernetes_pod_name": "testpod",
|
||||
"__meta_kubernetes_pod_node_name": "testnode",
|
||||
"__meta_kubernetes_pod_phase": "",
|
||||
"__meta_kubernetes_pod_ready": "unknown",
|
||||
"__meta_kubernetes_pod_uid": "deadbeef",
|
||||
},
|
||||
},
|
||||
Labels: model.LabelSet{
|
||||
"__meta_kubernetes_endpointslice_address_type": "IPv4",
|
||||
"__meta_kubernetes_endpointslice_name": "testendpoints",
|
||||
"__meta_kubernetes_namespace": "ns2",
|
||||
},
|
||||
Source: "endpointslice/ns2/testendpoints",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
|
@ -28,6 +28,7 @@ import (
|
|||
config_util "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
disv1beta1 "k8s.io/api/discovery/v1beta1"
|
||||
"k8s.io/api/networking/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
|
@ -71,11 +72,12 @@ type Role string
|
|||
|
||||
// The valid options for Role.
|
||||
const (
|
||||
RoleNode Role = "node"
|
||||
RolePod Role = "pod"
|
||||
RoleService Role = "service"
|
||||
RoleEndpoint Role = "endpoints"
|
||||
RoleIngress Role = "ingress"
|
||||
RoleNode Role = "node"
|
||||
RolePod Role = "pod"
|
||||
RoleService Role = "service"
|
||||
RoleEndpoint Role = "endpoints"
|
||||
RoleEndpointSlice Role = "endpointslice"
|
||||
RoleIngress Role = "ingress"
|
||||
)
|
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||
|
@ -84,7 +86,7 @@ func (c *Role) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|||
return err
|
||||
}
|
||||
switch *c {
|
||||
case RoleNode, RolePod, RoleService, RoleEndpoint, RoleIngress:
|
||||
case RoleNode, RolePod, RoleService, RoleEndpoint, RoleEndpointSlice, RoleIngress:
|
||||
return nil
|
||||
default:
|
||||
return errors.Errorf("unknown Kubernetes SD role %q", *c)
|
||||
|
@ -101,11 +103,12 @@ type SDConfig struct {
|
|||
}
|
||||
|
||||
type roleSelector struct {
|
||||
node resourceSelector
|
||||
pod resourceSelector
|
||||
service resourceSelector
|
||||
endpoints resourceSelector
|
||||
ingress resourceSelector
|
||||
node resourceSelector
|
||||
pod resourceSelector
|
||||
service resourceSelector
|
||||
endpoints resourceSelector
|
||||
endpointslice resourceSelector
|
||||
ingress resourceSelector
|
||||
}
|
||||
|
||||
type SelectorConfig struct {
|
||||
|
@ -128,7 +131,7 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|||
return err
|
||||
}
|
||||
if c.Role == "" {
|
||||
return errors.Errorf("role missing (one of: pod, service, endpoints, node, ingress)")
|
||||
return errors.Errorf("role missing (one of: pod, service, endpoints, endpointslice, node, ingress)")
|
||||
}
|
||||
err = c.HTTPClientConfig.Validate()
|
||||
if err != nil {
|
||||
|
@ -140,11 +143,12 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|||
|
||||
foundSelectorRoles := make(map[Role]struct{})
|
||||
allowedSelectors := map[Role][]string{
|
||||
RolePod: {string(RolePod)},
|
||||
RoleService: {string(RoleService)},
|
||||
RoleEndpoint: {string(RolePod), string(RoleService), string(RoleEndpoint)},
|
||||
RoleNode: {string(RoleNode)},
|
||||
RoleIngress: {string(RoleIngress)},
|
||||
RolePod: {string(RolePod)},
|
||||
RoleService: {string(RoleService)},
|
||||
RoleEndpointSlice: {string(RolePod), string(RoleService), string(RoleEndpointSlice)},
|
||||
RoleEndpoint: {string(RolePod), string(RoleService), string(RoleEndpoint)},
|
||||
RoleNode: {string(RoleNode)},
|
||||
RoleIngress: {string(RoleIngress)},
|
||||
}
|
||||
|
||||
for _, selector := range c.Selectors {
|
||||
|
@ -154,7 +158,7 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|||
foundSelectorRoles[selector.Role] = struct{}{}
|
||||
|
||||
if _, ok := allowedSelectors[c.Role]; !ok {
|
||||
return errors.Errorf("invalid role: %q, expecting one of: pod, service, endpoints, node or ingress", c.Role)
|
||||
return errors.Errorf("invalid role: %q, expecting one of: pod, service, endpoints, endpointslice, node or ingress", c.Role)
|
||||
}
|
||||
var allowed bool
|
||||
for _, role := range allowedSelectors[c.Role] {
|
||||
|
@ -197,7 +201,7 @@ func init() {
|
|||
prometheus.MustRegister(eventCount)
|
||||
|
||||
// Initialize metric vectors.
|
||||
for _, role := range []string{"endpoints", "node", "pod", "service", "ingress"} {
|
||||
for _, role := range []string{"endpointslice", "endpoints", "node", "pod", "service", "ingress"} {
|
||||
for _, evt := range []string{"add", "delete", "update"} {
|
||||
eventCount.WithLabelValues(role, evt)
|
||||
}
|
||||
|
@ -286,6 +290,9 @@ func mapSelector(rawSelector []SelectorConfig) roleSelector {
|
|||
rs := roleSelector{}
|
||||
for _, resourceSelectorRaw := range rawSelector {
|
||||
switch resourceSelectorRaw.Role {
|
||||
case RoleEndpointSlice:
|
||||
rs.endpointslice.field = resourceSelectorRaw.Field
|
||||
rs.endpointslice.label = resourceSelectorRaw.Label
|
||||
case RoleEndpoint:
|
||||
rs.endpoints.field = resourceSelectorRaw.Field
|
||||
rs.endpoints.label = resourceSelectorRaw.Label
|
||||
|
@ -314,6 +321,58 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
|||
namespaces := d.getNamespaces()
|
||||
|
||||
switch d.role {
|
||||
case RoleEndpointSlice:
|
||||
for _, namespace := range namespaces {
|
||||
e := d.client.DiscoveryV1beta1().EndpointSlices(namespace)
|
||||
elw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = d.selectors.endpointslice.field
|
||||
options.LabelSelector = d.selectors.endpointslice.label
|
||||
return e.List(ctx, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.FieldSelector = d.selectors.endpointslice.field
|
||||
options.LabelSelector = d.selectors.endpointslice.label
|
||||
return e.Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
s := d.client.CoreV1().Services(namespace)
|
||||
slw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = d.selectors.service.field
|
||||
options.LabelSelector = d.selectors.service.label
|
||||
return s.List(ctx, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.FieldSelector = d.selectors.service.field
|
||||
options.LabelSelector = d.selectors.service.label
|
||||
return s.Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
p := d.client.CoreV1().Pods(namespace)
|
||||
plw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = d.selectors.pod.field
|
||||
options.LabelSelector = d.selectors.pod.label
|
||||
return p.List(ctx, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.FieldSelector = d.selectors.pod.field
|
||||
options.LabelSelector = d.selectors.pod.label
|
||||
return p.Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
eps := NewEndpointSlice(
|
||||
log.With(d.logger, "role", "endpointslice"),
|
||||
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
|
||||
cache.NewSharedInformer(elw, &disv1beta1.EndpointSlice{}, resyncPeriod),
|
||||
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
|
||||
)
|
||||
d.discoverers = append(d.discoverers, eps)
|
||||
go eps.endpointSliceInf.Run(ctx.Done())
|
||||
go eps.serviceInf.Run(ctx.Done())
|
||||
go eps.podInf.Run(ctx.Done())
|
||||
}
|
||||
case RoleEndpoint:
|
||||
for _, namespace := range namespaces {
|
||||
e := d.client.CoreV1().Endpoints(namespace)
|
||||
|
|
|
@ -162,6 +162,7 @@ type hasSynced interface {
|
|||
var _ hasSynced = &Discovery{}
|
||||
var _ hasSynced = &Node{}
|
||||
var _ hasSynced = &Endpoints{}
|
||||
var _ hasSynced = &EndpointSlice{}
|
||||
var _ hasSynced = &Ingress{}
|
||||
var _ hasSynced = &Pod{}
|
||||
var _ hasSynced = &Service{}
|
||||
|
@ -187,6 +188,10 @@ func (e *Endpoints) hasSynced() bool {
|
|||
return e.endpointsInf.HasSynced() && e.serviceInf.HasSynced() && e.podInf.HasSynced()
|
||||
}
|
||||
|
||||
func (e *EndpointSlice) hasSynced() bool {
|
||||
return e.endpointSliceInf.HasSynced() && e.serviceInf.HasSynced() && e.podInf.HasSynced()
|
||||
}
|
||||
|
||||
func (i *Ingress) hasSynced() bool {
|
||||
return i.informer.HasSynced()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue