Merge pull request #6452 from prometheus/compact-post-mem

Reread index series rather than storing in memory.
This commit is contained in:
Brian Brazil 2019-12-16 17:44:05 +00:00 committed by GitHub
commit 207808bbae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 233 additions and 132 deletions

View file

@ -54,10 +54,6 @@ type IndexWriter interface {
// The passed in values chained tuples of strings of the length of names.
WriteLabelIndex(names []string, values []string) error
// WritePostings writes a postings list for a single label pair.
// The Postings here contain refs to the series that were added.
WritePostings(name, value string, it index.Postings) error
// Close writes any finalization and closes the resources associated with
// the underlying writer.
Close() error

View file

@ -569,7 +569,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}
}
indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename))
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
if err != nil {
return errors.Wrap(err, "open index writer")
}
@ -727,11 +727,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
}
// We fully rebuild the postings list index from merged series.
var (
postings = index.NewMemPostings()
values = map[string]stringset{}
i = uint64(0)
values = map[string]stringset{}
ref = uint64(0)
)
if err := indexw.AddSymbols(allSymbols); err != nil {
@ -822,7 +820,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrap(err, "write chunks")
}
if err := indexw.AddSeries(i, lset, mergedChks...); err != nil {
if err := indexw.AddSeries(ref, lset, mergedChks...); err != nil {
return errors.Wrap(err, "add series")
}
@ -846,9 +844,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
valset.set(l.Value)
}
postings.Add(i, lset)
i++
ref++
}
if set.Err() != nil {
return errors.Wrap(set.Err(), "iterate compaction set")
@ -866,11 +863,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
}
for _, l := range postings.SortedKeys() {
if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil {
return errors.Wrap(err, "write postings")
}
}
return nil
}

View file

@ -860,6 +860,36 @@ func BenchmarkCompaction(b *testing.B) {
}
}
func BenchmarkCompactionFromHead(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_compaction_from_head")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()
totalSeries := 100000
for labelNames := 1; labelNames < totalSeries; labelNames *= 10 {
labelValues := totalSeries / labelNames
b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000)
testutil.Ok(b, err)
for ln := 0; ln < labelNames; ln++ {
app := h.Appender()
for lv := 0; lv < labelValues; lv++ {
app.Add(labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0)
}
testutil.Ok(b, app.Commit())
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
createBlockFromHead(b, filepath.Join(dir, fmt.Sprintf("%d-%d", i, labelNames)), h)
}
h.Close()
})
}
}
// TestDisableAutoCompactions checks that we can
// disable and enable the auto compaction.
// This is needed for unit tests that rely on
@ -926,8 +956,8 @@ func TestCancelCompactions(t *testing.T) {
}()
// Create some blocks to fall within the compaction range.
createBlock(t, tmpdir, genSeries(10, 10000, 0, 1000))
createBlock(t, tmpdir, genSeries(10, 10000, 1000, 2000))
createBlock(t, tmpdir, genSeries(1, 10000, 0, 1000))
createBlock(t, tmpdir, genSeries(1, 10000, 1000, 2000))
createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one.
// Copy the db so we have an exact copy to compare compaction times.

View file

@ -158,6 +158,14 @@ func NewDecbufUvarintAt(bs ByteSlice, off int, castagnoliTable *crc32.Table) Dec
return dec
}
// NewDecbufRaw returns a new decoding buffer of the given length.
func NewDecbufRaw(bs ByteSlice, length int) Decbuf {
if bs.Len() < length {
return Decbuf{E: ErrInvalidSize}
}
return Decbuf{B: bs.Range(0, length)}
}
func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) }
func (d *Decbuf) Be32int() int { return int(d.Be32()) }
func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) }
@ -260,6 +268,18 @@ func (d *Decbuf) Byte() byte {
return x
}
func (d *Decbuf) ConsumePadding() {
if d.E != nil {
return
}
for len(d.B) > 1 && d.B[0] == '\x00' {
d.B = d.B[1:]
}
if len(d.B) < 1 {
d.E = ErrInvalidSize
}
}
func (d *Decbuf) Err() error { return d.E }
func (d *Decbuf) Len() int { return len(d.B) }
func (d *Decbuf) Get() []byte { return d.B }

View file

