Move stats into meta.json file, cleanup, docs

This commit is contained in:
Fabian Reinartz 2017-01-19 11:22:47 +01:00
parent 2f02f86b62
commit 9ddbd64d00
8 changed files with 207 additions and 233 deletions

View file

@ -1,10 +1,11 @@
package tsdb
import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"sort"
"sync"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/pkg/errors"
@ -12,23 +13,36 @@ import (
// Block handles reads against a Block of time series data.
type Block interface {
// Directory where block data is stored.
Dir() string
Stats() BlockStats
// Stats returns statistics about the block.
Meta() BlockMeta
// Index returns an IndexReader over the block's data.
Index() IndexReader
// Series returns a SeriesReader over the block's data.
Series() SeriesReader
// Persisted returns whether the block is already persisted,
// and no longer being appended to.
Persisted() bool
// Close releases all underlying resources of the block.
Close() error
}
// BlockStats provides stats on a data block.
type BlockStats struct {
MinTime, MaxTime int64 // time range of samples in the block
// BlockMeta provides meta information about a block.
type BlockMeta struct {
MinTime int64 `json:"minTime,omitempty"`
MaxTime int64 `json:"maxTime,omitempty"`
SampleCount uint64
SeriesCount uint64
ChunkCount uint64
mtx sync.RWMutex
Stats struct {
NumSamples uint64 `json:"numSamples,omitempty"`
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
} `json:"stats,omitempty"`
}
const (
@ -37,8 +51,8 @@ const (
)
type persistedBlock struct {
dir string
stats *BlockStats
dir string
meta BlockMeta
chunksf, indexf *mmapFile
@ -46,6 +60,13 @@ type persistedBlock struct {
indexr *indexReader
}
type blockMeta struct {
Version int `json:"version"`
Meta BlockMeta `json:",inline"`
}
const metaFilename = "meta.json"
func newPersistedBlock(dir string) (*persistedBlock, error) {
// TODO(fabxc): validate match of name and stats time, validate magic.
@ -68,19 +89,22 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
return nil, errors.Wrap(err, "create index reader")
}
stats, err := ir.Stats()
if err != nil {
return nil, errors.Wrap(err, "read stats")
}
pb := &persistedBlock{
dir: dir,
chunksf: chunksf,
indexf: indexf,
chunkr: sr,
indexr: ir,
stats: &stats,
}
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
if err != nil {
return nil, err
}
if err := json.Unmarshal(b, &pb.meta); err != nil {
return nil, err
}
return pb, nil
}
@ -98,7 +122,7 @@ func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Persisted() bool { return true }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Series() SeriesReader { return pb.chunkr }
func (pb *persistedBlock) Stats() BlockStats { return *pb.stats }
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
func chunksFileName(path string) string {
return filepath.Join(path, "chunks-000")

View file

@ -93,11 +93,11 @@ func compactionMatch(blocks []Block) bool {
// Naively check whether both blocks have roughly the same number of samples
// and whether the total sample count doesn't exceed 2GB chunk file size
// by rough approximation.
n := float64(blocks[0].Stats().SampleCount)
n := float64(blocks[0].Meta().Stats.NumSamples)
t := n
for _, b := range blocks[1:] {
m := float64(b.Stats().SampleCount)
m := float64(b.Meta().Stats.NumSamples)
if m < 0.7*n || m > 1.3*n {
return false
@ -109,12 +109,12 @@ func compactionMatch(blocks []Block) bool {
return t < 10*200e6
}
func mergeStats(blocks ...Block) (res BlockStats) {
res.MinTime = blocks[0].Stats().MinTime
res.MaxTime = blocks[len(blocks)-1].Stats().MaxTime
func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
res.MinTime = blocks[0].Meta().MinTime
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
for _, b := range blocks {
res.SampleCount += b.Stats().SampleCount
res.Stats.NumSamples += b.Meta().Stats.NumSamples
}
return res
}
@ -147,7 +147,10 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
return errors.Wrap(err, "create index file")
}
indexw := newIndexWriter(indexf)
indexw, err := newIndexWriter(indexf)
if err != nil {
return errors.Wrap(err, "open index writer")
}
chunkw := newSeriesWriter(chunkf, indexw)
if err = c.write(blocks, indexw, chunkw); err != nil {
@ -204,7 +207,7 @@ func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWrite
postings = &memPostings{m: make(map[term][]uint32, 512)}
values = map[string]stringset{}
i = uint32(0)
stats = mergeStats(blocks...)
meta = mergeBlockMetas(blocks...)
)
for set.Next() {
@ -213,8 +216,8 @@ func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWrite
return err
}
stats.ChunkCount += uint64(len(chunks))
stats.SeriesCount++
meta.Stats.NumChunks += uint64(len(chunks))
meta.Stats.NumSeries++
for _, l := range lset {
valset, ok := values[l.Name]
@ -232,10 +235,6 @@ func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWrite
return set.Err()
}
if err := indexw.WriteStats(stats); err != nil {
return err
}
s := make([]string, 0, 256)
for n, v := range values {
s = s[:0]

130
db.go
View file

@ -152,7 +152,7 @@ func (db *DB) run() {
select {
case <-db.cutc:
db.mtx.Lock()
err := db.cut()
_, err := db.cut()
db.mtx.Unlock()
if err != nil {
@ -268,34 +268,6 @@ func (db *DB) compact(i, j int) error {
return nil
}
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
}
if !strings.HasPrefix(fi.Name(), "b-") {
return false
}
if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil {
return false
}
return true
}
func blockDirs(dir string) ([]string, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
var dirs []string
for _, fi := range files {
if isBlockDir(fi) {
dirs = append(dirs, filepath.Join(dir, fi.Name()))
}
}
return dirs, nil
}
func (db *DB) initBlocks() error {
var (
persisted []*persistedBlock
@ -309,7 +281,7 @@ func (db *DB) initBlocks() error {
for _, dir := range dirs {
if fileutil.Exist(filepath.Join(dir, walFileName)) {
h, err := openHeadBlock(dir, db.logger)
h, err := openHeadBlock(dir, db.logger, nil)
if err != nil {
return err
}
@ -327,9 +299,9 @@ func (db *DB) initBlocks() error {
db.heads = heads
if len(heads) == 0 {
return db.cut()
_, err = db.cut()
}
return nil
return err
}
// Close the partition.
@ -418,24 +390,6 @@ func (a *dbAppender) Rollback() error {
return err
}
func (db *DB) headForDir(dir string) (int, bool) {
for i, b := range db.heads {
if b.Dir() == dir {
return i, true
}
}
return -1, false
}
func (db *DB) persistedForDir(dir string) (int, bool) {
for i, b := range db.persisted {
if b.Dir() == dir {
return i, true
}
}
return -1, false
}
func (db *DB) compactable() []Block {
db.mtx.RLock()
defer db.mtx.RUnlock()
@ -471,14 +425,14 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
var bs []Block
for _, b := range db.persisted {
s := b.Stats()
if intervalOverlap(mint, maxt, s.MinTime, s.MaxTime) {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
bs = append(bs, b)
}
}
for _, b := range db.heads {
s := b.Stats()
if intervalOverlap(mint, maxt, s.MinTime, s.MaxTime) {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
bs = append(bs, b)
}
}
@ -488,23 +442,62 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
// cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period.
func (db *DB) cut() error {
dir, err := db.nextBlockDir()
func (db *DB) cut() (*headBlock, error) {
dir, err := nextBlockDir(db.dir)
if err != nil {
return err
return nil, err
}
newHead, err := openHeadBlock(dir, db.logger)
// If its not the very first head block, all its samples must not be
// larger than
var minTime *int64
if len(db.heads) > 0 {
cb := db.heads[len(db.heads)-1]
minTime = new(int64)
*minTime = cb.Meta().MaxTime + 1
}
newHead, err := openHeadBlock(dir, db.logger, minTime)
if err != nil {
return err
return nil, err
}
db.heads = append(db.heads, newHead)
db.headGen++
return nil
return newHead, nil
}
func (db *DB) nextBlockDir() (string, error) {
names, err := fileutil.ReadDir(db.dir)
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
}
if !strings.HasPrefix(fi.Name(), "b-") {
return false
}
if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil {
return false
}
return true
}
func blockDirs(dir string) ([]string, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
var dirs []string
for _, fi := range files {
if isBlockDir(fi) {
dirs = append(dirs, filepath.Join(dir, fi.Name()))
}
}
return dirs, nil
}
func nextBlockDir(dir string) (string, error) {
names, err := fileutil.ReadDir(dir)
if err != nil {
return "", err
}
@ -520,7 +513,7 @@ func (db *DB) nextBlockDir() (string, error) {
}
i = j
}
return filepath.Join(db.dir, fmt.Sprintf("b-%0.6d", i+1)), nil
return filepath.Join(dir, fmt.Sprintf("b-%0.6d", i+1)), nil
}
// PartitionedDB is a time series storage.
@ -691,14 +684,3 @@ func yoloString(b []byte) string {
}
return *((*string)(unsafe.Pointer(&h)))
}
func yoloBytes(s string) []byte {
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
h := reflect.SliceHeader{
Cap: sh.Len,
Len: sh.Len,
Data: sh.Data,
}
return *((*[]byte)(unsafe.Pointer(&h)))
}

125
head.go
View file

@ -1,10 +1,13 @@
package tsdb
import (
"errors"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"sync"
"time"
@ -13,6 +16,7 @@ import (
"github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
)
var (
@ -37,6 +41,10 @@ type headBlock struct {
mtx sync.RWMutex
dir string
// Head blocks are initialized with a minimum timestamp if they were preceded
// by another block. Appended samples must not have a smaller timestamp than this.
minTime *int64
// descs holds all chunk descs for the head block. Each chunk implicitly
// is assigned the index as its ID.
series []*memSeries
@ -52,18 +60,24 @@ type headBlock struct {
wal *WAL
stats *BlockStats
metamtx sync.RWMutex
meta BlockMeta
}
// openHeadBlock creates a new empty head block.
func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
func openHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error) {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 5*time.Second)
if err != nil {
return nil, err
}
b := &headBlock{
h := &headBlock{
dir: dir,
minTime: minTime,
series: []*memSeries{},
hashes: map[uint64][]*memSeries{},
values: map[string]stringset{},
@ -71,35 +85,46 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
wal: wal,
mapper: newPositionMapper(nil),
}
b.stats = &BlockStats{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
}
err = wal.ReadAll(&walHandler{
series: func(lset labels.Labels) {
b.create(lset.Hash(), lset)
b.stats.SeriesCount++
},
sample: func(s refdSample) {
b.series[s.ref].append(s.t, s.v)
if s.t < b.stats.MinTime {
b.stats.MinTime = s.t
}
if s.t > b.stats.MaxTime {
b.stats.MaxTime = s.t
}
b.stats.SampleCount++
},
})
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
if err != nil {
return nil, err
}
if err := json.Unmarshal(b, &h.meta); err != nil {
return nil, err
}
b.updateMapping()
// Replay contents of the write ahead log.
if err = wal.ReadAll(&walHandler{
series: func(lset labels.Labels) error {
h.create(lset.Hash(), lset)
h.meta.Stats.NumSeries++
return b, nil
return nil
},
sample: func(s refdSample) error {
h.series[s.ref].append(s.t, s.v)
if h.minTime != nil && s.t < *h.minTime {
return errors.Errorf("sample earlier than minimum timestamp %d", *h.minTime)
}
if s.t < h.meta.MinTime {
h.meta.MinTime = s.t
}
if s.t > h.meta.MaxTime {
h.meta.MaxTime = s.t
}
h.meta.Stats.NumSamples++
return nil
},
}); err != nil {
return nil, err
}
h.updateMapping()
return h, nil
}
// Close syncs all data and closes underlying resources of the head block.
@ -107,19 +132,18 @@ func (h *headBlock) Close() error {
return h.wal.Close()
}
func (h *headBlock) Meta() BlockMeta {
h.metamtx.RLock()
defer h.metamtx.RUnlock()
return h.meta
}
func (h *headBlock) Dir() string { return h.dir }
func (h *headBlock) Persisted() bool { return false }
func (h *headBlock) Index() IndexReader { return &headIndexReader{h} }
func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} }
// Stats returns statisitics about the indexed data.
func (h *headBlock) Stats() BlockStats {
h.stats.mtx.RLock()
defer h.stats.mtx.RUnlock()
return *h.stats
}
func (h *headBlock) Appender() Appender {
h.mtx.RLock()
return &headAppender{headBlock: h, samples: getHeadAppendBuffer()}
@ -194,7 +218,7 @@ func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error
}
func (a *headAppender) Add(ref uint64, t int64, v float64) error {
// We only own the first 5 bytes of the reference. Anything before is
// We only own the last 5 bytes of the reference. Anything before is
// used by higher-order appenders. We erase it to avoid issues.
ref = (ref << 24) >> 24
@ -287,6 +311,7 @@ func (a *headAppender) Commit() error {
// Write all new series and samples to the WAL and add it to the
// in-mem database on success.
if err := a.wal.Log(a.newLabels, a.samples); err != nil {
a.mtx.RUnlock()
return err
}
@ -298,17 +323,17 @@ func (a *headAppender) Commit() error {
a.mtx.RUnlock()
a.stats.mtx.Lock()
defer a.stats.mtx.Unlock()
a.metamtx.Lock()
defer a.metamtx.Unlock()
a.stats.SampleCount += total
a.stats.SeriesCount += uint64(len(a.newSeries))
a.meta.Stats.NumSamples += total
a.meta.Stats.NumSeries += uint64(len(a.newSeries))
if mint < a.stats.MinTime {
a.stats.MinTime = mint
if mint < a.meta.MinTime {
a.meta.MinTime = mint
}
if maxt > a.stats.MaxTime {
a.stats.MaxTime = maxt
if maxt > a.meta.MaxTime {
a.meta.MaxTime = maxt
}
return nil
@ -420,12 +445,6 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) {
return res, nil
}
func (h *headIndexReader) Stats() (BlockStats, error) {
h.stats.mtx.RLock()
defer h.stats.mtx.RUnlock()
return *h.stats, nil
}
// get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet.
func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries {
@ -467,10 +486,10 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries {
}
func (h *headBlock) fullness() float64 {
h.stats.mtx.RLock()
defer h.stats.mtx.RUnlock()
h.metamtx.RLock()
defer h.metamtx.RUnlock()
return float64(h.stats.SampleCount) / float64(h.stats.SeriesCount+1) / 250
return float64(h.meta.Stats.NumSamples) / float64(h.meta.Stats.NumSeries+1) / 250
}
func (h *headBlock) updateMapping() {

View file

@ -53,9 +53,6 @@ func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) {
// IndexReader provides reading access of serialized index data.
type IndexReader interface {
// Stats returns statisitics about the indexed data.
Stats() (BlockStats, error)
// LabelValues returns the possible label values
LabelValues(names ...string) (StringTuples, error)
@ -195,28 +192,6 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) {
return yoloString(b), nil
}
func (r *indexReader) Stats() (BlockStats, error) {
flag, b, err := r.section(8)
if err != nil {
return BlockStats{}, err
}
if flag != flagStd {
return BlockStats{}, errInvalidFlag
}
if len(b) != 64 {
return BlockStats{}, errInvalidSize
}
return BlockStats{
MinTime: int64(binary.BigEndian.Uint64(b)),
MaxTime: int64(binary.BigEndian.Uint64(b[8:])),
SeriesCount: binary.BigEndian.Uint64(b[16:]),
ChunkCount: binary.BigEndian.Uint64(b[24:]),
SampleCount: binary.BigEndian.Uint64(b[32:]),
}, nil
}
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
key := strings.Join(names, string(sep))
off, ok := r.labels[key]

12
wal.go
View file

@ -88,8 +88,8 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error
}
type walHandler struct {
sample func(refdSample)
series func(labels.Labels)
sample func(refdSample) error
series func(labels.Labels) error
}
// ReadAll consumes all entries in the WAL and triggers the registered handlers.
@ -343,7 +343,9 @@ func (d *walDecoder) decodeSeries(flag byte, b []byte) error {
b = b[n+int(vl):]
}
d.handler.series(lset)
if err := d.handler.series(lset); err != nil {
return err
}
}
return nil
}
@ -382,7 +384,9 @@ func (d *walDecoder) decodeSamples(flag byte, b []byte) error {
smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
b = b[8:]
d.handler.sample(smpl)
if err := d.handler.sample(smpl); err != nil {
return err
}
}
return nil
}

View file

@ -118,8 +118,14 @@ func BenchmarkWALRead(b *testing.B) {
var numSeries, numSamples int
err = wal.ReadAll(&walHandler{
series: func(lset labels.Labels) { numSeries++ },
sample: func(smpl refdSample) { numSamples++ },
series: func(lset labels.Labels) error {
numSeries++
return nil
},
sample: func(smpl refdSample) error {
numSamples++
return nil
},
})
require.NoError(b, err)

View file

@ -2,7 +2,6 @@ package tsdb
import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"sort"
@ -156,9 +155,6 @@ type IndexWriter interface {
// list iterator. It only has to be available during the write processing.
AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta)
// WriteStats writes final stats for the indexed block.
WriteStats(BlockStats) error
// WriteLabelIndex serializes an index from label names to values.
// The passed in values chained tuples of strings of the length of names.
WriteLabelIndex(names []string, values []string) error
@ -183,9 +179,10 @@ type indexWriterSeries struct {
// indexWriter implements the IndexWriter interface for the standard
// serialization format.
type indexWriter struct {
ow io.Writer
w *ioutil.PageWriter
n int64
ow io.Writer
w *ioutil.PageWriter
n int64
started bool
series map[uint32]*indexWriterSeries
@ -194,14 +191,15 @@ type indexWriter struct {
postings []hashEntry // postings lists offsets
}
func newIndexWriter(w io.Writer) *indexWriter {
return &indexWriter{
func newIndexWriter(w io.Writer) (*indexWriter, error) {
ix := &indexWriter{
w: ioutil.NewPageWriter(w, compactionPageBytes, 0),
ow: w,
n: 0,
symbols: make(map[string]uint32, 4096),
series: make(map[uint32]*indexWriterSeries, 4096),
}
return ix, ix.writeMeta()
}
func (w *indexWriter) write(wr io.Writer, b []byte) error {
@ -253,39 +251,6 @@ func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkM
}
}
func (w *indexWriter) WriteStats(stats BlockStats) error {
if w.n != 0 {
return fmt.Errorf("WriteStats must be called first")
}
if err := w.writeMeta(); err != nil {
return err
}
b := [64]byte{}
binary.BigEndian.PutUint64(b[0:], uint64(stats.MinTime))
binary.BigEndian.PutUint64(b[8:], uint64(stats.MaxTime))
binary.BigEndian.PutUint64(b[16:], stats.SeriesCount)
binary.BigEndian.PutUint64(b[24:], stats.ChunkCount)
binary.BigEndian.PutUint64(b[32:], stats.SampleCount)
err := w.section(64, flagStd, func(wr io.Writer) error {
return w.write(wr, b[:])
})
if err != nil {
return err
}
if err := w.writeSymbols(); err != nil {
return err
}
if err := w.writeSeries(); err != nil {
return err
}
return nil
}
func (w *indexWriter) writeSymbols() error {
// Generate sorted list of strings we will store as reference table.
symbols := make([]string, 0, len(w.symbols))