Add per-block state ULID

This commit is contained in:
Fabian Reinartz 2017-02-27 10:46:15 +01:00
parent 9a5dfadb09
commit 306831f151
4 changed files with 46 additions and 23 deletions

View file

@ -7,6 +7,7 @@ import (
"path/filepath" "path/filepath"
"sort" "sort"
"github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -24,16 +25,15 @@ type Block interface {
// Series returns a SeriesReader over the block's data. // Series returns a SeriesReader over the block's data.
Chunks() ChunkReader Chunks() ChunkReader
// Persisted returns whether the block is already persisted,
// and no longer being appended to.
Persisted() bool
// Close releases all underlying resources of the block. // Close releases all underlying resources of the block.
Close() error Close() error
} }
// BlockMeta provides meta information about a block. // BlockMeta provides meta information about a block.
type BlockMeta struct { type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
ULID ulid.ULID `json:"ulid"`
// Sequence number of the block. // Sequence number of the block.
Sequence int `json:"sequence"` Sequence int `json:"sequence"`
@ -118,7 +118,7 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
return nil, err return nil, err
} }
cr, err := newChunkReader(filepath.Join(dir, "chunks")) cr, err := newChunkReader(chunkDir(dir))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -137,28 +137,21 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
} }
func (pb *persistedBlock) Close() error { func (pb *persistedBlock) Close() error {
err0 := pb.chunkr.Close() var merr MultiError
err1 := pb.indexr.Close()
if err0 != nil { merr.Add(pb.chunkr.Close())
return err0 merr.Add(pb.indexr.Close())
}
return err1 return merr.Err()
} }
func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Persisted() bool { return true }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
func chunksFileName(path string) string { func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
return filepath.Join(path, "chunks-000") func walDir(dir string) string { return filepath.Join(dir, "wal") }
}
func indexFileName(path string) string {
return filepath.Join(path, "index-000")
}
type mmapFile struct { type mmapFile struct {
f *os.File f *os.File

View file

@ -1,12 +1,14 @@
package tsdb package tsdb
import ( import (
"math/rand"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -127,12 +129,15 @@ func (c *compactor) match(bs []compactionInfo) bool {
return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange
} }
var entropy = rand.New(rand.NewSource(time.Now().UnixNano()))
func mergeBlockMetas(blocks ...Block) (res BlockMeta) { func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
m0 := blocks[0].Meta() m0 := blocks[0].Meta()
res.Sequence = m0.Sequence res.Sequence = m0.Sequence
res.MinTime = m0.MinTime res.MinTime = m0.MinTime
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
res.ULID = ulid.MustNew(ulid.Now(), entropy)
g := m0.Compaction.Generation g := m0.Compaction.Generation
if g == 0 && len(blocks) > 1 { if g == 0 && len(blocks) > 1 {
@ -163,7 +168,7 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
return err return err
} }
chunkw, err := newChunkWriter(filepath.Join(dir, "chunks")) chunkw, err := newChunkWriter(chunkDir(dir))
if err != nil { if err != nil {
return errors.Wrap(err, "open chunk writer") return errors.Wrap(err, "open chunk writer")
} }

10
db.go
View file

@ -4,6 +4,7 @@ package tsdb
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"math" "math"
"os" "os"
@ -889,3 +890,12 @@ func yoloString(b []byte) string {
} }
return *((*string)(unsafe.Pointer(&h))) return *((*string)(unsafe.Pointer(&h)))
} }
func closeAll(cs ...io.Closer) error {
var merr MultiError
for _, c := range cs {
merr.Add(c.Close())
}
return merr.Err()
}

21
head.go
View file

@ -14,6 +14,7 @@ import (
"github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -62,11 +63,16 @@ type headBlock struct {
} }
func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) {
if err := os.MkdirAll(dir, 0755); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
ulid, err := ulid.New(ulid.Now(), entropy)
if err != nil {
return nil, err return nil, err
} }
if err := writeMetaFile(dir, &BlockMeta{ if err := writeMetaFile(dir, &BlockMeta{
ULID: ulid,
Sequence: seq, Sequence: seq,
MinTime: mint, MinTime: mint,
MaxTime: maxt, MaxTime: maxt,
@ -133,10 +139,19 @@ func (h *headBlock) inBounds(t int64) bool {
// Close syncs all data and closes underlying resources of the head block. // Close syncs all data and closes underlying resources of the head block.
func (h *headBlock) Close() error { func (h *headBlock) Close() error {
if err := writeMetaFile(h.dir, &h.meta); err != nil { if err := h.wal.Close(); err != nil {
return err return err
} }
return h.wal.Close() // Check whether the head block still exists in the underlying dir
// or has already been replaced with a compacted version
meta, err := readMetaFile(h.dir)
if err != nil {
return err
}
if meta.ULID == h.meta.ULID {
return writeMetaFile(h.dir, &h.meta)
}
return nil
} }
func (h *headBlock) Meta() BlockMeta { func (h *headBlock) Meta() BlockMeta {