prometheus/discovery/kubernetes/kubernetes_test.go
Filip Petkovski 7a78897d0b
Improve reliability of Kubernetes SD tests (#10761)
The tests for Kubernetes SD rely on comparing target groups by first
serializing them to JSON. However, the target group MarshalJSON function
only serializes the __address__ label, which makes eliminates all other
labels from the comparison.

This commit implements a separate marshaling function intended for use in
Kubernetes SD tests. The function serializes all target labels, making
comparisons much more reliable. The commit also fixes all tests that
started to fail due to the newly introduced change.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
2022-06-07 16:19:40 +01:00

296 lines
8.3 KiB
Go

// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"encoding/json"
"errors"
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/version"
fakediscovery "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/testutil"
)
func TestMain(m *testing.M) {
testutil.TolerantVerifyLeak(m)
}
// makeDiscovery creates a kubernetes.Discovery instance for testing.
func makeDiscovery(role Role, nsDiscovery NamespaceDiscovery, objects ...runtime.Object) (*Discovery, kubernetes.Interface) {
return makeDiscoveryWithVersion(role, nsDiscovery, "v1.22.0", objects...)
}
// makeDiscoveryWithVersion creates a kubernetes.Discovery instance with the specified kubernetes version for testing.
func makeDiscoveryWithVersion(role Role, nsDiscovery NamespaceDiscovery, k8sVer string, objects ...runtime.Object) (*Discovery, kubernetes.Interface) {
clientset := fake.NewSimpleClientset(objects...)
fakeDiscovery, _ := clientset.Discovery().(*fakediscovery.FakeDiscovery)
fakeDiscovery.FakedServerVersion = &version.Info{GitVersion: k8sVer}
return &Discovery{
client: clientset,
logger: log.NewNopLogger(),
role: role,
namespaceDiscovery: &nsDiscovery,
ownNamespace: "own-ns",
}, clientset
}
// makeDiscoveryWithMetadata creates a kubernetes.Discovery instance with the specified metadata config.
func makeDiscoveryWithMetadata(role Role, nsDiscovery NamespaceDiscovery, attachMetadata AttachMetadataConfig, objects ...runtime.Object) (*Discovery, kubernetes.Interface) {
d, k8s := makeDiscovery(role, nsDiscovery, objects...)
d.attachMetadata = attachMetadata
return d, k8s
}
type k8sDiscoveryTest struct {
// discovery is instance of discovery.Discoverer
discovery discovery.Discoverer
// beforeRun runs before discoverer run
beforeRun func()
// afterStart runs after discoverer has synced
afterStart func()
// expectedMaxItems is expected max items we may get from channel
expectedMaxItems int
// expectedRes is expected final result
expectedRes map[string]*targetgroup.Group
}
func (d k8sDiscoveryTest) Run(t *testing.T) {
t.Helper()
ch := make(chan []*targetgroup.Group)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if d.beforeRun != nil {
d.beforeRun()
}
// Run discoverer and start a goroutine to read results.
go d.discovery.Run(ctx, ch)
// Ensure that discovery has a discoverer set. This prevents a race
// condition where the above go routine may or may not have set a
// discoverer yet.
lastDiscoverersCount := 0
dis := d.discovery.(*Discovery)
for {
dis.RLock()
l := len(dis.discoverers)
dis.RUnlock()
if l > 0 && l == lastDiscoverersCount {
break
}
time.Sleep(100 * time.Millisecond)
lastDiscoverersCount = l
}
resChan := make(chan map[string]*targetgroup.Group)
go readResultWithTimeout(t, ch, d.expectedMaxItems, time.Second, resChan)
dd, ok := d.discovery.(hasSynced)
if !ok {
t.Errorf("discoverer does not implement hasSynced interface")
return
}
if !cache.WaitForCacheSync(ctx.Done(), dd.hasSynced) {
t.Errorf("discoverer failed to sync: %v", dd)
return
}
if d.afterStart != nil {
d.afterStart()
}
if d.expectedRes != nil {
res := <-resChan
requireTargetGroups(t, d.expectedRes, res)
}
}
// readResultWithTimeout reads all targetgroups from channel with timeout.
// It merges targetgroups by source and sends the result to result channel.
func readResultWithTimeout(t *testing.T, ch <-chan []*targetgroup.Group, max int, timeout time.Duration, resChan chan<- map[string]*targetgroup.Group) {
res := make(map[string]*targetgroup.Group)
Loop:
for {
select {
case tgs := <-ch:
for _, tg := range tgs {
if tg == nil {
continue
}
res[tg.Source] = tg
}
if len(res) == max {
// Reached max target groups we may get, break fast.
break Loop
}
case <-time.After(timeout):
// Because we use queue, an object that is created then
// deleted or updated may be processed only once.
// So possibly we may skip events, timed out here.
t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(res), max)
break Loop
}
}
resChan <- res
}
func requireTargetGroups(t *testing.T, expected, res map[string]*targetgroup.Group) {
t.Helper()
b1, err := marshalTargetGroups(expected)
if err != nil {
panic(err)
}
b2, err := marshalTargetGroups(res)
if err != nil {
panic(err)
}
require.Equal(t, string(b1), string(b2))
}
// marshalTargetGroups serializes a set of target groups to JSON, ignoring the
// custom MarshalJSON function defined on the targetgroup.Group struct.
// marshalTargetGroups can be used for making exact comparisons between target groups
// as it will serialize all target labels.
func marshalTargetGroups(tgs map[string]*targetgroup.Group) ([]byte, error) {
type targetGroupAlias targetgroup.Group
aliases := make(map[string]*targetGroupAlias, len(tgs))
for k, v := range tgs {
tg := targetGroupAlias(*v)
aliases[k] = &tg
}
return json.Marshal(aliases)
}
type hasSynced interface {
// hasSynced returns true if all informers synced.
// This is only used in testing to determine when discoverer synced to
// kubernetes apiserver.
hasSynced() bool
}
var (
_ hasSynced = &Discovery{}
_ hasSynced = &Node{}
_ hasSynced = &Endpoints{}
_ hasSynced = &EndpointSlice{}
_ hasSynced = &Ingress{}
_ hasSynced = &Pod{}
_ hasSynced = &Service{}
)
func (d *Discovery) hasSynced() bool {
d.RLock()
defer d.RUnlock()
for _, discoverer := range d.discoverers {
if hasSynceddiscoverer, ok := discoverer.(hasSynced); ok {
if !hasSynceddiscoverer.hasSynced() {
return false
}
}
}
return true
}
func (n *Node) hasSynced() bool {
return n.informer.HasSynced()
}
func (e *Endpoints) hasSynced() bool {
return e.endpointsInf.HasSynced() && e.serviceInf.HasSynced() && e.podInf.HasSynced()
}
func (e *EndpointSlice) hasSynced() bool {
return e.endpointSliceInf.HasSynced() && e.serviceInf.HasSynced() && e.podInf.HasSynced()
}
func (i *Ingress) hasSynced() bool {
return i.informer.HasSynced()
}
func (p *Pod) hasSynced() bool {
return p.podInf.HasSynced()
}
func (s *Service) hasSynced() bool {
return s.informer.HasSynced()
}
func TestRetryOnError(t *testing.T) {
for _, successAt := range []int{1, 2, 3} {
var called int
f := func() error {
called++
if called >= successAt {
return nil
}
return errors.New("dummy")
}
retryOnError(context.TODO(), 0, f)
require.Equal(t, successAt, called)
}
}
func TestCheckNetworkingV1Supported(t *testing.T) {
tests := []struct {
version string
wantSupported bool
wantErr bool
}{
{version: "v1.18.0", wantSupported: false, wantErr: false},
{version: "v1.18.1", wantSupported: false, wantErr: false},
// networking v1 is supported since Kubernetes v1.19
{version: "v1.19.0", wantSupported: true, wantErr: false},
{version: "v1.20.0-beta.2", wantSupported: true, wantErr: false},
// error patterns
{version: "", wantSupported: false, wantErr: true},
{version: "<>", wantSupported: false, wantErr: true},
}
for _, tc := range tests {
tc := tc
t.Run(tc.version, func(t *testing.T) {
clientset := fake.NewSimpleClientset()
fakeDiscovery, _ := clientset.Discovery().(*fakediscovery.FakeDiscovery)
fakeDiscovery.FakedServerVersion = &version.Info{GitVersion: tc.version}
supported, err := checkNetworkingV1Supported(clientset)
if tc.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, tc.wantSupported, supported)
})
}
}