This commit is contained in:
Vanshika 2025-03-06 01:44:58 +05:30 committed by GitHub
commit b550a92cb2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 101 additions and 25 deletions

View file

@ -132,6 +132,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
var (
samples []prompb.Sample
histograms []prompb.Histogram
exemplars []prompb.Exemplar
)
for valType := iter.Next(); valType != chunkenc.ValNone; valType = iter.Next() {
@ -164,10 +165,16 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
return nil, ss.Warnings(), err
}
if sse, ok := ss.(*seriesSetWithExemplars); ok {
key := series.Labels().Hash()
exemplars = sse.exemplarMap[key]
}
resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{
Labels: prompb.FromLabels(series.Labels(), nil),
Samples: samples,
Histograms: histograms,
Exemplars: exemplars,
})
}
return resp, ss.Warnings(), ss.Err()
@ -182,7 +189,7 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet
return errSeriesSet{err: err}
}
lbls := ts.ToLabels(&b, nil)
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms})
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms, exemplars: ts.Exemplars})
}
if sortSeries {
@ -368,6 +375,7 @@ type concreteSeries struct {
labels labels.Labels
floats []prompb.Sample
histograms []prompb.Histogram
exemplars []prompb.Exemplar
}
func (c *concreteSeries) Labels() labels.Labels {

View file

@ -18,17 +18,20 @@ import (
"errors"
"fmt"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
)
type sampleAndChunkQueryableClient struct {
client ReadClient
externalLabels labels.Labels
requiredMatchers []*labels.Matcher
readRecent bool
callback startTimeCallback
client ReadClient
externalLabels labels.Labels
requiredMatchers []*labels.Matcher
readRecent bool
callback startTimeCallback
exemplarQueryable storage.ExemplarQueryable
}
// NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets.
@ -38,24 +41,27 @@ func NewSampleAndChunkQueryableClient(
requiredMatchers []*labels.Matcher,
readRecent bool,
callback startTimeCallback,
exemplarQueryable storage.ExemplarQueryable,
) storage.SampleAndChunkQueryable {
return &sampleAndChunkQueryableClient{
client: c,
externalLabels: externalLabels,
requiredMatchers: requiredMatchers,
readRecent: readRecent,
callback: callback,
externalLabels: externalLabels,
requiredMatchers: requiredMatchers,
readRecent: readRecent,
callback: callback,
exemplarQueryable: exemplarQueryable,
}
}
func (c *sampleAndChunkQueryableClient) Querier(mint, maxt int64) (storage.Querier, error) {
q := &querier{
mint: mint,
maxt: maxt,
client: c.client,
externalLabels: c.externalLabels,
requiredMatchers: c.requiredMatchers,
mint: mint,
maxt: maxt,
client: c.client,
externalLabels: c.externalLabels,
requiredMatchers: c.requiredMatchers,
exemplarQueryable: c.exemplarQueryable,
}
if c.readRecent {
return q, nil
@ -78,11 +84,12 @@ func (c *sampleAndChunkQueryableClient) Querier(mint, maxt int64) (storage.Queri
func (c *sampleAndChunkQueryableClient) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
cq := &chunkQuerier{
querier: querier{
mint: mint,
maxt: maxt,
client: c.client,
externalLabels: c.externalLabels,
requiredMatchers: c.requiredMatchers,
mint: mint,
maxt: maxt,
client: c.client,
externalLabels: c.externalLabels,
requiredMatchers: c.requiredMatchers,
exemplarQueryable: c.exemplarQueryable,
},
}
if c.readRecent {
@ -128,8 +135,9 @@ type querier struct {
client ReadClient
// Derived from configuration.
externalLabels labels.Labels
requiredMatchers []*labels.Matcher
externalLabels labels.Labels
requiredMatchers []*labels.Matcher
exemplarQueryable storage.ExemplarQueryable
}
// Select implements storage.Querier and uses the given matchers to read series sets from the client.
@ -164,11 +172,22 @@ func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se
if err != nil {
return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err))
}
res, err := q.client.Read(ctx, query, sortSeries)
if err != nil {
return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err))
}
if q.exemplarQueryable != nil {
exemplarQuerier, err := q.exemplarQueryable.ExemplarQuerier(ctx)
if err != nil {
return storage.ErrSeriesSet(fmt.Errorf("ExemplarQuerier: %w", err))
}
exemplars, err := exemplarQuerier.Select(q.mint, q.maxt, [][]*labels.Matcher{matchers}...)
if err != nil {
return storage.ErrSeriesSet(fmt.Errorf("exemplar select: %w", err))
}
res = mergeSeriesWithExemplars(res, exemplars)
}
return newSeriesSetFilter(res, added)
}
@ -278,3 +297,48 @@ func (sf seriesFilter) Labels() labels.Labels {
b.Del(sf.toFilter...)
return b.Labels()
}
func mergeSeriesWithExemplars(ss storage.SeriesSet, exemplars []exemplar.QueryResult) storage.SeriesSet {
exemplarMap := make(map[uint64][]prompb.Exemplar)
for _, result := range exemplars {
key := result.SeriesLabels.Hash()
for _, ex := range result.Exemplars {
if ex.HasTs {
exemplarMap[key] = append(exemplarMap[key], prompb.Exemplar{
Labels: prompb.FromLabels(ex.Labels, nil),
Value: ex.Value,
Timestamp: ex.Ts,
})
}
}
}
return &seriesSetWithExemplars{
SeriesSet: ss,
exemplarMap: exemplarMap,
}
}
type seriesSetWithExemplars struct {
storage.SeriesSet
exemplarMap map[uint64][]prompb.Exemplar
}
func (sswe *seriesSetWithExemplars) At() storage.Series {
series := sswe.SeriesSet.At()
key := series.Labels().Hash()
exemplars := sswe.exemplarMap[key]
return &seriesWithExemplars{
Series: series,
exemplars: exemplars,
}
}
type seriesWithExemplars struct {
storage.Series
exemplars []prompb.Exemplar
}
func (swe *seriesWithExemplars) Exemplars() []prompb.Exemplar {
return swe.exemplars
}

View file

@ -256,8 +256,9 @@ func TestSampleAndChunkQueryableClient(t *testing.T) {
readRecent bool
callback startTimeCallback
expectedQuery *prompb.Query
expectedSeries []labels.Labels
expectedQuery *prompb.Query
expectedSeries []labels.Labels
exemplarQueryable storage.ExemplarQueryable
}{
{
name: "empty",
@ -472,6 +473,7 @@ func TestSampleAndChunkQueryableClient(t *testing.T) {
tc.requiredMatchers,
tc.readRecent,
tc.callback,
tc.exemplarQueryable,
)
q, err := c.Querier(tc.mint, tc.maxt)
require.NoError(t, err)

View file

@ -61,6 +61,7 @@ type Storage struct {
// For reads.
queryables []storage.SampleAndChunkQueryable
localStartTimeCallback startTimeCallback
exemplarQueryable storage.ExemplarQueryable
}
// NewStorage returns a remote.Storage.
@ -137,6 +138,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
labelsToEqualityMatchers(rrConf.RequiredMatchers),
rrConf.ReadRecent,
s.localStartTimeCallback,
s.exemplarQueryable,
))
}
s.queryables = queryables