mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-23 11:41:54 -08:00
Reduce memory used by postings offset table.
Rather than keeping the offset of each postings list, instead keep the nth offset of the offset of the posting list. As postings list offsets have always been sorted, we can then get to the closest entry before the one we want an iterate forwards. I haven't done much tuning on the 32 number, it was chosen to try not to read through more than a 4k page of data. Switch to a bulk interface for fetching postings. Use it to avoid having to re-read parts of the posting offset table when querying lots of it. For a index with what BenchmarkHeadPostingForMatchers uses RAM for r.postings drops from 3.79MB to 80.19kB or about 48x. Bytes allocated go down by 30%, and suprisingly CPU usage drops by 4-6% for typical queries too. benchmark old ns/op new ns/op delta BenchmarkPostingsForMatchers/Block/n="1"-4 35231 36673 +4.09% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 563380 540627 -4.04% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 536782 534186 -0.48% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 533990 541550 +1.42% BenchmarkPostingsForMatchers/Block/i=~".*"-4 113374598 117969608 +4.05% BenchmarkPostingsForMatchers/Block/i=~".+"-4 146329884 139651442 -4.56% BenchmarkPostingsForMatchers/Block/i=~""-4 50346510 44961127 -10.70% BenchmarkPostingsForMatchers/Block/i!=""-4 41261550 35356165 -14.31% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 112544418 116904010 +3.87% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 112487086 116864918 +3.89% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 41094758 35457904 -13.72% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 41906372 36151473 -13.73% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 147262414 140424800 -4.64% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 28615629 27872072 -2.60% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 147117177 140462403 -4.52% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 175096826 167902298 -4.11% benchmark old allocs new allocs delta BenchmarkPostingsForMatchers/Block/n="1"-4 4 6 +50.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 7 11 +57.14% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 7 11 +57.14% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 15 17 +13.33% BenchmarkPostingsForMatchers/Block/i=~".*"-4 100010 100012 +0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 200069 200040 -0.01% BenchmarkPostingsForMatchers/Block/i=~""-4 200072 200045 -0.01% BenchmarkPostingsForMatchers/Block/i!=""-4 200070 200041 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 100013 100017 +0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 100017 100023 +0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 200073 200046 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 200075 200050 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 200074 200049 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 111165 111150 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 200078 200055 -0.01% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 311282 311238 -0.01% benchmark old bytes new bytes delta BenchmarkPostingsForMatchers/Block/n="1"-4 264 296 +12.12% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 360 424 +17.78% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 360 424 +17.78% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 520 552 +6.15% BenchmarkPostingsForMatchers/Block/i=~".*"-4 1600461 1600482 +0.00% BenchmarkPostingsForMatchers/Block/i=~".+"-4 24900801 17259077 -30.69% BenchmarkPostingsForMatchers/Block/i=~""-4 24900836 17259151 -30.69% BenchmarkPostingsForMatchers/Block/i!=""-4 24900760 17259048 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 1600557 1600621 +0.00% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 1600717 1600813 +0.01% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 24900856 17259176 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 24900952 17259304 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 24900993 17259333 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 3788311 3142630 -17.04% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 24901137 17259509 -30.69% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 28693086 20405680 -28.88% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
aff9f7a9e8
commit
48d25e6fe7
|
@ -69,14 +69,14 @@ type IndexReader interface {
|
|||
// and indices.
|
||||
Symbols() (map[string]struct{}, error)
|
||||
|
||||
// LabelValues returns the possible label values.
|
||||
// LabelValues returns sorted possible label values.
|
||||
LabelValues(names ...string) (index.StringTuples, error)
|
||||
|
||||
// Postings returns the postings list iterator for the label pair.
|
||||
// Postings returns the postings list iterator for the label pairs.
|
||||
// The Postings here contain the offsets to the series inside the index.
|
||||
// Found IDs are not strictly required to point to a valid Series, e.g. during
|
||||
// background garbage collections.
|
||||
Postings(name, value string) (index.Postings, error)
|
||||
// Found IDs are not strictly required to point to a valid Series, e.g.
|
||||
// during background garbage collections. Input values must be sorted.
|
||||
Postings(name string, values ...string) (index.Postings, error)
|
||||
|
||||
// SortedPostings returns a postings list that is reordered to be sorted
|
||||
// by the label set of the underlying series.
|
||||
|
@ -450,8 +450,8 @@ func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, erro
|
|||
return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
|
||||
}
|
||||
|
||||
func (r blockIndexReader) Postings(name, value string) (index.Postings, error) {
|
||||
p, err := r.ir.Postings(name, value)
|
||||
func (r blockIndexReader) Postings(name string, values ...string) (index.Postings, error) {
|
||||
p, err := r.ir.Postings(name, values...)
|
||||
if err != nil {
|
||||
return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
|
||||
}
|
||||
|
|
|
@ -708,7 +708,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||
allSymbols[s] = struct{}{}
|
||||
}
|
||||
|
||||
all, err := indexr.Postings(index.AllPostingsKey())
|
||||
k, v := index.AllPostingsKey()
|
||||
all, err := indexr.Postings(k, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1421,7 +1421,8 @@ func TestChunkAtBlockBoundary(t *testing.T) {
|
|||
|
||||
meta := block.Meta()
|
||||
|
||||
p, err := r.Postings(index.AllPostingsKey())
|
||||
k, v := index.AllPostingsKey()
|
||||
p, err := r.Postings(k, v)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
var (
|
||||
|
|
|
@ -203,9 +203,9 @@ They are used to track label index sections. They are read into memory when an i
|
|||
|
||||
### Postings Offset Table
|
||||
|
||||
A postings offset table stores a sequence of postings offset entries.
|
||||
A postings offset table stores a sequence of postings offset entries, sorted by label name and value.
|
||||
Every postings offset entry holds the label name/value pair and the offset to its series list in the postings section.
|
||||
They are used to track postings sections. They are read into memory when an index file is loaded.
|
||||
They are used to track postings sections. They are partially read into memory when an index file is loaded.
|
||||
|
||||
```
|
||||
┌─────────────────────┬──────────────────────┐
|
||||
|
|
|
@ -119,8 +119,11 @@ func NewDecbufAt(bs ByteSlice, off int, castagnoliTable *crc32.Table) Decbuf {
|
|||
b = bs.Range(off+4, off+4+l+4)
|
||||
dec := Decbuf{B: b[:len(b)-4]}
|
||||
|
||||
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.Crc32(castagnoliTable) != exp {
|
||||
return Decbuf{E: ErrInvalidChecksum}
|
||||
if castagnoliTable != nil {
|
||||
|
||||
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.Crc32(castagnoliTable) != exp {
|
||||
return Decbuf{E: ErrInvalidChecksum}
|
||||
}
|
||||
}
|
||||
return dec
|
||||
}
|
||||
|
@ -164,16 +167,30 @@ func (d *Decbuf) Crc32(castagnoliTable *crc32.Table) uint32 {
|
|||
return crc32.Checksum(d.B, castagnoliTable)
|
||||
}
|
||||
|
||||
func (d *Decbuf) Skip(l int) {
|
||||
if len(d.B) < l {
|
||||
d.E = ErrInvalidSize
|
||||
return
|
||||
}
|
||||
d.B = d.B[l:]
|
||||
}
|
||||
|
||||
func (d *Decbuf) UvarintStr() string {
|
||||
return string(d.UvarintBytes())
|
||||
}
|
||||
|
||||
// The return value becomes invalid if the byte slice goes away.
|
||||
// Compared to UvarintStr, this avoid allocations.
|
||||
func (d *Decbuf) UvarintBytes() []byte {
|
||||
l := d.Uvarint64()
|
||||
if d.E != nil {
|
||||
return ""
|
||||
return []byte{}
|
||||
}
|
||||
if len(d.B) < int(l) {
|
||||
d.E = ErrInvalidSize
|
||||
return ""
|
||||
return []byte{}
|
||||
}
|
||||
s := string(d.B[:l])
|
||||
s := d.B[:l]
|
||||
d.B = d.B[l:]
|
||||
return s
|
||||
}
|
||||
|
|
10
tsdb/head.go
10
tsdb/head.go
|
@ -1383,9 +1383,13 @@ func (h *headIndexReader) LabelNames() ([]string, error) {
|
|||
return labelNames, nil
|
||||
}
|
||||
|
||||
// Postings returns the postings list iterator for the label pair.
|
||||
func (h *headIndexReader) Postings(name, value string) (index.Postings, error) {
|
||||
return h.head.postings.Get(name, value), nil
|
||||
// Postings returns the postings list iterator for the label pairs.
|
||||
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
|
||||
res := make([]index.Postings, 0, len(values))
|
||||
for _, value := range values {
|
||||
res = append(res, h.head.postings.Get(name, value))
|
||||
}
|
||||
return index.Merge(res...), nil
|
||||
}
|
||||
|
||||
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||
|
|
|
@ -700,15 +700,17 @@ type StringTuples interface {
|
|||
}
|
||||
|
||||
type Reader struct {
|
||||
b ByteSlice
|
||||
b ByteSlice
|
||||
toc *TOC
|
||||
|
||||
// Close that releases the underlying resources of the byte slice.
|
||||
c io.Closer
|
||||
|
||||
// Cached hashmaps of section offsets.
|
||||
labels map[string]uint64
|
||||
// LabelName to LabelValue to offset map.
|
||||
postings map[string]map[string]uint64
|
||||
// Map of LabelName to a list of some LabelValues's position in the offset table.
|
||||
// The first and last values for each name are always present.
|
||||
postings map[string][]postingOffset
|
||||
// Cache of read symbols. Strings that are returned when reading from the
|
||||
// block are always backed by true strings held in here rather than
|
||||
// strings that are backed by byte slices from the mmap'd index file. This
|
||||
|
@ -724,6 +726,11 @@ type Reader struct {
|
|||
version int
|
||||
}
|
||||
|
||||
type postingOffset struct {
|
||||
value string
|
||||
off int
|
||||
}
|
||||
|
||||
// ByteSlice abstracts a byte slice.
|
||||
type ByteSlice interface {
|
||||
Len() int
|
||||
|
@ -772,7 +779,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
|||
b: b,
|
||||
c: c,
|
||||
labels: map[string]uint64{},
|
||||
postings: map[string]map[string]uint64{},
|
||||
postings: map[string][]postingOffset{},
|
||||
}
|
||||
|
||||
// Verify header.
|
||||
|
@ -788,12 +795,13 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
|||
return nil, errors.Errorf("unknown index file version %d", r.version)
|
||||
}
|
||||
|
||||
toc, err := NewTOCFromByteSlice(b)
|
||||
var err error
|
||||
r.toc, err = NewTOCFromByteSlice(b)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "read TOC")
|
||||
}
|
||||
|
||||
r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(toc.Symbols))
|
||||
r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(r.toc.Symbols))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "read symbols")
|
||||
}
|
||||
|
@ -811,7 +819,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
|||
allocatedSymbols[s] = s
|
||||
}
|
||||
|
||||
if err := ReadOffsetTable(r.b, toc.LabelIndicesTable, func(key []string, off uint64) error {
|
||||
if err := ReadOffsetTable(r.b, r.toc.LabelIndicesTable, func(key []string, off uint64, _ int) error {
|
||||
if len(key) != 1 {
|
||||
return errors.Errorf("unexpected key length for label indices table %d", len(key))
|
||||
}
|
||||
|
@ -822,19 +830,46 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
|||
return nil, errors.Wrap(err, "read label index table")
|
||||
}
|
||||
|
||||
r.postings[""] = map[string]uint64{}
|
||||
if err := ReadOffsetTable(r.b, toc.PostingsTable, func(key []string, off uint64) error {
|
||||
var lastKey []string
|
||||
lastOff := 0
|
||||
valueCount := 0
|
||||
// For the postings offset table we keep every label name but only every nth
|
||||
// label value (plus the first and last one), to save memory.
|
||||
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error {
|
||||
if len(key) != 2 {
|
||||
return errors.Errorf("unexpected key length for posting table %d", len(key))
|
||||
}
|
||||
if _, ok := r.postings[key[0]]; !ok {
|
||||
r.postings[allocatedSymbols[key[0]]] = map[string]uint64{}
|
||||
// Next label name.
|
||||
r.postings[allocatedSymbols[key[0]]] = []postingOffset{}
|
||||
if lastKey != nil {
|
||||
// Always include last value for each label name.
|
||||
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: allocatedSymbols[lastKey[1]], off: lastOff})
|
||||
}
|
||||
lastKey = nil
|
||||
valueCount = 0
|
||||
}
|
||||
r.postings[key[0]][allocatedSymbols[key[1]]] = off
|
||||
if valueCount%32 == 0 {
|
||||
r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: allocatedSymbols[key[1]], off: off})
|
||||
lastKey = nil
|
||||
} else {
|
||||
lastKey = key
|
||||
lastOff = off
|
||||
}
|
||||
valueCount++
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "read postings table")
|
||||
}
|
||||
if lastKey != nil {
|
||||
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: allocatedSymbols[lastKey[1]], off: lastOff})
|
||||
}
|
||||
// Trim any extra space in the slices.
|
||||
for k, v := range r.postings {
|
||||
l := make([]postingOffset, len(v))
|
||||
copy(l, v)
|
||||
r.postings[k] = l
|
||||
}
|
||||
|
||||
r.dec = &Decoder{LookupSymbol: r.lookupSymbol}
|
||||
|
||||
|
@ -855,18 +890,21 @@ type Range struct {
|
|||
// for all postings lists.
|
||||
func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
|
||||
m := map[labels.Label]Range{}
|
||||
|
||||
for k, e := range r.postings {
|
||||
for v, start := range e {
|
||||
d := encoding.NewDecbufAt(r.b, int(start), castagnoliTable)
|
||||
if d.Err() != nil {
|
||||
return nil, d.Err()
|
||||
}
|
||||
m[labels.Label{Name: k, Value: v}] = Range{
|
||||
Start: int64(start) + 4,
|
||||
End: int64(start) + 4 + int64(d.Len()),
|
||||
}
|
||||
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error {
|
||||
if len(key) != 2 {
|
||||
return errors.Errorf("unexpected key length for posting table %d", len(key))
|
||||
}
|
||||
d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable)
|
||||
if d.Err() != nil {
|
||||
return d.Err()
|
||||
}
|
||||
m[labels.Label{Name: key[0], Value: key[1]}] = Range{
|
||||
Start: int64(off) + 4,
|
||||
End: int64(off) + 4 + int64(d.Len()),
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, errors.Wrap(err, "read postings table")
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
@ -908,17 +946,18 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin
|
|||
|
||||
// ReadOffsetTable reads an offset table and at the given position calls f for each
|
||||
// found entry. If f returns an error it stops decoding and returns the received error.
|
||||
func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error {
|
||||
func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64, int) error) error {
|
||||
d := encoding.NewDecbufAt(bs, int(off), castagnoliTable)
|
||||
startLen := d.Len()
|
||||
cnt := d.Be32()
|
||||
|
||||
// The Postings offset table takes only 2 keys per entry (name and value of label),
|
||||
// and the LabelIndices offset table takes only 1 key per entry (a label name).
|
||||
// Hence setting the size to max of both, i.e. 2.
|
||||
keys := make([]string, 0, 2)
|
||||
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
|
||||
offsetPos := startLen - d.Len()
|
||||
keyCount := d.Uvarint()
|
||||
keys = keys[:0]
|
||||
// The Postings offset table takes only 2 keys per entry (name and value of label),
|
||||
// and the LabelIndices offset table takes only 1 key per entry (a label name).
|
||||
// Hence setting the size to max of both, i.e. 2.
|
||||
keys := make([]string, 0, 2)
|
||||
|
||||
for i := 0; i < keyCount; i++ {
|
||||
keys = append(keys, d.UvarintStr())
|
||||
|
@ -927,7 +966,7 @@ func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) e
|
|||
if d.Err() != nil {
|
||||
break
|
||||
}
|
||||
if err := f(keys, o); err != nil {
|
||||
if err := f(keys, o, offsetPos); err != nil {
|
||||
return err
|
||||
}
|
||||
cnt--
|
||||
|
@ -1027,25 +1066,82 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
|
|||
return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series")
|
||||
}
|
||||
|
||||
// Postings returns a postings list for the given label pair.
|
||||
func (r *Reader) Postings(name, value string) (Postings, error) {
|
||||
func (r *Reader) Postings(name string, values ...string) (Postings, error) {
|
||||
e, ok := r.postings[name]
|
||||
if !ok {
|
||||
return EmptyPostings(), nil
|
||||
}
|
||||
off, ok := e[value]
|
||||
if !ok {
|
||||
|
||||
if len(values) == 0 {
|
||||
return EmptyPostings(), nil
|
||||
}
|
||||
d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable)
|
||||
if d.Err() != nil {
|
||||
return nil, errors.Wrap(d.Err(), "get postings entry")
|
||||
|
||||
res := make([]Postings, 0, len(values))
|
||||
skip := 0
|
||||
valueIndex := 0
|
||||
for valueIndex < len(values) && values[valueIndex] < e[0].value {
|
||||
// Discard values before the start.
|
||||
valueIndex++
|
||||
}
|
||||
_, p, err := r.dec.Postings(d.Get())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "decode postings")
|
||||
for valueIndex < len(values) {
|
||||
value := values[valueIndex]
|
||||
|
||||
i := sort.Search(len(e), func(i int) bool { return e[i].value >= value })
|
||||
if i == len(e) {
|
||||
// We're past the end.
|
||||
break
|
||||
}
|
||||
if i > 0 && e[i].value != value {
|
||||
// Need to look from previous entry.
|
||||
i--
|
||||
}
|
||||
// Don't Crc32 the entire postings offset table, this is very slow
|
||||
// so hope any issues were caught at startup.
|
||||
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
|
||||
d.Skip(e[i].off)
|
||||
|
||||
// Iterate on the offset table.
|
||||
var postingsOff uint64 // The offset into the postings table.
|
||||
for d.Err() == nil {
|
||||
if skip == 0 {
|
||||
// These are always the same number of bytes,
|
||||
// and it's faster to skip than parse.
|
||||
skip = d.Len()
|
||||
d.Uvarint() // Keycount.
|
||||
d.UvarintBytes() // Label name.
|
||||
skip -= d.Len()
|
||||
} else {
|
||||
d.Skip(skip)
|
||||
}
|
||||
v := d.UvarintBytes() // Label value.
|
||||
postingsOff = d.Uvarint64() // Offset.
|
||||
for string(v) >= value {
|
||||
if string(v) == value {
|
||||
// Read from the postings table.
|
||||
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
|
||||
_, p, err := r.dec.Postings(d2.Get())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "decode postings")
|
||||
}
|
||||
res = append(res, p)
|
||||
}
|
||||
valueIndex++
|
||||
if valueIndex == len(values) {
|
||||
break
|
||||
}
|
||||
value = values[valueIndex]
|
||||
}
|
||||
if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) {
|
||||
// Need to go to a later postings offset entry, if there is one.
|
||||
break
|
||||
}
|
||||
}
|
||||
if d.Err() != nil {
|
||||
return nil, errors.Wrap(d.Err(), "get postings offset entry")
|
||||
}
|
||||
}
|
||||
return p, nil
|
||||
|
||||
return Merge(res...), nil
|
||||
}
|
||||
|
||||
// SortedPostings returns the given postings list reordered so that the backing series
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package index
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
@ -111,9 +112,13 @@ func (m mockIndex) LabelValues(names ...string) (StringTuples, error) {
|
|||
return NewStringTuples(m.labelIndex[names[0]], 1)
|
||||
}
|
||||
|
||||
func (m mockIndex) Postings(name, value string) (Postings, error) {
|
||||
l := labels.Label{Name: name, Value: value}
|
||||
return NewListPostings(m.postings[l]), nil
|
||||
func (m mockIndex) Postings(name string, values ...string) (Postings, error) {
|
||||
p := []Postings{}
|
||||
for _, value := range values {
|
||||
l := labels.Label{Name: name, Value: value}
|
||||
p = append(p, NewListPostings(m.postings[l]))
|
||||
}
|
||||
return Merge(p...), nil
|
||||
}
|
||||
|
||||
func (m mockIndex) SortedPostings(p Postings) Postings {
|
||||
|
@ -238,6 +243,96 @@ func TestIndexRW_Postings(t *testing.T) {
|
|||
testutil.Ok(t, ir.Close())
|
||||
}
|
||||
|
||||
func TestPostingsMany(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "test_postings_many")
|
||||
testutil.Ok(t, err)
|
||||
defer func() {
|
||||
testutil.Ok(t, os.RemoveAll(dir))
|
||||
}()
|
||||
|
||||
fn := filepath.Join(dir, indexFilename)
|
||||
|
||||
iw, err := NewWriter(fn)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
// Create a label in the index which has 999 values.
|
||||
symbols := map[string]struct{}{}
|
||||
series := []labels.Labels{}
|
||||
for i := 1; i < 1000; i++ {
|
||||
v := fmt.Sprintf("%03d", i)
|
||||
series = append(series, labels.FromStrings("i", v, "foo", "bar"))
|
||||
symbols[v] = struct{}{}
|
||||
}
|
||||
symbols["i"] = struct{}{}
|
||||
symbols["foo"] = struct{}{}
|
||||
symbols["bar"] = struct{}{}
|
||||
testutil.Ok(t, iw.AddSymbols(symbols))
|
||||
|
||||
for i, s := range series {
|
||||
testutil.Ok(t, iw.AddSeries(uint64(i), s))
|
||||
}
|
||||
for i, s := range series {
|
||||
testutil.Ok(t, iw.WritePostings("i", s.Get("i"), newListPostings(uint64(i))))
|
||||
}
|
||||
testutil.Ok(t, iw.Close())
|
||||
|
||||
ir, err := NewFileReader(fn)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
cases := []struct {
|
||||
in []string
|
||||
}{
|
||||
// Simple cases, everything is present.
|
||||
{in: []string{"002"}},
|
||||
{in: []string{"031", "032", "033"}},
|
||||
{in: []string{"032", "033"}},
|
||||
{in: []string{"127", "128"}},
|
||||
{in: []string{"127", "128", "129"}},
|
||||
{in: []string{"127", "129"}},
|
||||
{in: []string{"128", "129"}},
|
||||
{in: []string{"998", "999"}},
|
||||
{in: []string{"999"}},
|
||||
// Before actual values.
|
||||
{in: []string{"000"}},
|
||||
{in: []string{"000", "001"}},
|
||||
{in: []string{"000", "002"}},
|
||||
// After actual values.
|
||||
{in: []string{"999a"}},
|
||||
{in: []string{"999", "999a"}},
|
||||
{in: []string{"998", "999", "999a"}},
|
||||
// In the middle of actual values.
|
||||
{in: []string{"126a", "127", "128"}},
|
||||
{in: []string{"127", "127a", "128"}},
|
||||
{in: []string{"127", "127a", "128", "128a", "129"}},
|
||||
{in: []string{"127", "128a", "129"}},
|
||||
{in: []string{"128", "128a", "129"}},
|
||||
{in: []string{"128", "129", "129a"}},
|
||||
{in: []string{"126a", "126b", "127", "127a", "127b", "128", "128a", "128b", "129", "129a", "129b"}},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
it, err := ir.Postings("i", c.in...)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
got := []string{}
|
||||
var lbls labels.Labels
|
||||
var metas []chunks.Meta
|
||||
for it.Next() {
|
||||
testutil.Ok(t, ir.Series(it.At(), &lbls, &metas))
|
||||
got = append(got, lbls.Get("i"))
|
||||
}
|
||||
testutil.Ok(t, it.Err())
|
||||
exp := []string{}
|
||||
for _, e := range c.in {
|
||||
if _, ok := symbols[e]; ok && e != "l" {
|
||||
exp = append(exp, e)
|
||||
}
|
||||
}
|
||||
testutil.Equals(t, exp, got, fmt.Sprintf("input: %v", c.in))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPersistence_index_e2e(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "test_persistence_e2e")
|
||||
testutil.Ok(t, err)
|
||||
|
@ -327,12 +422,10 @@ func TestPersistence_index_e2e(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, mi.WritePostings("", "", newListPostings(all...)))
|
||||
|
||||
for n, e := range postings.m {
|
||||
for v := range e {
|
||||
err = iw.WritePostings(n, v, postings.Get(n, v))
|
||||
testutil.Ok(t, err)
|
||||
mi.WritePostings(n, v, postings.Get(n, v))
|
||||
}
|
||||
for _, l := range postings.SortedKeys() {
|
||||
err := iw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value))
|
||||
testutil.Ok(t, err)
|
||||
mi.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value))
|
||||
}
|
||||
|
||||
err = iw.Close()
|
||||
|
@ -364,7 +457,7 @@ func TestPersistence_index_e2e(t *testing.T) {
|
|||
testutil.Equals(t, explset, lset)
|
||||
testutil.Equals(t, expchks, chks)
|
||||
}
|
||||
testutil.Assert(t, expp.Next() == false, "")
|
||||
testutil.Assert(t, expp.Next() == false, "Unexpected Next() for "+p.Name+" "+p.Value)
|
||||
testutil.Ok(t, gotp.Err())
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
package index
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
|
@ -36,7 +35,6 @@ func TestPostingsStats(t *testing.T) {
|
|||
data := stats.get()
|
||||
testutil.Equals(t, 10, len(data))
|
||||
for i := 0; i < heapLength; i++ {
|
||||
fmt.Printf("%d", data[i].Count)
|
||||
testutil.Equals(t, uint64(max-i), data[i].Count)
|
||||
}
|
||||
|
||||
|
|
|
@ -385,7 +385,8 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
|||
|
||||
// If there's nothing to subtract from, add in everything and remove the notIts later.
|
||||
if len(its) == 0 && len(notIts) != 0 {
|
||||
allPostings, err := ix.Postings(index.AllPostingsKey())
|
||||
k, v := index.AllPostingsKey()
|
||||
allPostings, err := ix.Postings(k, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -413,7 +414,8 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro
|
|||
if m.Type == labels.MatchRegexp {
|
||||
setMatches := findSetMatches(m.Value)
|
||||
if len(setMatches) > 0 {
|
||||
return postingsForSetMatcher(ix, m.Name, setMatches)
|
||||
sort.Strings(setMatches)
|
||||
return ix.Postings(m.Name, setMatches...)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -437,17 +439,7 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro
|
|||
return index.EmptyPostings(), nil
|
||||
}
|
||||
|
||||
var rit []index.Postings
|
||||
|
||||
for _, v := range res {
|
||||
it, err := ix.Postings(m.Name, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rit = append(rit, it)
|
||||
}
|
||||
|
||||
return index.Merge(rit...), nil
|
||||
return ix.Postings(m.Name, res...)
|
||||
}
|
||||
|
||||
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
||||
|
@ -469,29 +461,7 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
|
|||
}
|
||||
}
|
||||
|
||||
var rit []index.Postings
|
||||
for _, v := range res {
|
||||
it, err := ix.Postings(m.Name, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rit = append(rit, it)
|
||||
}
|
||||
|
||||
return index.Merge(rit...), nil
|
||||
}
|
||||
|
||||
func postingsForSetMatcher(ix IndexReader, name string, matches []string) (index.Postings, error) {
|
||||
var its []index.Postings
|
||||
for _, match := range matches {
|
||||
if it, err := ix.Postings(name, match); err == nil {
|
||||
its = append(its, it)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return index.Merge(its...), nil
|
||||
return ix.Postings(m.Name, res...)
|
||||
}
|
||||
|
||||
func mergeStrings(a, b []string) []string {
|
||||
|
|
|
@ -1376,9 +1376,13 @@ func (m mockIndex) LabelValues(names ...string) (index.StringTuples, error) {
|
|||
return index.NewStringTuples(m.labelIndex[names[0]], 1)
|
||||
}
|
||||
|
||||
func (m mockIndex) Postings(name, value string) (index.Postings, error) {
|
||||
l := labels.Label{Name: name, Value: value}
|
||||
return index.NewListPostings(m.postings[l]), nil
|
||||
func (m mockIndex) Postings(name string, values ...string) (index.Postings, error) {
|
||||
res := make([]index.Postings, 0, len(values))
|
||||
for _, value := range values {
|
||||
l := labels.Label{Name: name, Value: value}
|
||||
res = append(res, index.NewListPostings(m.postings[l]))
|
||||
}
|
||||
return index.Merge(res...), nil
|
||||
}
|
||||
|
||||
func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
|
||||
|
|
Loading…
Reference in a new issue