mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-13 17:14:05 -08:00
Merge f9bc50b247
into 76432aaf4e
This commit is contained in:
commit
a45d12aaf3
|
@ -733,7 +733,7 @@ func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt i
|
|||
for _, mset := range matcherSets {
|
||||
sets = append(sets, q.Select(ctx, true, nil, mset...))
|
||||
}
|
||||
ss = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
|
||||
ss = storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
|
||||
} else {
|
||||
ss = q.Select(ctx, false, nil, matcherSets[0]...)
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
|
@ -136,13 +135,17 @@ func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier {
|
|||
// Select returns a set of series that matches the given label matchers.
|
||||
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
||||
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
|
||||
var limit int
|
||||
if hints != nil {
|
||||
limit = hints.Limit
|
||||
}
|
||||
if !q.concurrentSelect {
|
||||
for _, querier := range q.queriers {
|
||||
// We need to sort for merge to work.
|
||||
seriesSets = append(seriesSets, querier.Select(ctx, true, hints, matchers...))
|
||||
}
|
||||
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
|
||||
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
|
||||
s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn)
|
||||
return s, s.Next()
|
||||
}}
|
||||
}
|
||||
|
@ -175,7 +178,7 @@ func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints
|
|||
seriesSets = append(seriesSets, r)
|
||||
}
|
||||
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
|
||||
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
|
||||
s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn)
|
||||
return s, s.Next()
|
||||
}}
|
||||
}
|
||||
|
@ -193,35 +196,44 @@ func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQ
|
|||
// If matchers are specified the returned result set is reduced
|
||||
// to label values of metrics matching the matchers.
|
||||
func (q *mergeGenericQuerier) LabelValues(ctx context.Context, name string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
res, ws, err := q.lvals(ctx, q.queriers, name, hints, matchers...)
|
||||
res, ws, err := q.mergeResults(q.queriers, hints, func(q LabelQuerier) ([]string, annotations.Annotations, error) {
|
||||
return q.LabelValues(ctx, name, hints, matchers...)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("LabelValues() from merge generic querier for label %s: %w", name, err)
|
||||
}
|
||||
return res, ws, nil
|
||||
}
|
||||
|
||||
// lvals performs merge sort for LabelValues from multiple queriers.
|
||||
func (q *mergeGenericQuerier) lvals(ctx context.Context, lq labelGenericQueriers, n string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
// mergeResults performs merge sort on the results of invoking the resultsFn against multiple queriers.
|
||||
func (q *mergeGenericQuerier) mergeResults(lq labelGenericQueriers, hints *LabelHints, resultsFn func(q LabelQuerier) ([]string, annotations.Annotations, error)) ([]string, annotations.Annotations, error) {
|
||||
if lq.Len() == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
if lq.Len() == 1 {
|
||||
return lq.Get(0).LabelValues(ctx, n, hints, matchers...)
|
||||
return resultsFn(lq.Get(0))
|
||||
}
|
||||
a, b := lq.SplitByHalf()
|
||||
|
||||
var ws annotations.Annotations
|
||||
s1, w, err := q.lvals(ctx, a, n, hints, matchers...)
|
||||
s1, w, err := q.mergeResults(a, hints, resultsFn)
|
||||
ws.Merge(w)
|
||||
if err != nil {
|
||||
return nil, ws, err
|
||||
}
|
||||
s2, ws, err := q.lvals(ctx, b, n, hints, matchers...)
|
||||
s2, w, err := q.mergeResults(b, hints, resultsFn)
|
||||
ws.Merge(w)
|
||||
if err != nil {
|
||||
return nil, ws, err
|
||||
}
|
||||
return mergeStrings(s1, s2), ws, nil
|
||||
|
||||
s1 = truncateToLimit(s1, hints)
|
||||
s2 = truncateToLimit(s2, hints)
|
||||
|
||||
merged := mergeStrings(s1, s2)
|
||||
merged = truncateToLimit(merged, hints)
|
||||
|
||||
return merged, ws, nil
|
||||
}
|
||||
|
||||
func mergeStrings(a, b []string) []string {
|
||||
|
@ -253,33 +265,13 @@ func mergeStrings(a, b []string) []string {
|
|||
|
||||
// LabelNames returns all the unique label names present in all queriers in sorted order.
|
||||
func (q *mergeGenericQuerier) LabelNames(ctx context.Context, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
var (
|
||||
labelNamesMap = make(map[string]struct{})
|
||||
warnings annotations.Annotations
|
||||
)
|
||||
for _, querier := range q.queriers {
|
||||
names, wrn, err := querier.LabelNames(ctx, hints, matchers...)
|
||||
if wrn != nil {
|
||||
// TODO(bwplotka): We could potentially wrap warnings.
|
||||
warnings.Merge(wrn)
|
||||
}
|
||||
res, ws, err := q.mergeResults(q.queriers, hints, func(q LabelQuerier) ([]string, annotations.Annotations, error) {
|
||||
return q.LabelNames(ctx, hints, matchers...)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("LabelNames() from merge generic querier: %w", err)
|
||||
}
|
||||
for _, name := range names {
|
||||
labelNamesMap[name] = struct{}{}
|
||||
}
|
||||
}
|
||||
if len(labelNamesMap) == 0 {
|
||||
return nil, warnings, nil
|
||||
}
|
||||
|
||||
labelNames := make([]string, 0, len(labelNamesMap))
|
||||
for name := range labelNamesMap {
|
||||
labelNames = append(labelNames, name)
|
||||
}
|
||||
slices.Sort(labelNames)
|
||||
return labelNames, warnings, nil
|
||||
return res, ws, nil
|
||||
}
|
||||
|
||||
// Close releases the resources of the generic querier.
|
||||
|
@ -293,17 +285,25 @@ func (q *mergeGenericQuerier) Close() error {
|
|||
return errs.Err()
|
||||
}
|
||||
|
||||
func truncateToLimit(s []string, hints *LabelHints) []string {
|
||||
if hints != nil && hints.Limit > 0 && len(s) > hints.Limit {
|
||||
s = s[:hints.Limit]
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
|
||||
// It has to handle time-overlapped series as well.
|
||||
type VerticalSeriesMergeFunc func(...Series) Series
|
||||
|
||||
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
|
||||
func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
|
||||
// If limit is set, the SeriesSet will be limited up-to the limit. 0 means disabled.
|
||||
func NewMergeSeriesSet(sets []SeriesSet, limit int, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
|
||||
genericSets := make([]genericSeriesSet, 0, len(sets))
|
||||
for _, s := range sets {
|
||||
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
|
||||
}
|
||||
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
|
||||
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
|
||||
}
|
||||
|
||||
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
|
||||
|
@ -313,12 +313,12 @@ func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) Seri
|
|||
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries
|
||||
|
||||
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
|
||||
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
|
||||
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, limit int, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
|
||||
genericSets := make([]genericSeriesSet, 0, len(sets))
|
||||
for _, s := range sets {
|
||||
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
|
||||
}
|
||||
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
|
||||
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
|
||||
}
|
||||
|
||||
// genericMergeSeriesSet implements genericSeriesSet.
|
||||
|
@ -329,6 +329,8 @@ type genericMergeSeriesSet struct {
|
|||
heap genericSeriesSetHeap
|
||||
sets []genericSeriesSet
|
||||
currentSets []genericSeriesSet
|
||||
seriesLimit int
|
||||
mergedSeries int // tracks the total number of series merged and returned.
|
||||
}
|
||||
|
||||
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
|
||||
|
@ -336,7 +338,8 @@ type genericMergeSeriesSet struct {
|
|||
// Each series set must return its series in labels order, otherwise
|
||||
// merged series set will be incorrect.
|
||||
// Overlapped situations are merged using provided mergeFunc.
|
||||
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
|
||||
// If seriesLimit is set, only limited series are returned.
|
||||
func newGenericMergeSeriesSet(sets []genericSeriesSet, seriesLimit int, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
|
||||
if len(sets) == 1 {
|
||||
return sets[0]
|
||||
}
|
||||
|
@ -359,10 +362,16 @@ func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMe
|
|||
mergeFunc: mergeFunc,
|
||||
sets: sets,
|
||||
heap: h,
|
||||
seriesLimit: seriesLimit,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *genericMergeSeriesSet) Next() bool {
|
||||
if c.seriesLimit > 0 && c.mergedSeries >= c.seriesLimit {
|
||||
// Exit early if seriesLimit is set.
|
||||
return false
|
||||
}
|
||||
|
||||
// Run in a loop because the "next" series sets may not be valid anymore.
|
||||
// If, for the current label set, all the next series sets come from
|
||||
// failed remote storage sources, we want to keep trying with the next label set.
|
||||
|
@ -393,6 +402,7 @@ func (c *genericMergeSeriesSet) Next() bool {
|
|||
break
|
||||
}
|
||||
}
|
||||
c.mergedSeries++
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
@ -1345,7 +1345,7 @@ func makeMergeSeriesSet(serieses [][]Series) SeriesSet {
|
|||
for i, s := range serieses {
|
||||
seriesSets[i] = &genericSeriesSetAdapter{NewMockSeriesSet(s...)}
|
||||
}
|
||||
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
|
||||
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, 0, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
|
||||
}
|
||||
|
||||
func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) {
|
||||
|
@ -1390,6 +1390,34 @@ func BenchmarkMergeSeriesSet(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
func BenchmarkMergeLabelValuesWithLimit(b *testing.B) {
|
||||
var queriers []genericQuerier
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
var lbls []string
|
||||
for j := 0; j < 100000; j++ {
|
||||
lbls = append(lbls, fmt.Sprintf("querier_%d_label_%d", i, j))
|
||||
}
|
||||
q := &mockQuerier{resp: lbls}
|
||||
queriers = append(queriers, newGenericQuerierFrom(q))
|
||||
}
|
||||
|
||||
mergeQuerier := &mergeGenericQuerier{
|
||||
queriers: queriers, // Assume querying 5 blocks.
|
||||
mergeFn: func(l ...Labels) Labels {
|
||||
return l[0]
|
||||
},
|
||||
}
|
||||
|
||||
b.Run("benchmark", func(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
hints := &LabelHints{
|
||||
Limit: 1000,
|
||||
}
|
||||
mergeQuerier.LabelValues(ctx, "name", hints)
|
||||
})
|
||||
}
|
||||
|
||||
func visitMockQueriers(t *testing.T, qr Querier, f func(t *testing.T, q *mockQuerier)) int {
|
||||
count := 0
|
||||
switch x := qr.(type) {
|
||||
|
@ -1428,6 +1456,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
|||
name string
|
||||
primaries []Querier
|
||||
secondaries []Querier
|
||||
limit int
|
||||
|
||||
expectedSelectsSeries []labels.Labels
|
||||
expectedLabels []string
|
||||
|
@ -1553,12 +1582,39 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
|||
expectedLabels: []string{"a", "b"},
|
||||
expectedWarnings: annotations.New().Add(warnStorage),
|
||||
},
|
||||
{
|
||||
name: "successful queriers with limit",
|
||||
primaries: []Querier{
|
||||
&mockQuerier{resp: []string{"a", "d"}, warnings: annotations.New().Add(warnStorage), err: nil},
|
||||
},
|
||||
secondaries: []Querier{
|
||||
&mockQuerier{resp: []string{"b", "c"}, warnings: annotations.New().Add(warnStorage), err: nil},
|
||||
},
|
||||
limit: 2,
|
||||
expectedSelectsSeries: []labels.Labels{
|
||||
labels.FromStrings("test", "a"),
|
||||
labels.FromStrings("test", "b"),
|
||||
},
|
||||
expectedLabels: []string{"a", "b"},
|
||||
expectedWarnings: annotations.New().Add(warnStorage),
|
||||
},
|
||||
} {
|
||||
var labelHints *LabelHints
|
||||
var selectHints *SelectHints
|
||||
if tcase.limit > 0 {
|
||||
labelHints = &LabelHints{
|
||||
Limit: tcase.limit,
|
||||
}
|
||||
selectHints = &SelectHints{
|
||||
Limit: tcase.limit,
|
||||
}
|
||||
}
|
||||
|
||||
t.Run(tcase.name, func(t *testing.T) {
|
||||
q := NewMergeQuerier(tcase.primaries, tcase.secondaries, func(s ...Series) Series { return s[0] })
|
||||
|
||||
t.Run("Select", func(t *testing.T) {
|
||||
res := q.Select(context.Background(), false, nil)
|
||||
res := q.Select(context.Background(), false, selectHints)
|
||||
var lbls []labels.Labels
|
||||
for res.Next() {
|
||||
lbls = append(lbls, res.At().Labels())
|
||||
|
@ -1577,7 +1633,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
|||
require.Equal(t, len(tcase.primaries)+len(tcase.secondaries), n)
|
||||
})
|
||||
t.Run("LabelNames", func(t *testing.T) {
|
||||
res, w, err := q.LabelNames(ctx, nil)
|
||||
res, w, err := q.LabelNames(ctx, labelHints)
|
||||
require.Subset(t, tcase.expectedWarnings, w)
|
||||
require.ErrorIs(t, err, tcase.expectedErrs[1], "expected error doesn't match")
|
||||
requireEqualSlice(t, tcase.expectedLabels, res)
|
||||
|
@ -1590,7 +1646,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
|||
})
|
||||
})
|
||||
t.Run("LabelValues", func(t *testing.T) {
|
||||
res, w, err := q.LabelValues(ctx, "test", nil)
|
||||
res, w, err := q.LabelValues(ctx, "test", labelHints)
|
||||
require.Subset(t, tcase.expectedWarnings, w)
|
||||
require.ErrorIs(t, err, tcase.expectedErrs[2], "expected error doesn't match")
|
||||
requireEqualSlice(t, tcase.expectedLabels, res)
|
||||
|
@ -1604,7 +1660,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
|||
})
|
||||
t.Run("LabelValuesWithMatchers", func(t *testing.T) {
|
||||
matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue")
|
||||
res, w, err := q.LabelValues(ctx, "test2", nil, matcher)
|
||||
res, w, err := q.LabelValues(ctx, "test2", labelHints, matcher)
|
||||
require.Subset(t, tcase.expectedWarnings, w)
|
||||
require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match")
|
||||
requireEqualSlice(t, tcase.expectedLabels, res)
|
||||
|
|
|
@ -831,7 +831,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
|
|||
if len(sets) > 1 {
|
||||
// Merge series using specified chunk series merger.
|
||||
// The default one is the compacting series merger.
|
||||
set = storage.NewMergeChunkSeriesSet(sets, mergeFunc)
|
||||
set = storage.NewMergeChunkSeriesSet(sets, 0, mergeFunc)
|
||||
}
|
||||
|
||||
// Iterate over all sorted chunk series.
|
||||
|
|
|
@ -2030,7 +2030,7 @@ func TestPopulateWithDelSeriesIterator_NextWithMinTime(t *testing.T) {
|
|||
// TODO(bwplotka): Merge with storage merged series set benchmark.
|
||||
func BenchmarkMergedSeriesSet(b *testing.B) {
|
||||
sel := func(sets []storage.SeriesSet) storage.SeriesSet {
|
||||
return storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
|
||||
return storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
|
||||
}
|
||||
|
||||
for _, k := range []int{
|
||||
|
|
|
@ -917,7 +917,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
|
|||
s := q.Select(ctx, true, hints, mset...)
|
||||
sets = append(sets, s)
|
||||
}
|
||||
set = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
|
||||
set = storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
|
||||
} else {
|
||||
// At this point at least one match exists.
|
||||
set = q.Select(ctx, false, hints, matcherSets[0]...)
|
||||
|
|
|
@ -100,7 +100,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
|||
sets = append(sets, s)
|
||||
}
|
||||
|
||||
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
|
||||
set := storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
|
||||
it := storage.NewBuffer(int64(h.lookbackDelta / 1e6))
|
||||
var chkIter chunkenc.Iterator
|
||||
Loop:
|
||||
|
|
Loading…
Reference in a new issue