diff --git a/storage/remote/read.go b/storage/remote/read.go index 2ec48784dc..caa34c38ae 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -18,7 +18,9 @@ import ( "errors" "fmt" + "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" ) @@ -128,8 +130,9 @@ type querier struct { client ReadClient // Derived from configuration. - externalLabels labels.Labels - requiredMatchers []*labels.Matcher + externalLabels labels.Labels + requiredMatchers []*labels.Matcher + exemplarQueryable storage.ExemplarQueryable } // Select implements storage.Querier and uses the given matchers to read series sets from the client. @@ -165,10 +168,23 @@ func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err)) } + exemplarQuerier, err := q.exemplarQueryable.ExemplarQuerier(ctx) + if err != nil { + return storage.ErrSeriesSet(fmt.Errorf("ExemplarQuerier: %w", err)) + } + + exemplars, err := exemplarQuerier.Select(q.mint, q.maxt, [][]*labels.Matcher{matchers}...) + if err != nil { + return storage.ErrSeriesSet(fmt.Errorf("exemplar select: %w", err)) + } + res, err := q.client.Read(ctx, query, sortSeries) if err != nil { return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err)) } + + res = mergeSeriesWithExemplars(res, exemplars) + return newSeriesSetFilter(res, added) } @@ -278,3 +294,52 @@ func (sf seriesFilter) Labels() labels.Labels { b.Del(sf.toFilter...) return b.Labels() } +func mergeSeriesWithExemplars(ss storage.SeriesSet, exemplars []exemplar.QueryResult) storage.SeriesSet { + exemplarMap := make(map[uint64][]prompb.Exemplar) + for _, result := range exemplars { + + key := result.SeriesLabels.Hash() + for _, ex := range result.Exemplars { + + if ex.HasTs { + exemplarMap[key] = append(exemplarMap[key], prompb.Exemplar{ + Labels: prompb.FromLabels(ex.Labels, nil), + Value: ex.Value, + Timestamp: ex.Ts, + }) + } + } + } + + // Wrap the original SeriesSet to inject exemplars into the series. + return &seriesSetWithExemplars{ + SeriesSet: ss, + exemplarMap: exemplarMap, + } +} + +type seriesSetWithExemplars struct { + storage.SeriesSet + exemplarMap map[uint64][]prompb.Exemplar +} + +// At returns a series with exemplars if they exist for the series' labels. +func (sswe *seriesSetWithExemplars) At() storage.Series { + series := sswe.SeriesSet.At() + key := series.Labels().Hash() + exemplars := sswe.exemplarMap[key] + + return &seriesWithExemplars{ + Series: series, + exemplars: exemplars, + } +} + +type seriesWithExemplars struct { + storage.Series + exemplars []prompb.Exemplar +} + +func (swe *seriesWithExemplars) Exemplars() []prompb.Exemplar { + return swe.exemplars +}