diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 80bb811500..eb25a7fe25 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -132,6 +132,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, var ( samples []prompb.Sample histograms []prompb.Histogram + exemplars []prompb.Exemplar ) for valType := iter.Next(); valType != chunkenc.ValNone; valType = iter.Next() { @@ -164,10 +165,16 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, return nil, ss.Warnings(), err } + if sse, ok := ss.(*seriesSetWithExemplars); ok { + key := series.Labels().Hash() + exemplars = sse.exemplarMap[key] + } + resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{ Labels: prompb.FromLabels(series.Labels(), nil), Samples: samples, Histograms: histograms, + Exemplars: exemplars, }) } return resp, ss.Warnings(), ss.Err() @@ -182,7 +189,7 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet return errSeriesSet{err: err} } lbls := ts.ToLabels(&b, nil) - series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms}) + series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms, exemplars: ts.Exemplars}) } if sortSeries { @@ -368,6 +375,7 @@ type concreteSeries struct { labels labels.Labels floats []prompb.Sample histograms []prompb.Histogram + exemplars []prompb.Exemplar } func (c *concreteSeries) Labels() labels.Labels { diff --git a/storage/remote/read.go b/storage/remote/read.go index 2ec48784dc..53771658b1 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -18,17 +18,20 @@ import ( "errors" "fmt" + "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" ) type sampleAndChunkQueryableClient struct { - client ReadClient - externalLabels labels.Labels - requiredMatchers []*labels.Matcher - readRecent bool - callback startTimeCallback + client ReadClient + externalLabels labels.Labels + requiredMatchers []*labels.Matcher + readRecent bool + callback startTimeCallback + exemplarQueryable storage.ExemplarQueryable } // NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets. @@ -38,24 +41,27 @@ func NewSampleAndChunkQueryableClient( requiredMatchers []*labels.Matcher, readRecent bool, callback startTimeCallback, + exemplarQueryable storage.ExemplarQueryable, ) storage.SampleAndChunkQueryable { return &sampleAndChunkQueryableClient{ client: c, - externalLabels: externalLabels, - requiredMatchers: requiredMatchers, - readRecent: readRecent, - callback: callback, + externalLabels: externalLabels, + requiredMatchers: requiredMatchers, + readRecent: readRecent, + callback: callback, + exemplarQueryable: exemplarQueryable, } } func (c *sampleAndChunkQueryableClient) Querier(mint, maxt int64) (storage.Querier, error) { q := &querier{ - mint: mint, - maxt: maxt, - client: c.client, - externalLabels: c.externalLabels, - requiredMatchers: c.requiredMatchers, + mint: mint, + maxt: maxt, + client: c.client, + externalLabels: c.externalLabels, + requiredMatchers: c.requiredMatchers, + exemplarQueryable: c.exemplarQueryable, } if c.readRecent { return q, nil @@ -78,11 +84,12 @@ func (c *sampleAndChunkQueryableClient) Querier(mint, maxt int64) (storage.Queri func (c *sampleAndChunkQueryableClient) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { cq := &chunkQuerier{ querier: querier{ - mint: mint, - maxt: maxt, - client: c.client, - externalLabels: c.externalLabels, - requiredMatchers: c.requiredMatchers, + mint: mint, + maxt: maxt, + client: c.client, + externalLabels: c.externalLabels, + requiredMatchers: c.requiredMatchers, + exemplarQueryable: c.exemplarQueryable, }, } if c.readRecent { @@ -128,8 +135,9 @@ type querier struct { client ReadClient // Derived from configuration. - externalLabels labels.Labels - requiredMatchers []*labels.Matcher + externalLabels labels.Labels + requiredMatchers []*labels.Matcher + exemplarQueryable storage.ExemplarQueryable } // Select implements storage.Querier and uses the given matchers to read series sets from the client. @@ -164,11 +172,22 @@ func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se if err != nil { return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err)) } - res, err := q.client.Read(ctx, query, sortSeries) if err != nil { return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err)) } + if q.exemplarQueryable != nil { + exemplarQuerier, err := q.exemplarQueryable.ExemplarQuerier(ctx) + if err != nil { + return storage.ErrSeriesSet(fmt.Errorf("ExemplarQuerier: %w", err)) + } + + exemplars, err := exemplarQuerier.Select(q.mint, q.maxt, [][]*labels.Matcher{matchers}...) + if err != nil { + return storage.ErrSeriesSet(fmt.Errorf("exemplar select: %w", err)) + } + res = mergeSeriesWithExemplars(res, exemplars) + } return newSeriesSetFilter(res, added) } @@ -278,3 +297,48 @@ func (sf seriesFilter) Labels() labels.Labels { b.Del(sf.toFilter...) return b.Labels() } + +func mergeSeriesWithExemplars(ss storage.SeriesSet, exemplars []exemplar.QueryResult) storage.SeriesSet { + exemplarMap := make(map[uint64][]prompb.Exemplar) + for _, result := range exemplars { + key := result.SeriesLabels.Hash() + for _, ex := range result.Exemplars { + if ex.HasTs { + exemplarMap[key] = append(exemplarMap[key], prompb.Exemplar{ + Labels: prompb.FromLabels(ex.Labels, nil), + Value: ex.Value, + Timestamp: ex.Ts, + }) + } + } + } + return &seriesSetWithExemplars{ + SeriesSet: ss, + exemplarMap: exemplarMap, + } +} + +type seriesSetWithExemplars struct { + storage.SeriesSet + exemplarMap map[uint64][]prompb.Exemplar +} + +func (sswe *seriesSetWithExemplars) At() storage.Series { + series := sswe.SeriesSet.At() + key := series.Labels().Hash() + exemplars := sswe.exemplarMap[key] + + return &seriesWithExemplars{ + Series: series, + exemplars: exemplars, + } +} + +type seriesWithExemplars struct { + storage.Series + exemplars []prompb.Exemplar +} + +func (swe *seriesWithExemplars) Exemplars() []prompb.Exemplar { + return swe.exemplars +} diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 8073f23b3b..bf5785772e 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -256,8 +256,9 @@ func TestSampleAndChunkQueryableClient(t *testing.T) { readRecent bool callback startTimeCallback - expectedQuery *prompb.Query - expectedSeries []labels.Labels + expectedQuery *prompb.Query + expectedSeries []labels.Labels + exemplarQueryable storage.ExemplarQueryable }{ { name: "empty", @@ -472,6 +473,7 @@ func TestSampleAndChunkQueryableClient(t *testing.T) { tc.requiredMatchers, tc.readRecent, tc.callback, + tc.exemplarQueryable, ) q, err := c.Querier(tc.mint, tc.maxt) require.NoError(t, err) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index ba6d100bdf..1e5585381d 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -61,6 +61,7 @@ type Storage struct { // For reads. queryables []storage.SampleAndChunkQueryable localStartTimeCallback startTimeCallback + exemplarQueryable storage.ExemplarQueryable } // NewStorage returns a remote.Storage. @@ -137,6 +138,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { labelsToEqualityMatchers(rrConf.RequiredMatchers), rrConf.ReadRecent, s.localStartTimeCallback, + s.exemplarQueryable, )) } s.queryables = queryables