Consolidate persistence and compaction

This commit is contained in:
Fabian Reinartz 2017-01-03 15:43:26 +01:00
parent e7f04d14d5
commit ac49f8c15e
4 changed files with 434 additions and 173 deletions

View file

@ -8,6 +8,7 @@ import (
"sort" "sort"
"strconv" "strconv"
"github.com/bradfitz/slice"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -15,11 +16,11 @@ import (
// Block handles reads against a block of time series data. // Block handles reads against a block of time series data.
type block interface { type block interface {
dir() string dir() string
// stats() BlockStats stats() BlockStats
interval() (int64, int64) interval() (int64, int64)
index() IndexReader index() IndexReader
series() SeriesReader series() SeriesReader
// persisted() bool persisted() bool
} }
type BlockStats struct { type BlockStats struct {
@ -37,7 +38,7 @@ const (
type persistedBlock struct { type persistedBlock struct {
d string d string
stats BlockStats bstats BlockStats
chunksf, indexf *mmapFile chunksf, indexf *mmapFile
@ -51,25 +52,25 @@ func newPersistedBlock(p string) (*persistedBlock, error) {
// mmap files belonging to the block. // mmap files belonging to the block.
chunksf, err := openMmapFile(chunksFileName(p)) chunksf, err := openMmapFile(chunksFileName(p))
if err != nil { if err != nil {
return nil, err return nil, errors.Wrap(err, "open chunk file")
} }
indexf, err := openMmapFile(indexFileName(p)) indexf, err := openMmapFile(indexFileName(p))
if err != nil { if err != nil {
return nil, err return nil, errors.Wrap(err, "open index file")
} }
sr, err := newSeriesReader(chunksf.b) sr, err := newSeriesReader(chunksf.b)
if err != nil { if err != nil {
return nil, err return nil, errors.Wrap(err, "create series reader")
} }
ir, err := newIndexReader(sr, indexf.b) ir, err := newIndexReader(sr, indexf.b)
if err != nil { if err != nil {
return nil, err return nil, errors.Wrap(err, "create index reader")
} }
stats, err := ir.Stats() stats, err := ir.Stats()
if err != nil { if err != nil {
return nil, err return nil, errors.Wrap(err, "read stats")
} }
pb := &persistedBlock{ pb := &persistedBlock{
@ -78,7 +79,7 @@ func newPersistedBlock(p string) (*persistedBlock, error) {
indexf: indexf, indexf: indexf,
chunkr: sr, chunkr: sr,
indexr: ir, indexr: ir,
stats: stats, bstats: stats,
} }
return pb, nil return pb, nil
} }
@ -94,44 +95,40 @@ func (pb *persistedBlock) Close() error {
} }
func (pb *persistedBlock) dir() string { return pb.d } func (pb *persistedBlock) dir() string { return pb.d }
func (pb *persistedBlock) persisted() bool { return true }
func (pb *persistedBlock) index() IndexReader { return pb.indexr } func (pb *persistedBlock) index() IndexReader { return pb.indexr }
func (pb *persistedBlock) series() SeriesReader { return pb.chunkr } func (pb *persistedBlock) series() SeriesReader { return pb.chunkr }
func (pb *persistedBlock) stats() BlockStats { return pb.bstats }
func (pb *persistedBlock) interval() (int64, int64) { func (pb *persistedBlock) interval() (int64, int64) {
return pb.stats.MinTime, pb.stats.MaxTime return pb.bstats.MinTime, pb.bstats.MaxTime
} }
type persistedBlocks []*persistedBlock
func (p persistedBlocks) Len() int { return len(p) }
func (p persistedBlocks) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p persistedBlocks) Less(i, j int) bool { return p[i].stats.MinTime < p[j].stats.MinTime }
// findBlocks finds time-ordered persisted blocks within a directory. // findBlocks finds time-ordered persisted blocks within a directory.
func findBlocks(path string) ([]*persistedBlock, *HeadBlock, error) { func findBlocks(path string) ([]*persistedBlock, []*HeadBlock, error) {
var pbs persistedBlocks var (
pbs []*persistedBlock
heads []*HeadBlock
)
files, err := ioutil.ReadDir(path) files, err := ioutil.ReadDir(path)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
var head *HeadBlock
for _, fi := range files { for _, fi := range files {
p := filepath.Join(path, fi.Name()) p := filepath.Join(path, fi.Name())
if _, err := os.Stat(chunksFileName(p)); os.IsNotExist(err) { if _, err := os.Stat(chunksFileName(p)); os.IsNotExist(err) {
if head != nil {
return nil, nil, errors.Errorf("found two head blocks")
}
ts, err := strconv.Atoi(filepath.Base(p)) ts, err := strconv.Atoi(filepath.Base(p))
if err != nil { if err != nil {
return nil, nil, errors.Errorf("invalid directory name") return nil, nil, errors.Errorf("invalid directory name")
} }
head, err = OpenHeadBlock(p, int64(ts)) head, err := OpenHeadBlock(p, int64(ts))
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
heads = append(heads, head)
continue continue
} }
@ -144,9 +141,9 @@ func findBlocks(path string) ([]*persistedBlock, *HeadBlock, error) {
// Order blocks by their base time so they represent a continous // Order blocks by their base time so they represent a continous
// range of time. // range of time.
sort.Sort(pbs) slice.Sort(pbs, func(i, j int) bool { return pbs[i].bstats.MinTime < pbs[j].bstats.MinTime })
return pbs, head, nil return pbs, heads, nil
} }
func chunksFileName(path string) string { func chunksFileName(path string) string {

View file

@ -1,16 +1,22 @@
package tsdb package tsdb
import ( import (
"fmt"
"math"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"time"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
) )
type compactor struct { type compactor struct {
shard *Shard metrics *compactorMetrics
blocks compactableBlocks blocks compactableBlocks
logger log.Logger logger log.Logger
@ -18,17 +24,57 @@ type compactor struct {
donec chan struct{} donec chan struct{}
} }
type compactableBlocks interface { type compactorMetrics struct {
compactable() []block triggered prometheus.Counter
set([]block) ran prometheus.Counter
failed prometheus.Counter
duration prometheus.Histogram
} }
func newCompactor(s *Shard, l log.Logger) (*compactor, error) { func newCompactorMetrics(i int) *compactorMetrics {
shardLabel := prometheus.Labels{
"shard": fmt.Sprintf("%d", i),
}
m := &compactorMetrics{}
m.triggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_compactions_triggered_total",
Help: "Total number of triggered compactions for the shard.",
ConstLabels: shardLabel,
})
m.ran = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_compactions_total",
Help: "Total number of compactions that were executed for the shard.",
ConstLabels: shardLabel,
})
m.failed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_compactions_failed_total",
Help: "Total number of compactions that failed for the shard.",
ConstLabels: shardLabel,
})
m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_shard_compaction_duration",
Help: "Duration of compaction runs.",
ConstLabels: shardLabel,
})
return m
}
type compactableBlocks interface {
lock() sync.Locker
compactable() []block
reinit(dir string) error
}
func newCompactor(i int, blocks compactableBlocks, l log.Logger) (*compactor, error) {
c := &compactor{ c := &compactor{
triggerc: make(chan struct{}, 1), triggerc: make(chan struct{}, 1),
donec: make(chan struct{}), donec: make(chan struct{}),
shard: s,
logger: l, logger: l,
blocks: blocks,
metrics: newCompactorMetrics(i),
} }
go c.run() go c.run()
@ -44,35 +90,59 @@ func (c *compactor) trigger() {
func (c *compactor) run() { func (c *compactor) run() {
for range c.triggerc { for range c.triggerc {
// continue c.metrics.triggered.Inc()
// bs := c.blocks.get()
// if len(bs) < 2 { bs := c.pick()
// continue if len(bs) == 0 {
// } continue
}
// var ( start := time.Now()
// dir = fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now())) err := c.compact(bs...)
// a = bs[0]
// b = bs[1]
// )
// c.blocks.Lock() c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(start).Seconds())
// if err := persist(dir, func(indexw IndexWriter, chunkw SeriesWriter) error { if err != nil {
// return c.compact(indexw, chunkw, a, b) c.logger.Log("msg", "compaction failed", "err", err)
// }); err != nil { c.metrics.failed.Inc()
// c.logger.Log("msg", "compaction failed", "err", err) continue
// continue }
// }
// c.blocks.Unlock() // Drain channel of signals triggered during compaction.
select {
case <-c.triggerc:
default:
}
} }
close(c.donec) close(c.donec)
} }
func (c *compactor) pick() []block { func (c *compactor) pick() []block {
bs := c.blocks.compactable()
if len(bs) == 0 {
return nil return nil
}
if !bs[len(bs)-1].persisted() {
// TODO(fabxc): double check scoring function here or only do it here
// and trigger every X scrapes?
return bs[len(bs)-1:]
}
candidate := []block{}
trange := int64(math.MaxInt64)
for i, b := range bs[:len(bs)-1] {
r := bs[i+1].stats().MaxTime - b.stats().MinTime
if r < trange {
trange = r
candidate = bs[i : i+1]
}
}
return candidate
} }
func (c *compactor) Close() error { func (c *compactor) Close() error {
@ -81,44 +151,105 @@ func (c *compactor) Close() error {
return nil return nil
} }
func (c *compactor) compact(indexw IndexWriter, chunkw SeriesWriter, a, b block) error { func mergeStats(blocks ...block) (res BlockStats) {
aall, err := a.index().Postings("", "") res.MinTime = blocks[0].stats().MinTime
if err != nil { res.MaxTime = blocks[len(blocks)-1].stats().MaxTime
for _, b := range blocks {
res.SampleCount += b.stats().SampleCount
}
return res
}
func (c *compactor) compact(blocks ...block) error {
tmpdir := blocks[0].dir() + ".tmp"
// Write to temporary directory to make persistence appear atomic.
if fileutil.Exist(tmpdir) {
if err := os.RemoveAll(tmpdir); err != nil {
return err return err
} }
ball, err := b.index().Postings("", "") }
if err != nil { if err := fileutil.CreateDirAll(tmpdir); err != nil {
return err return err
} }
set, err := newCompactionMerger( chunkf, err := fileutil.LockFile(chunksFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666)
newCompactionSeriesSet(a.index(), a.series(), aall),
newCompactionSeriesSet(b.index(), b.series(), ball),
)
if err != nil { if err != nil {
return err return errors.Wrap(err, "create chunk file")
}
indexf, err := fileutil.LockFile(indexFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return errors.Wrap(err, "create index file")
} }
astats, err := a.index().Stats() indexw := newIndexWriter(indexf)
chunkw := newSeriesWriter(chunkf, indexw)
if err := c.write(blocks, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
}
if err := chunkw.Close(); err != nil {
return errors.Wrap(err, "close chunk writer")
}
if err := indexw.Close(); err != nil {
return errors.Wrap(err, "close index writer")
}
if err := fileutil.Fsync(chunkf.File); err != nil {
return errors.Wrap(err, "fsync chunk file")
}
if err := fileutil.Fsync(indexf.File); err != nil {
return errors.Wrap(err, "fsync index file")
}
if err := chunkf.Close(); err != nil {
return errors.Wrap(err, "close chunk file")
}
if err := indexf.Close(); err != nil {
return errors.Wrap(err, "close index file")
}
c.blocks.lock().Lock()
defer c.blocks.lock().Unlock()
if err := renameDir(tmpdir, blocks[0].dir()); err != nil {
return errors.Wrap(err, "rename dir")
}
var merr MultiError
for _, b := range blocks {
merr.Add(errors.Wrapf(c.blocks.reinit(b.dir()), "reinit block at %q", b.dir()))
}
return merr.Err()
}
func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWriter) error {
var set compactionSet
for i, b := range blocks {
all, err := b.index().Postings("", "")
if err != nil { if err != nil {
return err return err
} }
bstats, err := a.index().Stats() s := newCompactionSeriesSet(b.index(), b.series(), all)
if i == 0 {
set = s
continue
}
set, err = newCompactionMerger(set, s)
if err != nil { if err != nil {
return err return err
} }
}
// We fully rebuild the postings list index from merged series. // We fully rebuild the postings list index from merged series.
var ( var (
postings = &memPostings{m: make(map[term][]uint32, 512)} postings = &memPostings{m: make(map[term][]uint32, 512)}
values = map[string]stringset{} values = map[string]stringset{}
i = uint32(0) i = uint32(0)
stats = mergeStats(blocks...)
) )
stats := BlockStats{
MinTime: astats.MinTime,
MaxTime: bstats.MaxTime,
SampleCount: astats.SampleCount + bstats.SampleCount,
}
for set.Next() { for set.Next() {
lset, chunks := set.At() lset, chunks := set.At()
@ -174,10 +305,15 @@ func (c *compactor) compact(indexw IndexWriter, chunkw SeriesWriter, a, b block)
if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { if err := indexw.WritePostings("", "", newListPostings(all)); err != nil {
return err return err
} }
return nil return nil
} }
type compactionSet interface {
Next() bool
At() (labels.Labels, []ChunkMeta)
Err() error
}
type compactionSeriesSet struct { type compactionSeriesSet struct {
p Postings p Postings
index IndexReader index IndexReader
@ -229,7 +365,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta) {
} }
type compactionMerger struct { type compactionMerger struct {
a, b *compactionSeriesSet a, b compactionSet
adone, bdone bool adone, bdone bool
l labels.Labels l labels.Labels
@ -241,7 +377,7 @@ type compactionSeries struct {
chunks []ChunkMeta chunks []ChunkMeta
} }
func newCompactionMerger(a, b *compactionSeriesSet) (*compactionMerger, error) { func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) {
c := &compactionMerger{ c := &compactionMerger{
a: a, a: a,
b: b, b: b,

255
db.go
View file

@ -14,9 +14,11 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -180,12 +182,12 @@ type Shard struct {
metrics *shardMetrics metrics *shardMetrics
mtx sync.RWMutex mtx sync.RWMutex
persisted persistedBlocks persisted []*persistedBlock
head *HeadBlock heads []*HeadBlock
compactor *compactor compactor *compactor
donec chan struct{} donec chan struct{}
persistc chan struct{} cutc chan struct{}
} }
type shardMetrics struct { type shardMetrics struct {
@ -199,24 +201,24 @@ func newShardMetrics(r prometheus.Registerer, i int) *shardMetrics {
"shard": fmt.Sprintf("%d", i), "shard": fmt.Sprintf("%d", i),
} }
m := &shardMetrics{ m := &shardMetrics{}
persistences: prometheus.NewCounter(prometheus.CounterOpts{
m.persistences = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_persistences_total", Name: "tsdb_shard_persistences_total",
Help: "Total number of head persistances that ran so far.", Help: "Total number of head persistances that ran so far.",
ConstLabels: shardLabel, ConstLabels: shardLabel,
}), })
persistenceDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ m.persistenceDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_shard_persistence_duration_seconds", Name: "tsdb_shard_persistence_duration_seconds",
Help: "Duration of persistences in seconds.", Help: "Duration of persistences in seconds.",
ConstLabels: shardLabel, ConstLabels: shardLabel,
Buckets: prometheus.ExponentialBuckets(0.25, 2, 5), Buckets: prometheus.ExponentialBuckets(0.25, 2, 5),
}), })
samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{ m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_shard_samples_appended_total", Name: "tsdb_shard_samples_appended_total",
Help: "Total number of appended samples for the shard.", Help: "Total number of appended samples for the shard.",
ConstLabels: shardLabel, ConstLabels: shardLabel,
}), })
}
if r != nil { if r != nil {
r.MustRegister( r.MustRegister(
@ -238,33 +240,34 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) {
} }
// Initialize previously persisted blocks. // Initialize previously persisted blocks.
pbs, head, err := findBlocks(path) persisted, heads, err := findBlocks(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO(fabxc): get time from client-defined `now` function. // TODO(fabxc): get time from client-defined `now` function.
baset := time.Unix(0, 0).UnixNano() / int64(time.Millisecond) baset := time.Unix(0, 0).UnixNano() / int64(time.Millisecond)
if len(pbs) > 0 { if len(persisted) > 0 {
baset = pbs[len(pbs)-1].stats.MaxTime baset = persisted[len(persisted)-1].bstats.MaxTime
} }
if head == nil { if len(heads) == 0 {
head, err = OpenHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset) head, err := OpenHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset)
if err != nil { if err != nil {
return nil, err return nil, err
} }
heads = []*HeadBlock{head}
} }
s := &Shard{ s := &Shard{
path: path, path: path,
logger: logger, logger: logger,
metrics: newShardMetrics(prometheus.DefaultRegisterer, i), metrics: newShardMetrics(prometheus.DefaultRegisterer, i),
head: head, heads: heads,
persisted: pbs, persisted: persisted,
persistc: make(chan struct{}, 1), cutc: make(chan struct{}, 1),
donec: make(chan struct{}), donec: make(chan struct{}),
} }
if s.compactor, err = newCompactor(s, logger); err != nil { if s.compactor, err = newCompactor(i, s, logger); err != nil {
return nil, err return nil, err
} }
go s.run() go s.run()
@ -273,22 +276,29 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) {
} }
func (s *Shard) run() { func (s *Shard) run() {
for range s.persistc { for range s.cutc {
start := time.Now() // if err := s.cut(); err != nil {
// s.logger.Log("msg", "cut error", "err", err)
// }
// select {
// case <-s.cutc:
// default:
// }
// start := time.Now()
if err := s.persist(); err != nil { // if err := s.persist(); err != nil {
s.logger.Log("msg", "persistence error", "err", err) // s.logger.Log("msg", "persistence error", "err", err)
} // }
s.metrics.persistenceDuration.Observe(time.Since(start).Seconds()) // s.metrics.persistenceDuration.Observe(time.Since(start).Seconds())
s.metrics.persistences.Inc() // s.metrics.persistences.Inc()
} }
close(s.donec) close(s.donec)
} }
// Close the shard. // Close the shard.
func (s *Shard) Close() error { func (s *Shard) Close() error {
close(s.persistc) close(s.cutc)
<-s.donec <-s.donec
var merr MultiError var merr MultiError
@ -300,7 +310,9 @@ func (s *Shard) Close() error {
for _, pb := range s.persisted { for _, pb := range s.persisted {
merr.Add(pb.Close()) merr.Add(pb.Close())
} }
merr.Add(s.head.Close()) for _, hb := range s.heads {
merr.Add(hb.Close())
}
return merr.Err() return merr.Err()
} }
@ -312,25 +324,118 @@ func (s *Shard) appendBatch(samples []hashedSample) error {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
head := s.heads[len(s.heads)-1]
// TODO(fabxc): distinguish samples between concurrent heads for // TODO(fabxc): distinguish samples between concurrent heads for
// different time blocks. Those may occurr during transition to still // different time blocks. Those may occurr during transition to still
// allow late samples to arrive for a previous block. // allow late samples to arrive for a previous block.
err := s.head.appendBatch(samples) err := head.appendBatch(samples)
if err == nil { if err == nil {
s.metrics.samplesAppended.Add(float64(len(samples))) s.metrics.samplesAppended.Add(float64(len(samples)))
} }
// TODO(fabxc): randomize over time and use better scoring function. // TODO(fabxc): randomize over time and use better scoring function.
if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 400 { if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 400 {
select { if err := s.cut(); err != nil {
case s.persistc <- struct{}{}: s.logger.Log("msg", "cut failed", "err", err)
default:
} }
} }
return err return err
} }
func (s *Shard) lock() sync.Locker {
return &s.mtx
}
func (s *Shard) headForDir(dir string) (int, bool) {
for i, b := range s.heads {
if b.dir() == dir {
return i, true
}
}
return -1, false
}
func (s *Shard) persistedForDir(dir string) (int, bool) {
for i, b := range s.persisted {
if b.dir() == dir {
return i, true
}
}
return -1, false
}
func (s *Shard) reinit(dir string) error {
if !fileutil.Exist(dir) {
if i, ok := s.headForDir(dir); ok {
if err := s.heads[i].Close(); err != nil {
return err
}
s.heads = append(s.heads[:i], s.heads[i+1:]...)
}
if i, ok := s.persistedForDir(dir); ok {
if err := s.persisted[i].Close(); err != nil {
return err
}
s.persisted = append(s.persisted[:i], s.persisted[i+1:]...)
}
return nil
}
// If a block dir has to be reinitialized and it wasn't a deletion,
// it has to be a newly persisted or compacted one.
if !fileutil.Exist(chunksFileName(dir)) {
return errors.New("no chunk file for new block dir")
}
// Remove a previous head block.
if i, ok := s.headForDir(dir); ok {
if err := s.heads[i].Close(); err != nil {
return err
}
s.heads = append(s.heads[:i], s.heads[i+1:]...)
}
// Close an old persisted block.
i, ok := s.persistedForDir(dir)
if ok {
if err := s.persisted[i].Close(); err != nil {
return err
}
}
pb, err := newPersistedBlock(dir)
if err != nil {
return errors.Wrap(err, "open persisted block")
}
if i >= 0 {
s.persisted[i] = pb
} else {
s.persisted = append(s.persisted, pb)
}
return nil
}
func (s *Shard) compactable() []block {
var blocks []block
for _, pb := range s.persisted {
blocks = append(blocks, pb)
}
// threshold := s.heads[len(s.heads)-1].bstats.MaxTime - headGracePeriod
// for _, hb := range s.heads {
// if hb.bstats.MaxTime < threshold {
// blocks = append(blocks, hb)
// }
// }
for _, hb := range s.heads[:len(s.heads)-1] {
blocks = append(blocks, hb)
}
return blocks
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool { func intervalOverlap(amin, amax, bmin, bmax int64) bool {
if bmin >= amin && bmin <= amax { if bmin >= amin && bmin <= amax {
return true return true
@ -357,57 +462,75 @@ func (s *Shard) blocksForInterval(mint, maxt int64) []block {
bs = append(bs, b) bs = append(bs, b)
} }
} }
for _, b := range s.heads {
bmin, bmax := b.interval()
hmin, hmax := s.head.interval() if intervalOverlap(mint, maxt, bmin, bmax) {
bs = append(bs, b)
if intervalOverlap(mint, maxt, hmin, hmax) { }
bs = append(bs, s.head)
} }
return bs return bs
} }
// TODO(fabxc): make configurable. // TODO(fabxc): make configurable.
const shardGracePeriod = 60 * 1000 // 60 seconds for millisecond scale const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale
func (s *Shard) persist() error {
s.mtx.Lock()
// cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period.
func (s *Shard) cut() error {
// Set new head block. // Set new head block.
head := s.head head := s.heads[len(s.heads)-1]
newHead, err := OpenHeadBlock(filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MaxTime)), head.stats.MaxTime)
if err != nil {
s.mtx.Unlock()
return err
}
s.head = newHead
s.mtx.Unlock() newHead, err := OpenHeadBlock(filepath.Join(s.path, fmt.Sprintf("%d", head.bstats.MaxTime)), head.bstats.MaxTime)
// TODO(fabxc): add grace period where we can still append to old head shard
// before actually persisting it.
dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime))
if err := persist(dir, head.persist); err != nil {
return err
}
s.logger.Log("samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
// Reopen block as persisted block for querying.
pb, err := newPersistedBlock(dir)
if err != nil { if err != nil {
return err return err
} }
s.heads = append(s.heads, newHead)
s.mtx.Lock()
s.persisted = append(s.persisted, pb)
s.mtx.Unlock()
s.compactor.trigger() s.compactor.trigger()
return nil return nil
} }
// func (s *Shard) persist() error {
// s.mtx.Lock()
// // Set new head block.
// head := s.head
// newHead, err := OpenHeadBlock(filepath.Join(s.path, fmt.Sprintf("%d", head.bstats.MaxTime)), head.bstats.MaxTime)
// if err != nil {
// s.mtx.Unlock()
// return err
// }
// s.head = newHead
// s.mtx.Unlock()
// // TODO(fabxc): add grace period where we can still append to old head shard
// // before actually persisting it.
// dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime))
// if err := persist(dir, head.persist); err != nil {
// return err
// }
// s.logger.Log("samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
// // Reopen block as persisted block for querying.
// pb, err := newPersistedBlock(dir)
// if err != nil {
// return err
// }
// s.mtx.Lock()
// s.persisted = append(s.persisted, pb)
// s.mtx.Unlock()
// s.compactor.trigger()
// return nil
// }
// chunkDesc wraps a plain data chunk and provides cached meta data about it. // chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct { type chunkDesc struct {
ref uint32 ref uint32

