From eeb97899ea2e258bf21babc1d80c1c9f06202a6d Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Wed, 8 Jan 2025 23:46:59 +0530 Subject: [PATCH 1/4] initial step Signed-off-by: Vanshikav123 --- storage/remote/read.go | 69 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/storage/remote/read.go b/storage/remote/read.go index 2ec48784dc..caa34c38ae 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -18,7 +18,9 @@ import ( "errors" "fmt" + "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" ) @@ -128,8 +130,9 @@ type querier struct { client ReadClient // Derived from configuration. - externalLabels labels.Labels - requiredMatchers []*labels.Matcher + externalLabels labels.Labels + requiredMatchers []*labels.Matcher + exemplarQueryable storage.ExemplarQueryable } // Select implements storage.Querier and uses the given matchers to read series sets from the client. @@ -165,10 +168,23 @@ func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err)) } + exemplarQuerier, err := q.exemplarQueryable.ExemplarQuerier(ctx) + if err != nil { + return storage.ErrSeriesSet(fmt.Errorf("ExemplarQuerier: %w", err)) + } + + exemplars, err := exemplarQuerier.Select(q.mint, q.maxt, [][]*labels.Matcher{matchers}...) + if err != nil { + return storage.ErrSeriesSet(fmt.Errorf("exemplar select: %w", err)) + } + res, err := q.client.Read(ctx, query, sortSeries) if err != nil { return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err)) } + + res = mergeSeriesWithExemplars(res, exemplars) + return newSeriesSetFilter(res, added) } @@ -278,3 +294,52 @@ func (sf seriesFilter) Labels() labels.Labels { b.Del(sf.toFilter...) return b.Labels() } +func mergeSeriesWithExemplars(ss storage.SeriesSet, exemplars []exemplar.QueryResult) storage.SeriesSet { + exemplarMap := make(map[uint64][]prompb.Exemplar) + for _, result := range exemplars { + + key := result.SeriesLabels.Hash() + for _, ex := range result.Exemplars { + + if ex.HasTs { + exemplarMap[key] = append(exemplarMap[key], prompb.Exemplar{ + Labels: prompb.FromLabels(ex.Labels, nil), + Value: ex.Value, + Timestamp: ex.Ts, + }) + } + } + } + + // Wrap the original SeriesSet to inject exemplars into the series. + return &seriesSetWithExemplars{ + SeriesSet: ss, + exemplarMap: exemplarMap, + } +} + +type seriesSetWithExemplars struct { + storage.SeriesSet + exemplarMap map[uint64][]prompb.Exemplar +} + +// At returns a series with exemplars if they exist for the series' labels. +func (sswe *seriesSetWithExemplars) At() storage.Series { + series := sswe.SeriesSet.At() + key := series.Labels().Hash() + exemplars := sswe.exemplarMap[key] + + return &seriesWithExemplars{ + Series: series, + exemplars: exemplars, + } +} + +type seriesWithExemplars struct { + storage.Series + exemplars []prompb.Exemplar +} + +func (swe *seriesWithExemplars) Exemplars() []prompb.Exemplar { + return swe.exemplars +} From 86634fcd75f38c63d5b177f302453f5c2111f608 Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Wed, 8 Jan 2025 23:53:58 +0530 Subject: [PATCH 2/4] removed comments Signed-off-by: Vanshikav123 --- storage/remote/read.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/storage/remote/read.go b/storage/remote/read.go index caa34c38ae..9013fc3b91 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -311,7 +311,6 @@ func mergeSeriesWithExemplars(ss storage.SeriesSet, exemplars []exemplar.QueryRe } } - // Wrap the original SeriesSet to inject exemplars into the series. return &seriesSetWithExemplars{ SeriesSet: ss, exemplarMap: exemplarMap, @@ -323,7 +322,6 @@ type seriesSetWithExemplars struct { exemplarMap map[uint64][]prompb.Exemplar } -// At returns a series with exemplars if they exist for the series' labels. func (sswe *seriesSetWithExemplars) At() storage.Series { series := sswe.SeriesSet.At() key := series.Labels().Hash() From c97a9052a72957103db76e1fdd67798af8ba7fe2 Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Tue, 14 Jan 2025 19:08:49 +0530 Subject: [PATCH 3/4] fixing errors Signed-off-by: Vanshikav123 --- storage/remote/read.go | 71 +++++++++++++++++++------------------ storage/remote/read_test.go | 6 ++-- storage/remote/storage.go | 2 ++ 3 files changed, 42 insertions(+), 37 deletions(-) diff --git a/storage/remote/read.go b/storage/remote/read.go index 9013fc3b91..53771658b1 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -26,11 +26,12 @@ import ( ) type sampleAndChunkQueryableClient struct { - client ReadClient - externalLabels labels.Labels - requiredMatchers []*labels.Matcher - readRecent bool - callback startTimeCallback + client ReadClient + externalLabels labels.Labels + requiredMatchers []*labels.Matcher + readRecent bool + callback startTimeCallback + exemplarQueryable storage.ExemplarQueryable } // NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets. @@ -40,24 +41,27 @@ func NewSampleAndChunkQueryableClient( requiredMatchers []*labels.Matcher, readRecent bool, callback startTimeCallback, + exemplarQueryable storage.ExemplarQueryable, ) storage.SampleAndChunkQueryable { return &sampleAndChunkQueryableClient{ client: c, - externalLabels: externalLabels, - requiredMatchers: requiredMatchers, - readRecent: readRecent, - callback: callback, + externalLabels: externalLabels, + requiredMatchers: requiredMatchers, + readRecent: readRecent, + callback: callback, + exemplarQueryable: exemplarQueryable, } } func (c *sampleAndChunkQueryableClient) Querier(mint, maxt int64) (storage.Querier, error) { q := &querier{ - mint: mint, - maxt: maxt, - client: c.client, - externalLabels: c.externalLabels, - requiredMatchers: c.requiredMatchers, + mint: mint, + maxt: maxt, + client: c.client, + externalLabels: c.externalLabels, + requiredMatchers: c.requiredMatchers, + exemplarQueryable: c.exemplarQueryable, } if c.readRecent { return q, nil @@ -80,11 +84,12 @@ func (c *sampleAndChunkQueryableClient) Querier(mint, maxt int64) (storage.Queri func (c *sampleAndChunkQueryableClient) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { cq := &chunkQuerier{ querier: querier{ - mint: mint, - maxt: maxt, - client: c.client, - externalLabels: c.externalLabels, - requiredMatchers: c.requiredMatchers, + mint: mint, + maxt: maxt, + client: c.client, + externalLabels: c.externalLabels, + requiredMatchers: c.requiredMatchers, + exemplarQueryable: c.exemplarQueryable, }, } if c.readRecent { @@ -167,24 +172,22 @@ func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se if err != nil { return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err)) } - - exemplarQuerier, err := q.exemplarQueryable.ExemplarQuerier(ctx) - if err != nil { - return storage.ErrSeriesSet(fmt.Errorf("ExemplarQuerier: %w", err)) - } - - exemplars, err := exemplarQuerier.Select(q.mint, q.maxt, [][]*labels.Matcher{matchers}...) - if err != nil { - return storage.ErrSeriesSet(fmt.Errorf("exemplar select: %w", err)) - } - res, err := q.client.Read(ctx, query, sortSeries) if err != nil { return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err)) } + if q.exemplarQueryable != nil { + exemplarQuerier, err := q.exemplarQueryable.ExemplarQuerier(ctx) + if err != nil { + return storage.ErrSeriesSet(fmt.Errorf("ExemplarQuerier: %w", err)) + } - res = mergeSeriesWithExemplars(res, exemplars) - + exemplars, err := exemplarQuerier.Select(q.mint, q.maxt, [][]*labels.Matcher{matchers}...) + if err != nil { + return storage.ErrSeriesSet(fmt.Errorf("exemplar select: %w", err)) + } + res = mergeSeriesWithExemplars(res, exemplars) + } return newSeriesSetFilter(res, added) } @@ -294,13 +297,12 @@ func (sf seriesFilter) Labels() labels.Labels { b.Del(sf.toFilter...) return b.Labels() } + func mergeSeriesWithExemplars(ss storage.SeriesSet, exemplars []exemplar.QueryResult) storage.SeriesSet { exemplarMap := make(map[uint64][]prompb.Exemplar) for _, result := range exemplars { - key := result.SeriesLabels.Hash() for _, ex := range result.Exemplars { - if ex.HasTs { exemplarMap[key] = append(exemplarMap[key], prompb.Exemplar{ Labels: prompb.FromLabels(ex.Labels, nil), @@ -310,7 +312,6 @@ func mergeSeriesWithExemplars(ss storage.SeriesSet, exemplars []exemplar.QueryRe } } } - return &seriesSetWithExemplars{ SeriesSet: ss, exemplarMap: exemplarMap, diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index b78a8c6215..d7567806a8 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -256,8 +256,9 @@ func TestSampleAndChunkQueryableClient(t *testing.T) { readRecent bool callback startTimeCallback - expectedQuery *prompb.Query - expectedSeries []labels.Labels + expectedQuery *prompb.Query + expectedSeries []labels.Labels + exemplarQueryable storage.ExemplarQueryable }{ { name: "empty", @@ -472,6 +473,7 @@ func TestSampleAndChunkQueryableClient(t *testing.T) { tc.requiredMatchers, tc.readRecent, tc.callback, + tc.exemplarQueryable, ) q, err := c.Querier(tc.mint, tc.maxt) require.NoError(t, err) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 14c3c87d93..36e8bdfee2 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -61,6 +61,7 @@ type Storage struct { // For reads. queryables []storage.SampleAndChunkQueryable localStartTimeCallback startTimeCallback + exemplarQueryable storage.ExemplarQueryable } // NewStorage returns a remote.Storage. @@ -137,6 +138,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { labelsToEqualityMatchers(rrConf.RequiredMatchers), rrConf.ReadRecent, s.localStartTimeCallback, + s.exemplarQueryable, )) } s.queryables = queryables From 83eb2169868996d9925d10cdd11f22032d81e0b7 Mon Sep 17 00:00:00 2001 From: Vanshikav123 Date: Wed, 15 Jan 2025 18:44:11 +0530 Subject: [PATCH 4/4] Serialization/Deserialization Signed-off-by: Vanshikav123 --- storage/remote/codec.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 80bb811500..eb25a7fe25 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -132,6 +132,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, var ( samples []prompb.Sample histograms []prompb.Histogram + exemplars []prompb.Exemplar ) for valType := iter.Next(); valType != chunkenc.ValNone; valType = iter.Next() { @@ -164,10 +165,16 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, return nil, ss.Warnings(), err } + if sse, ok := ss.(*seriesSetWithExemplars); ok { + key := series.Labels().Hash() + exemplars = sse.exemplarMap[key] + } + resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{ Labels: prompb.FromLabels(series.Labels(), nil), Samples: samples, Histograms: histograms, + Exemplars: exemplars, }) } return resp, ss.Warnings(), ss.Err() @@ -182,7 +189,7 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet return errSeriesSet{err: err} } lbls := ts.ToLabels(&b, nil) - series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms}) + series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms, exemplars: ts.Exemplars}) } if sortSeries { @@ -368,6 +375,7 @@ type concreteSeries struct { labels labels.Labels floats []prompb.Sample histograms []prompb.Histogram + exemplars []prompb.Exemplar } func (c *concreteSeries) Labels() labels.Labels {