Persist blocks periodically

This commit is contained in:
Fabian Reinartz 2016-12-09 13:41:38 +01:00
parent 0cf8bb9e53
commit 74f8dfd95d
4 changed files with 154 additions and 63 deletions

View file

@ -59,7 +59,7 @@ type seriesMeta struct {
} }
type blockStats struct { type blockStats struct {
series uint32 chunks uint32
samples uint64 samples uint64
_ [4]byte // padding/reserved _ [4]byte // padding/reserved
} }

View file

@ -1,6 +1,7 @@
package main package main
import ( import (
"flag"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -31,6 +32,8 @@ func main() {
NewBenchCommand(), NewBenchCommand(),
) )
flag.CommandLine.Set("log.level", "debug")
root.Execute() root.Execute()
} }
@ -131,7 +134,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
measureTime("ingestScrapes", func() { measureTime("ingestScrapes", func() {
b.startProfiling() b.startProfiling()
if err := b.ingestScrapes(metrics, 1000); err != nil { if err := b.ingestScrapes(metrics, 3000); err != nil {
exitWithError(err) exitWithError(err)
} }
}) })
@ -193,7 +196,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount
} }
for i := 0; i < scrapeCount; i++ { for i := 0; i < scrapeCount; i++ {
ts += int64(10000) ts += int64(30000)
sc.Reset() sc.Reset()
for _, s := range scrape { for _, s := range scrape {
@ -372,9 +375,9 @@ func readPrometheusLabels(r io.Reader, n int) ([]model.Metric, error) {
if dups > 0 { if dups > 0 {
fmt.Println("dropped duplicate metrics:", dups) fmt.Println("dropped duplicate metrics:", dups)
} }
fmt.Println("read metrics", len(mets)) fmt.Println("read metrics", len(mets[:n]))
return mets, nil return mets[:n], nil
} }
func exitWithError(err error) { func exitWithError(err error) {

190
db.go
View file

@ -61,7 +61,7 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) {
// for the bitshift-modulo when finding the right shard. // for the bitshift-modulo when finding the right shard.
for i := 0; i < numSeriesShards; i++ { for i := 0; i < numSeriesShards; i++ {
path := filepath.Join(path, fmt.Sprintf("%d", i)) path := filepath.Join(path, fmt.Sprintf("%d", i))
c.shards = append(c.shards, NewSeriesShard(path)) c.shards = append(c.shards, NewSeriesShard(path, l.With("shard", i)))
} }
// TODO(fabxc): run background compaction + GC. // TODO(fabxc): run background compaction + GC.
@ -73,30 +73,11 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) {
func (db *DB) Close() error { func (db *DB) Close() error {
var wg sync.WaitGroup var wg sync.WaitGroup
start := time.Now()
for i, shard := range db.shards { for i, shard := range db.shards {
fmt.Println("shard", i)
fmt.Println(" num chunks", shard.head.stats().series)
fmt.Println(" num samples", shard.head.stats().samples)
wg.Add(1) wg.Add(1)
go func(i int, shard *SeriesShard) { go func(i int, shard *SeriesShard) {
f, err := os.Create(filepath.Join(db.path, fmt.Sprintf("shard-%d-series", i))) if err := shard.Close(); err != nil {
if err != nil { // TODO(fabxc): handle with multi error.
panic(err)
}
bw := &blockWriter{block: shard.head}
n, err := bw.writeSeries(f)
if err != nil {
panic(err)
}
fmt.Println(" wrote bytes", n)
if err := f.Sync(); err != nil {
panic(err)
}
if err := f.Close(); err != nil {
panic(err) panic(err)
} }
wg.Done() wg.Done()
@ -104,8 +85,6 @@ func (db *DB) Close() error {
} }
wg.Wait() wg.Wait()
fmt.Println("final serialization took", time.Since(start))
return nil return nil
} }
@ -114,6 +93,21 @@ func (db *DB) Querier(start, end int64) Querier {
return nil return nil
} }
// AppendVector adds values for a list of label sets for the given timestamp
// in milliseconds.
func (db *DB) AppendVector(ts int64, v *Vector) error {
// Sequentially add samples to shards.
for s, bkt := range v.Buckets {
shard := db.shards[s]
if err := shard.appendBatch(ts, bkt); err != nil {
// TODO(fabxc): handle gracefully and collect multi-error.
return err
}
}
return nil
}
// Matcher matches a string. // Matcher matches a string.
type Matcher interface { type Matcher interface {
// Match returns true if the matcher applies to the string value. // Match returns true if the matcher applies to the string value.
@ -173,7 +167,10 @@ const sep = '\xff'
// SeriesShard handles reads and writes of time series falling into // SeriesShard handles reads and writes of time series falling into
// a hashed shard of a series. // a hashed shard of a series.
type SeriesShard struct { type SeriesShard struct {
path string path string
persistCh chan struct{}
done chan struct{}
logger log.Logger
mtx sync.RWMutex mtx sync.RWMutex
blocks *Block blocks *Block
@ -181,13 +178,42 @@ type SeriesShard struct {
} }
// NewSeriesShard returns a new SeriesShard. // NewSeriesShard returns a new SeriesShard.
func NewSeriesShard(path string) *SeriesShard { func NewSeriesShard(path string, logger log.Logger) *SeriesShard {
return &SeriesShard{ s := &SeriesShard{
path: path, path: path,
persistCh: make(chan struct{}, 1),
done: make(chan struct{}),
logger: logger,
// TODO(fabxc): restore from checkpoint. // TODO(fabxc): restore from checkpoint.
head: NewHeadBlock(),
// TODO(fabxc): provide access to persisted blocks. // TODO(fabxc): provide access to persisted blocks.
} }
// TODO(fabxc): get base time from pre-existing blocks. Otherwise
// it should come from a user defined start timestamp.
// Use actual time for now.
s.head = NewHeadBlock(time.Now().UnixNano() / int64(time.Millisecond))
go s.run()
return s
}
func (s *SeriesShard) run() {
// for {
// select {
// case <-s.done:
// return
// case <-s.persistCh:
// if err := s.persist(); err != nil {
// s.logger.With("err", err).Error("persistence failed")
// }
// }
// }
}
// Close the series shard.
func (s *SeriesShard) Close() error {
close(s.done)
return nil
} }
// blockFor returns the block of shard series that contains the given timestamp. // blockFor returns the block of shard series that contains the given timestamp.
@ -195,6 +221,88 @@ func (s *SeriesShard) blockFor(ts int64) block {
return nil return nil
} }
func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error {
// TODO(fabxc): make configurable.
const persistenceTimeThreshold = 1000 * 60 * 60 // 1 hour if timestamp in ms
s.mtx.Lock()
defer s.mtx.Unlock()
for _, smpl := range samples {
if err := s.head.append(smpl.Hash, smpl.Labels, ts, smpl.Value); err != nil {
// TODO(fabxc): handle gracefully and collect multi-error.
return err
}
}
if ts > s.head.highTimestamp {
s.head.highTimestamp = ts
}
// TODO(fabxc): randomize over time
if s.head.stats().samples/uint64(s.head.stats().chunks) > 400 {
select {
case s.persistCh <- struct{}{}:
s.logger.Debug("trigger persistence")
go s.persist()
default:
}
}
return nil
}
// TODO(fabxc): make configurable.
const shardGracePeriod = 60 * 1000 // 60 seconds for millisecond scale
func (s *SeriesShard) persist() error {
s.mtx.Lock()
// Set new head block.
head := s.head
s.head = NewHeadBlock(head.highTimestamp)
s.mtx.Unlock()
defer func() {
<-s.persistCh
}()
// TODO(fabxc): add grace period where we can still append to old head shard
// before actually persisting it.
p := filepath.Join(s.path, fmt.Sprintf("%d", head.baseTimestamp))
if err := os.MkdirAll(p, 0777); err != nil {
return err
}
f, err := os.Create(filepath.Join(p, "series"))
if err != nil {
return err
}
bw := &blockWriter{block: head}
n, err := bw.writeSeries(f)
if err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
sz := fmt.Sprintf("%fMiB", float64(n)/1024/1024)
s.logger.With("size", sz).
With("samples", head.samples).
With("chunks", head.stats().chunks).
Debug("persisted head")
return nil
}
// chunkDesc wraps a plain data chunk and provides cached meta data about it. // chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct { type chunkDesc struct {
lset Labels lset Labels
@ -348,27 +456,3 @@ func (v *Vector) Add(lset Labels, val float64) {
Value: val, Value: val,
}) })
} }
// AppendVector adds values for a list of label sets for the given timestamp
// in milliseconds.
func (db *DB) AppendVector(ts int64, v *Vector) error {
// Sequentially add samples to shards.
for s, bkt := range v.Buckets {
shard := db.shards[s]
// TODO(fabxc): benchmark whether grouping into shards and submitting to
// shards in batches is more efficient.
shard.head.mtx.Lock()
for _, smpl := range bkt {
if err := shard.head.append(smpl.Hash, smpl.Labels, ts, smpl.Value); err != nil {
shard.head.mtx.Unlock()
// TODO(fabxc): handle gracefully and collect multi-error.
return err
}
}
shard.head.mtx.Unlock()
}
return nil
}

14
head.go
View file

@ -13,13 +13,17 @@ type HeadBlock struct {
descs map[uint64][]*chunkDesc // labels hash to possible chunks descs descs map[uint64][]*chunkDesc // labels hash to possible chunks descs
index *memIndex index *memIndex
samples uint64 // total samples in the block. samples uint64 // total samples in the block
highTimestamp int64 // highest timestamp of any sample
baseTimestamp int64 // all samples are strictly later
} }
func NewHeadBlock() *HeadBlock { // NewHeadBlock creates a new empty head block.
func NewHeadBlock(baseTime int64) *HeadBlock {
return &HeadBlock{ return &HeadBlock{
descs: make(map[uint64][]*chunkDesc, 2048), descs: make(map[uint64][]*chunkDesc, 2048),
index: newMemIndex(), index: newMemIndex(),
baseTimestamp: baseTime,
} }
} }
@ -54,7 +58,7 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error
func (h *HeadBlock) stats() *blockStats { func (h *HeadBlock) stats() *blockStats {
return &blockStats{ return &blockStats{
series: uint32(h.index.numSeries()), chunks: uint32(h.index.numSeries()),
samples: h.samples, samples: h.samples,
} }
} }