Properly track and write meta file

This commit is contained in:
Fabian Reinartz 2017-01-19 14:01:38 +01:00
parent 9ddbd64d00
commit d4779b374c
6 changed files with 152 additions and 80 deletions

View file

@ -35,8 +35,11 @@ type Block interface {
// BlockMeta provides meta information about a block.
type BlockMeta struct {
MinTime int64 `json:"minTime,omitempty"`
MaxTime int64 `json:"maxTime,omitempty"`
// MinTime and MaxTime specify the time range all samples
// in the block must be in. If unset, samples can be appended
// freely until they are set.
MinTime *int64 `json:"minTime,omitempty"`
MaxTime *int64 `json:"maxTime,omitempty"`
Stats struct {
NumSamples uint64 `json:"numSamples,omitempty"`
@ -61,12 +64,48 @@ type persistedBlock struct {
}
type blockMeta struct {
*BlockMeta
Version int `json:"version"`
Meta BlockMeta `json:",inline"`
}
const metaFilename = "meta.json"
func readMetaFile(dir string) (*BlockMeta, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
if err != nil {
return nil, err
}
var m blockMeta
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
if m.Version != 1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}
return m.BlockMeta, nil
}
func writeMetaFile(dir string, meta *BlockMeta) error {
f, err := os.Create(filepath.Join(dir, metaFilename))
if err != nil {
return err
}
enc := json.NewEncoder(f)
enc.SetIndent("", "\t")
if err := enc.Encode(&blockMeta{Version: 1, BlockMeta: meta}); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
return nil
}
func newPersistedBlock(dir string) (*persistedBlock, error) {
// TODO(fabxc): validate match of name and stats time, validate magic.
@ -89,22 +128,20 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
return nil, errors.Wrap(err, "create index reader")
}
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
}
pb := &persistedBlock{
dir: dir,
meta: *meta,
chunksf: chunksf,
indexf: indexf,
chunkr: sr,
indexr: ir,
}
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
if err != nil {
return nil, err
}
if err := json.Unmarshal(b, &pb.meta); err != nil {
return nil, err
}
return pb, nil
}

View file

