diff --git a/discovery/xds/kuma.go b/discovery/xds/kuma.go index d4071be9c..77f9f0561 100644 --- a/discovery/xds/kuma.go +++ b/discovery/xds/kuma.go @@ -27,7 +27,6 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/osutil" "github.com/prometheus/prometheus/util/strutil" ) @@ -129,30 +128,27 @@ func (c *KumaSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discover return NewKumaHTTPDiscovery(c, logger) } -func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) *targetgroup.Group { +func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) []model.LabelSet { commonLabels := convertKumaUserLabels(assignment.Labels) commonLabels[kumaMeshLabel] = model.LabelValue(assignment.Mesh) commonLabels[kumaServiceLabel] = model.LabelValue(assignment.Service) - var targetLabelSets []model.LabelSet + var targets []model.LabelSet - for _, target := range assignment.Targets { - targetLabels := convertKumaUserLabels(target.Labels) + for _, madsTarget := range assignment.Targets { + targetLabels := convertKumaUserLabels(madsTarget.Labels).Merge(commonLabels) - targetLabels[kumaDataplaneLabel] = model.LabelValue(target.Name) - targetLabels[model.InstanceLabel] = model.LabelValue(target.Name) - targetLabels[model.AddressLabel] = model.LabelValue(target.Address) - targetLabels[model.SchemeLabel] = model.LabelValue(target.Scheme) - targetLabels[model.MetricsPathLabel] = model.LabelValue(target.MetricsPath) + targetLabels[kumaDataplaneLabel] = model.LabelValue(madsTarget.Name) + targetLabels[model.AddressLabel] = model.LabelValue(madsTarget.Address) + targetLabels[model.InstanceLabel] = model.LabelValue(madsTarget.Name) + targetLabels[model.SchemeLabel] = model.LabelValue(madsTarget.Scheme) + targetLabels[model.MetricsPathLabel] = model.LabelValue(madsTarget.MetricsPath) - targetLabelSets = append(targetLabelSets, targetLabels) + targets = append(targets, targetLabels) } - return &targetgroup.Group{ - Labels: commonLabels, - Targets: targetLabelSets, - } + return targets } func convertKumaUserLabels(labels map[string]string) model.LabelSet { @@ -165,12 +161,12 @@ func convertKumaUserLabels(labels map[string]string) model.LabelSet { } // kumaMadsV1ResourceParser is an xds.resourceParser. -func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*targetgroup.Group, error) { +func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]model.LabelSet, error) { if typeURL != KumaMadsV1ResourceTypeURL { return nil, errors.Errorf("recieved invalid typeURL for Kuma MADS v1 Resource: %s", typeURL) } - var groups []*targetgroup.Group + var targets []model.LabelSet for _, resource := range resources { assignment := &MonitoringAssignment{} @@ -179,10 +175,10 @@ func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*target return nil, err } - groups = append(groups, convertKumaV1MonitoringAssignment(assignment)) + targets = append(targets, convertKumaV1MonitoringAssignment(assignment)...) } - return groups, nil + return targets, nil } func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Discoverer, error) { diff --git a/discovery/xds/kuma_test.go b/discovery/xds/kuma_test.go index 8db2ce443..0de1b986d 100644 --- a/discovery/xds/kuma_test.go +++ b/discovery/xds/kuma_test.go @@ -138,65 +138,47 @@ func TestKumaMadsV1ResourceParserValidResources(t *testing.T) { res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...) require.NoError(t, err) - groups, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL) + targets, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL) require.NoError(t, err) - require.Len(t, groups, 3) + require.Len(t, targets, 3) - expectedGroup1 := &targetgroup.Group{ - Targets: []model.LabelSet{ - { - "__address__": "10.1.4.32:9090", - "__meta_kuma_label_commit_hash": "620506a88", - "__meta_kuma_dataplane": "prometheus-01", - "__metrics_path__": "/custom-metrics", - "__scheme__": "http", - "instance": "prometheus-01", - }, - { - "__address__": "10.1.4.33:9090", - "__meta_kuma_label_commit_hash": "3513bba00", - "__meta_kuma_dataplane": "prometheus-02", - "__metrics_path__": "", - "__scheme__": "http", - "instance": "prometheus-02", - }, - }, - Labels: model.LabelSet{ + expectedTargets := []model.LabelSet{ + { + "__address__": "10.1.4.32:9090", + "__metrics_path__": "/custom-metrics", + "__scheme__": "http", + "instance": "prometheus-01", "__meta_kuma_mesh": "metrics", "__meta_kuma_service": "prometheus", "__meta_kuma_label_team": "infra", "__meta_kuma_label_kuma_io_zone": "us-east-1", + "__meta_kuma_label_commit_hash": "620506a88", + "__meta_kuma_dataplane": "prometheus-01", }, - } - require.Equal(t, expectedGroup1, groups[0]) - - expectedGroup2 := &targetgroup.Group{ - Labels: model.LabelSet{ + { + "__address__": "10.1.4.33:9090", + "__metrics_path__": "", + "__scheme__": "http", + "instance": "prometheus-02", "__meta_kuma_mesh": "metrics", - "__meta_kuma_service": "grafana", + "__meta_kuma_service": "prometheus", "__meta_kuma_label_team": "infra", "__meta_kuma_label_kuma_io_zone": "us-east-1", + "__meta_kuma_label_commit_hash": "3513bba00", + "__meta_kuma_dataplane": "prometheus-02", + }, + { + "__address__": "10.1.1.1", + "__metrics_path__": "", + "__scheme__": "http", + "instance": "elasticsearch-01", + "__meta_kuma_mesh": "data", + "__meta_kuma_service": "elasticsearch", + "__meta_kuma_label_role": "ml", + "__meta_kuma_dataplane": "elasticsearch-01", }, } - require.Equal(t, expectedGroup2, groups[1]) - - expectedGroup3 := &targetgroup.Group{ - Targets: []model.LabelSet{ - { - "__address__": "10.1.1.1", - "__meta_kuma_label_role": "ml", - "__meta_kuma_dataplane": "elasticsearch-01", - "__metrics_path__": "", - "__scheme__": "http", - "instance": "elasticsearch-01", - }, - }, - Labels: model.LabelSet{ - "__meta_kuma_mesh": "data", - "__meta_kuma_service": "elasticsearch", - }, - } - require.Equal(t, expectedGroup3, groups[2]) + require.Equal(t, expectedTargets, targets) } func TestKumaMadsV1ResourceParserInvalidResources(t *testing.T) { @@ -262,66 +244,48 @@ tls_config: kd.poll(context.Background(), ch) groups := <-ch - require.Len(t, groups, 3) + require.Len(t, groups, 1) - expectedGroup1 := &targetgroup.Group{ - Source: "kuma", - Targets: []model.LabelSet{ - { - "__address__": "10.1.4.32:9090", - "__meta_kuma_label_commit_hash": "620506a88", - "__meta_kuma_dataplane": "prometheus-01", - "__metrics_path__": "/custom-metrics", - "__scheme__": "http", - "instance": "prometheus-01", - }, - { - "__address__": "10.1.4.33:9090", - "__meta_kuma_label_commit_hash": "3513bba00", - "__meta_kuma_dataplane": "prometheus-02", - "__metrics_path__": "", - "__scheme__": "http", - "instance": "prometheus-02", - }, - }, - Labels: model.LabelSet{ + targets := groups[0].Targets + require.Len(t, targets, 3) + + expectedTargets := []model.LabelSet{ + { + "__address__": "10.1.4.32:9090", + "__metrics_path__": "/custom-metrics", + "__scheme__": "http", + "instance": "prometheus-01", "__meta_kuma_mesh": "metrics", "__meta_kuma_service": "prometheus", "__meta_kuma_label_team": "infra", "__meta_kuma_label_kuma_io_zone": "us-east-1", + "__meta_kuma_label_commit_hash": "620506a88", + "__meta_kuma_dataplane": "prometheus-01", }, - } - require.Equal(t, expectedGroup1, groups[0]) - - expectedGroup2 := &targetgroup.Group{ - Source: "kuma", - Labels: model.LabelSet{ + { + "__address__": "10.1.4.33:9090", + "__metrics_path__": "", + "__scheme__": "http", + "instance": "prometheus-02", "__meta_kuma_mesh": "metrics", - "__meta_kuma_service": "grafana", + "__meta_kuma_service": "prometheus", "__meta_kuma_label_team": "infra", "__meta_kuma_label_kuma_io_zone": "us-east-1", + "__meta_kuma_label_commit_hash": "3513bba00", + "__meta_kuma_dataplane": "prometheus-02", + }, + { + "__address__": "10.1.1.1", + "__metrics_path__": "", + "__scheme__": "http", + "instance": "elasticsearch-01", + "__meta_kuma_mesh": "data", + "__meta_kuma_service": "elasticsearch", + "__meta_kuma_label_role": "ml", + "__meta_kuma_dataplane": "elasticsearch-01", }, } - require.Equal(t, expectedGroup2, groups[1]) - - expectedGroup3 := &targetgroup.Group{ - Source: "kuma", - Targets: []model.LabelSet{ - { - "__address__": "10.1.1.1", - "__meta_kuma_label_role": "ml", - "__meta_kuma_dataplane": "elasticsearch-01", - "__metrics_path__": "", - "__scheme__": "http", - "instance": "elasticsearch-01", - }, - }, - Labels: model.LabelSet{ - "__meta_kuma_mesh": "data", - "__meta_kuma_service": "elasticsearch", - }, - } - require.Equal(t, expectedGroup3, groups[2]) + require.Equal(t, expectedTargets, targets) // Should skip the next update. ctx, cancel := context.WithCancel(context.Background()) diff --git a/discovery/xds/xds.go b/discovery/xds/xds.go index f99e03da9..48bdbab02 100644 --- a/discovery/xds/xds.go +++ b/discovery/xds/xds.go @@ -15,7 +15,6 @@ package xds import ( "context" - "github.com/prometheus/common/model" "time" v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" @@ -23,6 +22,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" + "github.com/prometheus/common/model" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" @@ -95,7 +95,9 @@ var ( } ) -type resourceParser func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) +// resourceParser is a function that takes raw discovered objects and translates them into +// targetgroup.Group Targets. On error, no updates are sent to the scrape manager and the failure count is incremented. +type resourceParser func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) // fetchDiscovery implements long-polling via xDS Fetch REST-JSON. type fetchDiscovery struct { @@ -154,23 +156,18 @@ func (d *fetchDiscovery) poll(ctx context.Context, ch chan<- []*targetgroup.Grou return } - parsedGroups, err := d.parseResources(response.Resources, response.TypeUrl) + parsedTargets, err := d.parseResources(response.Resources, response.TypeUrl) if err != nil { level.Error(d.logger).Log("msg", "error parsing resources", "err", err) d.fetchFailuresCount.Inc() return } - for _, group := range parsedGroups { - group.Source = d.source - } + level.Debug(d.logger).Log("msg", "Updated to version", "version", response.VersionInfo, "targets", len(parsedTargets)) - level.Debug(d.logger).Log("msg", "updated to version", "version", response.VersionInfo, "groups", len(parsedGroups)) - - // Check the context before sending an update on the channel. select { case <-ctx.Done(): return - case ch <- parsedGroups: + case ch <- []*targetgroup.Group{{Source: d.source, Targets: parsedTargets}}: } } diff --git a/discovery/xds/xds_test.go b/discovery/xds/xds_test.go index 412cbda68..c0cdd3e7b 100644 --- a/discovery/xds/xds_test.go +++ b/discovery/xds/xds_test.go @@ -93,9 +93,9 @@ func createTestHTTPServer(t *testing.T, responder discoveryResponder) *httptest. })) } -func constantResourceParser(groups []*targetgroup.Group, err error) resourceParser { - return func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) { - return groups, err +func constantResourceParser(targets []model.LabelSet, err error) resourceParser { + return func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) { + return targets, err } } @@ -174,13 +174,16 @@ func TestPollingRefreshAttachesGroupMetadata(t *testing.T) { fetchDuration: testFetchDuration, fetchFailuresCount: testFetchFailuresCount, fetchSkipUpdateCount: testFetchSkipUpdateCount, - parseResources: constantResourceParser([]*targetgroup.Group{ - {}, + parseResources: constantResourceParser([]model.LabelSet{ { - Source: "a-custom-source", - Labels: model.LabelSet{ - "__meta_custom_xds_label": "a-value", - }, + "__meta_custom_xds_label": "a-value", + "__address__": "10.1.4.32:9090", + "instance": "prometheus-01", + }, + { + "__meta_custom_xds_label": "a-value", + "__address__": "10.1.5.32:9090", + "instance": "prometheus-02", }, }, nil), } @@ -189,13 +192,83 @@ func TestPollingRefreshAttachesGroupMetadata(t *testing.T) { groups := <-ch require.NotNil(t, groups) - require.Len(t, groups, 2) + require.Len(t, groups, 1) - for _, group := range groups { - require.Equal(t, source, group.Source) + group := groups[0] + require.Equal(t, source, group.Source) + + require.Len(t, group.Targets, 2) + + target2 := group.Targets[1] + require.Contains(t, target2, model.LabelName("__meta_custom_xds_label")) + require.Equal(t, model.LabelValue("a-value"), target2["__meta_custom_xds_label"]) +} + +func TestPollingDisappearingTargets(t *testing.T) { + server := "http://198.161.2.0" + source := "test" + rc := &testResourceClient{ + server: server, + protocolVersion: ProtocolV3, + fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) { + return &v3.DiscoveryResponse{}, nil + }, } - group2 := groups[1] - require.Contains(t, group2.Labels, model.LabelName("__meta_custom_xds_label")) - require.Equal(t, model.LabelValue("a-value"), group2.Labels["__meta_custom_xds_label"]) + // On the first poll, send back two targets. On the next, send just one. + counter := 0 + parser := func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) { + counter++ + if counter == 1 { + return []model.LabelSet{ + { + "__meta_custom_xds_label": "a-value", + "__address__": "10.1.4.32:9090", + "instance": "prometheus-01", + }, + { + "__meta_custom_xds_label": "a-value", + "__address__": "10.1.5.32:9090", + "instance": "prometheus-02", + }, + }, nil + } + + return []model.LabelSet{ + { + "__meta_custom_xds_label": "a-value", + "__address__": "10.1.4.32:9090", + "instance": "prometheus-01", + }, + }, nil + } + + pd := &fetchDiscovery{ + source: source, + client: rc, + logger: nopLogger, + fetchDuration: testFetchDuration, + fetchFailuresCount: testFetchFailuresCount, + fetchSkipUpdateCount: testFetchSkipUpdateCount, + parseResources: parser, + } + + ch := make(chan []*targetgroup.Group, 1) + pd.poll(context.Background(), ch) + groups := <-ch + require.NotNil(t, groups) + + require.Len(t, groups, 1) + + require.Equal(t, source, groups[0].Source) + require.Len(t, groups[0].Targets, 2) + + pd.poll(context.Background(), ch) + groups = <-ch + require.NotNil(t, groups) + + require.Len(t, groups, 1) + + require.Equal(t, source, groups[0].Source) + require.Len(t, groups[0].Targets, 1) }