mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 21:24:05 -08:00
TSDB: Move merge of head postings into index
This enables it to take advantage of a more compact data structure since all postings are known to be `*ListPostings`. Remove the `Get` member which was not used for anything else, and fix up tests. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
0a8779f46d
commit
cfa32f3d28
|
@ -103,20 +103,7 @@ func (h *headIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Ma
|
|||
|
||||
// Postings returns the postings list iterator for the label pairs.
|
||||
func (h *headIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
|
||||
switch len(values) {
|
||||
case 0:
|
||||
return index.EmptyPostings(), nil
|
||||
case 1:
|
||||
return h.head.postings.Get(name, values[0]), nil
|
||||
default:
|
||||
res := make([]index.Postings, 0, len(values))
|
||||
for _, value := range values {
|
||||
if p := h.head.postings.Get(name, value); !index.IsEmptyPostingsType(p) {
|
||||
res = append(res, p)
|
||||
}
|
||||
}
|
||||
return index.Merge(ctx, res...), nil
|
||||
}
|
||||
return h.head.postings.Postings(ctx, name, values...), nil
|
||||
}
|
||||
|
||||
func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||
|
|
|
@ -962,12 +962,12 @@ func TestHead_Truncate(t *testing.T) {
|
|||
require.Nil(t, h.series.getByID(s3.ref))
|
||||
require.Nil(t, h.series.getByID(s4.ref))
|
||||
|
||||
postingsA1, _ := index.ExpandPostings(h.postings.Get("a", "1"))
|
||||
postingsA2, _ := index.ExpandPostings(h.postings.Get("a", "2"))
|
||||
postingsB1, _ := index.ExpandPostings(h.postings.Get("b", "1"))
|
||||
postingsB2, _ := index.ExpandPostings(h.postings.Get("b", "2"))
|
||||
postingsC1, _ := index.ExpandPostings(h.postings.Get("c", "1"))
|
||||
postingsAll, _ := index.ExpandPostings(h.postings.Get("", ""))
|
||||
postingsA1, _ := index.ExpandPostings(h.postings.Postings(ctx, "a", "1"))
|
||||
postingsA2, _ := index.ExpandPostings(h.postings.Postings(ctx, "a", "2"))
|
||||
postingsB1, _ := index.ExpandPostings(h.postings.Postings(ctx, "b", "1"))
|
||||
postingsB2, _ := index.ExpandPostings(h.postings.Postings(ctx, "b", "2"))
|
||||
postingsC1, _ := index.ExpandPostings(h.postings.Postings(ctx, "c", "1"))
|
||||
postingsAll, _ := index.ExpandPostings(h.postings.Postings(ctx, "", ""))
|
||||
|
||||
require.Equal(t, []storage.SeriesRef{storage.SeriesRef(s1.ref)}, postingsA1)
|
||||
require.Equal(t, []storage.SeriesRef{storage.SeriesRef(s2.ref)}, postingsA2)
|
||||
|
|
|
@ -235,25 +235,9 @@ func (p *MemPostings) Stats(label string, limit int, labelSizeFunc func(string,
|
|||
}
|
||||
}
|
||||
|
||||
// Get returns a postings list for the given label pair.
|
||||
func (p *MemPostings) Get(name, value string) Postings {
|
||||
var lp []storage.SeriesRef
|
||||
p.mtx.RLock()
|
||||
l := p.m[name]
|
||||
if l != nil {
|
||||
lp = l[value]
|
||||
}
|
||||
p.mtx.RUnlock()
|
||||
|
||||
if lp == nil {
|
||||
return EmptyPostings()
|
||||
}
|
||||
return newListPostings(lp...)
|
||||
}
|
||||
|
||||
// All returns a postings list over all documents ever added.
|
||||
func (p *MemPostings) All() Postings {
|
||||
return p.Get(AllPostingsKey())
|
||||
return p.Postings(context.Background(), allPostingsKey.Name, allPostingsKey.Value)
|
||||
}
|
||||
|
||||
// EnsureOrder ensures that all postings lists are sorted. After it returns all further
|
||||
|
@ -490,7 +474,7 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
|
|||
}
|
||||
|
||||
// Now `vals` only contains the values that matched, get their postings.
|
||||
its := make([]Postings, 0, len(vals))
|
||||
its := make([]*ListPostings, 0, len(vals))
|
||||
lps := make([]ListPostings, len(vals))
|
||||
p.mtx.RLock()
|
||||
e := p.m[name]
|
||||
|
@ -510,11 +494,27 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
|
|||
return Merge(ctx, its...)
|
||||
}
|
||||
|
||||
// Postings returns a postings iterator for the given label values.
|
||||
func (p *MemPostings) Postings(ctx context.Context, name string, values ...string) Postings {
|
||||
res := make([]*ListPostings, 0, len(values))
|
||||
lps := make([]ListPostings, len(values))
|
||||
p.mtx.RLock()
|
||||
postingsMapForName := p.m[name]
|
||||
for i, value := range values {
|
||||
if lp := postingsMapForName[value]; lp != nil {
|
||||
lps[i] = ListPostings{list: lp}
|
||||
res = append(res, &lps[i])
|
||||
}
|
||||
}
|
||||
p.mtx.RUnlock()
|
||||
return Merge(ctx, res...)
|
||||
}
|
||||
|
||||
func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
|
||||
p.mtx.RLock()
|
||||
|
||||
e := p.m[name]
|
||||
its := make([]Postings, 0, len(e))
|
||||
its := make([]*ListPostings, 0, len(e))
|
||||
lps := make([]ListPostings, len(e))
|
||||
i := 0
|
||||
for _, refs := range e {
|
||||
|
|
|
@ -979,7 +979,7 @@ func TestMemPostings_Delete(t *testing.T) {
|
|||
p.Add(2, labels.FromStrings("lbl1", "b"))
|
||||
p.Add(3, labels.FromStrings("lbl2", "a"))
|
||||
|
||||
before := p.Get(allPostingsKey.Name, allPostingsKey.Value)
|
||||
before := p.Postings(context.Background(), allPostingsKey.Name, allPostingsKey.Value)
|
||||
deletedRefs := map[storage.SeriesRef]struct{}{
|
||||
2: {},
|
||||
}
|
||||
|
@ -987,7 +987,7 @@ func TestMemPostings_Delete(t *testing.T) {
|
|||
{Name: "lbl1", Value: "b"}: {},
|
||||
}
|
||||
p.Delete(deletedRefs, affectedLabels)
|
||||
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
|
||||
after := p.Postings(context.Background(), allPostingsKey.Name, allPostingsKey.Value)
|
||||
|
||||
// Make sure postings gotten before the delete have the old data when
|
||||
// iterated over.
|
||||
|
@ -1001,7 +1001,7 @@ func TestMemPostings_Delete(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, []storage.SeriesRef{1, 3}, expanded)
|
||||
|
||||
deleted := p.Get("lbl1", "b")
|
||||
deleted := p.Postings(context.Background(), "lbl1", "b")
|
||||
expanded, err = ExpandPostings(deleted)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, expanded, "expected empty postings, got %v", expanded)
|
||||
|
@ -1073,7 +1073,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
|
|||
return
|
||||
default:
|
||||
// Get a random value of this label.
|
||||
p.Get(lbl, itoa(rand.Intn(10000))).Next()
|
||||
p.Postings(context.Background(), lbl, itoa(rand.Intn(10000))).Next()
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
|
|
Loading…
Reference in a new issue