@ -147,13 +147,10 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
return errors.Wrap(err, "create index file")
}
indexw, err := newIndexWriter(indexf)
if err != nil {
return errors.Wrap(err, "open index writer")
}
indexw := newIndexWriter(indexf)
chunkw := newSeriesWriter(chunkf, indexw)
if err = c.write(blocks, indexw, chunkw); err != nil {
if err = c.write(dir, blocks, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
}
@ -178,7 +175,7 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
return nil
}
func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWriter) error {
func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw SeriesWriter) error {
var set compactionSet
for i, b := range blocks {
@ -260,7 +257,8 @@ func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWrite
if err := indexw.WritePostings("", "", newListPostings(all)); err != nil {
return err
}
return nil
return writeMetaFile(dir, &meta)
}
type compactionSet interface {

43
db.go
View file

@ -281,7 +281,7 @@ func (db *DB) initBlocks() error {
for _, dir := range dirs {
if fileutil.Exist(filepath.Join(dir, walFileName)) {
h, err := openHeadBlock(dir, db.logger, nil)
h, err := openHeadBlock(dir, db.logger)
if err != nil {
return err
}
@ -365,7 +365,6 @@ func (a *dbAppender) Add(ref uint64, t int64, v float64) error {
if gen != a.gen {
return ErrNotFound
}
a.db.metrics.samplesAppended.Inc()
return a.head.Add(ref, t, v)
}
@ -426,13 +425,12 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
for _, b := range db.persisted {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
if intervalOverlap(mint, maxt, *m.MinTime, *m.MaxTime) {
bs = append(bs, b)
}
}
for _, b := range db.heads {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
if intervalOverlap(mint, maxt, b.mint, b.maxt) {
bs = append(bs, b)
}
}
@ -443,22 +441,33 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
// cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period.
func (db *DB) cut() (*headBlock, error) {
var mint *int64
// If a previous block exists, fix its max time and and take the
// timestamp after as the minimum for the new head.
if len(db.heads) > 0 {
cur := db.heads[len(db.heads)-1]
cur.metamtx.Lock()
if cur.meta.MinTime == nil {
mt := cur.mint
cur.meta.MinTime = &mt
}
cur.meta.MaxTime = new(int64)
mt := cur.maxt + 1
cur.meta.MaxTime = &mt
mint = &mt
cur.metamtx.Unlock()
}
dir, err := nextBlockDir(db.dir)
if err != nil {
return nil, err
}
// If its not the very first head block, all its samples must not be
// larger than
var minTime *int64
if len(db.heads) > 0 {
cb := db.heads[len(db.heads)-1]
minTime = new(int64)
*minTime = cb.Meta().MaxTime + 1
}
newHead, err := openHeadBlock(dir, db.logger, minTime)
newHead, err := createHeadBlock(dir, db.logger, mint)
if err != nil {
return nil, err
}

89
head.go
View file

@ -1,13 +1,10 @@
package tsdb
import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"sync"
"time"
@ -40,10 +37,7 @@ var (
type headBlock struct {
mtx sync.RWMutex
dir string
// Head blocks are initialized with a minimum timestamp if they were preceded
// by another block. Appended samples must not have a smaller timestamp than this.
minTime *int64
wal *WAL
// descs holds all chunk descs for the head block. Each chunk implicitly
// is assigned the index as its ID.
@ -58,14 +52,24 @@ type headBlock struct {
values map[string]stringset // label names to possible values
postings *memPostings // postings lists for terms
wal *WAL
metamtx sync.RWMutex
meta BlockMeta
mint, maxt int64 // timestamp range of current samples
}
func createHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error) {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
if err := writeMetaFile(dir, &BlockMeta{MinTime: minTime}); err != nil {
return nil, err
}
return openHeadBlock(dir, l)
}
// openHeadBlock creates a new empty head block.
func openHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error) {
func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
@ -74,24 +78,22 @@ func openHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error)
if err != nil {
return nil, err
}
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
}
h := &headBlock{
dir: dir,
minTime: minTime,
wal: wal,
series: []*memSeries{},
hashes: map[uint64][]*memSeries{},
values: map[string]stringset{},
postings: &memPostings{m: make(map[term][]uint32)},
wal: wal,
mapper: newPositionMapper(nil),
}
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
if err != nil {
return nil, err
}
if err := json.Unmarshal(b, &h.meta); err != nil {
return nil, err
meta: *meta,
mint: math.MaxInt64,
maxt: math.MinInt64,
}
// Replay contents of the write ahead log.
@ -99,23 +101,22 @@ func openHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error)
series: func(lset labels.Labels) error {
h.create(lset.Hash(), lset)
h.meta.Stats.NumSeries++
return nil
},
sample: func(s refdSample) error {
h.series[s.ref].append(s.t, s.v)
if h.minTime != nil && s.t < *h.minTime {
return errors.Errorf("sample earlier than minimum timestamp %d", *h.minTime)
if !h.inBounds(s.t) {
return ErrOutOfBounds
}
if s.t < h.meta.MinTime {
h.meta.MinTime = s.t
if s.t < h.mint {
h.mint = s.t
}
if s.t > h.meta.MaxTime {
h.meta.MaxTime = s.t
if s.t > h.maxt {
h.maxt = s.t
}
h.meta.Stats.NumSamples++
return nil
},
}); err != nil {
@ -127,6 +128,12 @@ func openHeadBlock(dir string, l log.Logger, minTime *int64) (*headBlock, error)
return h, nil
}
// inBounds returns true if the given timestamp is within the valid
// time bounds of the block.
func (h *headBlock) inBounds(t int64) bool {
return h.meta.MinTime != nil && t < *h.meta.MinTime
}
// Close syncs all data and closes underlying resources of the head block.
func (h *headBlock) Close() error {
return h.wal.Close()
@ -294,12 +301,6 @@ func (a *headAppender) Commit() error {
a.createSeries()
var (
total = uint64(len(a.samples))
mint = int64(math.MaxInt64)
maxt = int64(math.MinInt64)
)
for i := range a.samples {
s := &a.samples[i]
@ -315,10 +316,22 @@ func (a *headAppender) Commit() error {
return err
}
var (
total = uint64(len(a.samples))
mint = int64(math.MaxInt64)
maxt = int64(math.MinInt64)
)
for _, s := range a.samples {
if !a.series[s.ref].append(s.t, s.v) {
total--
}
if s.t < mint {
mint = s.t
}
if s.t > maxt {
maxt = s.t
}
}
a.mtx.RUnlock()
@ -329,11 +342,11 @@ func (a *headAppender) Commit() error {
a.meta.Stats.NumSamples += total
a.meta.Stats.NumSeries += uint64(len(a.newSeries))
if mint < a.meta.MinTime {
a.meta.MinTime = mint
if mint < a.mint {
a.mint = mint
}
if maxt > a.meta.MaxTime {
a.meta.MaxTime = maxt
if maxt > a.maxt {
a.maxt = maxt
}
return nil

View file

@ -178,14 +178,17 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
}
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
if int(o) > len(r.b) {
return "", errors.Errorf("invalid symbol offset %d", o)
}
l, n := binary.Uvarint(r.b[o:])
if n < 0 {
return "", fmt.Errorf("reading symbol length failed")
return "", errors.New("reading symbol length failed")
}
end := int(o) + n + int(l)
if end > len(r.b) {
return "", fmt.Errorf("invalid length")
return "", errors.New("invalid length")
}
b := r.b[int(o)+n : end]

View file

@ -191,15 +191,14 @@ type indexWriter struct {
postings []hashEntry // postings lists offsets
}
func newIndexWriter(w io.Writer) (*indexWriter, error) {
ix := &indexWriter{
func newIndexWriter(w io.Writer) *indexWriter {
return &indexWriter{
w: ioutil.NewPageWriter(w, compactionPageBytes, 0),
ow: w,
n: 0,
symbols: make(map[string]uint32, 4096),
series: make(map[uint32]*indexWriterSeries, 4096),
}
return ix, ix.writeMeta()
}
func (w *indexWriter) write(wr io.Writer, b []byte) error {
@ -336,6 +335,19 @@ func (w *indexWriter) writeSeries() error {
}
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
if !w.started {
if err := w.writeMeta(); err != nil {
return err
}
if err := w.writeSymbols(); err != nil {
return err
}
if err := w.writeSeries(); err != nil {
return err
}
w.started = true
}
valt, err := newStringTuples(values, len(names))
if err != nil {
return err