mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #14518 from bboreham/faster-listpostings-merge
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
TSDB: Optimization: Merge postings using concrete type
This commit is contained in:
commit
096e2aa7bd
|
@ -103,20 +103,7 @@ func (h *headIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Ma
|
||||||
|
|
||||||
// Postings returns the postings list iterator for the label pairs.
|
// Postings returns the postings list iterator for the label pairs.
|
||||||
func (h *headIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
|
func (h *headIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
|
||||||
switch len(values) {
|
return h.head.postings.Postings(ctx, name, values...), nil
|
||||||
case 0:
|
|
||||||
return index.EmptyPostings(), nil
|
|
||||||
case 1:
|
|
||||||
return h.head.postings.Get(name, values[0]), nil
|
|
||||||
default:
|
|
||||||
res := make([]index.Postings, 0, len(values))
|
|
||||||
for _, value := range values {
|
|
||||||
if p := h.head.postings.Get(name, value); !index.IsEmptyPostingsType(p) {
|
|
||||||
res = append(res, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return index.Merge(ctx, res...), nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||||
|
|
|
@ -962,12 +962,12 @@ func TestHead_Truncate(t *testing.T) {
|
||||||
require.Nil(t, h.series.getByID(s3.ref))
|
require.Nil(t, h.series.getByID(s3.ref))
|
||||||
require.Nil(t, h.series.getByID(s4.ref))
|
require.Nil(t, h.series.getByID(s4.ref))
|
||||||
|
|
||||||
postingsA1, _ := index.ExpandPostings(h.postings.Get("a", "1"))
|
postingsA1, _ := index.ExpandPostings(h.postings.Postings(ctx, "a", "1"))
|
||||||
postingsA2, _ := index.ExpandPostings(h.postings.Get("a", "2"))
|
postingsA2, _ := index.ExpandPostings(h.postings.Postings(ctx, "a", "2"))
|
||||||
postingsB1, _ := index.ExpandPostings(h.postings.Get("b", "1"))
|
postingsB1, _ := index.ExpandPostings(h.postings.Postings(ctx, "b", "1"))
|
||||||
postingsB2, _ := index.ExpandPostings(h.postings.Get("b", "2"))
|
postingsB2, _ := index.ExpandPostings(h.postings.Postings(ctx, "b", "2"))
|
||||||
postingsC1, _ := index.ExpandPostings(h.postings.Get("c", "1"))
|
postingsC1, _ := index.ExpandPostings(h.postings.Postings(ctx, "c", "1"))
|
||||||
postingsAll, _ := index.ExpandPostings(h.postings.Get("", ""))
|
postingsAll, _ := index.ExpandPostings(h.postings.Postings(ctx, "", ""))
|
||||||
|
|
||||||
require.Equal(t, []storage.SeriesRef{storage.SeriesRef(s1.ref)}, postingsA1)
|
require.Equal(t, []storage.SeriesRef{storage.SeriesRef(s1.ref)}, postingsA1)
|
||||||
require.Equal(t, []storage.SeriesRef{storage.SeriesRef(s2.ref)}, postingsA2)
|
require.Equal(t, []storage.SeriesRef{storage.SeriesRef(s2.ref)}, postingsA2)
|
||||||
|
|
|
@ -235,25 +235,9 @@ func (p *MemPostings) Stats(label string, limit int, labelSizeFunc func(string,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns a postings list for the given label pair.
|
|
||||||
func (p *MemPostings) Get(name, value string) Postings {
|
|
||||||
var lp []storage.SeriesRef
|
|
||||||
p.mtx.RLock()
|
|
||||||
l := p.m[name]
|
|
||||||
if l != nil {
|
|
||||||
lp = l[value]
|
|
||||||
}
|
|
||||||
p.mtx.RUnlock()
|
|
||||||
|
|
||||||
if lp == nil {
|
|
||||||
return EmptyPostings()
|
|
||||||
}
|
|
||||||
return newListPostings(lp...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// All returns a postings list over all documents ever added.
|
// All returns a postings list over all documents ever added.
|
||||||
func (p *MemPostings) All() Postings {
|
func (p *MemPostings) All() Postings {
|
||||||
return p.Get(AllPostingsKey())
|
return p.Postings(context.Background(), allPostingsKey.Name, allPostingsKey.Value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnsureOrder ensures that all postings lists are sorted. After it returns all further
|
// EnsureOrder ensures that all postings lists are sorted. After it returns all further
|
||||||
|
@ -490,7 +474,7 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now `vals` only contains the values that matched, get their postings.
|
// Now `vals` only contains the values that matched, get their postings.
|
||||||
its := make([]Postings, 0, len(vals))
|
its := make([]*ListPostings, 0, len(vals))
|
||||||
lps := make([]ListPostings, len(vals))
|
lps := make([]ListPostings, len(vals))
|
||||||
p.mtx.RLock()
|
p.mtx.RLock()
|
||||||
e := p.m[name]
|
e := p.m[name]
|
||||||
|
@ -510,11 +494,27 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
|
||||||
return Merge(ctx, its...)
|
return Merge(ctx, its...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Postings returns a postings iterator for the given label values.
|
||||||
|
func (p *MemPostings) Postings(ctx context.Context, name string, values ...string) Postings {
|
||||||
|
res := make([]*ListPostings, 0, len(values))
|
||||||
|
lps := make([]ListPostings, len(values))
|
||||||
|
p.mtx.RLock()
|
||||||
|
postingsMapForName := p.m[name]
|
||||||
|
for i, value := range values {
|
||||||
|
if lp := postingsMapForName[value]; lp != nil {
|
||||||
|
lps[i] = ListPostings{list: lp}
|
||||||
|
res = append(res, &lps[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p.mtx.RUnlock()
|
||||||
|
return Merge(ctx, res...)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
|
func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
|
||||||
p.mtx.RLock()
|
p.mtx.RLock()
|
||||||
|
|
||||||
e := p.m[name]
|
e := p.m[name]
|
||||||
its := make([]Postings, 0, len(e))
|
its := make([]*ListPostings, 0, len(e))
|
||||||
lps := make([]ListPostings, len(e))
|
lps := make([]ListPostings, len(e))
|
||||||
i := 0
|
i := 0
|
||||||
for _, refs := range e {
|
for _, refs := range e {
|
||||||
|
@ -660,7 +660,7 @@ func (it *intersectPostings) Err() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge returns a new iterator over the union of the input iterators.
|
// Merge returns a new iterator over the union of the input iterators.
|
||||||
func Merge(_ context.Context, its ...Postings) Postings {
|
func Merge[T Postings](_ context.Context, its ...T) Postings {
|
||||||
if len(its) == 0 {
|
if len(its) == 0 {
|
||||||
return EmptyPostings()
|
return EmptyPostings()
|
||||||
}
|
}
|
||||||
|
@ -675,19 +675,19 @@ func Merge(_ context.Context, its ...Postings) Postings {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
type mergedPostings struct {
|
type mergedPostings[T Postings] struct {
|
||||||
p []Postings
|
p []T
|
||||||
h *loser.Tree[storage.SeriesRef, Postings]
|
h *loser.Tree[storage.SeriesRef, T]
|
||||||
cur storage.SeriesRef
|
cur storage.SeriesRef
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) {
|
func newMergedPostings[T Postings](p []T) (m *mergedPostings[T], nonEmpty bool) {
|
||||||
const maxVal = storage.SeriesRef(math.MaxUint64) // This value must be higher than all real values used in the tree.
|
const maxVal = storage.SeriesRef(math.MaxUint64) // This value must be higher than all real values used in the tree.
|
||||||
lt := loser.New(p, maxVal)
|
lt := loser.New(p, maxVal)
|
||||||
return &mergedPostings{p: p, h: lt}, true
|
return &mergedPostings[T]{p: p, h: lt}, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *mergedPostings) Next() bool {
|
func (it *mergedPostings[T]) Next() bool {
|
||||||
for {
|
for {
|
||||||
if !it.h.Next() {
|
if !it.h.Next() {
|
||||||
return false
|
return false
|
||||||
|
@ -701,7 +701,7 @@ func (it *mergedPostings) Next() bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *mergedPostings) Seek(id storage.SeriesRef) bool {
|
func (it *mergedPostings[T]) Seek(id storage.SeriesRef) bool {
|
||||||
for !it.h.IsEmpty() && it.h.At() < id {
|
for !it.h.IsEmpty() && it.h.At() < id {
|
||||||
finished := !it.h.Winner().Seek(id)
|
finished := !it.h.Winner().Seek(id)
|
||||||
it.h.Fix(finished)
|
it.h.Fix(finished)
|
||||||
|
@ -713,11 +713,11 @@ func (it *mergedPostings) Seek(id storage.SeriesRef) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it mergedPostings) At() storage.SeriesRef {
|
func (it mergedPostings[T]) At() storage.SeriesRef {
|
||||||
return it.cur
|
return it.cur
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it mergedPostings) Err() error {
|
func (it mergedPostings[T]) Err() error {
|
||||||
for _, p := range it.p {
|
for _, p := range it.p {
|
||||||
if err := p.Err(); err != nil {
|
if err := p.Err(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -392,8 +392,8 @@ func BenchmarkMerge(t *testing.B) {
|
||||||
refs = append(refs, temp)
|
refs = append(refs, temp)
|
||||||
}
|
}
|
||||||
|
|
||||||
its := make([]Postings, len(refs))
|
its := make([]*ListPostings, len(refs))
|
||||||
for _, nSeries := range []int{1, 10, 100, 1000, 10000, 100000} {
|
for _, nSeries := range []int{1, 10, 10000, 100000} {
|
||||||
t.Run(strconv.Itoa(nSeries), func(bench *testing.B) {
|
t.Run(strconv.Itoa(nSeries), func(bench *testing.B) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
for i := 0; i < bench.N; i++ {
|
for i := 0; i < bench.N; i++ {
|
||||||
|
@ -979,7 +979,7 @@ func TestMemPostings_Delete(t *testing.T) {
|
||||||
p.Add(2, labels.FromStrings("lbl1", "b"))
|
p.Add(2, labels.FromStrings("lbl1", "b"))
|
||||||
p.Add(3, labels.FromStrings("lbl2", "a"))
|
p.Add(3, labels.FromStrings("lbl2", "a"))
|
||||||
|
|
||||||
before := p.Get(allPostingsKey.Name, allPostingsKey.Value)
|
before := p.Postings(context.Background(), allPostingsKey.Name, allPostingsKey.Value)
|
||||||
deletedRefs := map[storage.SeriesRef]struct{}{
|
deletedRefs := map[storage.SeriesRef]struct{}{
|
||||||
2: {},
|
2: {},
|
||||||
}
|
}
|
||||||
|
@ -987,7 +987,7 @@ func TestMemPostings_Delete(t *testing.T) {
|
||||||
{Name: "lbl1", Value: "b"}: {},
|
{Name: "lbl1", Value: "b"}: {},
|
||||||
}
|
}
|
||||||
p.Delete(deletedRefs, affectedLabels)
|
p.Delete(deletedRefs, affectedLabels)
|
||||||
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
|
after := p.Postings(context.Background(), allPostingsKey.Name, allPostingsKey.Value)
|
||||||
|
|
||||||
// Make sure postings gotten before the delete have the old data when
|
// Make sure postings gotten before the delete have the old data when
|
||||||
// iterated over.
|
// iterated over.
|
||||||
|
@ -1001,7 +1001,7 @@ func TestMemPostings_Delete(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, []storage.SeriesRef{1, 3}, expanded)
|
require.Equal(t, []storage.SeriesRef{1, 3}, expanded)
|
||||||
|
|
||||||
deleted := p.Get("lbl1", "b")
|
deleted := p.Postings(context.Background(), "lbl1", "b")
|
||||||
expanded, err = ExpandPostings(deleted)
|
expanded, err = ExpandPostings(deleted)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Empty(t, expanded, "expected empty postings, got %v", expanded)
|
require.Empty(t, expanded, "expected empty postings, got %v", expanded)
|
||||||
|
@ -1073,7 +1073,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
// Get a random value of this label.
|
// Get a random value of this label.
|
||||||
p.Get(lbl, itoa(rand.Intn(10000))).Next()
|
p.Postings(context.Background(), lbl, itoa(rand.Intn(10000))).Next()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
|
@ -1410,12 +1410,15 @@ func BenchmarkMemPostings_PostingsForLabelMatching(b *testing.B) {
|
||||||
slowRegexp := "^" + slowRegexpString() + "$"
|
slowRegexp := "^" + slowRegexpString() + "$"
|
||||||
b.Logf("Slow regexp length = %d", len(slowRegexp))
|
b.Logf("Slow regexp length = %d", len(slowRegexp))
|
||||||
slow := regexp.MustCompile(slowRegexp)
|
slow := regexp.MustCompile(slowRegexp)
|
||||||
|
const seriesPerLabel = 10
|
||||||
|
|
||||||
for _, labelValueCount := range []int{1_000, 10_000, 100_000} {
|
for _, labelValueCount := range []int{1_000, 10_000, 100_000} {
|
||||||
b.Run(fmt.Sprintf("labels=%d", labelValueCount), func(b *testing.B) {
|
b.Run(fmt.Sprintf("labels=%d", labelValueCount), func(b *testing.B) {
|
||||||
mp := NewMemPostings()
|
mp := NewMemPostings()
|
||||||
for i := 0; i < labelValueCount; i++ {
|
for i := 0; i < labelValueCount; i++ {
|
||||||
mp.Add(storage.SeriesRef(i), labels.FromStrings("label", strconv.Itoa(i)))
|
for j := 0; j < seriesPerLabel; j++ {
|
||||||
|
mp.Add(storage.SeriesRef(i*seriesPerLabel+j), labels.FromStrings("__name__", strconv.Itoa(j), "label", strconv.Itoa(i)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fp, err := ExpandPostings(mp.PostingsForLabelMatching(context.Background(), "label", fast.MatchString))
|
fp, err := ExpandPostings(mp.PostingsForLabelMatching(context.Background(), "label", fast.MatchString))
|
||||||
|
@ -1435,6 +1438,18 @@ func BenchmarkMemPostings_PostingsForLabelMatching(b *testing.B) {
|
||||||
mp.PostingsForLabelMatching(context.Background(), "label", slow.MatchString).Next()
|
mp.PostingsForLabelMatching(context.Background(), "label", slow.MatchString).Next()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
b.Run("matcher=all", func(b *testing.B) {
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
// Match everything.
|
||||||
|
p := mp.PostingsForLabelMatching(context.Background(), "label", func(_ string) bool { return true })
|
||||||
|
var sum storage.SeriesRef
|
||||||
|
// Iterate through all results to exercise merge function.
|
||||||
|
for p.Next() {
|
||||||
|
sum += p.At()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue