diff --git a/block.go b/block.go index ef6d71a739..3cbbcd41a6 100644 --- a/block.go +++ b/block.go @@ -59,7 +59,7 @@ type seriesMeta struct { } type blockStats struct { - series uint32 + chunks uint32 samples uint64 _ [4]byte // padding/reserved } diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index ecbe662b3c..b5e0984c10 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -1,6 +1,7 @@ package main import ( + "flag" "fmt" "io" "io/ioutil" @@ -31,6 +32,8 @@ func main() { NewBenchCommand(), ) + flag.CommandLine.Set("log.level", "debug") + root.Execute() } @@ -131,7 +134,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { measureTime("ingestScrapes", func() { b.startProfiling() - if err := b.ingestScrapes(metrics, 1000); err != nil { + if err := b.ingestScrapes(metrics, 3000); err != nil { exitWithError(err) } }) @@ -193,7 +196,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount } for i := 0; i < scrapeCount; i++ { - ts += int64(10000) + ts += int64(30000) sc.Reset() for _, s := range scrape { @@ -372,9 +375,9 @@ func readPrometheusLabels(r io.Reader, n int) ([]model.Metric, error) { if dups > 0 { fmt.Println("dropped duplicate metrics:", dups) } - fmt.Println("read metrics", len(mets)) + fmt.Println("read metrics", len(mets[:n])) - return mets, nil + return mets[:n], nil } func exitWithError(err error) { diff --git a/db.go b/db.go index 0b28409d86..059764726f 100644 --- a/db.go +++ b/db.go @@ -61,7 +61,7 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { // for the bitshift-modulo when finding the right shard. for i := 0; i < numSeriesShards; i++ { path := filepath.Join(path, fmt.Sprintf("%d", i)) - c.shards = append(c.shards, NewSeriesShard(path)) + c.shards = append(c.shards, NewSeriesShard(path, l.With("shard", i))) } // TODO(fabxc): run background compaction + GC. @@ -73,30 +73,11 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { func (db *DB) Close() error { var wg sync.WaitGroup - start := time.Now() - for i, shard := range db.shards { - fmt.Println("shard", i) - fmt.Println(" num chunks", shard.head.stats().series) - fmt.Println(" num samples", shard.head.stats().samples) - wg.Add(1) go func(i int, shard *SeriesShard) { - f, err := os.Create(filepath.Join(db.path, fmt.Sprintf("shard-%d-series", i))) - if err != nil { - panic(err) - } - bw := &blockWriter{block: shard.head} - n, err := bw.writeSeries(f) - if err != nil { - panic(err) - } - fmt.Println(" wrote bytes", n) - if err := f.Sync(); err != nil { - panic(err) - } - - if err := f.Close(); err != nil { + if err := shard.Close(); err != nil { + // TODO(fabxc): handle with multi error. panic(err) } wg.Done() @@ -104,8 +85,6 @@ func (db *DB) Close() error { } wg.Wait() - fmt.Println("final serialization took", time.Since(start)) - return nil } @@ -114,6 +93,21 @@ func (db *DB) Querier(start, end int64) Querier { return nil } +// AppendVector adds values for a list of label sets for the given timestamp +// in milliseconds. +func (db *DB) AppendVector(ts int64, v *Vector) error { + // Sequentially add samples to shards. + for s, bkt := range v.Buckets { + shard := db.shards[s] + if err := shard.appendBatch(ts, bkt); err != nil { + // TODO(fabxc): handle gracefully and collect multi-error. + return err + } + } + + return nil +} + // Matcher matches a string. type Matcher interface { // Match returns true if the matcher applies to the string value. @@ -173,7 +167,10 @@ const sep = '\xff' // SeriesShard handles reads and writes of time series falling into // a hashed shard of a series. type SeriesShard struct { - path string + path string + persistCh chan struct{} + done chan struct{} + logger log.Logger mtx sync.RWMutex blocks *Block @@ -181,13 +178,42 @@ type SeriesShard struct { } // NewSeriesShard returns a new SeriesShard. -func NewSeriesShard(path string) *SeriesShard { - return &SeriesShard{ - path: path, +func NewSeriesShard(path string, logger log.Logger) *SeriesShard { + s := &SeriesShard{ + path: path, + persistCh: make(chan struct{}, 1), + done: make(chan struct{}), + logger: logger, // TODO(fabxc): restore from checkpoint. - head: NewHeadBlock(), // TODO(fabxc): provide access to persisted blocks. } + // TODO(fabxc): get base time from pre-existing blocks. Otherwise + // it should come from a user defined start timestamp. + // Use actual time for now. + s.head = NewHeadBlock(time.Now().UnixNano() / int64(time.Millisecond)) + + go s.run() + + return s +} + +func (s *SeriesShard) run() { + // for { + // select { + // case <-s.done: + // return + // case <-s.persistCh: + // if err := s.persist(); err != nil { + // s.logger.With("err", err).Error("persistence failed") + // } + // } + // } +} + +// Close the series shard. +func (s *SeriesShard) Close() error { + close(s.done) + return nil } // blockFor returns the block of shard series that contains the given timestamp. @@ -195,6 +221,88 @@ func (s *SeriesShard) blockFor(ts int64) block { return nil } +func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error { + // TODO(fabxc): make configurable. + const persistenceTimeThreshold = 1000 * 60 * 60 // 1 hour if timestamp in ms + + s.mtx.Lock() + defer s.mtx.Unlock() + + for _, smpl := range samples { + if err := s.head.append(smpl.Hash, smpl.Labels, ts, smpl.Value); err != nil { + // TODO(fabxc): handle gracefully and collect multi-error. + return err + } + } + + if ts > s.head.highTimestamp { + s.head.highTimestamp = ts + } + + // TODO(fabxc): randomize over time + if s.head.stats().samples/uint64(s.head.stats().chunks) > 400 { + select { + case s.persistCh <- struct{}{}: + s.logger.Debug("trigger persistence") + go s.persist() + default: + } + } + + return nil +} + +// TODO(fabxc): make configurable. +const shardGracePeriod = 60 * 1000 // 60 seconds for millisecond scale + +func (s *SeriesShard) persist() error { + s.mtx.Lock() + + // Set new head block. + head := s.head + s.head = NewHeadBlock(head.highTimestamp) + + s.mtx.Unlock() + + defer func() { + <-s.persistCh + }() + + // TODO(fabxc): add grace period where we can still append to old head shard + // before actually persisting it. + p := filepath.Join(s.path, fmt.Sprintf("%d", head.baseTimestamp)) + + if err := os.MkdirAll(p, 0777); err != nil { + return err + } + + f, err := os.Create(filepath.Join(p, "series")) + if err != nil { + return err + } + + bw := &blockWriter{block: head} + n, err := bw.writeSeries(f) + if err != nil { + return err + } + + if err := f.Sync(); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + sz := fmt.Sprintf("%fMiB", float64(n)/1024/1024) + + s.logger.With("size", sz). + With("samples", head.samples). + With("chunks", head.stats().chunks). + Debug("persisted head") + + return nil +} + // chunkDesc wraps a plain data chunk and provides cached meta data about it. type chunkDesc struct { lset Labels @@ -348,27 +456,3 @@ func (v *Vector) Add(lset Labels, val float64) { Value: val, }) } - -// AppendVector adds values for a list of label sets for the given timestamp -// in milliseconds. -func (db *DB) AppendVector(ts int64, v *Vector) error { - // Sequentially add samples to shards. - for s, bkt := range v.Buckets { - shard := db.shards[s] - - // TODO(fabxc): benchmark whether grouping into shards and submitting to - // shards in batches is more efficient. - shard.head.mtx.Lock() - - for _, smpl := range bkt { - if err := shard.head.append(smpl.Hash, smpl.Labels, ts, smpl.Value); err != nil { - shard.head.mtx.Unlock() - // TODO(fabxc): handle gracefully and collect multi-error. - return err - } - } - shard.head.mtx.Unlock() - } - - return nil -} diff --git a/head.go b/head.go index 7b191fb629..a07743be21 100644 --- a/head.go +++ b/head.go @@ -13,13 +13,17 @@ type HeadBlock struct { descs map[uint64][]*chunkDesc // labels hash to possible chunks descs index *memIndex - samples uint64 // total samples in the block. + samples uint64 // total samples in the block + highTimestamp int64 // highest timestamp of any sample + baseTimestamp int64 // all samples are strictly later } -func NewHeadBlock() *HeadBlock { +// NewHeadBlock creates a new empty head block. +func NewHeadBlock(baseTime int64) *HeadBlock { return &HeadBlock{ - descs: make(map[uint64][]*chunkDesc, 2048), - index: newMemIndex(), + descs: make(map[uint64][]*chunkDesc, 2048), + index: newMemIndex(), + baseTimestamp: baseTime, } } @@ -54,7 +58,7 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error func (h *HeadBlock) stats() *blockStats { return &blockStats{ - series: uint32(h.index.numSeries()), + chunks: uint32(h.index.numSeries()), samples: h.samples, } }