mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
Merge pull request #150 from prometheus/postingssort
Ensure postings are always sorted
This commit is contained in:
commit
5fa1c993b9
|
@ -457,7 +457,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
|
|
||||||
indexr := b.Index()
|
indexr := b.Index()
|
||||||
|
|
||||||
all, err := indexr.Postings("", "")
|
all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
6
head.go
6
head.go
|
@ -207,7 +207,7 @@ func (h *Head) ReadWAL() error {
|
||||||
}
|
}
|
||||||
ms := h.series.getByID(s.Ref)
|
ms := h.series.getByID(s.Ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref)
|
h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref, "ts", s.T, "mint", mint)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, chunkCreated := ms.append(s.T, s.V)
|
_, chunkCreated := ms.append(s.T, s.V)
|
||||||
|
@ -267,7 +267,7 @@ func (h *Head) Truncate(mint int64) error {
|
||||||
|
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
|
|
||||||
p, err := h.indexRange(mint, math.MaxInt64).Postings("", "")
|
p, err := h.indexRange(mint, math.MaxInt64).Postings(allPostingsKey.Name, allPostingsKey.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1038,8 +1038,6 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo
|
||||||
return prev, false
|
return prev, false
|
||||||
}
|
}
|
||||||
s.hashes[i].set(hash, series)
|
s.hashes[i].set(hash, series)
|
||||||
|
|
||||||
s.hashes[i][hash] = append(s.hashes[i][hash], series)
|
|
||||||
s.locks[i].Unlock()
|
s.locks[i].Unlock()
|
||||||
|
|
||||||
i = series.ref & stripeMask
|
i = series.ref & stripeMask
|
||||||
|
|
29
postings.go
29
postings.go
|
@ -45,7 +45,7 @@ func (p *memPostings) get(name, value string) Postings {
|
||||||
return newListPostings(l)
|
return newListPostings(l)
|
||||||
}
|
}
|
||||||
|
|
||||||
var allLabel = labels.Label{}
|
var allPostingsKey = labels.Label{}
|
||||||
|
|
||||||
// add adds a document to the index. The caller has to ensure that no
|
// add adds a document to the index. The caller has to ensure that no
|
||||||
// term argument appears twice.
|
// term argument appears twice.
|
||||||
|
@ -53,13 +53,36 @@ func (p *memPostings) add(id uint64, lset labels.Labels) {
|
||||||
p.mtx.Lock()
|
p.mtx.Lock()
|
||||||
|
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
p.m[l] = append(p.m[l], id)
|
p.addFor(id, l)
|
||||||
}
|
}
|
||||||
p.m[allLabel] = append(p.m[allLabel], id)
|
p.addFor(id, allPostingsKey)
|
||||||
|
|
||||||
p.mtx.Unlock()
|
p.mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *memPostings) addFor(id uint64, l labels.Label) {
|
||||||
|
list := append(p.m[l], id)
|
||||||
|
p.m[l] = list
|
||||||
|
|
||||||
|
// There is no guarantee that no higher ID was inserted before as they may
|
||||||
|
// be generated independently before adding them to postings.
|
||||||
|
// We repair order violations on insert. The invariant is that the first n-1
|
||||||
|
// items in the list are already sorted.
|
||||||
|
for i := len(list) - 1; i >= 1; i-- {
|
||||||
|
if list[i] >= list[i-1] {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
list[i], list[i-1] = list[i-1], list[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func expandPostings(p Postings) (res []uint64, err error) {
|
||||||
|
for p.Next() {
|
||||||
|
res = append(res, p.At())
|
||||||
|
}
|
||||||
|
return res, p.Err()
|
||||||
|
}
|
||||||
|
|
||||||
// Postings provides iterative access over a postings list.
|
// Postings provides iterative access over a postings list.
|
||||||
type Postings interface {
|
type Postings interface {
|
||||||
// Next advances the iterator and returns true if another value was found.
|
// Next advances the iterator and returns true if another value was found.
|
||||||
|
|
|
@ -21,6 +21,15 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestMemPostings_addFor(t *testing.T) {
|
||||||
|
p := newMemPostings()
|
||||||
|
p.m[allPostingsKey] = []uint64{1, 2, 3, 4, 6, 7, 8}
|
||||||
|
|
||||||
|
p.addFor(5, allPostingsKey)
|
||||||
|
|
||||||
|
require.Equal(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey])
|
||||||
|
}
|
||||||
|
|
||||||
type mockPostings struct {
|
type mockPostings struct {
|
||||||
next func() bool
|
next func() bool
|
||||||
seek func(uint64) bool
|
seek func(uint64) bool
|
||||||
|
@ -33,13 +42,6 @@ func (m *mockPostings) Seek(v uint64) bool { return m.seek(v) }
|
||||||
func (m *mockPostings) Value() uint64 { return m.value() }
|
func (m *mockPostings) Value() uint64 { return m.value() }
|
||||||
func (m *mockPostings) Err() error { return m.err() }
|
func (m *mockPostings) Err() error { return m.err() }
|
||||||
|
|
||||||
func expandPostings(p Postings) (res []uint64, err error) {
|
|
||||||
for p.Next() {
|
|
||||||
res = append(res, p.At())
|
|
||||||
}
|
|
||||||
return res, p.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntersect(t *testing.T) {
|
func TestIntersect(t *testing.T) {
|
||||||
var cases = []struct {
|
var cases = []struct {
|
||||||
a, b []uint64
|
a, b []uint64
|
||||||
|
|
Loading…
Reference in a new issue