diff --git a/chunks/xor.go b/chunks/xor.go index 6e750c0971..c0b8e83cc2 100644 --- a/chunks/xor.go +++ b/chunks/xor.go @@ -16,11 +16,7 @@ type XORChunk struct { // NewXORChunk returns a new chunk with XOR encoding of the given size. func NewXORChunk() *XORChunk { b := make([]byte, 2, 128) - - return &XORChunk{ - b: &bstream{stream: b, count: 0}, - num: 0, - } + return &XORChunk{b: &bstream{stream: b, count: 0}} } func (c *XORChunk) Encoding() Encoding { @@ -29,11 +25,7 @@ func (c *XORChunk) Encoding() Encoding { // Bytes returns the underlying byte slice of the chunk. func (c *XORChunk) Bytes() []byte { - b := c.b.bytes() - // Lazily populate length bytes – probably not necessary to have the - // cache value in struct. - binary.BigEndian.PutUint16(b[:2], c.num) - return b + return c.b.bytes() } // Appender implements the Chunk interface. @@ -58,7 +50,7 @@ func (c *XORChunk) Appender() (Appender, error) { leading: it.leading, trailing: it.trailing, } - if c.num == 0 { + if binary.BigEndian.Uint16(a.b.bytes()) == 0 { a.leading = 0xff } return a, nil @@ -70,7 +62,7 @@ func (c *XORChunk) iterator() *xorIterator { // Could only copy data if the chunk is not completed yet. return &xorIterator{ br: newBReader(c.b.bytes()[2:]), - numTotal: c.num, + numTotal: binary.BigEndian.Uint16(c.b.bytes()), } } @@ -93,15 +85,16 @@ type xorAppender struct { func (a *xorAppender) Append(t int64, v float64) { var tDelta uint64 + num := binary.BigEndian.Uint16(a.b.bytes()) - if a.c.num == 0 { + if num == 0 { buf := make([]byte, binary.MaxVarintLen64) for _, b := range buf[:binary.PutVarint(buf, t)] { a.b.writeByte(b) } a.b.writeBits(math.Float64bits(v), 64) - } else if a.c.num == 1 { + } else if num == 1 { tDelta = uint64(t - a.t) buf := make([]byte, binary.MaxVarintLen64) @@ -139,7 +132,7 @@ func (a *xorAppender) Append(t int64, v float64) { a.t = t a.v = v - a.c.num++ + binary.BigEndian.PutUint16(a.b.bytes(), num+1) a.tDelta = tDelta } diff --git a/compact.go b/compact.go new file mode 100644 index 0000000000..3ef779361c --- /dev/null +++ b/compact.go @@ -0,0 +1,290 @@ +package tsdb + +import ( + "fmt" + "os" + "time" + + "github.com/fabxc/tsdb/labels" + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/timestamp" +) + +type compactor struct { + shard *Shard + logger log.Logger + + triggerc chan struct{} + donec chan struct{} +} + +func newCompactor(s *Shard, l log.Logger) (*compactor, error) { + c := &compactor{ + triggerc: make(chan struct{}, 1), + donec: make(chan struct{}), + shard: s, + logger: l, + } + go c.run() + + return c, nil +} + +func (c *compactor) trigger() { + select { + case c.triggerc <- struct{}{}: + default: + } +} + +func (c *compactor) run() { + for range c.triggerc { + if len(c.shard.persisted) < 2 { + continue + } + dir := fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now())) + + if err := c.compact(dir, c.shard.persisted[0], c.shard.persisted[1]); err != nil { + c.logger.Log("msg", "compaction failed", "err", err) + } + } + close(c.donec) +} + +func (c *compactor) close() error { + close(c.triggerc) + <-c.donec + return nil +} + +func (c *compactor) compact(dir string, a, b *persistedBlock) error { + if err := os.MkdirAll(dir, 0777); err != nil { + return err + } + + cf, err := os.Create(chunksFileName(dir)) + if err != nil { + return err + } + xf, err := os.Create(indexFileName(dir)) + if err != nil { + return err + } + + index := newIndexWriter(xf) + series := newSeriesWriter(cf, index) + + defer index.Close() + defer series.Close() + + aall, err := a.index.Postings("", "") + if err != nil { + return err + } + ball, err := b.index.Postings("", "") + if err != nil { + return err + } + + set, err := newCompactionMerger( + newCompactionSeriesSet(a.index, a.chunks, aall), + newCompactionSeriesSet(b.index, b.chunks, ball), + ) + if err != nil { + return err + } + + // We fully rebuild the postings list index from merged series. + var ( + postings = &memPostings{m: make(map[term][]uint32, 512)} + values = map[string]stringset{} + i = uint32(0) + ) + stats := BlockStats{ + MinTime: a.stats.MinTime, + MaxTime: b.stats.MaxTime, + SampleCount: a.stats.SampleCount + b.stats.SampleCount, + } + + for set.Next() { + lset, chunks := set.At() + if err := series.WriteSeries(i, lset, chunks); err != nil { + return err + } + + stats.ChunkCount += uint32(len(chunks)) + stats.SeriesCount++ + + for _, l := range lset { + valset, ok := values[l.Name] + if !ok { + valset = stringset{} + values[l.Name] = valset + } + valset.set(l.Value) + + postings.add(i, term{name: l.Name, value: l.Value}) + } + i++ + } + if set.Err() != nil { + return set.Err() + } + + if err := index.WriteStats(stats); err != nil { + return err + } + + s := make([]string, 0, 256) + for n, v := range values { + s = s[:0] + + for x := range v { + s = append(s, x) + } + if err := index.WriteLabelIndex([]string{n}, s); err != nil { + return err + } + } + + for t := range postings.m { + if err := index.WritePostings(t.name, t.value, postings.get(t)); err != nil { + return err + } + } + // Write a postings list containing all series. + all := make([]uint32, i) + for i := range all { + all[i] = uint32(i) + } + if err := index.WritePostings("", "", newListPostings(all)); err != nil { + return err + } + + return nil +} + +type compactionSeriesSet struct { + p Postings + index IndexReader + series SeriesReader + + l labels.Labels + c []ChunkMeta + err error +} + +func newCompactionSeriesSet(i IndexReader, s SeriesReader, p Postings) *compactionSeriesSet { + return &compactionSeriesSet{ + index: i, + series: s, + p: p, + } +} + +func (c *compactionSeriesSet) Next() bool { + if !c.p.Next() { + return false + } + + c.l, c.c, c.err = c.index.Series(c.p.Value()) + if c.err != nil { + return false + } + for i := range c.c { + chk := &c.c[i] + + chk.Chunk, c.err = c.series.Chunk(chk.Ref) + if c.err != nil { + return false + } + } + + return true +} + +func (c *compactionSeriesSet) Err() error { + if c.err != nil { + return c.err + } + return c.p.Err() +} + +func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta) { + return c.l, c.c +} + +type compactionMerger struct { + a, b *compactionSeriesSet + + adone, bdone bool + l labels.Labels + c []ChunkMeta +} + +type compactionSeries struct { + labels labels.Labels + chunks []ChunkMeta +} + +func newCompactionMerger(a, b *compactionSeriesSet) (*compactionMerger, error) { + c := &compactionMerger{ + a: a, + b: b, + } + // Initialize first elements of both sets as Next() needs + // one element look-ahead. + c.adone = !c.a.Next() + c.bdone = !c.b.Next() + + return c, c.Err() +} + +func (c *compactionMerger) compare() int { + if c.adone { + return 1 + } + if c.bdone { + return -1 + } + a, _ := c.a.At() + b, _ := c.b.At() + return labels.Compare(a, b) +} + +func (c *compactionMerger) Next() bool { + if c.adone && c.bdone || c.Err() != nil { + return false + } + + d := c.compare() + // Both sets contain the current series. Chain them into a single one. + if d > 0 { + c.l, c.c = c.b.At() + c.bdone = !c.b.Next() + } else if d < 0 { + c.l, c.c = c.a.At() + c.adone = !c.a.Next() + } else { + l, ca := c.a.At() + _, cb := c.b.At() + + c.l = l + c.c = append(ca, cb...) + + c.adone = !c.a.Next() + c.bdone = !c.b.Next() + } + return true +} + +func (c *compactionMerger) Err() error { + if c.a.Err() != nil { + return c.a.Err() + } + return c.b.Err() +} + +func (c *compactionMerger) At() (labels.Labels, []ChunkMeta) { + return c.l, c.c +} diff --git a/db.go b/db.go index 1521b97604..07da446155 100644 --- a/db.go +++ b/db.go @@ -44,7 +44,7 @@ type DB struct { // TODO(fabxc): make configurable const ( - shardShift = 2 + shardShift = 0 numShards = 1 << shardShift maxChunkSize = 1024 ) @@ -83,8 +83,6 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { c.shards = append(c.shards, s) } - // TODO(fabxc): run background compaction + GC. - return c, nil } @@ -187,6 +185,7 @@ type Shard struct { mtx sync.RWMutex persisted persistedBlocks head *HeadBlock + compactor *compactor } type shardMetrics struct { @@ -265,6 +264,11 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) { head: head, persisted: pbs, } + s.compactor, err = newCompactor(s, logger) + if err != nil { + return nil, err + } + return s, nil } @@ -275,6 +279,8 @@ func (s *Shard) Close() error { var e MultiError + e.Add(s.compactor.close()) + for _, pb := range s.persisted { e.Add(pb.Close()) } @@ -402,6 +408,8 @@ func (s *Shard) persist() error { s.persisted = append(s.persisted, pb) s.mtx.Unlock() + s.compactor.trigger() + return nil } @@ -474,9 +482,11 @@ func (es MultiError) Err() error { } func yoloString(b []byte) string { + sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + h := reflect.StringHeader{ - Data: uintptr(unsafe.Pointer(&b[0])), - Len: len(b), + Data: sh.Data, + Len: sh.Len, } return *((*string)(unsafe.Pointer(&h))) } diff --git a/reader.go b/reader.go index 5a429d37b4..06a0cab77e 100644 --- a/reader.go +++ b/reader.go @@ -191,8 +191,9 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) { if end > len(r.b) { return "", fmt.Errorf("invalid length") } + b := r.b[int(o)+n : end] - return yoloString(r.b[int(o)+n : end]), nil + return yoloString(b), nil } func (r *indexReader) Stats() (BlockStats, error) { diff --git a/writer.go b/writer.go index e3f24f370c..a05d43586a 100644 --- a/writer.go +++ b/writer.go @@ -424,7 +424,11 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { var refs []uint32 for it.Next() { - refs = append(refs, w.series[it.Value()].offset) + s, ok := w.series[it.Value()] + if !ok { + return errors.Errorf("series for reference %d not found", it.Value()) + } + refs = append(refs, s.offset) } if err := it.Err(); err != nil { return err