mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-13 17:14:05 -08:00
Reread index series rather than storing in memory.
Rather than building up a 2nd copy of all the posting tables, construct it from the data we've already written to disk. This takes more time, but saves memory. Current benchmark numbers have this as slightly faster, but that's likely due to the synthetic data not having many label names. Memory usage is roughly halved for the relevant bits. Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
48d25e6fe7
commit
373a1fdfbf
|
@ -54,10 +54,6 @@ type IndexWriter interface {
|
||||||
// The passed in values chained tuples of strings of the length of names.
|
// The passed in values chained tuples of strings of the length of names.
|
||||||
WriteLabelIndex(names []string, values []string) error
|
WriteLabelIndex(names []string, values []string) error
|
||||||
|
|
||||||
// WritePostings writes a postings list for a single label pair.
|
|
||||||
// The Postings here contain refs to the series that were added.
|
|
||||||
WritePostings(name, value string, it index.Postings) error
|
|
||||||
|
|
||||||
// Close writes any finalization and closes the resources associated with
|
// Close writes any finalization and closes the resources associated with
|
||||||
// the underlying writer.
|
// the underlying writer.
|
||||||
Close() error
|
Close() error
|
||||||
|
|
|
@ -727,11 +727,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We fully rebuild the postings list index from merged series.
|
|
||||||
var (
|
var (
|
||||||
postings = index.NewMemPostings()
|
values = map[string]stringset{}
|
||||||
values = map[string]stringset{}
|
ref = uint64(0)
|
||||||
i = uint64(0)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := indexw.AddSymbols(allSymbols); err != nil {
|
if err := indexw.AddSymbols(allSymbols); err != nil {
|
||||||
|
@ -822,7 +820,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
return errors.Wrap(err, "write chunks")
|
return errors.Wrap(err, "write chunks")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := indexw.AddSeries(i, lset, mergedChks...); err != nil {
|
if err := indexw.AddSeries(ref, lset, mergedChks...); err != nil {
|
||||||
return errors.Wrap(err, "add series")
|
return errors.Wrap(err, "add series")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -846,9 +844,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
}
|
}
|
||||||
valset.set(l.Value)
|
valset.set(l.Value)
|
||||||
}
|
}
|
||||||
postings.Add(i, lset)
|
|
||||||
|
|
||||||
i++
|
ref++
|
||||||
}
|
}
|
||||||
if set.Err() != nil {
|
if set.Err() != nil {
|
||||||
return errors.Wrap(set.Err(), "iterate compaction set")
|
return errors.Wrap(set.Err(), "iterate compaction set")
|
||||||
|
@ -866,11 +863,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, l := range postings.SortedKeys() {
|
|
||||||
if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil {
|
|
||||||
return errors.Wrap(err, "write postings")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -158,6 +158,14 @@ func NewDecbufUvarintAt(bs ByteSlice, off int, castagnoliTable *crc32.Table) Dec
|
||||||
return dec
|
return dec
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewDecbufRaw returns a new decoding buffer of the given length.
|
||||||
|
func NewDecbufRaw(bs ByteSlice, length int) Decbuf {
|
||||||
|
if bs.Len() < length {
|
||||||
|
return Decbuf{E: ErrInvalidSize}
|
||||||
|
}
|
||||||
|
return Decbuf{B: bs.Range(0, length)}
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) }
|
func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) }
|
||||||
func (d *Decbuf) Be32int() int { return int(d.Be32()) }
|
func (d *Decbuf) Be32int() int { return int(d.Be32()) }
|
||||||
func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) }
|
func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) }
|
||||||
|
@ -260,6 +268,20 @@ func (d *Decbuf) Byte() byte {
|
||||||
return x
|
return x
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Decbuf) EatPadding() {
|
||||||
|
if d.E != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for len(d.B) > 1 && d.B[0] == '\x00' {
|
||||||
|
d.B = d.B[1:]
|
||||||
|
}
|
||||||
|
if len(d.B) < 1 {
|
||||||
|
d.E = ErrInvalidSize
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Decbuf) Err() error { return d.E }
|
func (d *Decbuf) Err() error { return d.E }
|
||||||
func (d *Decbuf) Len() int { return len(d.B) }
|
func (d *Decbuf) Len() int { return len(d.B) }
|
||||||
func (d *Decbuf) Get() []byte { return d.B }
|
func (d *Decbuf) Get() []byte { return d.B }
|
||||||
|
|
|
@ -119,17 +119,18 @@ type Writer struct {
|
||||||
stage indexWriterStage
|
stage indexWriterStage
|
||||||
|
|
||||||
// Reusable memory.
|
// Reusable memory.
|
||||||
buf1 encoding.Encbuf
|
buf1 encoding.Encbuf
|
||||||
buf2 encoding.Encbuf
|
buf2 encoding.Encbuf
|
||||||
uint32s []uint32
|
|
||||||
|
|
||||||
symbols map[string]uint32 // symbol offsets
|
symbols map[string]uint32 // symbol offsets
|
||||||
seriesOffsets map[uint64]uint64 // offsets of series
|
reverseSymbols map[uint32]string
|
||||||
labelIndexes []labelIndexHashEntry // label index offsets
|
labelIndexes []labelIndexHashEntry // label index offsets
|
||||||
postings []postingsHashEntry // postings lists offsets
|
postings []postingsHashEntry // postings lists offsets
|
||||||
|
labelNames map[string]struct{} // label names
|
||||||
|
|
||||||
// Hold last series to validate that clients insert new series in order.
|
// Hold last series to validate that clients insert new series in order.
|
||||||
lastSeries labels.Labels
|
lastSeries labels.Labels
|
||||||
|
lastRef uint64
|
||||||
|
|
||||||
crc32 hash.Hash
|
crc32 hash.Hash
|
||||||
|
|
||||||
|
@ -203,14 +204,14 @@ func NewWriter(fn string) (*Writer, error) {
|
||||||
stage: idxStageNone,
|
stage: idxStageNone,
|
||||||
|
|
||||||
// Reusable memory.
|
// Reusable memory.
|
||||||
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
||||||
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
||||||
uint32s: make([]uint32, 0, 1<<15),
|
|
||||||
|
|
||||||
// Caches.
|
// Caches.
|
||||||
symbols: make(map[string]uint32, 1<<13),
|
symbols: make(map[string]uint32, 1<<13),
|
||||||
seriesOffsets: make(map[uint64]uint64, 1<<16),
|
reverseSymbols: make(map[uint32]string, 1<<13),
|
||||||
crc32: newCRC32(),
|
labelNames: make(map[string]struct{}, 1<<8),
|
||||||
|
crc32: newCRC32(),
|
||||||
}
|
}
|
||||||
if err := iw.writeMeta(); err != nil {
|
if err := iw.writeMeta(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -274,10 +275,12 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
||||||
case idxStageLabelIndex:
|
case idxStageLabelIndex:
|
||||||
w.toc.LabelIndices = w.pos
|
w.toc.LabelIndices = w.pos
|
||||||
|
|
||||||
case idxStagePostings:
|
|
||||||
w.toc.Postings = w.pos
|
|
||||||
|
|
||||||
case idxStageDone:
|
case idxStageDone:
|
||||||
|
w.toc.Postings = w.pos
|
||||||
|
if err := w.writePostings(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
w.toc.LabelIndicesTable = w.pos
|
w.toc.LabelIndicesTable = w.pos
|
||||||
if err := w.writeLabelIndexesOffsetTable(); err != nil {
|
if err := w.writeLabelIndexesOffsetTable(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -312,8 +315,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
||||||
return errors.Errorf("out-of-order series added with label set %q", lset)
|
return errors.Errorf("out-of-order series added with label set %q", lset)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := w.seriesOffsets[ref]; ok {
|
if ref < w.lastRef && len(w.lastSeries) != 0 {
|
||||||
return errors.Errorf("series with reference %d already added", ref)
|
return errors.Errorf("series with reference greater than %d already added", ref)
|
||||||
}
|
}
|
||||||
// We add padding to 16 bytes to increase the addressable space we get through 4 byte
|
// We add padding to 16 bytes to increase the addressable space we get through 4 byte
|
||||||
// series references.
|
// series references.
|
||||||
|
@ -324,7 +327,6 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
||||||
if w.pos%16 != 0 {
|
if w.pos%16 != 0 {
|
||||||
return errors.Errorf("series write not 16-byte aligned at %d", w.pos)
|
return errors.Errorf("series write not 16-byte aligned at %d", w.pos)
|
||||||
}
|
}
|
||||||
w.seriesOffsets[ref] = w.pos / 16
|
|
||||||
|
|
||||||
w.buf2.Reset()
|
w.buf2.Reset()
|
||||||
w.buf2.PutUvarint(len(lset))
|
w.buf2.PutUvarint(len(lset))
|
||||||
|
@ -335,6 +337,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.Errorf("symbol entry for %q does not exist", l.Name)
|
return errors.Errorf("symbol entry for %q does not exist", l.Name)
|
||||||
}
|
}
|
||||||
|
w.labelNames[l.Name] = struct{}{}
|
||||||
w.buf2.PutUvarint32(index)
|
w.buf2.PutUvarint32(index)
|
||||||
|
|
||||||
index, ok = w.symbols[l.Value]
|
index, ok = w.symbols[l.Value]
|
||||||
|
@ -374,6 +377,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
||||||
}
|
}
|
||||||
|
|
||||||
w.lastSeries = append(w.lastSeries[:0], lset...)
|
w.lastSeries = append(w.lastSeries[:0], lset...)
|
||||||
|
w.lastRef = ref
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -408,6 +412,7 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error {
|
||||||
|
|
||||||
for index, s := range symbols {
|
for index, s := range symbols {
|
||||||
w.symbols[s] = uint32(index)
|
w.symbols[s] = uint32(index)
|
||||||
|
w.reverseSymbols[uint32(index)] = s
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutUvarintStr(s)
|
w.buf1.PutUvarintStr(s)
|
||||||
w.buf1.WriteToHash(w.crc32)
|
w.buf1.WriteToHash(w.crc32)
|
||||||
|
@ -590,11 +595,94 @@ func (w *Writer) writeTOC() error {
|
||||||
return w.write(w.buf1.Get())
|
return w.write(w.buf1.Get())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Writer) WritePostings(name, value string, it Postings) error {
|
func (w *Writer) writePostings() error {
|
||||||
if err := w.ensureStage(idxStagePostings); err != nil {
|
|
||||||
return errors.Wrap(err, "ensure stage")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
names := make([]string, 0, len(w.labelNames))
|
||||||
|
for n := range w.labelNames {
|
||||||
|
names = append(names, n)
|
||||||
|
}
|
||||||
|
sort.Strings(names)
|
||||||
|
|
||||||
|
if err := w.fbuf.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
f, err := fileutil.OpenMmapFile(w.f.Name())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
// Write out the special all index.
|
||||||
|
offsets := []uint32{}
|
||||||
|
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
|
||||||
|
d.B = d.B[w.toc.Series:] // dec.Skip not merged yet
|
||||||
|
for d.Len() > 0 {
|
||||||
|
d.EatPadding()
|
||||||
|
startPos := w.toc.LabelIndices - uint64(d.Len())
|
||||||
|
if startPos%16 != 0 {
|
||||||
|
return errors.Errorf("series not 16-byte aligned at %d", startPos)
|
||||||
|
}
|
||||||
|
offsets = append(offsets, uint32(startPos/16))
|
||||||
|
// Skip to next series. The 4 is for the CRC32.
|
||||||
|
skip := d.Uvarint() + 4
|
||||||
|
d.B = d.B[skip:]
|
||||||
|
if err := d.Err(); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.writePosting("", "", offsets)
|
||||||
|
|
||||||
|
for _, name := range names {
|
||||||
|
nameo := w.symbols[name]
|
||||||
|
postings := map[uint32][]uint32{}
|
||||||
|
|
||||||
|
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
|
||||||
|
d.B = d.B[w.toc.Series:] // dec.Skip not merged yet
|
||||||
|
for d.Len() > 0 {
|
||||||
|
d.EatPadding()
|
||||||
|
startPos := w.toc.LabelIndices - uint64(d.Len())
|
||||||
|
l := d.Uvarint() // Length of this series in bytes.
|
||||||
|
startLen := d.Len()
|
||||||
|
|
||||||
|
// See if this label name is in the series.
|
||||||
|
numLabels := d.Uvarint()
|
||||||
|
for i := 0; i < numLabels; i++ {
|
||||||
|
lno := uint32(d.Uvarint())
|
||||||
|
lvo := uint32(d.Uvarint())
|
||||||
|
|
||||||
|
if lno == nameo {
|
||||||
|
if _, ok := postings[lvo]; !ok {
|
||||||
|
postings[lvo] = []uint32{}
|
||||||
|
}
|
||||||
|
postings[lvo] = append(postings[lvo], uint32(startPos/16))
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Skip to next series. The 4 is for the CRC32.
|
||||||
|
skip := l - (startLen - d.Len()) + 4
|
||||||
|
d.B = d.B[skip:]
|
||||||
|
if err := d.Err(); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write out postings for this label name.
|
||||||
|
values := make([]uint32, 0, len(postings))
|
||||||
|
for v := range postings {
|
||||||
|
values = append(values, v)
|
||||||
|
|
||||||
|
}
|
||||||
|
// Symbol numbers are in order, so the strings will also be in order.
|
||||||
|
sort.Sort(uint32slice(values))
|
||||||
|
for _, v := range values {
|
||||||
|
w.writePosting(name, w.reverseSymbols[v], postings[v])
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) writePosting(name, value string, offs []uint32) error {
|
||||||
// Align beginning to 4 bytes for more efficient postings list scans.
|
// Align beginning to 4 bytes for more efficient postings list scans.
|
||||||
if err := w.addPadding(4); err != nil {
|
if err := w.addPadding(4); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -606,26 +694,6 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
|
||||||
offset: w.pos,
|
offset: w.pos,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Order of the references in the postings list does not imply order
|
|
||||||
// of the series references within the persisted block they are mapped to.
|
|
||||||
// We have to sort the new references again.
|
|
||||||
refs := w.uint32s[:0]
|
|
||||||
|
|
||||||
for it.Next() {
|
|
||||||
offset, ok := w.seriesOffsets[it.At()]
|
|
||||||
if !ok {
|
|
||||||
return errors.Errorf("%p series for reference %d not found", w, it.At())
|
|
||||||
}
|
|
||||||
if offset > (1<<32)-1 {
|
|
||||||
return errors.Errorf("series offset %d exceeds 4 bytes", offset)
|
|
||||||
}
|
|
||||||
refs = append(refs, uint32(offset))
|
|
||||||
}
|
|
||||||
if err := it.Err(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sort.Sort(uint32slice(refs))
|
|
||||||
|
|
||||||
startPos := w.pos
|
startPos := w.pos
|
||||||
// Leave 4 bytes of space for the length, which will be calculated later.
|
// Leave 4 bytes of space for the length, which will be calculated later.
|
||||||
if err := w.write([]byte("alen")); err != nil {
|
if err := w.write([]byte("alen")); err != nil {
|
||||||
|
@ -634,21 +702,23 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
|
||||||
w.crc32.Reset()
|
w.crc32.Reset()
|
||||||
|
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32int(len(refs))
|
w.buf1.PutBE32int(len(offs))
|
||||||
w.buf1.WriteToHash(w.crc32)
|
w.buf1.WriteToHash(w.crc32)
|
||||||
if err := w.write(w.buf1.Get()); err != nil {
|
if err := w.write(w.buf1.Get()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, r := range refs {
|
for _, off := range offs {
|
||||||
|
if off > (1<<32)-1 {
|
||||||
|
return errors.Errorf("series offset %d exceeds 4 bytes", off)
|
||||||
|
}
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32(r)
|
w.buf1.PutBE32(off)
|
||||||
w.buf1.WriteToHash(w.crc32)
|
w.buf1.WriteToHash(w.crc32)
|
||||||
if err := w.write(w.buf1.Get()); err != nil {
|
if err := w.write(w.buf1.Get()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
w.uint32s = refs
|
|
||||||
|
|
||||||
// Write out the length.
|
// Write out the length.
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
|
|
|
@ -49,6 +49,7 @@ func newMockIndex() mockIndex {
|
||||||
postings: make(map[labels.Label][]uint64),
|
postings: make(map[labels.Label][]uint64),
|
||||||
symbols: make(map[string]struct{}),
|
symbols: make(map[string]struct{}),
|
||||||
}
|
}
|
||||||
|
ix.postings[allPostingsKey] = []uint64{}
|
||||||
return ix
|
return ix
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,7 +64,12 @@ func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta)
|
||||||
for _, lbl := range l {
|
for _, lbl := range l {
|
||||||
m.symbols[lbl.Name] = struct{}{}
|
m.symbols[lbl.Name] = struct{}{}
|
||||||
m.symbols[lbl.Value] = struct{}{}
|
m.symbols[lbl.Value] = struct{}{}
|
||||||
|
if _, ok := m.postings[lbl]; !ok {
|
||||||
|
m.postings[lbl] = []uint64{}
|
||||||
|
}
|
||||||
|
m.postings[lbl] = append(m.postings[lbl], ref)
|
||||||
}
|
}
|
||||||
|
m.postings[allPostingsKey] = append(m.postings[allPostingsKey], ref)
|
||||||
|
|
||||||
s := series{l: l}
|
s := series{l: l}
|
||||||
// Actual chunk data is not stored in the index.
|
// Actual chunk data is not stored in the index.
|
||||||
|
@ -86,19 +92,6 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m mockIndex) WritePostings(name, value string, it Postings) error {
|
|
||||||
l := labels.Label{Name: name, Value: value}
|
|
||||||
if _, ok := m.postings[l]; ok {
|
|
||||||
return errors.Errorf("postings for %s already added", l)
|
|
||||||
}
|
|
||||||
ep, err := ExpandPostings(it)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
m.postings[l] = ep
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m mockIndex) Close() error {
|
func (m mockIndex) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -116,7 +109,7 @@ func (m mockIndex) Postings(name string, values ...string) (Postings, error) {
|
||||||
p := []Postings{}
|
p := []Postings{}
|
||||||
for _, value := range values {
|
for _, value := range values {
|
||||||
l := labels.Label{Name: name, Value: value}
|
l := labels.Label{Name: name, Value: value}
|
||||||
p = append(p, NewListPostings(m.postings[l]))
|
p = append(p, m.SortedPostings(NewListPostings(m.postings[l])))
|
||||||
}
|
}
|
||||||
return Merge(p...), nil
|
return Merge(p...), nil
|
||||||
}
|
}
|
||||||
|
@ -217,7 +210,8 @@ func TestIndexRW_Postings(t *testing.T) {
|
||||||
testutil.Ok(t, iw.AddSeries(3, series[2]))
|
testutil.Ok(t, iw.AddSeries(3, series[2]))
|
||||||
testutil.Ok(t, iw.AddSeries(4, series[3]))
|
testutil.Ok(t, iw.AddSeries(4, series[3]))
|
||||||
|
|
||||||
err = iw.WritePostings("a", "1", newListPostings(1, 2, 3, 4))
|
err = iw.WriteLabelIndex([]string{"a"}, []string{"1"})
|
||||||
|
err = iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"})
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
testutil.Ok(t, iw.Close())
|
testutil.Ok(t, iw.Close())
|
||||||
|
@ -271,9 +265,7 @@ func TestPostingsMany(t *testing.T) {
|
||||||
for i, s := range series {
|
for i, s := range series {
|
||||||
testutil.Ok(t, iw.AddSeries(uint64(i), s))
|
testutil.Ok(t, iw.AddSeries(uint64(i), s))
|
||||||
}
|
}
|
||||||
for i, s := range series {
|
err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"})
|
||||||
testutil.Ok(t, iw.WritePostings("i", s.Get("i"), newListPostings(uint64(i))))
|
|
||||||
}
|
|
||||||
testutil.Ok(t, iw.Close())
|
testutil.Ok(t, iw.Close())
|
||||||
|
|
||||||
ir, err := NewFileReader(fn)
|
ir, err := NewFileReader(fn)
|
||||||
|
@ -414,20 +406,6 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||||
testutil.Ok(t, mi.WriteLabelIndex([]string{k}, vals))
|
testutil.Ok(t, mi.WriteLabelIndex([]string{k}, vals))
|
||||||
}
|
}
|
||||||
|
|
||||||
all := make([]uint64, len(lbls))
|
|
||||||
for i := range all {
|
|
||||||
all[i] = uint64(i)
|
|
||||||
}
|
|
||||||
err = iw.WritePostings("", "", newListPostings(all...))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
testutil.Ok(t, mi.WritePostings("", "", newListPostings(all...)))
|
|
||||||
|
|
||||||
for _, l := range postings.SortedKeys() {
|
|
||||||
err := iw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value))
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
mi.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value))
|
|
||||||
}
|
|
||||||
|
|
||||||
err = iw.Close()
|
err = iw.Close()
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
@ -457,7 +435,7 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||||
testutil.Equals(t, explset, lset)
|
testutil.Equals(t, explset, lset)
|
||||||
testutil.Equals(t, expchks, chks)
|
testutil.Equals(t, expchks, chks)
|
||||||
}
|
}
|
||||||
testutil.Assert(t, expp.Next() == false, "Unexpected Next() for "+p.Name+" "+p.Value)
|
testutil.Assert(t, expp.Next() == false, "Expected no more postings for %q=%q", p.Name, p.Value)
|
||||||
testutil.Ok(t, gotp.Err())
|
testutil.Ok(t, gotp.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,6 @@ import (
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/prometheus/tsdb/index"
|
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -62,9 +61,9 @@ func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunk
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil }
|
func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil }
|
||||||
func (mockIndexWriter) WritePostings(name, value string, it index.Postings) error { return nil }
|
func (mockIndexWriter) WritePostings() error { return nil }
|
||||||
func (mockIndexWriter) Close() error { return nil }
|
func (mockIndexWriter) Close() error { return nil }
|
||||||
|
|
||||||
type mockBReader struct {
|
type mockBReader struct {
|
||||||
ir IndexReader
|
ir IndexReader
|
||||||
|
|
Loading…
Reference in a new issue