Segment chunk file

This adds write path support for segmented chunk data files.
Files of 512MB are pre-allocated and written to. If the file size
is exceeded, the next file is started. On completion, files
are truncated to their final size.
This commit is contained in:
Fabian Reinartz 2017-02-23 10:50:22 +01:00
parent a3b47c4929
commit 78780cd2ba
7 changed files with 209 additions and 107 deletions

View file

@ -22,7 +22,7 @@ type Block interface {
Index() IndexReader Index() IndexReader
// Series returns a SeriesReader over the block's data. // Series returns a SeriesReader over the block's data.
Series() SeriesReader Chunks() ChunkReader
// Persisted returns whether the block is already persisted, // Persisted returns whether the block is already persisted,
// and no longer being appended to. // and no longer being appended to.
@ -64,9 +64,9 @@ type persistedBlock struct {
dir string dir string
meta BlockMeta meta BlockMeta
chunksf, indexf *mmapFile indexf *mmapFile
chunkr *seriesReader chunkr *chunkReader
indexr *indexReader indexr *indexReader
} }
@ -120,19 +120,19 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
return nil, err return nil, err
} }
chunksf, err := openMmapFile(chunksFileName(dir)) cr, err := newChunkReader(filepath.Join(dir, "chunks"))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "open chunk file") return nil, err
} }
// ir, err := newIndexReader(dir)
// if err != nil {
// return nil, err
// }
indexf, err := openMmapFile(indexFileName(dir)) indexf, err := openMmapFile(indexFileName(dir))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "open index file") return nil, errors.Wrap(err, "open index file")
} }
sr, err := newSeriesReader([][]byte{chunksf.b})
if err != nil {
return nil, errors.Wrap(err, "create series reader")
}
ir, err := newIndexReader(indexf.b) ir, err := newIndexReader(indexf.b)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "create index reader") return nil, errors.Wrap(err, "create index reader")
@ -141,16 +141,15 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
pb := &persistedBlock{ pb := &persistedBlock{
dir: dir, dir: dir,
meta: *meta, meta: *meta,
chunksf: chunksf,
indexf: indexf, indexf: indexf,
chunkr: sr, chunkr: cr,
indexr: ir, indexr: ir,
} }
return pb, nil return pb, nil
} }
func (pb *persistedBlock) Close() error { func (pb *persistedBlock) Close() error {
err0 := pb.chunksf.Close() err0 := pb.chunkr.Close()
err1 := pb.indexf.Close() err1 := pb.indexf.Close()
if err0 != nil { if err0 != nil {
@ -162,7 +161,7 @@ func (pb *persistedBlock) Close() error {
func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Persisted() bool { return true } func (pb *persistedBlock) Persisted() bool { return true }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Series() SeriesReader { return pb.chunkr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
func chunksFileName(path string) string { func chunksFileName(path string) string {

View file

@ -64,7 +64,7 @@ type compactionInfo struct {
mint, maxt int64 mint, maxt int64
} }
const compactionBlocksLen = 4 const compactionBlocksLen = 3
// pick returns a range [i, j) in the blocks that are suitable to be compacted // pick returns a range [i, j) in the blocks that are suitable to be compacted
// into a single block at position i. // into a single block at position i.
@ -114,9 +114,6 @@ func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) {
func (c *compactor) match(bs []compactionInfo) bool { func (c *compactor) match(bs []compactionInfo) bool {
g := bs[0].generation g := bs[0].generation
if g >= 5 {
return false
}
for _, b := range bs { for _, b := range bs {
if b.generation == 0 { if b.generation == 0 {
@ -166,17 +163,16 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
return err return err
} }
chunkf, err := os.OpenFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return errors.Wrap(err, "create chunk file")
}
indexf, err := os.OpenFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) indexf, err := os.OpenFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil { if err != nil {
return errors.Wrap(err, "create index file") return errors.Wrap(err, "create index file")
} }
indexw := newIndexWriter(indexf) indexw := newIndexWriter(indexf)
chunkw := newChunkWriter(chunkf) chunkw, err := newChunkWriter(filepath.Join(dir, "chunks"))
if err != nil {
return errors.Wrap(err, "open chunk writer")
}
if err = c.write(dir, blocks, indexw, chunkw); err != nil { if err = c.write(dir, blocks, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction") return errors.Wrap(err, "write compaction")
@ -188,15 +184,9 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) {
if err = indexw.Close(); err != nil { if err = indexw.Close(); err != nil {
return errors.Wrap(err, "close index writer") return errors.Wrap(err, "close index writer")
} }
if err = fileutil.Fsync(chunkf); err != nil {
return errors.Wrap(err, "fsync chunk file")
}
if err = fileutil.Fsync(indexf); err != nil { if err = fileutil.Fsync(indexf); err != nil {
return errors.Wrap(err, "fsync index file") return errors.Wrap(err, "fsync index file")
} }
if err = chunkf.Close(); err != nil {
return errors.Wrap(err, "close chunk file")
}
if err = indexf.Close(); err != nil { if err = indexf.Close(); err != nil {
return errors.Wrap(err, "close index file") return errors.Wrap(err, "close index file")
} }
@ -215,7 +205,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw
if hb, ok := b.(*headBlock); ok { if hb, ok := b.(*headBlock); ok {
all = hb.remapPostings(all) all = hb.remapPostings(all)
} }
s := newCompactionSeriesSet(b.Index(), b.Series(), all) s := newCompactionSeriesSet(b.Index(), b.Chunks(), all)
if i == 0 { if i == 0 {
set = s set = s
@ -300,17 +290,17 @@ type compactionSet interface {
type compactionSeriesSet struct { type compactionSeriesSet struct {
p Postings p Postings
index IndexReader index IndexReader
series SeriesReader chunks ChunkReader
l labels.Labels l labels.Labels
c []ChunkMeta c []ChunkMeta
err error err error
} }
func newCompactionSeriesSet(i IndexReader, s SeriesReader, p Postings) *compactionSeriesSet { func newCompactionSeriesSet(i IndexReader, c ChunkReader, p Postings) *compactionSeriesSet {
return &compactionSeriesSet{ return &compactionSeriesSet{
index: i, index: i,
series: s, chunks: c,
p: p, p: p,
} }
} }
@ -327,7 +317,7 @@ func (c *compactionSeriesSet) Next() bool {
for i := range c.c { for i := range c.c {
chk := &c.c[i] chk := &c.c[i]
chk.Chunk, c.err = c.series.Chunk(chk.Ref) chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
if c.err != nil { if c.err != nil {
return false return false
} }

10
db.go
View file

@ -153,8 +153,8 @@ func Open(dir string, l log.Logger, opts *Options) (db *DB, err error) {
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
} }
var r prometheus.Registerer // var r prometheus.Registerer
// r := prometheus.DefaultRegisterer r := prometheus.DefaultRegisterer
if opts == nil { if opts == nil {
opts = DefaultOptions opts = DefaultOptions
@ -307,7 +307,11 @@ func (db *DB) compact(i, j int) error {
return errors.Wrap(err, "removing old block") return errors.Wrap(err, "removing old block")
} }
} }
return db.retentionCutoff() if err := db.retentionCutoff(); err != nil {
return err
}
return nil
} }
func (db *DB) retentionCutoff() error { func (db *DB) retentionCutoff() error {

View file

@ -149,7 +149,7 @@ func (h *headBlock) Meta() BlockMeta {
func (h *headBlock) Dir() string { return h.dir } func (h *headBlock) Dir() string { return h.dir }
func (h *headBlock) Persisted() bool { return false } func (h *headBlock) Persisted() bool { return false }
func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *headBlock) Index() IndexReader { return &headIndexReader{h} }
func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} } func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} }
func (h *headBlock) Appender() Appender { func (h *headBlock) Appender() Appender {
atomic.AddUint64(&h.activeWriters, 1) atomic.AddUint64(&h.activeWriters, 1)
@ -359,12 +359,12 @@ func (a *headAppender) Rollback() error {
return nil return nil
} }
type headSeriesReader struct { type headChunkReader struct {
*headBlock *headBlock
} }
// Chunk returns the chunk for the reference number. // Chunk returns the chunk for the reference number.
func (h *headSeriesReader) Chunk(ref uint64) (chunks.Chunk, error) { func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
h.mtx.RLock() h.mtx.RLock()
defer h.mtx.RUnlock() defer h.mtx.RUnlock()

View file

@ -59,7 +59,7 @@ func (s *DB) Querier(mint, maxt int64) Querier {
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
index: b.Index(), index: b.Index(),
series: b.Series(), chunks: b.Chunks(),
} }
// TODO(fabxc): find nicer solution. // TODO(fabxc): find nicer solution.
@ -123,19 +123,19 @@ func (q *querier) Close() error {
// blockQuerier provides querying access to a single block database. // blockQuerier provides querying access to a single block database.
type blockQuerier struct { type blockQuerier struct {
index IndexReader index IndexReader
series SeriesReader chunks ChunkReader
postingsMapper func(Postings) Postings postingsMapper func(Postings) Postings
mint, maxt int64 mint, maxt int64
} }
func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQuerier { func newBlockQuerier(ix IndexReader, c ChunkReader, mint, maxt int64) *blockQuerier {
return &blockQuerier{ return &blockQuerier{
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
index: ix, index: ix,
series: s, chunks: c,
} }
} }
@ -162,7 +162,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
return &blockSeriesSet{ return &blockSeriesSet{
index: q.index, index: q.index,
chunks: q.series, chunks: q.chunks,
it: p, it: p,
absent: absent, absent: absent,
mint: q.mint, mint: q.mint,
@ -425,7 +425,7 @@ func (s *partitionSeriesSet) Next() bool {
// blockSeriesSet is a set of series from an inverted index query. // blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct { type blockSeriesSet struct {
index IndexReader index IndexReader
chunks SeriesReader chunks ChunkReader
it Postings // postings list referencing series it Postings // postings list referencing series
absent []string // labels that must not be set for result series absent []string // labels that must not be set for result series
mint, maxt int64 // considered time range mint, maxt int64 // considered time range

View file

@ -3,6 +3,7 @@ package tsdb
import ( import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io"
"strings" "strings"
"github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/chunks"
@ -10,23 +11,42 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// SeriesReader provides reading access of serialized time series data. // ChunkReader provides reading access of serialized time series data.
type SeriesReader interface { type ChunkReader interface {
// Chunk returns the series data chunk with the given reference. // Chunk returns the series data chunk with the given reference.
Chunk(ref uint64) (chunks.Chunk, error) Chunk(ref uint64) (chunks.Chunk, error)
// Close releases all underlying resources of the reader.
Close() error
} }
// seriesReader implements a SeriesReader for a serialized byte stream // chunkReader implements a SeriesReader for a serialized byte stream
// of series data. // of series data.
type seriesReader struct { type chunkReader struct {
// The underlying bytes holding the encoded series data. // The underlying bytes holding the encoded series data.
bs [][]byte bs [][]byte
cs []io.Closer
} }
func newSeriesReader(bs [][]byte) (*seriesReader, error) { // newChunkReader returns a new chunkReader based on mmaped files found in dir.
s := &seriesReader{bs: bs} func newChunkReader(dir string) (*chunkReader, error) {
files, err := sequenceFiles(dir, "")
if err != nil {
return nil, err
}
var cr chunkReader
for i, b := range bs { for _, fn := range files {
f, err := openMmapFile(fn)
if err != nil {
return nil, errors.Wrapf(err, "mmap files")
}
cr.cs = append(cr.cs, f)
cr.bs = append(cr.bs, f.b)
}
for i, b := range cr.bs {
if len(b) < 4 { if len(b) < 4 {
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
} }
@ -35,10 +55,14 @@ func newSeriesReader(bs [][]byte) (*seriesReader, error) {
return nil, fmt.Errorf("invalid magic number %x", m) return nil, fmt.Errorf("invalid magic number %x", m)
} }
} }
return s, nil return &cr, nil
} }
func (s *seriesReader) Chunk(ref uint64) (chunks.Chunk, error) { func (s *chunkReader) Close() error {
return closeAll(s.cs...)
}
func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
var ( var (
seq = int(ref >> 32) seq = int(ref >> 32)
off = int((ref << 32) >> 32) off = int((ref << 32) >> 32)

159
writer.go
View file

@ -6,10 +6,12 @@ import (
"hash" "hash"
"hash/crc32" "hash/crc32"
"io" "io"
"os"
"sort" "sort"
"strings" "strings"
"github.com/bradfitz/slice" "github.com/bradfitz/slice"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -44,20 +46,109 @@ type ChunkWriter interface {
// chunkWriter implements the ChunkWriter interface for the standard // chunkWriter implements the ChunkWriter interface for the standard
// serialization format. // serialization format.
type chunkWriter struct { type chunkWriter struct {
ow io.Writer dirFile *os.File
w *bufio.Writer files []*os.File
wbuf *bufio.Writer
n int64 n int64
c int
crc32 hash.Hash crc32 hash.Hash
segmentSize int64
} }
func newChunkWriter(w io.Writer) *chunkWriter { const (
return &chunkWriter{ defaultChunkSegmentSize = 512 * 1024 * 1024
ow: w,
w: bufio.NewWriterSize(w, 1*1024*1024), chunksFormatV1 = 1
indexFormatV1 = 1
)
func newChunkWriter(dir string) (*chunkWriter, error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
dirFile, err := fileutil.OpenDir(dir)
if err != nil {
return nil, err
}
cw := &chunkWriter{
dirFile: dirFile,
n: 0, n: 0,
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
segmentSize: defaultChunkSegmentSize,
} }
return cw, nil
}
func (w *chunkWriter) tail() *os.File {
if len(w.files) == 0 {
return nil
}
return w.files[len(w.files)-1]
}
// finalizeTail writes all pending data to the current tail file,
// truncates its size, and closes it.
func (w *chunkWriter) finalizeTail() error {
tf := w.tail()
if tf == nil {
return nil
}
if err := w.wbuf.Flush(); err != nil {
return err
}
if err := fileutil.Fsync(tf); err != nil {
return err
}
// As the file was pre-allocated, we truncate any superfluous zero bytes.
off, err := tf.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if err := tf.Truncate(off); err != nil {
return err
}
return tf.Close()
}
func (w *chunkWriter) cut() error {
// Sync current tail to disk and close.
w.finalizeTail()
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
if err != nil {
return err
}
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return err
}
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
return err
}
if err = w.dirFile.Sync(); err != nil {
return err
}
// Write header metadata for new file.
metab := make([]byte, 8)
binary.BigEndian.PutUint32(metab[:4], MagicSeries)
metab[4] = chunksFormatV1
if _, err := f.Write(metab); err != nil {
return err
}
w.files = append(w.files, f)
if w.wbuf != nil {
w.wbuf.Reset(f)
} else {
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
}
w.n = 8
return nil
} }
func (w *chunkWriter) write(wr io.Writer, b []byte) error { func (w *chunkWriter) write(wr io.Writer, b []byte) error {
@ -66,44 +157,40 @@ func (w *chunkWriter) write(wr io.Writer, b []byte) error {
return err return err
} }
func (w *chunkWriter) writeMeta() error {
b := [8]byte{}
binary.BigEndian.PutUint32(b[:4], MagicSeries)
b[4] = flagStd
return w.write(w.w, b[:])
}
func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
// Initialize with meta data. // Calculate maximum space we need and cut a new segment in case
if w.n == 0 { // we don't fit into the current one.
if err := w.writeMeta(); err != nil { maxLen := int64(binary.MaxVarintLen32)
for _, c := range chks {
maxLen += binary.MaxVarintLen32 + 1
maxLen += int64(len(c.Chunk.Bytes()))
}
newsz := w.n + maxLen
if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
if err := w.cut(); err != nil {
return err return err
} }
} }
// Write chunks sequentially and set the reference field in the ChunkMeta.
w.crc32.Reset() w.crc32.Reset()
wr := io.MultiWriter(w.crc32, w.w) wr := io.MultiWriter(w.crc32, w.wbuf)
// For normal reads we don't need the number of the chunk section but b := make([]byte, binary.MaxVarintLen32)
// it allows us to verify checksums without reading the index file. n := binary.PutUvarint(b, uint64(len(chks)))
// The offsets are also technically enough to calculate chunk size. but
// holding the length of each chunk could later allow for adding padding
// between chunks.
b := [binary.MaxVarintLen32]byte{}
n := binary.PutUvarint(b[:], uint64(len(chks)))
if err := w.write(wr, b[:n]); err != nil { if err := w.write(wr, b[:n]); err != nil {
return err return err
} }
seq := uint64(w.seq()) << 32
for i := range chks { for i := range chks {
chk := &chks[i] chk := &chks[i]
chk.Ref = uint64(w.n) chk.Ref = seq | uint64(w.n)
n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
if err := w.write(wr, b[:n]); err != nil { if err := w.write(wr, b[:n]); err != nil {
return err return err
@ -117,24 +204,22 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
chk.Chunk = nil chk.Chunk = nil
} }
if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil {
return err return err
} }
return nil return nil
} }
func (w *chunkWriter) seq() int {
return len(w.files) - 1
}
func (w *chunkWriter) Size() int64 { func (w *chunkWriter) Size() int64 {
return w.n return w.n
} }
func (w *chunkWriter) Close() error { func (w *chunkWriter) Close() error {
// Initialize block in case no data was written to it. return w.finalizeTail()
if w.n == 0 {
if err := w.writeMeta(); err != nil {
return err
}
}
return w.w.Flush()
} }
// ChunkMeta holds information about a chunk of data. // ChunkMeta holds information about a chunk of data.