mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
initial step
Signed-off-by: Vanshikav123 <vanshikav928@gmail.com>
This commit is contained in:
parent
a768a3b95e
commit
eeb97899ea
|
@ -18,7 +18,9 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
)
|
||||
|
@ -130,6 +132,7 @@ type querier struct {
|
|||
// Derived from configuration.
|
||||
externalLabels labels.Labels
|
||||
requiredMatchers []*labels.Matcher
|
||||
exemplarQueryable storage.ExemplarQueryable
|
||||
}
|
||||
|
||||
// Select implements storage.Querier and uses the given matchers to read series sets from the client.
|
||||
|
@ -165,10 +168,23 @@ func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se
|
|||
return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err))
|
||||
}
|
||||
|
||||
exemplarQuerier, err := q.exemplarQueryable.ExemplarQuerier(ctx)
|
||||
if err != nil {
|
||||
return storage.ErrSeriesSet(fmt.Errorf("ExemplarQuerier: %w", err))
|
||||
}
|
||||
|
||||
exemplars, err := exemplarQuerier.Select(q.mint, q.maxt, [][]*labels.Matcher{matchers}...)
|
||||
if err != nil {
|
||||
return storage.ErrSeriesSet(fmt.Errorf("exemplar select: %w", err))
|
||||
}
|
||||
|
||||
res, err := q.client.Read(ctx, query, sortSeries)
|
||||
if err != nil {
|
||||
return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err))
|
||||
}
|
||||
|
||||
res = mergeSeriesWithExemplars(res, exemplars)
|
||||
|
||||
return newSeriesSetFilter(res, added)
|
||||
}
|
||||
|
||||
|
@ -278,3 +294,52 @@ func (sf seriesFilter) Labels() labels.Labels {
|
|||
b.Del(sf.toFilter...)
|
||||
return b.Labels()
|
||||
}
|
||||
func mergeSeriesWithExemplars(ss storage.SeriesSet, exemplars []exemplar.QueryResult) storage.SeriesSet {
|
||||
exemplarMap := make(map[uint64][]prompb.Exemplar)
|
||||
for _, result := range exemplars {
|
||||
|
||||
key := result.SeriesLabels.Hash()
|
||||
for _, ex := range result.Exemplars {
|
||||
|
||||
if ex.HasTs {
|
||||
exemplarMap[key] = append(exemplarMap[key], prompb.Exemplar{
|
||||
Labels: prompb.FromLabels(ex.Labels, nil),
|
||||
Value: ex.Value,
|
||||
Timestamp: ex.Ts,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wrap the original SeriesSet to inject exemplars into the series.
|
||||
return &seriesSetWithExemplars{
|
||||
SeriesSet: ss,
|
||||
exemplarMap: exemplarMap,
|
||||
}
|
||||
}
|
||||
|
||||
type seriesSetWithExemplars struct {
|
||||
storage.SeriesSet
|
||||
exemplarMap map[uint64][]prompb.Exemplar
|
||||
}
|
||||
|
||||
// At returns a series with exemplars if they exist for the series' labels.
|
||||
func (sswe *seriesSetWithExemplars) At() storage.Series {
|
||||
series := sswe.SeriesSet.At()
|
||||
key := series.Labels().Hash()
|
||||
exemplars := sswe.exemplarMap[key]
|
||||
|
||||
return &seriesWithExemplars{
|
||||
Series: series,
|
||||
exemplars: exemplars,
|
||||
}
|
||||
}
|
||||
|
||||
type seriesWithExemplars struct {
|
||||
storage.Series
|
||||
exemplars []prompb.Exemplar
|
||||
}
|
||||
|
||||
func (swe *seriesWithExemplars) Exemplars() []prompb.Exemplar {
|
||||
return swe.exemplars
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue