mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 13:44:05 -08:00
discovery: kubernetes: Avoid creating unnecessary Kubernetes indexers in RoleEndpointSlice
This was due to a missing "return", see https://github.com/prometheus/prometheus/pull/13554#discussion_r1490965817 Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
parent
d595f5a9b1
commit
92544c00bf
|
@ -18,6 +18,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
v1 "k8s.io/api/discovery/v1"
|
v1 "k8s.io/api/discovery/v1"
|
||||||
"k8s.io/api/discovery/v1beta1"
|
"k8s.io/api/discovery/v1beta1"
|
||||||
|
@ -1405,3 +1406,41 @@ func TestEndpointSliceDiscoveryEmptyPodStatus(t *testing.T) {
|
||||||
expectedRes: map[string]*targetgroup.Group{},
|
expectedRes: map[string]*targetgroup.Group{},
|
||||||
}.Run(t)
|
}.Run(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestEndpointSliceInfIndexersCount makes sure that RoleEndpointSlice discovery
|
||||||
|
// sets up indexing for the main Kube informer only when needed.
|
||||||
|
// See: https://github.com/prometheus/prometheus/pull/13554#discussion_r1490965817
|
||||||
|
func TestEndpointSliceInfIndexersCount(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
withNodeMetadata bool
|
||||||
|
}{
|
||||||
|
{"with node metadata", true},
|
||||||
|
{"without node metadata", false},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
var (
|
||||||
|
n *Discovery
|
||||||
|
mainInfIndexersCount int
|
||||||
|
)
|
||||||
|
if tc.withNodeMetadata {
|
||||||
|
mainInfIndexersCount = 1
|
||||||
|
n, _ = makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, AttachMetadataConfig{Node: true})
|
||||||
|
} else {
|
||||||
|
n, _ = makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{})
|
||||||
|
}
|
||||||
|
|
||||||
|
k8sDiscoveryTest{
|
||||||
|
discovery: n,
|
||||||
|
afterStart: func() {
|
||||||
|
n.RLock()
|
||||||
|
defer n.RUnlock()
|
||||||
|
require.Len(t, n.discoverers, 1)
|
||||||
|
require.Len(t, n.discoverers[0].(*EndpointSlice).endpointSliceInf.GetIndexer().GetIndexers(), mainInfIndexersCount)
|
||||||
|
},
|
||||||
|
}.Run(t)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -815,7 +815,7 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
|
||||||
func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
|
func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
|
||||||
indexers := make(map[string]cache.IndexFunc)
|
indexers := make(map[string]cache.IndexFunc)
|
||||||
if !d.attachMetadata.Node {
|
if !d.attachMetadata.Node {
|
||||||
cache.NewSharedIndexInformer(plw, &disv1.EndpointSlice{}, resyncDisabled, indexers)
|
return cache.NewSharedIndexInformer(plw, object, resyncDisabled, indexers)
|
||||||
}
|
}
|
||||||
|
|
||||||
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
|
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
|
||||||
|
|
|
@ -128,7 +128,7 @@ func (d k8sDiscoveryTest) Run(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
resChan := make(chan map[string]*targetgroup.Group)
|
resChan := make(chan map[string]*targetgroup.Group)
|
||||||
go readResultWithTimeout(t, ch, d.expectedMaxItems, time.Second, resChan)
|
go readResultWithTimeout(t, ctx, ch, d.expectedMaxItems, time.Second, resChan)
|
||||||
|
|
||||||
dd, ok := d.discovery.(hasSynced)
|
dd, ok := d.discovery.(hasSynced)
|
||||||
require.True(t, ok, "discoverer does not implement hasSynced interface")
|
require.True(t, ok, "discoverer does not implement hasSynced interface")
|
||||||
|
@ -141,13 +141,18 @@ func (d k8sDiscoveryTest) Run(t *testing.T) {
|
||||||
if d.expectedRes != nil {
|
if d.expectedRes != nil {
|
||||||
res := <-resChan
|
res := <-resChan
|
||||||
requireTargetGroups(t, d.expectedRes, res)
|
requireTargetGroups(t, d.expectedRes, res)
|
||||||
|
} else {
|
||||||
|
// Stop readResultWithTimeout and wait for it.
|
||||||
|
cancel()
|
||||||
|
<-resChan
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// readResultWithTimeout reads all targetgroups from channel with timeout.
|
// readResultWithTimeout reads all targetgroups from channel with timeout.
|
||||||
// It merges targetgroups by source and sends the result to result channel.
|
// It merges targetgroups by source and sends the result to result channel.
|
||||||
func readResultWithTimeout(t *testing.T, ch <-chan []*targetgroup.Group, max int, timeout time.Duration, resChan chan<- map[string]*targetgroup.Group) {
|
func readResultWithTimeout(t *testing.T, ctx context.Context, ch <-chan []*targetgroup.Group, max int, stopAfter time.Duration, resChan chan<- map[string]*targetgroup.Group) {
|
||||||
res := make(map[string]*targetgroup.Group)
|
res := make(map[string]*targetgroup.Group)
|
||||||
|
timeout := time.After(stopAfter)
|
||||||
Loop:
|
Loop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -162,12 +167,15 @@ Loop:
|
||||||
// Reached max target groups we may get, break fast.
|
// Reached max target groups we may get, break fast.
|
||||||
break Loop
|
break Loop
|
||||||
}
|
}
|
||||||
case <-time.After(timeout):
|
case <-timeout:
|
||||||
// Because we use queue, an object that is created then
|
// Because we use queue, an object that is created then
|
||||||
// deleted or updated may be processed only once.
|
// deleted or updated may be processed only once.
|
||||||
// So possibly we may skip events, timed out here.
|
// So possibly we may skip events, timed out here.
|
||||||
t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(res), max)
|
t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(res), max)
|
||||||
break Loop
|
break Loop
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Logf("stopped, got %d (max: %d) items", len(res), max)
|
||||||
|
break Loop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue