diff --git a/tsdb/block.go b/tsdb/block.go index ce75a56b27..8f8d5e0555 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -54,10 +54,6 @@ type IndexWriter interface { // The passed in values chained tuples of strings of the length of names. WriteLabelIndex(names []string, values []string) error - // WritePostings writes a postings list for a single label pair. - // The Postings here contain refs to the series that were added. - WritePostings(name, value string, it index.Postings) error - // Close writes any finalization and closes the resources associated with // the underlying writer. Close() error diff --git a/tsdb/compact.go b/tsdb/compact.go index da304ed12a..3e57512df5 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -569,7 +569,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } } - indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename)) + indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) if err != nil { return errors.Wrap(err, "open index writer") } @@ -727,11 +727,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - // We fully rebuild the postings list index from merged series. var ( - postings = index.NewMemPostings() - values = map[string]stringset{} - i = uint64(0) + values = map[string]stringset{} + ref = uint64(0) ) if err := indexw.AddSymbols(allSymbols); err != nil { @@ -822,7 +820,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return errors.Wrap(err, "write chunks") } - if err := indexw.AddSeries(i, lset, mergedChks...); err != nil { + if err := indexw.AddSeries(ref, lset, mergedChks...); err != nil { return errors.Wrap(err, "add series") } @@ -846,9 +844,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } valset.set(l.Value) } - postings.Add(i, lset) - i++ + ref++ } if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set") @@ -866,11 +863,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - for _, l := range postings.SortedKeys() { - if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil { - return errors.Wrap(err, "write postings") - } - } return nil } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 91024d2826..db90651099 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -860,6 +860,36 @@ func BenchmarkCompaction(b *testing.B) { } } +func BenchmarkCompactionFromHead(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_compaction_from_head") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() + totalSeries := 100000 + for labelNames := 1; labelNames < totalSeries; labelNames *= 10 { + labelValues := totalSeries / labelNames + b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(b, err) + for ln := 0; ln < labelNames; ln++ { + app := h.Appender() + for lv := 0; lv < labelValues; lv++ { + app.Add(labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0) + } + testutil.Ok(b, app.Commit()) + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + createBlockFromHead(b, filepath.Join(dir, fmt.Sprintf("%d-%d", i, labelNames)), h) + } + h.Close() + }) + } +} + // TestDisableAutoCompactions checks that we can // disable and enable the auto compaction. // This is needed for unit tests that rely on @@ -926,8 +956,8 @@ func TestCancelCompactions(t *testing.T) { }() // Create some blocks to fall within the compaction range. - createBlock(t, tmpdir, genSeries(10, 10000, 0, 1000)) - createBlock(t, tmpdir, genSeries(10, 10000, 1000, 2000)) + createBlock(t, tmpdir, genSeries(1, 10000, 0, 1000)) + createBlock(t, tmpdir, genSeries(1, 10000, 1000, 2000)) createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one. // Copy the db so we have an exact copy to compare compaction times. diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index f2e1037567..82270ce863 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -158,6 +158,14 @@ func NewDecbufUvarintAt(bs ByteSlice, off int, castagnoliTable *crc32.Table) Dec return dec } +// NewDecbufRaw returns a new decoding buffer of the given length. +func NewDecbufRaw(bs ByteSlice, length int) Decbuf { + if bs.Len() < length { + return Decbuf{E: ErrInvalidSize} + } + return Decbuf{B: bs.Range(0, length)} +} + func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) } func (d *Decbuf) Be32int() int { return int(d.Be32()) } func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) } @@ -260,6 +268,18 @@ func (d *Decbuf) Byte() byte { return x } +func (d *Decbuf) ConsumePadding() { + if d.E != nil { + return + } + for len(d.B) > 1 && d.B[0] == '\x00' { + d.B = d.B[1:] + } + if len(d.B) < 1 { + d.E = ErrInvalidSize + } +} + func (d *Decbuf) Err() error { return d.E } func (d *Decbuf) Len() int { return len(d.B) } func (d *Decbuf) Get() []byte { return d.B } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 61b80427c6..e51ff0e8d7 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -15,6 +15,7 @@ package index import ( "bufio" + "context" "encoding/binary" "hash" "hash/crc32" @@ -111,6 +112,7 @@ func newCRC32() hash.Hash32 { // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { + ctx context.Context f *os.File fbuf *bufio.Writer pos uint64 @@ -119,17 +121,18 @@ type Writer struct { stage indexWriterStage // Reusable memory. - buf1 encoding.Encbuf - buf2 encoding.Encbuf - uint32s []uint32 + buf1 encoding.Encbuf + buf2 encoding.Encbuf - symbols map[string]uint32 // symbol offsets - seriesOffsets map[uint64]uint64 // offsets of series - labelIndexes []labelIndexHashEntry // label index offsets - postings []postingsHashEntry // postings lists offsets + symbols map[string]uint32 // symbol offsets + reverseSymbols map[uint32]string + labelIndexes []labelIndexHashEntry // label index offsets + postings []postingsHashEntry // postings lists offsets + labelNames map[string]uint64 // label names, and their usage // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels + lastRef uint64 crc32 hash.Hash @@ -175,7 +178,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. -func NewWriter(fn string) (*Writer, error) { +func NewWriter(ctx context.Context, fn string) (*Writer, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -197,20 +200,19 @@ func NewWriter(fn string) (*Writer, error) { } iw := &Writer{ + ctx: ctx, f: f, fbuf: bufio.NewWriterSize(f, 1<<22), pos: 0, stage: idxStageNone, // Reusable memory. - buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - uint32s: make([]uint32, 0, 1<<15), + buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, + buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, // Caches. - symbols: make(map[string]uint32, 1<<13), - seriesOffsets: make(map[uint64]uint64, 1<<16), - crc32: newCRC32(), + labelNames: make(map[string]uint64, 1<<8), + crc32: newCRC32(), } if err := iw.writeMeta(); err != nil { return nil, err @@ -257,6 +259,12 @@ func (w *Writer) addPadding(size int) error { // ensureStage handles transitions between write stages and ensures that IndexWriter // methods are called in an order valid for the implementation. func (w *Writer) ensureStage(s indexWriterStage) error { + select { + case <-w.ctx.Done(): + return w.ctx.Err() + default: + } + if w.stage == s { return nil } @@ -274,10 +282,12 @@ func (w *Writer) ensureStage(s indexWriterStage) error { case idxStageLabelIndex: w.toc.LabelIndices = w.pos - case idxStagePostings: - w.toc.Postings = w.pos - case idxStageDone: + w.toc.Postings = w.pos + if err := w.writePostings(); err != nil { + return err + } + w.toc.LabelIndicesTable = w.pos if err := w.writeLabelIndexesOffsetTable(); err != nil { return err @@ -312,8 +322,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta return errors.Errorf("out-of-order series added with label set %q", lset) } - if _, ok := w.seriesOffsets[ref]; ok { - return errors.Errorf("series with reference %d already added", ref) + if ref < w.lastRef && len(w.lastSeries) != 0 { + return errors.Errorf("series with reference greater than %d already added", ref) } // We add padding to 16 bytes to increase the addressable space we get through 4 byte // series references. @@ -324,7 +334,6 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta if w.pos%16 != 0 { return errors.Errorf("series write not 16-byte aligned at %d", w.pos) } - w.seriesOffsets[ref] = w.pos / 16 w.buf2.Reset() w.buf2.PutUvarint(len(lset)) @@ -335,6 +344,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta if !ok { return errors.Errorf("symbol entry for %q does not exist", l.Name) } + w.labelNames[l.Name]++ w.buf2.PutUvarint32(index) index, ok = w.symbols[l.Value] @@ -374,6 +384,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta } w.lastSeries = append(w.lastSeries[:0], lset...) + w.lastRef = ref return nil } @@ -405,9 +416,11 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error { } w.symbols = make(map[string]uint32, len(symbols)) + w.reverseSymbols = make(map[uint32]string, len(symbols)) for index, s := range symbols { w.symbols[s] = uint32(index) + w.reverseSymbols[uint32(index)] = s w.buf1.Reset() w.buf1.PutUvarintStr(s) w.buf1.WriteToHash(w.crc32) @@ -590,11 +603,122 @@ func (w *Writer) writeTOC() error { return w.write(w.buf1.Get()) } -func (w *Writer) WritePostings(name, value string, it Postings) error { - if err := w.ensureStage(idxStagePostings); err != nil { - return errors.Wrap(err, "ensure stage") +func (w *Writer) writePostings() error { + names := make([]string, 0, len(w.labelNames)) + for n := range w.labelNames { + names = append(names, n) } + sort.Strings(names) + if err := w.fbuf.Flush(); err != nil { + return err + } + f, err := fileutil.OpenMmapFile(w.f.Name()) + if err != nil { + return err + } + defer f.Close() + + // Write out the special all posting. + offsets := []uint32{} + d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) + d.Skip(int(w.toc.Series)) + for d.Len() > 0 { + d.ConsumePadding() + startPos := w.toc.LabelIndices - uint64(d.Len()) + if startPos%16 != 0 { + return errors.Errorf("series not 16-byte aligned at %d", startPos) + } + offsets = append(offsets, uint32(startPos/16)) + // Skip to next series. The 4 is for the CRC32. + d.Skip(d.Uvarint() + 4) + if err := d.Err(); err != nil { + return nil + } + } + if err := w.writePosting("", "", offsets); err != nil { + return err + } + maxPostings := uint64(len(offsets)) // No label name can have more postings than this. + + for len(names) > 0 { + batchNames := []string{} + var c uint64 + // Try to bunch up label names into one loop, but avoid + // using more memory than a single label name can. + for len(names) > 0 { + if w.labelNames[names[0]]+c > maxPostings { + break + } + batchNames = append(batchNames, names[0]) + c += w.labelNames[names[0]] + names = names[1:] + } + + nameSymbols := map[uint32]struct{}{} + for _, name := range batchNames { + nameSymbols[w.symbols[name]] = struct{}{} + } + // Label name -> label value -> positions. + postings := map[uint32]map[uint32][]uint32{} + + d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) + d.Skip(int(w.toc.Series)) + for d.Len() > 0 { + d.ConsumePadding() + startPos := w.toc.LabelIndices - uint64(d.Len()) + l := d.Uvarint() // Length of this series in bytes. + startLen := d.Len() + + // See if label names we want are in the series. + numLabels := d.Uvarint() + for i := 0; i < numLabels; i++ { + lno := uint32(d.Uvarint()) + lvo := uint32(d.Uvarint()) + + if _, ok := nameSymbols[lno]; ok { + if _, ok := postings[lno]; !ok { + postings[lno] = map[uint32][]uint32{} + } + if _, ok := postings[lno][lvo]; !ok { + postings[lno][lvo] = []uint32{} + } + postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16)) + } + } + // Skip to next series. The 4 is for the CRC32. + d.Skip(l - (startLen - d.Len()) + 4) + if err := d.Err(); err != nil { + return nil + } + } + + for _, name := range batchNames { + // Write out postings for this label name. + values := make([]uint32, 0, len(postings[w.symbols[name]])) + for v := range postings[w.symbols[name]] { + values = append(values, v) + + } + // Symbol numbers are in order, so the strings will also be in order. + sort.Sort(uint32slice(values)) + for _, v := range values { + if err := w.writePosting(name, w.reverseSymbols[v], postings[w.symbols[name]][v]); err != nil { + return err + } + } + } + select { + case <-w.ctx.Done(): + return w.ctx.Err() + default: + } + + } + return nil +} + +func (w *Writer) writePosting(name, value string, offs []uint32) error { // Align beginning to 4 bytes for more efficient postings list scans. if err := w.addPadding(4); err != nil { return err @@ -606,60 +730,20 @@ func (w *Writer) WritePostings(name, value string, it Postings) error { offset: w.pos, }) - // Order of the references in the postings list does not imply order - // of the series references within the persisted block they are mapped to. - // We have to sort the new references again. - refs := w.uint32s[:0] - - for it.Next() { - offset, ok := w.seriesOffsets[it.At()] - if !ok { - return errors.Errorf("%p series for reference %d not found", w, it.At()) - } - if offset > (1<<32)-1 { - return errors.Errorf("series offset %d exceeds 4 bytes", offset) - } - refs = append(refs, uint32(offset)) - } - if err := it.Err(); err != nil { - return err - } - sort.Sort(uint32slice(refs)) - - startPos := w.pos - // Leave 4 bytes of space for the length, which will be calculated later. - if err := w.write([]byte("alen")); err != nil { - return err - } - w.crc32.Reset() - w.buf1.Reset() - w.buf1.PutBE32int(len(refs)) - w.buf1.WriteToHash(w.crc32) - if err := w.write(w.buf1.Get()); err != nil { - return err - } + w.buf1.PutBE32int(len(offs)) - for _, r := range refs { - w.buf1.Reset() - w.buf1.PutBE32(r) - w.buf1.WriteToHash(w.crc32) - if err := w.write(w.buf1.Get()); err != nil { - return err + for _, off := range offs { + if off > (1<<32)-1 { + return errors.Errorf("series offset %d exceeds 4 bytes", off) } - } - w.uint32s = refs - - // Write out the length. - w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) - if err := w.writeAt(w.buf1.Get(), startPos); err != nil { - return err + w.buf1.PutBE32(off) } - w.buf1.Reset() - w.buf1.PutHashSum(w.crc32) - return w.write(w.buf1.Get()) + w.buf2.Reset() + w.buf2.PutBE32int(w.buf1.Len()) + w.buf1.PutHash(w.crc32) + return w.write(w.buf2.Get(), w.buf1.Get()) } type uint32slice []uint32 diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 95611d093e..d60af036cf 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -14,6 +14,7 @@ package index import ( + "context" "fmt" "io/ioutil" "math/rand" @@ -49,6 +50,7 @@ func newMockIndex() mockIndex { postings: make(map[labels.Label][]uint64), symbols: make(map[string]struct{}), } + ix.postings[allPostingsKey] = []uint64{} return ix } @@ -63,7 +65,12 @@ func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) for _, lbl := range l { m.symbols[lbl.Name] = struct{}{} m.symbols[lbl.Value] = struct{}{} + if _, ok := m.postings[lbl]; !ok { + m.postings[lbl] = []uint64{} + } + m.postings[lbl] = append(m.postings[lbl], ref) } + m.postings[allPostingsKey] = append(m.postings[allPostingsKey], ref) s := series{l: l} // Actual chunk data is not stored in the index. @@ -86,19 +93,6 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error { return nil } -func (m mockIndex) WritePostings(name, value string, it Postings) error { - l := labels.Label{Name: name, Value: value} - if _, ok := m.postings[l]; ok { - return errors.Errorf("postings for %s already added", l) - } - ep, err := ExpandPostings(it) - if err != nil { - return err - } - m.postings[l] = ep - return nil -} - func (m mockIndex) Close() error { return nil } @@ -116,7 +110,7 @@ func (m mockIndex) Postings(name string, values ...string) (Postings, error) { p := []Postings{} for _, value := range values { l := labels.Label{Name: name, Value: value} - p = append(p, NewListPostings(m.postings[l])) + p = append(p, m.SortedPostings(NewListPostings(m.postings[l]))) } return Merge(p...), nil } @@ -162,7 +156,7 @@ func TestIndexRW_Create_Open(t *testing.T) { fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. - iw, err := NewWriter(fn) + iw, err := NewWriter(context.Background(), fn) testutil.Ok(t, err) testutil.Ok(t, iw.Close()) @@ -190,7 +184,7 @@ func TestIndexRW_Postings(t *testing.T) { fn := filepath.Join(dir, indexFilename) - iw, err := NewWriter(fn) + iw, err := NewWriter(context.Background(), fn) testutil.Ok(t, err) series := []labels.Labels{ @@ -217,7 +211,9 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, iw.AddSeries(3, series[2])) testutil.Ok(t, iw.AddSeries(4, series[3])) - err = iw.WritePostings("a", "1", newListPostings(1, 2, 3, 4)) + err = iw.WriteLabelIndex([]string{"a"}, []string{"1"}) + testutil.Ok(t, err) + err = iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"}) testutil.Ok(t, err) testutil.Ok(t, iw.Close()) @@ -252,7 +248,7 @@ func TestPostingsMany(t *testing.T) { fn := filepath.Join(dir, indexFilename) - iw, err := NewWriter(fn) + iw, err := NewWriter(context.Background(), fn) testutil.Ok(t, err) // Create a label in the index which has 999 values. @@ -271,9 +267,8 @@ func TestPostingsMany(t *testing.T) { for i, s := range series { testutil.Ok(t, iw.AddSeries(uint64(i), s)) } - for i, s := range series { - testutil.Ok(t, iw.WritePostings("i", s.Get("i"), newListPostings(uint64(i)))) - } + err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"}) + testutil.Ok(t, err) testutil.Ok(t, iw.Close()) ir, err := NewFileReader(fn) @@ -374,7 +369,7 @@ func TestPersistence_index_e2e(t *testing.T) { }) } - iw, err := NewWriter(filepath.Join(dir, indexFilename)) + iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename)) testutil.Ok(t, err) testutil.Ok(t, iw.AddSymbols(symbols)) @@ -414,20 +409,6 @@ func TestPersistence_index_e2e(t *testing.T) { testutil.Ok(t, mi.WriteLabelIndex([]string{k}, vals)) } - all := make([]uint64, len(lbls)) - for i := range all { - all[i] = uint64(i) - } - err = iw.WritePostings("", "", newListPostings(all...)) - testutil.Ok(t, err) - testutil.Ok(t, mi.WritePostings("", "", newListPostings(all...))) - - for _, l := range postings.SortedKeys() { - err := iw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)) - testutil.Ok(t, err) - mi.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)) - } - err = iw.Close() testutil.Ok(t, err) @@ -457,7 +438,7 @@ func TestPersistence_index_e2e(t *testing.T) { testutil.Equals(t, explset, lset) testutil.Equals(t, expchks, chks) } - testutil.Assert(t, expp.Next() == false, "Unexpected Next() for "+p.Name+" "+p.Value) + testutil.Assert(t, expp.Next() == false, "Expected no more postings for %q=%q", p.Name, p.Value) testutil.Ok(t, gotp.Err()) } diff --git a/tsdb/mocks_test.go b/tsdb/mocks_test.go index d9b1056563..c7bb422ed0 100644 --- a/tsdb/mocks_test.go +++ b/tsdb/mocks_test.go @@ -17,7 +17,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" ) @@ -62,9 +61,8 @@ func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunk return nil } -func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil } -func (mockIndexWriter) WritePostings(name, value string, it index.Postings) error { return nil } -func (mockIndexWriter) Close() error { return nil } +func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil } +func (mockIndexWriter) Close() error { return nil } type mockBReader struct { ir IndexReader