@ -15,6 +15,7 @@ package index
import (
"bufio"
"context"
"encoding/binary"
"hash"
"hash/crc32"
@ -111,6 +112,7 @@ func newCRC32() hash.Hash32 {
// Writer implements the IndexWriter interface for the standard
// serialization format.
type Writer struct {
ctx context.Context
f *os.File
fbuf *bufio.Writer
pos uint64
@ -119,17 +121,18 @@ type Writer struct {
stage indexWriterStage
// Reusable memory.
buf1 encoding.Encbuf
buf2 encoding.Encbuf
uint32s []uint32
buf1 encoding.Encbuf
buf2 encoding.Encbuf
symbols map[string]uint32 // symbol offsets
seriesOffsets map[uint64]uint64 // offsets of series
labelIndexes []labelIndexHashEntry // label index offsets
postings []postingsHashEntry // postings lists offsets
symbols map[string]uint32 // symbol offsets
reverseSymbols map[uint32]string
labelIndexes []labelIndexHashEntry // label index offsets
postings []postingsHashEntry // postings lists offsets
labelNames map[string]uint64 // label names, and their usage
// Hold last series to validate that clients insert new series in order.
lastSeries labels.Labels
lastRef uint64
crc32 hash.Hash
@ -175,7 +178,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
func NewWriter(fn string) (*Writer, error) {
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
dir := filepath.Dir(fn)
df, err := fileutil.OpenDir(dir)
@ -197,20 +200,19 @@ func NewWriter(fn string) (*Writer, error) {
}
iw := &Writer{
ctx: ctx,
f: f,
fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0,
stage: idxStageNone,
// Reusable memory.
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
uint32s: make([]uint32, 0, 1<<15),
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
// Caches.
symbols: make(map[string]uint32, 1<<13),
seriesOffsets: make(map[uint64]uint64, 1<<16),
crc32: newCRC32(),
labelNames: make(map[string]uint64, 1<<8),
crc32: newCRC32(),
}
if err := iw.writeMeta(); err != nil {
return nil, err
@ -257,6 +259,12 @@ func (w *Writer) addPadding(size int) error {
// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *Writer) ensureStage(s indexWriterStage) error {
select {
case <-w.ctx.Done():
return w.ctx.Err()
default:
}
if w.stage == s {
return nil
}
@ -274,10 +282,12 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
case idxStageLabelIndex:
w.toc.LabelIndices = w.pos
case idxStagePostings:
w.toc.Postings = w.pos
case idxStageDone:
w.toc.Postings = w.pos
if err := w.writePostings(); err != nil {
return err
}
w.toc.LabelIndicesTable = w.pos
if err := w.writeLabelIndexesOffsetTable(); err != nil {
return err
@ -312,8 +322,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
return errors.Errorf("out-of-order series added with label set %q", lset)
}
if _, ok := w.seriesOffsets[ref]; ok {
return errors.Errorf("series with reference %d already added", ref)
if ref < w.lastRef && len(w.lastSeries) != 0 {
return errors.Errorf("series with reference greater than %d already added", ref)
}
// We add padding to 16 bytes to increase the addressable space we get through 4 byte
// series references.
@ -324,7 +334,6 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
if w.pos%16 != 0 {
return errors.Errorf("series write not 16-byte aligned at %d", w.pos)
}
w.seriesOffsets[ref] = w.pos / 16
w.buf2.Reset()
w.buf2.PutUvarint(len(lset))
@ -335,6 +344,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
if !ok {
return errors.Errorf("symbol entry for %q does not exist", l.Name)
}
w.labelNames[l.Name]++
w.buf2.PutUvarint32(index)
index, ok = w.symbols[l.Value]
@ -374,6 +384,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
}
w.lastSeries = append(w.lastSeries[:0], lset...)
w.lastRef = ref
return nil
}
@ -405,9 +416,11 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error {
}
w.symbols = make(map[string]uint32, len(symbols))
w.reverseSymbols = make(map[uint32]string, len(symbols))
for index, s := range symbols {
w.symbols[s] = uint32(index)
w.reverseSymbols[uint32(index)] = s
w.buf1.Reset()
w.buf1.PutUvarintStr(s)
w.buf1.WriteToHash(w.crc32)
@ -590,11 +603,122 @@ func (w *Writer) writeTOC() error {
return w.write(w.buf1.Get())
}
func (w *Writer) WritePostings(name, value string, it Postings) error {
if err := w.ensureStage(idxStagePostings); err != nil {
return errors.Wrap(err, "ensure stage")
func (w *Writer) writePostings() error {
names := make([]string, 0, len(w.labelNames))
for n := range w.labelNames {
names = append(names, n)
}
sort.Strings(names)
if err := w.fbuf.Flush(); err != nil {
return err
}
f, err := fileutil.OpenMmapFile(w.f.Name())
if err != nil {
return err
}
defer f.Close()
// Write out the special all posting.
offsets := []uint32{}
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
d.Skip(int(w.toc.Series))
for d.Len() > 0 {
d.ConsumePadding()
startPos := w.toc.LabelIndices - uint64(d.Len())
if startPos%16 != 0 {
return errors.Errorf("series not 16-byte aligned at %d", startPos)
}
offsets = append(offsets, uint32(startPos/16))
// Skip to next series. The 4 is for the CRC32.
d.Skip(d.Uvarint() + 4)
if err := d.Err(); err != nil {
return nil
}
}
if err := w.writePosting("", "", offsets); err != nil {
return err
}
maxPostings := uint64(len(offsets)) // No label name can have more postings than this.
for len(names) > 0 {
batchNames := []string{}
var c uint64
// Try to bunch up label names into one loop, but avoid
// using more memory than a single label name can.
for len(names) > 0 {
if w.labelNames[names[0]]+c > maxPostings {
break
}
batchNames = append(batchNames, names[0])
c += w.labelNames[names[0]]
names = names[1:]
}
nameSymbols := map[uint32]struct{}{}
for _, name := range batchNames {
nameSymbols[w.symbols[name]] = struct{}{}
}
// Label name -> label value -> positions.
postings := map[uint32]map[uint32][]uint32{}
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
d.Skip(int(w.toc.Series))
for d.Len() > 0 {
d.ConsumePadding()
startPos := w.toc.LabelIndices - uint64(d.Len())
l := d.Uvarint() // Length of this series in bytes.
startLen := d.Len()
// See if label names we want are in the series.
numLabels := d.Uvarint()
for i := 0; i < numLabels; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())
if _, ok := nameSymbols[lno]; ok {
if _, ok := postings[lno]; !ok {
postings[lno] = map[uint32][]uint32{}
}
if _, ok := postings[lno][lvo]; !ok {
postings[lno][lvo] = []uint32{}
}
postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16))
}
}
// Skip to next series. The 4 is for the CRC32.
d.Skip(l - (startLen - d.Len()) + 4)
if err := d.Err(); err != nil {
return nil
}
}
for _, name := range batchNames {
// Write out postings for this label name.
values := make([]uint32, 0, len(postings[w.symbols[name]]))
for v := range postings[w.symbols[name]] {
values = append(values, v)
}
// Symbol numbers are in order, so the strings will also be in order.
sort.Sort(uint32slice(values))
for _, v := range values {
if err := w.writePosting(name, w.reverseSymbols[v], postings[w.symbols[name]][v]); err != nil {
return err
}
}
}
select {
case <-w.ctx.Done():
return w.ctx.Err()
default:
}
}
return nil
}
func (w *Writer) writePosting(name, value string, offs []uint32) error {
// Align beginning to 4 bytes for more efficient postings list scans.
if err := w.addPadding(4); err != nil {
return err
@ -606,60 +730,20 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
offset: w.pos,
})
// Order of the references in the postings list does not imply order
// of the series references within the persisted block they are mapped to.
// We have to sort the new references again.
refs := w.uint32s[:0]
for it.Next() {
offset, ok := w.seriesOffsets[it.At()]
if !ok {
return errors.Errorf("%p series for reference %d not found", w, it.At())
}
if offset > (1<<32)-1 {
return errors.Errorf("series offset %d exceeds 4 bytes", offset)
}
refs = append(refs, uint32(offset))
}
if err := it.Err(); err != nil {
return err
}
sort.Sort(uint32slice(refs))
startPos := w.pos
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
}
w.crc32.Reset()
w.buf1.Reset()
w.buf1.PutBE32int(len(refs))
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
w.buf1.PutBE32int(len(offs))
for _, r := range refs {
w.buf1.Reset()
w.buf1.PutBE32(r)
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
for _, off := range offs {
if off > (1<<32)-1 {
return errors.Errorf("series offset %d exceeds 4 bytes", off)
}
}
w.uint32s = refs
// Write out the length.
w.buf1.Reset()
w.buf1.PutBE32int(int(w.pos - startPos - 4))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
return err
w.buf1.PutBE32(off)
}
w.buf1.Reset()
w.buf1.PutHashSum(w.crc32)
return w.write(w.buf1.Get())
w.buf2.Reset()
w.buf2.PutBE32int(w.buf1.Len())
w.buf1.PutHash(w.crc32)
return w.write(w.buf2.Get(), w.buf1.Get())
}
type uint32slice []uint32

View file

@ -14,6 +14,7 @@
package index
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
@ -49,6 +50,7 @@ func newMockIndex() mockIndex {
postings: make(map[labels.Label][]uint64),
symbols: make(map[string]struct{}),
}
ix.postings[allPostingsKey] = []uint64{}
return ix
}
@ -63,7 +65,12 @@ func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta)
for _, lbl := range l {
m.symbols[lbl.Name] = struct{}{}
m.symbols[lbl.Value] = struct{}{}
if _, ok := m.postings[lbl]; !ok {
m.postings[lbl] = []uint64{}
}
m.postings[lbl] = append(m.postings[lbl], ref)
}
m.postings[allPostingsKey] = append(m.postings[allPostingsKey], ref)
s := series{l: l}
// Actual chunk data is not stored in the index.
@ -86,19 +93,6 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error {
return nil
}
func (m mockIndex) WritePostings(name, value string, it Postings) error {
l := labels.Label{Name: name, Value: value}
if _, ok := m.postings[l]; ok {
return errors.Errorf("postings for %s already added", l)
}
ep, err := ExpandPostings(it)
if err != nil {
return err
}
m.postings[l] = ep
return nil
}
func (m mockIndex) Close() error {
return nil
}
@ -116,7 +110,7 @@ func (m mockIndex) Postings(name string, values ...string) (Postings, error) {
p := []Postings{}
for _, value := range values {
l := labels.Label{Name: name, Value: value}
p = append(p, NewListPostings(m.postings[l]))
p = append(p, m.SortedPostings(NewListPostings(m.postings[l])))
}
return Merge(p...), nil
}
@ -162,7 +156,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
fn := filepath.Join(dir, indexFilename)
// An empty index must still result in a readable file.
iw, err := NewWriter(fn)
iw, err := NewWriter(context.Background(), fn)
testutil.Ok(t, err)
testutil.Ok(t, iw.Close())
@ -190,7 +184,7 @@ func TestIndexRW_Postings(t *testing.T) {
fn := filepath.Join(dir, indexFilename)
iw, err := NewWriter(fn)
iw, err := NewWriter(context.Background(), fn)
testutil.Ok(t, err)
series := []labels.Labels{
@ -217,7 +211,9 @@ func TestIndexRW_Postings(t *testing.T) {
testutil.Ok(t, iw.AddSeries(3, series[2]))
testutil.Ok(t, iw.AddSeries(4, series[3]))
err = iw.WritePostings("a", "1", newListPostings(1, 2, 3, 4))
err = iw.WriteLabelIndex([]string{"a"}, []string{"1"})
testutil.Ok(t, err)
err = iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"})
testutil.Ok(t, err)
testutil.Ok(t, iw.Close())
@ -252,7 +248,7 @@ func TestPostingsMany(t *testing.T) {
fn := filepath.Join(dir, indexFilename)
iw, err := NewWriter(fn)
iw, err := NewWriter(context.Background(), fn)
testutil.Ok(t, err)
// Create a label in the index which has 999 values.
@ -271,9 +267,8 @@ func TestPostingsMany(t *testing.T) {
for i, s := range series {
testutil.Ok(t, iw.AddSeries(uint64(i), s))
}
for i, s := range series {
testutil.Ok(t, iw.WritePostings("i", s.Get("i"), newListPostings(uint64(i))))
}
err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"})
testutil.Ok(t, err)
testutil.Ok(t, iw.Close())
ir, err := NewFileReader(fn)
@ -374,7 +369,7 @@ func TestPersistence_index_e2e(t *testing.T) {
})
}
iw, err := NewWriter(filepath.Join(dir, indexFilename))
iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename))
testutil.Ok(t, err)
testutil.Ok(t, iw.AddSymbols(symbols))
@ -414,20 +409,6 @@ func TestPersistence_index_e2e(t *testing.T) {
testutil.Ok(t, mi.WriteLabelIndex([]string{k}, vals))
}
all := make([]uint64, len(lbls))
for i := range all {
all[i] = uint64(i)
}
err = iw.WritePostings("", "", newListPostings(all...))
testutil.Ok(t, err)
testutil.Ok(t, mi.WritePostings("", "", newListPostings(all...)))
for _, l := range postings.SortedKeys() {
err := iw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value))
testutil.Ok(t, err)
mi.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value))
}
err = iw.Close()
testutil.Ok(t, err)
@ -457,7 +438,7 @@ func TestPersistence_index_e2e(t *testing.T) {
testutil.Equals(t, explset, lset)
testutil.Equals(t, expchks, chks)
}
testutil.Assert(t, expp.Next() == false, "Unexpected Next() for "+p.Name+" "+p.Value)
testutil.Assert(t, expp.Next() == false, "Expected no more postings for %q=%q", p.Name, p.Value)
testutil.Ok(t, gotp.Err())
}

View file

@ -17,7 +17,6 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
)
@ -62,9 +61,8 @@ func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunk
return nil
}
func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil }
func (mockIndexWriter) WritePostings(name, value string, it index.Postings) error { return nil }
func (mockIndexWriter) Close() error { return nil }
func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil }
func (mockIndexWriter) Close() error { return nil }
type mockBReader struct {
ir IndexReader