41
head.go
View file

@ -26,7 +26,7 @@ type HeadBlock struct {
wal *WAL wal *WAL
stats BlockStats bstats BlockStats
} }
// OpenHeadBlock creates a new empty head block. // OpenHeadBlock creates a new empty head block.
@ -44,7 +44,7 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) {
postings: &memPostings{m: make(map[term][]uint32)}, postings: &memPostings{m: make(map[term][]uint32)},
wal: wal, wal: wal,
} }
b.stats.MinTime = baseTime b.bstats.MinTime = baseTime
err = wal.ReadAll(&walHandler{ err = wal.ReadAll(&walHandler{
series: func(lset labels.Labels) { series: func(lset labels.Labels) {
@ -52,7 +52,7 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) {
}, },
sample: func(s hashedSample) { sample: func(s hashedSample) {
b.descs[s.ref].append(s.t, s.v) b.descs[s.ref].append(s.t, s.v)
b.stats.SampleCount++ b.bstats.SampleCount++
}, },
}) })
if err != nil { if err != nil {
@ -68,8 +68,10 @@ func (h *HeadBlock) Close() error {
} }
func (h *HeadBlock) dir() string { return h.d } func (h *HeadBlock) dir() string { return h.d }
func (h *HeadBlock) persisted() bool { return false }
func (h *HeadBlock) index() IndexReader { return h } func (h *HeadBlock) index() IndexReader { return h }
func (h *HeadBlock) series() SeriesReader { return h } func (h *HeadBlock) series() SeriesReader { return h }
func (h *HeadBlock) stats() BlockStats { return h.bstats }
// Chunk returns the chunk for the reference number. // Chunk returns the chunk for the reference number.
func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
@ -80,12 +82,12 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
} }
func (h *HeadBlock) interval() (int64, int64) { func (h *HeadBlock) interval() (int64, int64) {
return h.stats.MinTime, h.stats.MaxTime return h.bstats.MinTime, h.bstats.MaxTime
} }
// Stats returns statisitics about the indexed data. // Stats returns statisitics about the indexed data.
func (h *HeadBlock) Stats() (BlockStats, error) { func (h *HeadBlock) Stats() (BlockStats, error) {
return h.stats, nil return h.bstats, nil
} }
// LabelValues returns the possible label values // LabelValues returns the possible label values
@ -119,7 +121,12 @@ func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
} }
cd := h.descs[ref] cd := h.descs[ref]
return cd.lset, []ChunkMeta{{MinTime: h.stats.MinTime, Ref: ref}}, nil meta := ChunkMeta{
MinTime: cd.firsTimestamp,
MaxTime: cd.lastTimestamp,
Ref: ref,
}
return cd.lset, []ChunkMeta{meta}, nil
} }
func (h *HeadBlock) LabelIndices() ([][]string, error) { func (h *HeadBlock) LabelIndices() ([][]string, error) {
@ -162,24 +169,22 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
h.descs = append(h.descs, cd) h.descs = append(h.descs, cd)
h.hashes[hash] = append(h.hashes[hash], cd) h.hashes[hash] = append(h.hashes[hash], cd)
// Add each label pair as a term to the inverted index.
terms := make([]term, 0, len(lset))
for _, l := range lset { for _, l := range lset {
terms = append(terms, term{name: l.Name, value: l.Value})
valset, ok := h.values[l.Name] valset, ok := h.values[l.Name]
if !ok { if !ok {
valset = stringset{} valset = stringset{}
h.values[l.Name] = valset h.values[l.Name] = valset
} }
valset.set(l.Value) valset.set(l.Value)
h.postings.add(cd.ref, term{name: l.Name, value: l.Value})
} }
h.postings.add(cd.ref, terms...)
h.postings.add(cd.ref, term{})
// For the head block there's exactly one chunk per series. // For the head block there's exactly one chunk per series.
h.stats.ChunkCount++ h.bstats.ChunkCount++
h.stats.SeriesCount++ h.bstats.SeriesCount++
return cd return cd
} }
@ -254,10 +259,10 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
} }
cd.append(s.t, s.v) cd.append(s.t, s.v)
if s.t > h.stats.MaxTime { if s.t > h.bstats.MaxTime {
h.stats.MaxTime = s.t h.bstats.MaxTime = s.t
} }
h.stats.SampleCount++ h.bstats.SampleCount++
} }
return nil return nil
@ -280,7 +285,7 @@ func (h *HeadBlock) persist(indexw IndexWriter, chunkw SeriesWriter) error {
} }
} }
if err := indexw.WriteStats(h.stats); err != nil { if err := indexw.WriteStats(h.bstats); err != nil {
return err return err
} }
for n, v := range h.values { for n, v := range h.values {