mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
Merge pull request #13554 from machine424/k8s-failures
discovery(k8s): add metric prometheus_sd_kubernetes_failures_total
This commit is contained in:
commit
8799581b24
|
@ -485,8 +485,8 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
|||
eps := NewEndpointSlice(
|
||||
log.With(d.logger, "role", "endpointslice"),
|
||||
informer,
|
||||
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
|
||||
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
|
||||
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
|
||||
d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
|
||||
nodeInf,
|
||||
d.metrics.eventCount,
|
||||
)
|
||||
|
@ -545,8 +545,8 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
|||
eps := NewEndpoints(
|
||||
log.With(d.logger, "role", "endpoint"),
|
||||
d.newEndpointsByNodeInformer(elw),
|
||||
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
|
||||
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
|
||||
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
|
||||
d.mustNewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
|
||||
nodeInf,
|
||||
d.metrics.eventCount,
|
||||
)
|
||||
|
@ -602,7 +602,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
|||
}
|
||||
svc := NewService(
|
||||
log.With(d.logger, "role", "service"),
|
||||
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
|
||||
d.mustNewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
|
||||
d.metrics.eventCount,
|
||||
)
|
||||
d.discoverers = append(d.discoverers, svc)
|
||||
|
@ -641,7 +641,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
|||
return i.Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
informer = cache.NewSharedInformer(ilw, &networkv1.Ingress{}, resyncDisabled)
|
||||
informer = d.mustNewSharedInformer(ilw, &networkv1.Ingress{}, resyncDisabled)
|
||||
} else {
|
||||
i := d.client.NetworkingV1beta1().Ingresses(namespace)
|
||||
ilw := &cache.ListWatch{
|
||||
|
@ -656,7 +656,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
|||
return i.Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
informer = cache.NewSharedInformer(ilw, &v1beta1.Ingress{}, resyncDisabled)
|
||||
informer = d.mustNewSharedInformer(ilw, &v1beta1.Ingress{}, resyncDisabled)
|
||||
}
|
||||
ingress := NewIngress(
|
||||
log.With(d.logger, "role", "ingress"),
|
||||
|
@ -747,7 +747,7 @@ func (d *Discovery) newNodeInformer(ctx context.Context) cache.SharedInformer {
|
|||
return d.client.CoreV1().Nodes().Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
return cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncDisabled)
|
||||
return d.mustNewSharedInformer(nlw, &apiv1.Node{}, resyncDisabled)
|
||||
}
|
||||
|
||||
func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
|
||||
|
@ -762,7 +762,7 @@ func (d *Discovery) newPodsByNodeInformer(plw *cache.ListWatch) cache.SharedInde
|
|||
}
|
||||
}
|
||||
|
||||
return cache.NewSharedIndexInformer(plw, &apiv1.Pod{}, resyncDisabled, indexers)
|
||||
return d.mustNewSharedIndexInformer(plw, &apiv1.Pod{}, resyncDisabled, indexers)
|
||||
}
|
||||
|
||||
func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.SharedIndexInformer {
|
||||
|
@ -783,7 +783,7 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
|
|||
return pods, nil
|
||||
}
|
||||
if !d.attachMetadata.Node {
|
||||
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
|
||||
return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
|
||||
}
|
||||
|
||||
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
|
||||
|
@ -809,13 +809,13 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
|
|||
return nodes, nil
|
||||
}
|
||||
|
||||
return cache.NewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
|
||||
return d.mustNewSharedIndexInformer(plw, &apiv1.Endpoints{}, resyncDisabled, indexers)
|
||||
}
|
||||
|
||||
func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
|
||||
indexers := make(map[string]cache.IndexFunc)
|
||||
if !d.attachMetadata.Node {
|
||||
return cache.NewSharedIndexInformer(plw, object, resyncDisabled, indexers)
|
||||
return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
|
||||
}
|
||||
|
||||
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {
|
||||
|
@ -854,7 +854,32 @@ func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object
|
|||
return nodes, nil
|
||||
}
|
||||
|
||||
return cache.NewSharedIndexInformer(plw, object, resyncDisabled, indexers)
|
||||
return d.mustNewSharedIndexInformer(plw, object, resyncDisabled, indexers)
|
||||
}
|
||||
|
||||
func (d *Discovery) informerWatchErrorHandler(r *cache.Reflector, err error) {
|
||||
d.metrics.failuresCount.Inc()
|
||||
cache.DefaultWatchErrorHandler(r, err)
|
||||
}
|
||||
|
||||
func (d *Discovery) mustNewSharedInformer(lw cache.ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) cache.SharedInformer {
|
||||
informer := cache.NewSharedInformer(lw, exampleObject, defaultEventHandlerResyncPeriod)
|
||||
// Invoking SetWatchErrorHandler should fail only if the informer has been started beforehand.
|
||||
// Such a scenario would suggest an incorrect use of the API, thus the panic.
|
||||
if err := informer.SetWatchErrorHandler(d.informerWatchErrorHandler); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return informer
|
||||
}
|
||||
|
||||
func (d *Discovery) mustNewSharedIndexInformer(lw cache.ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
|
||||
informer := cache.NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, indexers)
|
||||
// Invoking SetWatchErrorHandler should fail only if the informer has been started beforehand.
|
||||
// Such a scenario would suggest an incorrect use of the API, thus the panic.
|
||||
if err := informer.SetWatchErrorHandler(d.informerWatchErrorHandler); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return informer
|
||||
}
|
||||
|
||||
func checkDiscoveryV1Supported(client kubernetes.Interface) (bool, error) {
|
||||
|
|
|
@ -21,12 +21,16 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/version"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
fakediscovery "k8s.io/client-go/discovery/fake"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
kubetesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -314,3 +318,39 @@ func TestCheckNetworkingV1Supported(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFailuresCountMetric(t *testing.T) {
|
||||
tests := []struct {
|
||||
role Role
|
||||
minFailedWatches int
|
||||
}{
|
||||
{RoleNode, 1},
|
||||
{RolePod, 1},
|
||||
{RoleService, 1},
|
||||
{RoleEndpoint, 3},
|
||||
{RoleEndpointSlice, 3},
|
||||
{RoleIngress, 1},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
tc := tc
|
||||
t.Run(string(tc.role), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
n, c := makeDiscovery(tc.role, NamespaceDiscovery{})
|
||||
// The counter is initialized and no failures at the beginning.
|
||||
require.Equal(t, float64(0), prom_testutil.ToFloat64(n.metrics.failuresCount))
|
||||
|
||||
// Simulate an error on watch requests.
|
||||
c.Discovery().(*fakediscovery.FakeDiscovery).PrependWatchReactor("*", func(action kubetesting.Action) (bool, watch.Interface, error) {
|
||||
return true, nil, apierrors.NewUnauthorized("unauthorized")
|
||||
})
|
||||
|
||||
// Start the discovery.
|
||||
k8sDiscoveryTest{discovery: n}.Run(t)
|
||||
|
||||
// At least the errors of the initial watches should be caught (watches are retried on errors).
|
||||
require.GreaterOrEqual(t, prom_testutil.ToFloat64(n.metrics.failuresCount), float64(tc.minFailedWatches))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,8 @@ import (
|
|||
var _ discovery.DiscovererMetrics = (*kubernetesMetrics)(nil)
|
||||
|
||||
type kubernetesMetrics struct {
|
||||
eventCount *prometheus.CounterVec
|
||||
eventCount *prometheus.CounterVec
|
||||
failuresCount prometheus.Counter
|
||||
|
||||
metricRegisterer discovery.MetricRegisterer
|
||||
}
|
||||
|
@ -37,10 +38,18 @@ func newDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetric
|
|||
},
|
||||
[]string{"role", "event"},
|
||||
),
|
||||
failuresCount: prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: discovery.KubernetesMetricsNamespace,
|
||||
Name: "failures_total",
|
||||
Help: "The number of failed WATCH/LIST requests.",
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
m.metricRegisterer = discovery.NewMetricRegisterer(reg, []prometheus.Collector{
|
||||
m.eventCount,
|
||||
m.failuresCount,
|
||||
})
|
||||
|
||||
// Initialize metric vectors.
|
||||
|
@ -61,6 +70,8 @@ func newDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetric
|
|||
}
|
||||
}
|
||||
|
||||
m.failuresCount.Add(0)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue