From 5d75a3dc7ba5a1401a2c5806007e7cef7ccb8818 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 2 Jan 2017 10:34:55 +0100 Subject: [PATCH] Add basic compaction This adds a basic compactor that will merge two persisted blocks into one. It simply fully rewrites the index and concatenates the chunk lists. It just writes into the current working dir and doesn't properly handle which blocks to compact for now. --- chunks/xor.go | 23 ++-- compact.go | 290 ++++++++++++++++++++++++++++++++++++++++++++++++++ db.go | 20 +++- reader.go | 3 +- writer.go | 6 +- 5 files changed, 320 insertions(+), 22 deletions(-) create mode 100644 compact.go diff --git a/chunks/xor.go b/chunks/xor.go index 6e750c0971..c0b8e83cc2 100644 --- a/chunks/xor.go +++ b/chunks/xor.go @@ -16,11 +16,7 @@ type XORChunk struct { // NewXORChunk returns a new chunk with XOR encoding of the given size. func NewXORChunk() *XORChunk { b := make([]byte, 2, 128) - - return &XORChunk{ - b: &bstream{stream: b, count: 0}, - num: 0, - } + return &XORChunk{b: &bstream{stream: b, count: 0}} } func (c *XORChunk) Encoding() Encoding { @@ -29,11 +25,7 @@ func (c *XORChunk) Encoding() Encoding { // Bytes returns the underlying byte slice of the chunk. func (c *XORChunk) Bytes() []byte { - b := c.b.bytes() - // Lazily populate length bytes – probably not necessary to have the - // cache value in struct. - binary.BigEndian.PutUint16(b[:2], c.num) - return b + return c.b.bytes() } // Appender implements the Chunk interface. @@ -58,7 +50,7 @@ func (c *XORChunk) Appender() (Appender, error) { leading: it.leading, trailing: it.trailing, } - if c.num == 0 { + if binary.BigEndian.Uint16(a.b.bytes()) == 0 { a.leading = 0xff } return a, nil @@ -70,7 +62,7 @@ func (c *XORChunk) iterator() *xorIterator { // Could only copy data if the chunk is not completed yet. return &xorIterator{ br: newBReader(c.b.bytes()[2:]), - numTotal: c.num, + numTotal: binary.BigEndian.Uint16(c.b.bytes()), } } @@ -93,15 +85,16 @@ type xorAppender struct { func (a *xorAppender) Append(t int64, v float64) { var tDelta uint64 + num := binary.BigEndian.Uint16(a.b.bytes()) - if a.c.num == 0 { + if num == 0 { buf := make([]byte, binary.MaxVarintLen64) for _, b := range buf[:binary.PutVarint(buf, t)] { a.b.writeByte(b) } a.b.writeBits(math.Float64bits(v), 64) - } else if a.c.num == 1 { + } else if num == 1 { tDelta = uint64(t - a.t) buf := make([]byte, binary.MaxVarintLen64) @@ -139,7 +132,7 @@ func (a *xorAppender) Append(t int64, v float64) { a.t = t a.v = v - a.c.num++ + binary.BigEndian.PutUint16(a.b.bytes(), num+1) a.tDelta = tDelta } diff --git a/compact.go b/compact.go new file mode 100644 index 0000000000..3ef779361c --- /dev/null +++ b/compact.go @@ -0,0 +1,290 @@ +package tsdb + +import ( + "fmt" + "os" + "time" + + "github.com/fabxc/tsdb/labels" + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/timestamp" +) + +type compactor struct { + shard *Shard + logger log.Logger + + triggerc chan struct{} + donec chan struct{} +} + +func newCompactor(s *Shard, l log.Logger) (*compactor, error) { + c := &compactor{ + triggerc: make(chan struct{}, 1), + donec: make(chan struct{}), + shard: s, + logger: l, + } + go c.run() + + return c, nil +} + +func (c *compactor) trigger() { + select { + case c.triggerc <- struct{}{}: + default: + } +} + +func (c *compactor) run() { + for range c.triggerc { + if len(c.shard.persisted) < 2 { + continue + } + dir := fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now())) + + if err := c.compact(dir, c.shard.persisted[0], c.shard.persisted[1]); err != nil { + c.logger.Log("msg", "compaction failed", "err", err) + } + } + close(c.donec) +} + +func (c *compactor) close() error { + close(c.triggerc) + <-c.donec + return nil +} + +func (c *compactor) compact(dir string, a, b *persistedBlock) error { + if err := os.MkdirAll(dir, 0777); err != nil { + return err + } + + cf, err := os.Create(chunksFileName(dir)) + if err != nil { + return err + } + xf, err := os.Create(indexFileName(dir)) + if err != nil { + return err + } + + index := newIndexWriter(xf) + series := newSeriesWriter(cf, index) + + defer index.Close() + defer series.Close() + + aall, err := a.index.Postings("", "") + if err != nil { + return err + } + ball, err := b.index.Postings("", "") + if err != nil { + return err + } + + set, err := newCompactionMerger( + newCompactionSeriesSet(a.index, a.chunks, aall), + newCompactionSeriesSet(b.index, b.chunks, ball), + ) + if err != nil { + return err + } + + // We fully rebuild the postings list index from merged series. + var ( + postings = &memPostings{m: make(map[term][]uint32, 512)} + values = map[string]stringset{} + i = uint32(0) + ) + stats := BlockStats{ + MinTime: a.stats.MinTime, + MaxTime: b.stats.MaxTime, + SampleCount: a.stats.SampleCount + b.stats.SampleCount, + } + + for set.Next() { + lset, chunks := set.At() + if err := series.WriteSeries(i, lset, chunks); err != nil { + return err + } + + stats.ChunkCount += uint32(len(chunks)) + stats.SeriesCount++ + + for _, l := range lset { + valset, ok := values[l.Name] + if !ok { + valset = stringset{} + values[l.Name] = valset + } + valset.set(l.Value) + + postings.add(i, term{name: l.Name, value: l.Value}) + } + i++ + } + if set.Err() != nil { + return set.Err() + } + + if err := index.WriteStats(stats); err != nil { + return err + } + + s := make([]string, 0, 256) + for n, v := range values { + s = s[:0] + + for x := range v { + s = append(s, x) + } + if err := index.WriteLabelIndex([]string{n}, s); err != nil { + return err + } + } + + for t := range postings.m { + if err := index.WritePostings(t.name, t.value, postings.get(t)); err != nil { + return err + } + } + // Write a postings list containing all series. + all := make([]uint32, i) + for i := range all { + all[i] = uint32(i) + } + if err := index.WritePostings("", "", newListPostings(all)); err != nil { + return err + } + + return nil +} + +type compactionSeriesSet struct { + p Postings + index IndexReader + series SeriesReader + + l labels.Labels + c []ChunkMeta + err error +} + +func newCompactionSeriesSet(i IndexReader, s SeriesReader, p Postings) *compactionSeriesSet { + return &compactionSeriesSet{ + index: i, + series: s, + p: p, + } +} + +func (c *compactionSeriesSet) Next() bool { + if !c.p.Next() { + return false + } + + c.l, c.c, c.err = c.index.Series(c.p.Value()) + if c.err != nil { + return false + } + for i := range c.c { + chk := &c.c[i] + + chk.Chunk, c.err = c.series.Chunk(chk.Ref) + if c.err != nil { + return false + } + } + + return true +} + +func (c *compactionSeriesSet) Err() error { + if c.err != nil { + return c.err + } + return c.p.Err() +} + +func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta) { + return c.l, c.c +} + +type compactionMerger struct { + a, b *compactionSeriesSet + + adone, bdone bool + l labels.Labels + c []ChunkMeta +} + +type compactionSeries struct { + labels labels.Labels + chunks []ChunkMeta +} + +func newCompactionMerger(a, b *compactionSeriesSet) (*compactionMerger, error) { + c := &compactionMerger{ + a: a, + b: b, + } + // Initialize first elements of both sets as Next() needs + // one element look-ahead. + c.adone = !c.a.Next() + c.bdone = !c.b.Next() + + return c, c.Err() +} + +func (c *compactionMerger) compare() int { + if c.adone { + return 1 + } + if c.bdone { + return -1 + } + a, _ := c.a.At() + b, _ := c.b.At() + return labels.Compare(a, b) +} + +func (c *compactionMerger) Next() bool { + if c.adone && c.bdone || c.Err() != nil { + return false + } + + d := c.compare() + // Both sets contain the current series. Chain them into a single one. + if d > 0 { + c.l, c.c = c.b.At() + c.bdone = !c.b.Next() + } else if d < 0 { + c.l, c.c = c.a.At() + c.adone = !c.a.Next() + } else { + l, ca := c.a.At() + _, cb := c.b.At() + + c.l = l + c.c = append(ca, cb...) + + c.adone = !c.a.Next() + c.bdone = !c.b.Next() + } + return true +} + +func (c *compactionMerger) Err() error { + if c.a.Err() != nil { + return c.a.Err() + } + return c.b.Err() +} + +func (c *compactionMerger) At() (labels.Labels, []ChunkMeta) { + return c.l, c.c +} diff --git a/db.go b/db.go index 1521b97604..07da446155 100644 --- a/db.go +++ b/db.go @@ -44,7 +44,7 @@ type DB struct { // TODO(fabxc): make configurable const ( - shardShift = 2 + shardShift = 0 numShards = 1 << shardShift maxChunkSize = 1024 ) @@ -83,8 +83,6 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { c.shards = append(c.shards, s) } - // TODO(fabxc): run background compaction + GC. - return c, nil } @@ -187,6 +185,7 @@ type Shard struct { mtx sync.RWMutex persisted persistedBlocks head *HeadBlock + compactor *compactor } type shardMetrics struct { @@ -265,6 +264,11 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) { head: head, persisted: pbs, } + s.compactor, err = newCompactor(s, logger) + if err != nil { + return nil, err + } + return s, nil } @@ -275,6 +279,8 @@ func (s *Shard) Close() error { var e MultiError + e.Add(s.compactor.close()) + for _, pb := range s.persisted { e.Add(pb.Close()) } @@ -402,6 +408,8 @@ func (s *Shard) persist() error { s.persisted = append(s.persisted, pb) s.mtx.Unlock() + s.compactor.trigger() + return nil } @@ -474,9 +482,11 @@ func (es MultiError) Err() error { } func yoloString(b []byte) string { + sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + h := reflect.StringHeader{ - Data: uintptr(unsafe.Pointer(&b[0])), - Len: len(b), + Data: sh.Data, + Len: sh.Len, } return *((*string)(unsafe.Pointer(&h))) } diff --git a/reader.go b/reader.go index 5a429d37b4..06a0cab77e 100644 --- a/reader.go +++ b/reader.go @@ -191,8 +191,9 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) { if end > len(r.b) { return "", fmt.Errorf("invalid length") } + b := r.b[int(o)+n : end] - return yoloString(r.b[int(o)+n : end]), nil + return yoloString(b), nil } func (r *indexReader) Stats() (BlockStats, error) { diff --git a/writer.go b/writer.go index e3f24f370c..a05d43586a 100644 --- a/writer.go +++ b/writer.go @@ -424,7 +424,11 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { var refs []uint32 for it.Next() { - refs = append(refs, w.series[it.Value()].offset) + s, ok := w.series[it.Value()] + if !ok { + return errors.Errorf("series for reference %d not found", it.Value()) + } + refs = append(refs, s.offset) } if err := it.Err(); err != nil { return err