diff --git a/block.go b/block.go index bbb2facee..48855308f 100644 --- a/block.go +++ b/block.go @@ -8,6 +8,7 @@ import ( "sort" "strconv" + "github.com/bradfitz/slice" "github.com/coreos/etcd/pkg/fileutil" "github.com/pkg/errors" ) @@ -15,11 +16,11 @@ import ( // Block handles reads against a block of time series data. type block interface { dir() string - // stats() BlockStats + stats() BlockStats interval() (int64, int64) index() IndexReader series() SeriesReader - // persisted() bool + persisted() bool } type BlockStats struct { @@ -36,8 +37,8 @@ const ( ) type persistedBlock struct { - d string - stats BlockStats + d string + bstats BlockStats chunksf, indexf *mmapFile @@ -51,25 +52,25 @@ func newPersistedBlock(p string) (*persistedBlock, error) { // mmap files belonging to the block. chunksf, err := openMmapFile(chunksFileName(p)) if err != nil { - return nil, err + return nil, errors.Wrap(err, "open chunk file") } indexf, err := openMmapFile(indexFileName(p)) if err != nil { - return nil, err + return nil, errors.Wrap(err, "open index file") } sr, err := newSeriesReader(chunksf.b) if err != nil { - return nil, err + return nil, errors.Wrap(err, "create series reader") } ir, err := newIndexReader(sr, indexf.b) if err != nil { - return nil, err + return nil, errors.Wrap(err, "create index reader") } stats, err := ir.Stats() if err != nil { - return nil, err + return nil, errors.Wrap(err, "read stats") } pb := &persistedBlock{ @@ -78,7 +79,7 @@ func newPersistedBlock(p string) (*persistedBlock, error) { indexf: indexf, chunkr: sr, indexr: ir, - stats: stats, + bstats: stats, } return pb, nil } @@ -94,44 +95,40 @@ func (pb *persistedBlock) Close() error { } func (pb *persistedBlock) dir() string { return pb.d } +func (pb *persistedBlock) persisted() bool { return true } func (pb *persistedBlock) index() IndexReader { return pb.indexr } func (pb *persistedBlock) series() SeriesReader { return pb.chunkr } +func (pb *persistedBlock) stats() BlockStats { return pb.bstats } func (pb *persistedBlock) interval() (int64, int64) { - return pb.stats.MinTime, pb.stats.MaxTime + return pb.bstats.MinTime, pb.bstats.MaxTime } -type persistedBlocks []*persistedBlock - -func (p persistedBlocks) Len() int { return len(p) } -func (p persistedBlocks) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func (p persistedBlocks) Less(i, j int) bool { return p[i].stats.MinTime < p[j].stats.MinTime } - // findBlocks finds time-ordered persisted blocks within a directory. -func findBlocks(path string) ([]*persistedBlock, *HeadBlock, error) { - var pbs persistedBlocks +func findBlocks(path string) ([]*persistedBlock, []*HeadBlock, error) { + var ( + pbs []*persistedBlock + heads []*HeadBlock + ) files, err := ioutil.ReadDir(path) if err != nil { return nil, nil, err } - var head *HeadBlock for _, fi := range files { p := filepath.Join(path, fi.Name()) if _, err := os.Stat(chunksFileName(p)); os.IsNotExist(err) { - if head != nil { - return nil, nil, errors.Errorf("found two head blocks") - } ts, err := strconv.Atoi(filepath.Base(p)) if err != nil { return nil, nil, errors.Errorf("invalid directory name") } - head, err = OpenHeadBlock(p, int64(ts)) + head, err := OpenHeadBlock(p, int64(ts)) if err != nil { return nil, nil, err } + heads = append(heads, head) continue } @@ -144,9 +141,9 @@ func findBlocks(path string) ([]*persistedBlock, *HeadBlock, error) { // Order blocks by their base time so they represent a continous // range of time. - sort.Sort(pbs) + slice.Sort(pbs, func(i, j int) bool { return pbs[i].bstats.MinTime < pbs[j].bstats.MinTime }) - return pbs, head, nil + return pbs, heads, nil } func chunksFileName(path string) string { diff --git a/compact.go b/compact.go index 819778ea1..5b207fc34 100644 --- a/compact.go +++ b/compact.go @@ -1,34 +1,80 @@ package tsdb import ( + "fmt" + "math" "os" "path/filepath" + "sync" + "time" "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" ) type compactor struct { - shard *Shard - blocks compactableBlocks - logger log.Logger + metrics *compactorMetrics + blocks compactableBlocks + logger log.Logger triggerc chan struct{} donec chan struct{} } -type compactableBlocks interface { - compactable() []block - set([]block) +type compactorMetrics struct { + triggered prometheus.Counter + ran prometheus.Counter + failed prometheus.Counter + duration prometheus.Histogram } -func newCompactor(s *Shard, l log.Logger) (*compactor, error) { +func newCompactorMetrics(i int) *compactorMetrics { + shardLabel := prometheus.Labels{ + "shard": fmt.Sprintf("%d", i), + } + + m := &compactorMetrics{} + + m.triggered = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_shard_compactions_triggered_total", + Help: "Total number of triggered compactions for the shard.", + ConstLabels: shardLabel, + }) + m.ran = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_shard_compactions_total", + Help: "Total number of compactions that were executed for the shard.", + ConstLabels: shardLabel, + }) + m.failed = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_shard_compactions_failed_total", + Help: "Total number of compactions that failed for the shard.", + ConstLabels: shardLabel, + }) + m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "tsdb_shard_compaction_duration", + Help: "Duration of compaction runs.", + ConstLabels: shardLabel, + }) + + return m +} + +type compactableBlocks interface { + lock() sync.Locker + compactable() []block + reinit(dir string) error +} + +func newCompactor(i int, blocks compactableBlocks, l log.Logger) (*compactor, error) { c := &compactor{ triggerc: make(chan struct{}, 1), donec: make(chan struct{}), - shard: s, logger: l, + blocks: blocks, + metrics: newCompactorMetrics(i), } go c.run() @@ -44,35 +90,59 @@ func (c *compactor) trigger() { func (c *compactor) run() { for range c.triggerc { - // continue - // bs := c.blocks.get() + c.metrics.triggered.Inc() - // if len(bs) < 2 { - // continue - // } + bs := c.pick() + if len(bs) == 0 { + continue + } - // var ( - // dir = fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now())) - // a = bs[0] - // b = bs[1] - // ) + start := time.Now() + err := c.compact(bs...) - // c.blocks.Lock() + c.metrics.ran.Inc() + c.metrics.duration.Observe(time.Since(start).Seconds()) - // if err := persist(dir, func(indexw IndexWriter, chunkw SeriesWriter) error { - // return c.compact(indexw, chunkw, a, b) - // }); err != nil { - // c.logger.Log("msg", "compaction failed", "err", err) - // continue - // } + if err != nil { + c.logger.Log("msg", "compaction failed", "err", err) + c.metrics.failed.Inc() + continue + } - // c.blocks.Unlock() + // Drain channel of signals triggered during compaction. + select { + case <-c.triggerc: + default: + } } close(c.donec) } func (c *compactor) pick() []block { - return nil + bs := c.blocks.compactable() + if len(bs) == 0 { + return nil + } + + if !bs[len(bs)-1].persisted() { + // TODO(fabxc): double check scoring function here or only do it here + // and trigger every X scrapes? + return bs[len(bs)-1:] + } + + candidate := []block{} + trange := int64(math.MaxInt64) + + for i, b := range bs[:len(bs)-1] { + r := bs[i+1].stats().MaxTime - b.stats().MinTime + + if r < trange { + trange = r + candidate = bs[i : i+1] + } + } + + return candidate } func (c *compactor) Close() error { @@ -81,31 +151,96 @@ func (c *compactor) Close() error { return nil } -func (c *compactor) compact(indexw IndexWriter, chunkw SeriesWriter, a, b block) error { - aall, err := a.index().Postings("", "") - if err != nil { - return err +func mergeStats(blocks ...block) (res BlockStats) { + res.MinTime = blocks[0].stats().MinTime + res.MaxTime = blocks[len(blocks)-1].stats().MaxTime + + for _, b := range blocks { + res.SampleCount += b.stats().SampleCount } - ball, err := b.index().Postings("", "") - if err != nil { + return res +} + +func (c *compactor) compact(blocks ...block) error { + tmpdir := blocks[0].dir() + ".tmp" + + // Write to temporary directory to make persistence appear atomic. + if fileutil.Exist(tmpdir) { + if err := os.RemoveAll(tmpdir); err != nil { + return err + } + } + if err := fileutil.CreateDirAll(tmpdir); err != nil { return err } - set, err := newCompactionMerger( - newCompactionSeriesSet(a.index(), a.series(), aall), - newCompactionSeriesSet(b.index(), b.series(), ball), - ) + chunkf, err := fileutil.LockFile(chunksFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { - return err + return errors.Wrap(err, "create chunk file") + } + indexf, err := fileutil.LockFile(indexFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return errors.Wrap(err, "create index file") } - astats, err := a.index().Stats() - if err != nil { - return err + indexw := newIndexWriter(indexf) + chunkw := newSeriesWriter(chunkf, indexw) + + if err := c.write(blocks, indexw, chunkw); err != nil { + return errors.Wrap(err, "write compaction") } - bstats, err := a.index().Stats() - if err != nil { - return err + + if err := chunkw.Close(); err != nil { + return errors.Wrap(err, "close chunk writer") + } + if err := indexw.Close(); err != nil { + return errors.Wrap(err, "close index writer") + } + if err := fileutil.Fsync(chunkf.File); err != nil { + return errors.Wrap(err, "fsync chunk file") + } + if err := fileutil.Fsync(indexf.File); err != nil { + return errors.Wrap(err, "fsync index file") + } + if err := chunkf.Close(); err != nil { + return errors.Wrap(err, "close chunk file") + } + if err := indexf.Close(); err != nil { + return errors.Wrap(err, "close index file") + } + + c.blocks.lock().Lock() + defer c.blocks.lock().Unlock() + + if err := renameDir(tmpdir, blocks[0].dir()); err != nil { + return errors.Wrap(err, "rename dir") + } + + var merr MultiError + + for _, b := range blocks { + merr.Add(errors.Wrapf(c.blocks.reinit(b.dir()), "reinit block at %q", b.dir())) + } + return merr.Err() +} + +func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWriter) error { + var set compactionSet + for i, b := range blocks { + all, err := b.index().Postings("", "") + if err != nil { + return err + } + s := newCompactionSeriesSet(b.index(), b.series(), all) + + if i == 0 { + set = s + continue + } + set, err = newCompactionMerger(set, s) + if err != nil { + return err + } } // We fully rebuild the postings list index from merged series. @@ -113,12 +248,8 @@ func (c *compactor) compact(indexw IndexWriter, chunkw SeriesWriter, a, b block) postings = &memPostings{m: make(map[term][]uint32, 512)} values = map[string]stringset{} i = uint32(0) + stats = mergeStats(blocks...) ) - stats := BlockStats{ - MinTime: astats.MinTime, - MaxTime: bstats.MaxTime, - SampleCount: astats.SampleCount + bstats.SampleCount, - } for set.Next() { lset, chunks := set.At() @@ -174,10 +305,15 @@ func (c *compactor) compact(indexw IndexWriter, chunkw SeriesWriter, a, b block) if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { return err } - return nil } +type compactionSet interface { + Next() bool + At() (labels.Labels, []ChunkMeta) + Err() error +} + type compactionSeriesSet struct { p Postings index IndexReader @@ -229,7 +365,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta) { } type compactionMerger struct { - a, b *compactionSeriesSet + a, b compactionSet adone, bdone bool l labels.Labels @@ -241,7 +377,7 @@ type compactionSeries struct { chunks []ChunkMeta } -func newCompactionMerger(a, b *compactionSeriesSet) (*compactionMerger, error) { +func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { c := &compactionMerger{ a: a, b: b, diff --git a/db.go b/db.go index 70830ebb3..2491ce145 100644 --- a/db.go +++ b/db.go @@ -14,9 +14,11 @@ import ( "golang.org/x/sync/errgroup" + "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -180,12 +182,12 @@ type Shard struct { metrics *shardMetrics mtx sync.RWMutex - persisted persistedBlocks - head *HeadBlock + persisted []*persistedBlock + heads []*HeadBlock compactor *compactor - donec chan struct{} - persistc chan struct{} + donec chan struct{} + cutc chan struct{} } type shardMetrics struct { @@ -199,24 +201,24 @@ func newShardMetrics(r prometheus.Registerer, i int) *shardMetrics { "shard": fmt.Sprintf("%d", i), } - m := &shardMetrics{ - persistences: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_shard_persistences_total", - Help: "Total number of head persistances that ran so far.", - ConstLabels: shardLabel, - }), - persistenceDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_shard_persistence_duration_seconds", - Help: "Duration of persistences in seconds.", - ConstLabels: shardLabel, - Buckets: prometheus.ExponentialBuckets(0.25, 2, 5), - }), - samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_shard_samples_appended_total", - Help: "Total number of appended samples for the shard.", - ConstLabels: shardLabel, - }), - } + m := &shardMetrics{} + + m.persistences = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_shard_persistences_total", + Help: "Total number of head persistances that ran so far.", + ConstLabels: shardLabel, + }) + m.persistenceDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "tsdb_shard_persistence_duration_seconds", + Help: "Duration of persistences in seconds.", + ConstLabels: shardLabel, + Buckets: prometheus.ExponentialBuckets(0.25, 2, 5), + }) + m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "tsdb_shard_samples_appended_total", + Help: "Total number of appended samples for the shard.", + ConstLabels: shardLabel, + }) if r != nil { r.MustRegister( @@ -238,33 +240,34 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) { } // Initialize previously persisted blocks. - pbs, head, err := findBlocks(path) + persisted, heads, err := findBlocks(path) if err != nil { return nil, err } // TODO(fabxc): get time from client-defined `now` function. baset := time.Unix(0, 0).UnixNano() / int64(time.Millisecond) - if len(pbs) > 0 { - baset = pbs[len(pbs)-1].stats.MaxTime + if len(persisted) > 0 { + baset = persisted[len(persisted)-1].bstats.MaxTime } - if head == nil { - head, err = OpenHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset) + if len(heads) == 0 { + head, err := OpenHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset) if err != nil { return nil, err } + heads = []*HeadBlock{head} } s := &Shard{ path: path, logger: logger, metrics: newShardMetrics(prometheus.DefaultRegisterer, i), - head: head, - persisted: pbs, - persistc: make(chan struct{}, 1), + heads: heads, + persisted: persisted, + cutc: make(chan struct{}, 1), donec: make(chan struct{}), } - if s.compactor, err = newCompactor(s, logger); err != nil { + if s.compactor, err = newCompactor(i, s, logger); err != nil { return nil, err } go s.run() @@ -273,22 +276,29 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) { } func (s *Shard) run() { - for range s.persistc { - start := time.Now() + for range s.cutc { + // if err := s.cut(); err != nil { + // s.logger.Log("msg", "cut error", "err", err) + // } + // select { + // case <-s.cutc: + // default: + // } + // start := time.Now() - if err := s.persist(); err != nil { - s.logger.Log("msg", "persistence error", "err", err) - } + // if err := s.persist(); err != nil { + // s.logger.Log("msg", "persistence error", "err", err) + // } - s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) - s.metrics.persistences.Inc() + // s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) + // s.metrics.persistences.Inc() } close(s.donec) } // Close the shard. func (s *Shard) Close() error { - close(s.persistc) + close(s.cutc) <-s.donec var merr MultiError @@ -300,7 +310,9 @@ func (s *Shard) Close() error { for _, pb := range s.persisted { merr.Add(pb.Close()) } - merr.Add(s.head.Close()) + for _, hb := range s.heads { + merr.Add(hb.Close()) + } return merr.Err() } @@ -312,25 +324,118 @@ func (s *Shard) appendBatch(samples []hashedSample) error { s.mtx.Lock() defer s.mtx.Unlock() + head := s.heads[len(s.heads)-1] + // TODO(fabxc): distinguish samples between concurrent heads for // different time blocks. Those may occurr during transition to still // allow late samples to arrive for a previous block. - err := s.head.appendBatch(samples) + err := head.appendBatch(samples) if err == nil { s.metrics.samplesAppended.Add(float64(len(samples))) } // TODO(fabxc): randomize over time and use better scoring function. - if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 400 { - select { - case s.persistc <- struct{}{}: - default: + if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 400 { + if err := s.cut(); err != nil { + s.logger.Log("msg", "cut failed", "err", err) } } return err } +func (s *Shard) lock() sync.Locker { + return &s.mtx +} + +func (s *Shard) headForDir(dir string) (int, bool) { + for i, b := range s.heads { + if b.dir() == dir { + return i, true + } + } + return -1, false +} + +func (s *Shard) persistedForDir(dir string) (int, bool) { + for i, b := range s.persisted { + if b.dir() == dir { + return i, true + } + } + return -1, false +} + +func (s *Shard) reinit(dir string) error { + if !fileutil.Exist(dir) { + if i, ok := s.headForDir(dir); ok { + if err := s.heads[i].Close(); err != nil { + return err + } + s.heads = append(s.heads[:i], s.heads[i+1:]...) + } + if i, ok := s.persistedForDir(dir); ok { + if err := s.persisted[i].Close(); err != nil { + return err + } + s.persisted = append(s.persisted[:i], s.persisted[i+1:]...) + } + return nil + } + + // If a block dir has to be reinitialized and it wasn't a deletion, + // it has to be a newly persisted or compacted one. + if !fileutil.Exist(chunksFileName(dir)) { + return errors.New("no chunk file for new block dir") + } + + // Remove a previous head block. + if i, ok := s.headForDir(dir); ok { + if err := s.heads[i].Close(); err != nil { + return err + } + s.heads = append(s.heads[:i], s.heads[i+1:]...) + } + // Close an old persisted block. + i, ok := s.persistedForDir(dir) + if ok { + if err := s.persisted[i].Close(); err != nil { + return err + } + } + pb, err := newPersistedBlock(dir) + if err != nil { + return errors.Wrap(err, "open persisted block") + } + if i >= 0 { + s.persisted[i] = pb + } else { + s.persisted = append(s.persisted, pb) + } + + return nil +} + +func (s *Shard) compactable() []block { + var blocks []block + for _, pb := range s.persisted { + blocks = append(blocks, pb) + } + + // threshold := s.heads[len(s.heads)-1].bstats.MaxTime - headGracePeriod + + // for _, hb := range s.heads { + // if hb.bstats.MaxTime < threshold { + // blocks = append(blocks, hb) + // } + // } + for _, hb := range s.heads[:len(s.heads)-1] { + blocks = append(blocks, hb) + } + + return blocks +} + func intervalOverlap(amin, amax, bmin, bmax int64) bool { if bmin >= amin && bmin <= amax { return true @@ -357,57 +462,75 @@ func (s *Shard) blocksForInterval(mint, maxt int64) []block { bs = append(bs, b) } } + for _, b := range s.heads { + bmin, bmax := b.interval() - hmin, hmax := s.head.interval() - - if intervalOverlap(mint, maxt, hmin, hmax) { - bs = append(bs, s.head) + if intervalOverlap(mint, maxt, bmin, bmax) { + bs = append(bs, b) + } } return bs } // TODO(fabxc): make configurable. -const shardGracePeriod = 60 * 1000 // 60 seconds for millisecond scale - -func (s *Shard) persist() error { - s.mtx.Lock() +const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale +// cut starts a new head block to append to. The completed head block +// will still be appendable for the configured grace period. +func (s *Shard) cut() error { // Set new head block. - head := s.head - newHead, err := OpenHeadBlock(filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MaxTime)), head.stats.MaxTime) - if err != nil { - s.mtx.Unlock() - return err - } - s.head = newHead + head := s.heads[len(s.heads)-1] - s.mtx.Unlock() - - // TODO(fabxc): add grace period where we can still append to old head shard - // before actually persisting it. - dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime)) - - if err := persist(dir, head.persist); err != nil { - return err - } - s.logger.Log("samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head") - - // Reopen block as persisted block for querying. - pb, err := newPersistedBlock(dir) + newHead, err := OpenHeadBlock(filepath.Join(s.path, fmt.Sprintf("%d", head.bstats.MaxTime)), head.bstats.MaxTime) if err != nil { return err } - - s.mtx.Lock() - s.persisted = append(s.persisted, pb) - s.mtx.Unlock() + s.heads = append(s.heads, newHead) s.compactor.trigger() return nil } +// func (s *Shard) persist() error { +// s.mtx.Lock() + +// // Set new head block. +// head := s.head +// newHead, err := OpenHeadBlock(filepath.Join(s.path, fmt.Sprintf("%d", head.bstats.MaxTime)), head.bstats.MaxTime) +// if err != nil { +// s.mtx.Unlock() +// return err +// } +// s.head = newHead + +// s.mtx.Unlock() + +// // TODO(fabxc): add grace period where we can still append to old head shard +// // before actually persisting it. +// dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime)) + +// if err := persist(dir, head.persist); err != nil { +// return err +// } +// s.logger.Log("samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head") + +// // Reopen block as persisted block for querying. +// pb, err := newPersistedBlock(dir) +// if err != nil { +// return err +// } + +// s.mtx.Lock() +// s.persisted = append(s.persisted, pb) +// s.mtx.Unlock() + +// s.compactor.trigger() + +// return nil +// } + // chunkDesc wraps a plain data chunk and provides cached meta data about it. type chunkDesc struct { ref uint32 diff --git a/head.go b/head.go index 0b3ab0880..aee02f69f 100644 --- a/head.go +++ b/head.go @@ -26,7 +26,7 @@ type HeadBlock struct { wal *WAL - stats BlockStats + bstats BlockStats } // OpenHeadBlock creates a new empty head block. @@ -44,7 +44,7 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { postings: &memPostings{m: make(map[term][]uint32)}, wal: wal, } - b.stats.MinTime = baseTime + b.bstats.MinTime = baseTime err = wal.ReadAll(&walHandler{ series: func(lset labels.Labels) { @@ -52,7 +52,7 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { }, sample: func(s hashedSample) { b.descs[s.ref].append(s.t, s.v) - b.stats.SampleCount++ + b.bstats.SampleCount++ }, }) if err != nil { @@ -68,8 +68,10 @@ func (h *HeadBlock) Close() error { } func (h *HeadBlock) dir() string { return h.d } +func (h *HeadBlock) persisted() bool { return false } func (h *HeadBlock) index() IndexReader { return h } func (h *HeadBlock) series() SeriesReader { return h } +func (h *HeadBlock) stats() BlockStats { return h.bstats } // Chunk returns the chunk for the reference number. func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { @@ -80,12 +82,12 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { } func (h *HeadBlock) interval() (int64, int64) { - return h.stats.MinTime, h.stats.MaxTime + return h.bstats.MinTime, h.bstats.MaxTime } // Stats returns statisitics about the indexed data. func (h *HeadBlock) Stats() (BlockStats, error) { - return h.stats, nil + return h.bstats, nil } // LabelValues returns the possible label values @@ -119,7 +121,12 @@ func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { } cd := h.descs[ref] - return cd.lset, []ChunkMeta{{MinTime: h.stats.MinTime, Ref: ref}}, nil + meta := ChunkMeta{ + MinTime: cd.firsTimestamp, + MaxTime: cd.lastTimestamp, + Ref: ref, + } + return cd.lset, []ChunkMeta{meta}, nil } func (h *HeadBlock) LabelIndices() ([][]string, error) { @@ -162,24 +169,22 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { h.descs = append(h.descs, cd) h.hashes[hash] = append(h.hashes[hash], cd) - // Add each label pair as a term to the inverted index. - terms := make([]term, 0, len(lset)) - for _, l := range lset { - terms = append(terms, term{name: l.Name, value: l.Value}) - valset, ok := h.values[l.Name] if !ok { valset = stringset{} h.values[l.Name] = valset } valset.set(l.Value) + + h.postings.add(cd.ref, term{name: l.Name, value: l.Value}) } - h.postings.add(cd.ref, terms...) + + h.postings.add(cd.ref, term{}) // For the head block there's exactly one chunk per series. - h.stats.ChunkCount++ - h.stats.SeriesCount++ + h.bstats.ChunkCount++ + h.bstats.SeriesCount++ return cd } @@ -254,10 +259,10 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { } cd.append(s.t, s.v) - if s.t > h.stats.MaxTime { - h.stats.MaxTime = s.t + if s.t > h.bstats.MaxTime { + h.bstats.MaxTime = s.t } - h.stats.SampleCount++ + h.bstats.SampleCount++ } return nil @@ -280,7 +285,7 @@ func (h *HeadBlock) persist(indexw IndexWriter, chunkw SeriesWriter) error { } } - if err := indexw.WriteStats(h.stats); err != nil { + if err := indexw.WriteStats(h.bstats); err != nil { return err } for n, v := range h.values {