Merge pull request #9161 from roidelapluie/kuma-rel229

Fix `kuma_sd` targetgroup reporting (#9157)
This commit is contained in:
Frederic Branczyk 2021-08-06 09:00:25 +02:00 committed by GitHub
commit 1f3df4b81a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 170 additions and 140 deletions

View file

@ -27,7 +27,6 @@ import (
"google.golang.org/protobuf/types/known/anypb"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/osutil"
"github.com/prometheus/prometheus/util/strutil"
)
@ -129,30 +128,27 @@ func (c *KumaSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discover
return NewKumaHTTPDiscovery(c, logger)
}
func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) *targetgroup.Group {
func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) []model.LabelSet {
commonLabels := convertKumaUserLabels(assignment.Labels)
commonLabels[kumaMeshLabel] = model.LabelValue(assignment.Mesh)
commonLabels[kumaServiceLabel] = model.LabelValue(assignment.Service)
var targetLabelSets []model.LabelSet
var targets []model.LabelSet
for _, target := range assignment.Targets {
targetLabels := convertKumaUserLabels(target.Labels)
for _, madsTarget := range assignment.Targets {
targetLabels := convertKumaUserLabels(madsTarget.Labels).Merge(commonLabels)
targetLabels[kumaDataplaneLabel] = model.LabelValue(target.Name)
targetLabels[model.InstanceLabel] = model.LabelValue(target.Name)
targetLabels[model.AddressLabel] = model.LabelValue(target.Address)
targetLabels[model.SchemeLabel] = model.LabelValue(target.Scheme)
targetLabels[model.MetricsPathLabel] = model.LabelValue(target.MetricsPath)
targetLabels[kumaDataplaneLabel] = model.LabelValue(madsTarget.Name)
targetLabels[model.AddressLabel] = model.LabelValue(madsTarget.Address)
targetLabels[model.InstanceLabel] = model.LabelValue(madsTarget.Name)
targetLabels[model.SchemeLabel] = model.LabelValue(madsTarget.Scheme)
targetLabels[model.MetricsPathLabel] = model.LabelValue(madsTarget.MetricsPath)
targetLabelSets = append(targetLabelSets, targetLabels)
targets = append(targets, targetLabels)
}
return &targetgroup.Group{
Labels: commonLabels,
Targets: targetLabelSets,
}
return targets
}
func convertKumaUserLabels(labels map[string]string) model.LabelSet {
@ -165,12 +161,12 @@ func convertKumaUserLabels(labels map[string]string) model.LabelSet {
}
// kumaMadsV1ResourceParser is an xds.resourceParser.
func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*targetgroup.Group, error) {
func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]model.LabelSet, error) {
if typeURL != KumaMadsV1ResourceTypeURL {
return nil, errors.Errorf("recieved invalid typeURL for Kuma MADS v1 Resource: %s", typeURL)
}
var groups []*targetgroup.Group
var targets []model.LabelSet
for _, resource := range resources {
assignment := &MonitoringAssignment{}
@ -179,10 +175,10 @@ func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*target
return nil, err
}
groups = append(groups, convertKumaV1MonitoringAssignment(assignment))
targets = append(targets, convertKumaV1MonitoringAssignment(assignment)...)
}
return groups, nil
return targets, nil
}
func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Discoverer, error) {

View file

@ -138,65 +138,47 @@ func TestKumaMadsV1ResourceParserValidResources(t *testing.T) {
res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...)
require.NoError(t, err)
groups, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL)
targets, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL)
require.NoError(t, err)
require.Len(t, groups, 3)
require.Len(t, targets, 3)
expectedGroup1 := &targetgroup.Group{
Targets: []model.LabelSet{
{
"__address__": "10.1.4.32:9090",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
"__metrics_path__": "/custom-metrics",
"__scheme__": "http",
"instance": "prometheus-01",
},
{
"__address__": "10.1.4.33:9090",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "prometheus-02",
},
},
Labels: model.LabelSet{
expectedTargets := []model.LabelSet{
{
"__address__": "10.1.4.32:9090",
"__metrics_path__": "/custom-metrics",
"__scheme__": "http",
"instance": "prometheus-01",
"__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
},
}
require.Equal(t, expectedGroup1, groups[0])
expectedGroup2 := &targetgroup.Group{
Labels: model.LabelSet{
{
"__address__": "10.1.4.33:9090",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "prometheus-02",
"__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "grafana",
"__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
},
{
"__address__": "10.1.1.1",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "elasticsearch-01",
"__meta_kuma_mesh": "data",
"__meta_kuma_service": "elasticsearch",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
},
}
require.Equal(t, expectedGroup2, groups[1])
expectedGroup3 := &targetgroup.Group{
Targets: []model.LabelSet{
{
"__address__": "10.1.1.1",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "elasticsearch-01",
},
},
Labels: model.LabelSet{
"__meta_kuma_mesh": "data",
"__meta_kuma_service": "elasticsearch",
},
}
require.Equal(t, expectedGroup3, groups[2])
require.Equal(t, expectedTargets, targets)
}
func TestKumaMadsV1ResourceParserInvalidResources(t *testing.T) {
@ -262,66 +244,48 @@ tls_config:
kd.poll(context.Background(), ch)
groups := <-ch
require.Len(t, groups, 3)
require.Len(t, groups, 1)
expectedGroup1 := &targetgroup.Group{
Source: "kuma",
Targets: []model.LabelSet{
{
"__address__": "10.1.4.32:9090",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
"__metrics_path__": "/custom-metrics",
"__scheme__": "http",
"instance": "prometheus-01",
},
{
"__address__": "10.1.4.33:9090",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "prometheus-02",
},
},
Labels: model.LabelSet{
targets := groups[0].Targets
require.Len(t, targets, 3)
expectedTargets := []model.LabelSet{
{
"__address__": "10.1.4.32:9090",
"__metrics_path__": "/custom-metrics",
"__scheme__": "http",
"instance": "prometheus-01",
"__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
},
}
require.Equal(t, expectedGroup1, groups[0])
expectedGroup2 := &targetgroup.Group{
Source: "kuma",
Labels: model.LabelSet{
{
"__address__": "10.1.4.33:9090",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "prometheus-02",
"__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "grafana",
"__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
},
{
"__address__": "10.1.1.1",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "elasticsearch-01",
"__meta_kuma_mesh": "data",
"__meta_kuma_service": "elasticsearch",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
},
}
require.Equal(t, expectedGroup2, groups[1])
expectedGroup3 := &targetgroup.Group{
Source: "kuma",
Targets: []model.LabelSet{
{
"__address__": "10.1.1.1",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "elasticsearch-01",
},
},
Labels: model.LabelSet{
"__meta_kuma_mesh": "data",
"__meta_kuma_service": "elasticsearch",
},
}
require.Equal(t, expectedGroup3, groups[2])
require.Equal(t, expectedTargets, targets)
// Should skip the next update.
ctx, cancel := context.WithCancel(context.Background())

View file

@ -15,7 +15,6 @@ package xds
import (
"context"
"github.com/prometheus/common/model"
"time"
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@ -23,6 +22,7 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
@ -95,7 +95,9 @@ var (
}
)
type resourceParser func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error)
// resourceParser is a function that takes raw discovered objects and translates them into
// targetgroup.Group Targets. On error, no updates are sent to the scrape manager and the failure count is incremented.
type resourceParser func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error)
// fetchDiscovery implements long-polling via xDS Fetch REST-JSON.
type fetchDiscovery struct {
@ -154,23 +156,18 @@ func (d *fetchDiscovery) poll(ctx context.Context, ch chan<- []*targetgroup.Grou
return
}
parsedGroups, err := d.parseResources(response.Resources, response.TypeUrl)
parsedTargets, err := d.parseResources(response.Resources, response.TypeUrl)
if err != nil {
level.Error(d.logger).Log("msg", "error parsing resources", "err", err)
d.fetchFailuresCount.Inc()
return
}
for _, group := range parsedGroups {
group.Source = d.source
}
level.Debug(d.logger).Log("msg", "Updated to version", "version", response.VersionInfo, "targets", len(parsedTargets))
level.Debug(d.logger).Log("msg", "updated to version", "version", response.VersionInfo, "groups", len(parsedGroups))
// Check the context before sending an update on the channel.
select {
case <-ctx.Done():
return
case ch <- parsedGroups:
case ch <- []*targetgroup.Group{{Source: d.source, Targets: parsedTargets}}:
}
}

View file

@ -93,9 +93,9 @@ func createTestHTTPServer(t *testing.T, responder discoveryResponder) *httptest.
}))
}
func constantResourceParser(groups []*targetgroup.Group, err error) resourceParser {
return func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) {
return groups, err
func constantResourceParser(targets []model.LabelSet, err error) resourceParser {
return func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) {
return targets, err
}
}
@ -174,13 +174,16 @@ func TestPollingRefreshAttachesGroupMetadata(t *testing.T) {
fetchDuration: testFetchDuration,
fetchFailuresCount: testFetchFailuresCount,
fetchSkipUpdateCount: testFetchSkipUpdateCount,
parseResources: constantResourceParser([]*targetgroup.Group{
{},
parseResources: constantResourceParser([]model.LabelSet{
{
Source: "a-custom-source",
Labels: model.LabelSet{
"__meta_custom_xds_label": "a-value",
},
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.4.32:9090",
"instance": "prometheus-01",
},
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.5.32:9090",
"instance": "prometheus-02",
},
}, nil),
}
@ -189,13 +192,83 @@ func TestPollingRefreshAttachesGroupMetadata(t *testing.T) {
groups := <-ch
require.NotNil(t, groups)
require.Len(t, groups, 2)
require.Len(t, groups, 1)
for _, group := range groups {
require.Equal(t, source, group.Source)
group := groups[0]
require.Equal(t, source, group.Source)
require.Len(t, group.Targets, 2)
target2 := group.Targets[1]
require.Contains(t, target2, model.LabelName("__meta_custom_xds_label"))
require.Equal(t, model.LabelValue("a-value"), target2["__meta_custom_xds_label"])
}
func TestPollingDisappearingTargets(t *testing.T) {
server := "http://198.161.2.0"
source := "test"
rc := &testResourceClient{
server: server,
protocolVersion: ProtocolV3,
fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) {
return &v3.DiscoveryResponse{}, nil
},
}
group2 := groups[1]
require.Contains(t, group2.Labels, model.LabelName("__meta_custom_xds_label"))
require.Equal(t, model.LabelValue("a-value"), group2.Labels["__meta_custom_xds_label"])
// On the first poll, send back two targets. On the next, send just one.
counter := 0
parser := func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) {
counter++
if counter == 1 {
return []model.LabelSet{
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.4.32:9090",
"instance": "prometheus-01",
},
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.5.32:9090",
"instance": "prometheus-02",
},
}, nil
}
return []model.LabelSet{
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.4.32:9090",
"instance": "prometheus-01",
},
}, nil
}
pd := &fetchDiscovery{
source: source,
client: rc,
logger: nopLogger,
fetchDuration: testFetchDuration,
fetchFailuresCount: testFetchFailuresCount,
fetchSkipUpdateCount: testFetchSkipUpdateCount,
parseResources: parser,
}
ch := make(chan []*targetgroup.Group, 1)
pd.poll(context.Background(), ch)
groups := <-ch
require.NotNil(t, groups)
require.Len(t, groups, 1)
require.Equal(t, source, groups[0].Source)
require.Len(t, groups[0].Targets, 2)
pd.poll(context.Background(), ch)
groups = <-ch
require.NotNil(t, groups)
require.Len(t, groups, 1)
require.Equal(t, source, groups[0].Source)
require.Len(t, groups[0].Targets, 1)
}