From 097a2c1e5980651f85e84592ee802ecadaf359e6 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 28 Feb 2017 09:34:30 +0100 Subject: [PATCH] vendor: re-vendor storage --- vendor/github.com/fabxc/tsdb/block.go | 70 +++---- vendor/github.com/fabxc/tsdb/compact.go | 52 ++--- vendor/github.com/fabxc/tsdb/db.go | 39 ++-- vendor/github.com/fabxc/tsdb/head.go | 33 +++- vendor/github.com/fabxc/tsdb/querier.go | 12 +- vendor/github.com/fabxc/tsdb/reader.go | 84 +++++--- vendor/github.com/fabxc/tsdb/writer.go | 246 ++++++++++++++++-------- vendor/vendor.json | 6 +- 8 files changed, 332 insertions(+), 210 deletions(-) diff --git a/vendor/github.com/fabxc/tsdb/block.go b/vendor/github.com/fabxc/tsdb/block.go index db63e14f2..ad369a4a1 100644 --- a/vendor/github.com/fabxc/tsdb/block.go +++ b/vendor/github.com/fabxc/tsdb/block.go @@ -7,6 +7,7 @@ import ( "path/filepath" "sort" + "github.com/oklog/ulid" "github.com/pkg/errors" ) @@ -22,11 +23,7 @@ type Block interface { Index() IndexReader // Series returns a SeriesReader over the block's data. - Series() SeriesReader - - // Persisted returns whether the block is already persisted, - // and no longer being appended to. - Persisted() bool + Chunks() ChunkReader // Close releases all underlying resources of the block. Close() error @@ -34,6 +31,9 @@ type Block interface { // BlockMeta provides meta information about a block. type BlockMeta struct { + // Unique identifier for the block and its contents. Changes on compaction. + ULID ulid.ULID `json:"ulid"` + // Sequence number of the block. Sequence int `json:"sequence"` @@ -64,9 +64,7 @@ type persistedBlock struct { dir string meta BlockMeta - chunksf, indexf *mmapFile - - chunkr *seriesReader + chunkr *chunkReader indexr *indexReader } @@ -120,58 +118,40 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, err } - chunksf, err := openMmapFile(chunksFileName(dir)) + cr, err := newChunkReader(chunkDir(dir)) if err != nil { - return nil, errors.Wrap(err, "open chunk file") + return nil, err } - indexf, err := openMmapFile(indexFileName(dir)) + ir, err := newIndexReader(dir) if err != nil { - return nil, errors.Wrap(err, "open index file") - } - - sr, err := newSeriesReader([][]byte{chunksf.b}) - if err != nil { - return nil, errors.Wrap(err, "create series reader") - } - ir, err := newIndexReader(indexf.b) - if err != nil { - return nil, errors.Wrap(err, "create index reader") + return nil, err } pb := &persistedBlock{ - dir: dir, - meta: *meta, - chunksf: chunksf, - indexf: indexf, - chunkr: sr, - indexr: ir, + dir: dir, + meta: *meta, + chunkr: cr, + indexr: ir, } return pb, nil } func (pb *persistedBlock) Close() error { - err0 := pb.chunksf.Close() - err1 := pb.indexf.Close() + var merr MultiError - if err0 != nil { - return err0 - } - return err1 + merr.Add(pb.chunkr.Close()) + merr.Add(pb.indexr.Close()) + + return merr.Err() } -func (pb *persistedBlock) Dir() string { return pb.dir } -func (pb *persistedBlock) Persisted() bool { return true } -func (pb *persistedBlock) Index() IndexReader { return pb.indexr } -func (pb *persistedBlock) Series() SeriesReader { return pb.chunkr } -func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } +func (pb *persistedBlock) Dir() string { return pb.dir } +func (pb *persistedBlock) Index() IndexReader { return pb.indexr } +func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } +func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } -func chunksFileName(path string) string { - return filepath.Join(path, "chunks-000") -} - -func indexFileName(path string) string { - return filepath.Join(path, "index-000") -} +func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } +func walDir(dir string) string { return filepath.Join(dir, "wal") } type mmapFile struct { f *os.File diff --git a/vendor/github.com/fabxc/tsdb/compact.go b/vendor/github.com/fabxc/tsdb/compact.go index 6a0d67499..9ae18f198 100644 --- a/vendor/github.com/fabxc/tsdb/compact.go +++ b/vendor/github.com/fabxc/tsdb/compact.go @@ -1,12 +1,14 @@ package tsdb import ( + "math/rand" "os" "path/filepath" "time" "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -60,11 +62,12 @@ func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { } type compactionInfo struct { + seq int generation int mint, maxt int64 } -const compactionBlocksLen = 4 +const compactionBlocksLen = 3 // pick returns a range [i, j) in the blocks that are suitable to be compacted // into a single block at position i. @@ -103,8 +106,8 @@ func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) { } // Then we care about compacting multiple blocks, starting with the oldest. - for i := 0; i < len(bs)-compactionBlocksLen; i += compactionBlocksLen { - if c.match(bs[i : i+2]) { + for i := 0; i < len(bs)-compactionBlocksLen+1; i += compactionBlocksLen { + if c.match(bs[i : i+3]) { return i, i + compactionBlocksLen, true } } @@ -114,28 +117,24 @@ func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) { func (c *compactor) match(bs []compactionInfo) bool { g := bs[0].generation - if g >= 5 { - return false - } for _, b := range bs { - if b.generation == 0 { - continue - } if b.generation != g { return false } } - return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange } +var entropy = rand.New(rand.NewSource(time.Now().UnixNano())) + func mergeBlockMetas(blocks ...Block) (res BlockMeta) { m0 := blocks[0].Meta() res.Sequence = m0.Sequence res.MinTime = m0.MinTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime + res.ULID = ulid.MustNew(ulid.Now(), entropy) g := m0.Compaction.Generation if g == 0 && len(blocks) > 1 { @@ -166,18 +165,15 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { return err } - chunkf, err := os.OpenFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) + chunkw, err := newChunkWriter(chunkDir(dir)) if err != nil { - return errors.Wrap(err, "create chunk file") + return errors.Wrap(err, "open chunk writer") } - indexf, err := os.OpenFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) + indexw, err := newIndexWriter(dir) if err != nil { - return errors.Wrap(err, "create index file") + return errors.Wrap(err, "open index writer") } - indexw := newIndexWriter(indexf) - chunkw := newChunkWriter(chunkf) - if err = c.write(dir, blocks, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } @@ -188,18 +184,6 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { if err = indexw.Close(); err != nil { return errors.Wrap(err, "close index writer") } - if err = fileutil.Fsync(chunkf); err != nil { - return errors.Wrap(err, "fsync chunk file") - } - if err = fileutil.Fsync(indexf); err != nil { - return errors.Wrap(err, "fsync index file") - } - if err = chunkf.Close(); err != nil { - return errors.Wrap(err, "close chunk file") - } - if err = indexf.Close(); err != nil { - return errors.Wrap(err, "close index file") - } return nil } @@ -215,7 +199,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw if hb, ok := b.(*headBlock); ok { all = hb.remapPostings(all) } - s := newCompactionSeriesSet(b.Index(), b.Series(), all) + s := newCompactionSeriesSet(b.Index(), b.Chunks(), all) if i == 0 { set = s @@ -300,17 +284,17 @@ type compactionSet interface { type compactionSeriesSet struct { p Postings index IndexReader - series SeriesReader + chunks ChunkReader l labels.Labels c []ChunkMeta err error } -func newCompactionSeriesSet(i IndexReader, s SeriesReader, p Postings) *compactionSeriesSet { +func newCompactionSeriesSet(i IndexReader, c ChunkReader, p Postings) *compactionSeriesSet { return &compactionSeriesSet{ index: i, - series: s, + chunks: c, p: p, } } @@ -327,7 +311,7 @@ func (c *compactionSeriesSet) Next() bool { for i := range c.c { chk := &c.c[i] - chk.Chunk, c.err = c.series.Chunk(chk.Ref) + chk.Chunk, c.err = c.chunks.Chunk(chk.Ref) if c.err != nil { return false } diff --git a/vendor/github.com/fabxc/tsdb/db.go b/vendor/github.com/fabxc/tsdb/db.go index 10253ca1d..001509d5d 100644 --- a/vendor/github.com/fabxc/tsdb/db.go +++ b/vendor/github.com/fabxc/tsdb/db.go @@ -4,6 +4,7 @@ package tsdb import ( "bytes" "fmt" + "io" "io/ioutil" "math" "os" @@ -131,7 +132,7 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { } // Open returns a new DB in the given directory. -func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { +func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) { if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } @@ -148,8 +149,10 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { return nil, errors.Wrapf(err, "open DB in %s", dir) } - var r prometheus.Registerer - // r := prometheus.DefaultRegisterer + if l == nil { + l = log.NewLogfmtLogger(os.Stdout) + l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + } if opts == nil { opts = DefaultOptions @@ -161,7 +164,7 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { db = &DB{ dir: dir, lockf: lockf, - logger: logger, + logger: l, metrics: newDBMetrics(r), opts: opts, compactc: make(chan struct{}, 1), @@ -189,6 +192,7 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() + var seqs []int var infos []compactionInfo for _, b := range db.compactable() { m := b.Meta() @@ -197,17 +201,16 @@ func (db *DB) run() { generation: m.Compaction.Generation, mint: m.MinTime, maxt: m.MaxTime, + seq: m.Sequence, }) + seqs = append(seqs, m.Sequence) } i, j, ok := db.compactor.pick(infos) if !ok { continue } - db.logger.Log("msg", "picked", "i", i, "j", j) - for k := i; k < j; k++ { - db.logger.Log("k", k, "generation", infos[k].generation) - } + db.logger.Log("msg", "compact", "seqs", fmt.Sprintf("%v", seqs[i:j])) if err := db.compact(i, j); err != nil { db.logger.Log("msg", "compaction failed", "err", err) @@ -298,11 +301,16 @@ func (db *DB) compact(i, j int) error { db.persisted = append(db.persisted, pb) for _, b := range blocks[1:] { + db.logger.Log("msg", "remove old dir", "dir", b.Dir()) if err := os.RemoveAll(b.Dir()); err != nil { return errors.Wrap(err, "removing old block") } } - return db.retentionCutoff() + if err := db.retentionCutoff(); err != nil { + return err + } + + return nil } func (db *DB) retentionCutoff() error { @@ -726,7 +734,7 @@ func isPowTwo(x int) bool { } // OpenPartitioned or create a new DB. -func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*PartitionedDB, error) { +func OpenPartitioned(dir string, n int, l log.Logger, r prometheus.Registerer, opts *Options) (*PartitionedDB, error) { if !isPowTwo(n) { return nil, errors.Errorf("%d is not a power of two", n) } @@ -754,7 +762,7 @@ func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*Partition l := log.NewContext(l).With("partition", i) d := partitionDir(dir, i) - s, err := Open(d, l, opts) + s, err := Open(d, l, r, opts) if err != nil { return nil, fmt.Errorf("initializing partition %q failed: %s", d, err) } @@ -880,3 +888,12 @@ func yoloString(b []byte) string { } return *((*string)(unsafe.Pointer(&h))) } + +func closeAll(cs ...io.Closer) error { + var merr MultiError + + for _, c := range cs { + merr.Add(c.Close()) + } + return merr.Err() +} diff --git a/vendor/github.com/fabxc/tsdb/head.go b/vendor/github.com/fabxc/tsdb/head.go index d9ab2944c..260fb962e 100644 --- a/vendor/github.com/fabxc/tsdb/head.go +++ b/vendor/github.com/fabxc/tsdb/head.go @@ -14,6 +14,7 @@ import ( "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" + "github.com/oklog/ulid" "github.com/pkg/errors" ) @@ -62,11 +63,16 @@ type headBlock struct { } func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { - if err := os.MkdirAll(dir, 0755); err != nil { + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, err + } + ulid, err := ulid.New(ulid.Now(), entropy) + if err != nil { return nil, err } if err := writeMetaFile(dir, &BlockMeta{ + ULID: ulid, Sequence: seq, MinTime: mint, MaxTime: maxt, @@ -133,10 +139,19 @@ func (h *headBlock) inBounds(t int64) bool { // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { - if err := writeMetaFile(h.dir, &h.meta); err != nil { + if err := h.wal.Close(); err != nil { return err } - return h.wal.Close() + // Check whether the head block still exists in the underlying dir + // or has already been replaced with a compacted version + meta, err := readMetaFile(h.dir) + if err != nil { + return err + } + if meta.ULID == h.meta.ULID { + return writeMetaFile(h.dir, &h.meta) + } + return nil } func (h *headBlock) Meta() BlockMeta { @@ -146,10 +161,10 @@ func (h *headBlock) Meta() BlockMeta { return h.meta } -func (h *headBlock) Dir() string { return h.dir } -func (h *headBlock) Persisted() bool { return false } -func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } -func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} } +func (h *headBlock) Dir() string { return h.dir } +func (h *headBlock) Persisted() bool { return false } +func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } +func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } func (h *headBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) @@ -359,12 +374,12 @@ func (a *headAppender) Rollback() error { return nil } -type headSeriesReader struct { +type headChunkReader struct { *headBlock } // Chunk returns the chunk for the reference number. -func (h *headSeriesReader) Chunk(ref uint64) (chunks.Chunk, error) { +func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { h.mtx.RLock() defer h.mtx.RUnlock() diff --git a/vendor/github.com/fabxc/tsdb/querier.go b/vendor/github.com/fabxc/tsdb/querier.go index c09bab885..5f02c77f7 100644 --- a/vendor/github.com/fabxc/tsdb/querier.go +++ b/vendor/github.com/fabxc/tsdb/querier.go @@ -59,7 +59,7 @@ func (s *DB) Querier(mint, maxt int64) Querier { mint: mint, maxt: maxt, index: b.Index(), - series: b.Series(), + chunks: b.Chunks(), } // TODO(fabxc): find nicer solution. @@ -123,19 +123,19 @@ func (q *querier) Close() error { // blockQuerier provides querying access to a single block database. type blockQuerier struct { index IndexReader - series SeriesReader + chunks ChunkReader postingsMapper func(Postings) Postings mint, maxt int64 } -func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQuerier { +func newBlockQuerier(ix IndexReader, c ChunkReader, mint, maxt int64) *blockQuerier { return &blockQuerier{ mint: mint, maxt: maxt, index: ix, - series: s, + chunks: c, } } @@ -162,7 +162,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { return &blockSeriesSet{ index: q.index, - chunks: q.series, + chunks: q.chunks, it: p, absent: absent, mint: q.mint, @@ -425,7 +425,7 @@ func (s *partitionSeriesSet) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { index IndexReader - chunks SeriesReader + chunks ChunkReader it Postings // postings list referencing series absent []string // labels that must not be set for result series mint, maxt int64 // considered time range diff --git a/vendor/github.com/fabxc/tsdb/reader.go b/vendor/github.com/fabxc/tsdb/reader.go index 630c11a99..d4d816c1f 100644 --- a/vendor/github.com/fabxc/tsdb/reader.go +++ b/vendor/github.com/fabxc/tsdb/reader.go @@ -3,6 +3,8 @@ package tsdb import ( "encoding/binary" "fmt" + "io" + "path/filepath" "strings" "github.com/fabxc/tsdb/chunks" @@ -10,23 +12,43 @@ import ( "github.com/pkg/errors" ) -// SeriesReader provides reading access of serialized time series data. -type SeriesReader interface { +// ChunkReader provides reading access of serialized time series data. +type ChunkReader interface { // Chunk returns the series data chunk with the given reference. Chunk(ref uint64) (chunks.Chunk, error) + + // Close releases all underlying resources of the reader. + Close() error } -// seriesReader implements a SeriesReader for a serialized byte stream +// chunkReader implements a SeriesReader for a serialized byte stream // of series data. -type seriesReader struct { +type chunkReader struct { // The underlying bytes holding the encoded series data. bs [][]byte + + // Closers for resources behind the byte slices. + cs []io.Closer } -func newSeriesReader(bs [][]byte) (*seriesReader, error) { - s := &seriesReader{bs: bs} +// newChunkReader returns a new chunkReader based on mmaped files found in dir. +func newChunkReader(dir string) (*chunkReader, error) { + files, err := sequenceFiles(dir, "") + if err != nil { + return nil, err + } + var cr chunkReader - for i, b := range bs { + for _, fn := range files { + f, err := openMmapFile(fn) + if err != nil { + return nil, errors.Wrapf(err, "mmap files") + } + cr.cs = append(cr.cs, f) + cr.bs = append(cr.bs, f.b) + } + + for i, b := range cr.bs { if len(b) < 4 { return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) } @@ -35,10 +57,14 @@ func newSeriesReader(bs [][]byte) (*seriesReader, error) { return nil, fmt.Errorf("invalid magic number %x", m) } } - return s, nil + return &cr, nil } -func (s *seriesReader) Chunk(ref uint64) (chunks.Chunk, error) { +func (s *chunkReader) Close() error { + return closeAll(s.cs...) +} + +func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { var ( seq = int(ref >> 32) off = int((ref << 32) >> 32) @@ -80,6 +106,9 @@ type IndexReader interface { // LabelIndices returns the label pairs for which indices exist. LabelIndices() ([][]string, error) + + // Close released the underlying resources of the reader. + Close() error } // StringTuples provides access to a sorted list of string tuples. @@ -94,6 +123,9 @@ type indexReader struct { // The underlying byte slice holding the encoded series data. b []byte + // Close that releases the underlying resources of the byte slice. + c io.Closer + // Cached hashmaps of section offsets. labels map[string]uint32 postings map[string]uint32 @@ -104,34 +136,38 @@ var ( errInvalidFlag = fmt.Errorf("invalid flag") ) -func newIndexReader(b []byte) (*indexReader, error) { - if len(b) < 4 { - return nil, errors.Wrap(errInvalidSize, "index header") +// newIndexReader returns a new indexReader on the given directory. +func newIndexReader(dir string) (*indexReader, error) { + f, err := openMmapFile(filepath.Join(dir, "index")) + if err != nil { + return nil, err } - r := &indexReader{b: b} + r := &indexReader{b: f.b, c: f} // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex { - return nil, fmt.Errorf("invalid magic number %x", m) + if len(f.b) < 4 { + return nil, errors.Wrap(errInvalidSize, "index header") + } + if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { + return nil, errors.Errorf("invalid magic number %x", m) } - var err error // The last two 4 bytes hold the pointers to the hashmaps. - loff := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4]) - poff := binary.BigEndian.Uint32(b[len(b)-4:]) + loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) + poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) - f, b, err := r.section(loff) + flag, b, err := r.section(loff) if err != nil { return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) } - if r.labels, err = readHashmap(f, b); err != nil { + if r.labels, err = readHashmap(flag, b); err != nil { return nil, errors.Wrap(err, "read label index hashmap") } - f, b, err = r.section(poff) + flag, b, err = r.section(poff) if err != nil { return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) } - if r.postings, err = readHashmap(f, b); err != nil { + if r.postings, err = readHashmap(flag, b); err != nil { return nil, errors.Wrap(err, "read postings hashmap") } @@ -169,6 +205,10 @@ func readHashmap(flag byte, b []byte) (map[string]uint32, error) { return h, nil } +func (r *indexReader) Close() error { + return r.c.Close() +} + func (r *indexReader) section(o uint32) (byte, []byte, error) { b := r.b[o:] diff --git a/vendor/github.com/fabxc/tsdb/writer.go b/vendor/github.com/fabxc/tsdb/writer.go index 6b4e31deb..bafb0e8cd 100644 --- a/vendor/github.com/fabxc/tsdb/writer.go +++ b/vendor/github.com/fabxc/tsdb/writer.go @@ -6,10 +6,13 @@ import ( "hash" "hash/crc32" "io" + "os" + "path/filepath" "sort" "strings" "github.com/bradfitz/slice" + "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/pkg/errors" @@ -33,9 +36,6 @@ type ChunkWriter interface { // is set and can be used to retrieve the chunks from the written data. WriteChunks(chunks ...ChunkMeta) error - // Size returns the size of the data written so far. - Size() int64 - // Close writes any required finalization and closes the resources // associated with the underlying writer. Close() error @@ -44,20 +44,109 @@ type ChunkWriter interface { // chunkWriter implements the ChunkWriter interface for the standard // serialization format. type chunkWriter struct { - ow io.Writer - w *bufio.Writer - n int64 - c int - crc32 hash.Hash + dirFile *os.File + files []*os.File + wbuf *bufio.Writer + n int64 + crc32 hash.Hash + + segmentSize int64 } -func newChunkWriter(w io.Writer) *chunkWriter { - return &chunkWriter{ - ow: w, - w: bufio.NewWriterSize(w, 1*1024*1024), - n: 0, - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), +const ( + defaultChunkSegmentSize = 512 * 1024 * 1024 + + chunksFormatV1 = 1 + indexFormatV1 = 1 +) + +func newChunkWriter(dir string) (*chunkWriter, error) { + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, err } + dirFile, err := fileutil.OpenDir(dir) + if err != nil { + return nil, err + } + cw := &chunkWriter{ + dirFile: dirFile, + n: 0, + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), + segmentSize: defaultChunkSegmentSize, + } + return cw, nil +} + +func (w *chunkWriter) tail() *os.File { + if len(w.files) == 0 { + return nil + } + return w.files[len(w.files)-1] +} + +// finalizeTail writes all pending data to the current tail file, +// truncates its size, and closes it. +func (w *chunkWriter) finalizeTail() error { + tf := w.tail() + if tf == nil { + return nil + } + + if err := w.wbuf.Flush(); err != nil { + return err + } + if err := fileutil.Fsync(tf); err != nil { + return err + } + // As the file was pre-allocated, we truncate any superfluous zero bytes. + off, err := tf.Seek(0, os.SEEK_CUR) + if err != nil { + return err + } + if err := tf.Truncate(off); err != nil { + return err + } + return tf.Close() +} + +func (w *chunkWriter) cut() error { + // Sync current tail to disk and close. + w.finalizeTail() + + p, _, err := nextSequenceFile(w.dirFile.Name(), "") + if err != nil { + return err + } + f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return err + } + if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { + return err + } + if err = w.dirFile.Sync(); err != nil { + return err + } + + // Write header metadata for new file. + + metab := make([]byte, 8) + binary.BigEndian.PutUint32(metab[:4], MagicSeries) + metab[4] = chunksFormatV1 + + if _, err := f.Write(metab); err != nil { + return err + } + + w.files = append(w.files, f) + if w.wbuf != nil { + w.wbuf.Reset(f) + } else { + w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) + } + w.n = 8 + + return nil } func (w *chunkWriter) write(wr io.Writer, b []byte) error { @@ -66,44 +155,40 @@ func (w *chunkWriter) write(wr io.Writer, b []byte) error { return err } -func (w *chunkWriter) writeMeta() error { - b := [8]byte{} - - binary.BigEndian.PutUint32(b[:4], MagicSeries) - b[4] = flagStd - - return w.write(w.w, b[:]) -} - func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { - // Initialize with meta data. - if w.n == 0 { - if err := w.writeMeta(); err != nil { + // Calculate maximum space we need and cut a new segment in case + // we don't fit into the current one. + maxLen := int64(binary.MaxVarintLen32) + for _, c := range chks { + maxLen += binary.MaxVarintLen32 + 1 + maxLen += int64(len(c.Chunk.Bytes())) + } + newsz := w.n + maxLen + + if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize { + if err := w.cut(); err != nil { return err } } + // Write chunks sequentially and set the reference field in the ChunkMeta. w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.w) + wr := io.MultiWriter(w.crc32, w.wbuf) - // For normal reads we don't need the number of the chunk section but - // it allows us to verify checksums without reading the index file. - // The offsets are also technically enough to calculate chunk size. but - // holding the length of each chunk could later allow for adding padding - // between chunks. - b := [binary.MaxVarintLen32]byte{} - n := binary.PutUvarint(b[:], uint64(len(chks))) + b := make([]byte, binary.MaxVarintLen32) + n := binary.PutUvarint(b, uint64(len(chks))) if err := w.write(wr, b[:n]); err != nil { return err } + seq := uint64(w.seq()) << 32 for i := range chks { chk := &chks[i] - chk.Ref = uint64(w.n) + chk.Ref = seq | uint64(w.n) - n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) + n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) if err := w.write(wr, b[:n]); err != nil { return err @@ -117,24 +202,18 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { chk.Chunk = nil } - if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { + if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil { return err } return nil } -func (w *chunkWriter) Size() int64 { - return w.n +func (w *chunkWriter) seq() int { + return len(w.files) - 1 } func (w *chunkWriter) Close() error { - // Initialize block in case no data was written to it. - if w.n == 0 { - if err := w.writeMeta(); err != nil { - return err - } - } - return w.w.Flush() + return w.finalizeTail() } // ChunkMeta holds information about a chunk of data. @@ -155,7 +234,7 @@ type IndexWriter interface { // of chunks that the index can reference. // The reference number is used to resolve a series against the postings // list iterator. It only has to be available during the write processing. - AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) + AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error // WriteLabelIndex serializes an index from label names to values. // The passed in values chained tuples of strings of the length of names. @@ -164,9 +243,6 @@ type IndexWriter interface { // WritePostings writes a postings list for a single label pair. WritePostings(name, value string, it Postings) error - // Size returns the size of the data written so far. - Size() int64 - // Close writes any finalization and closes theresources associated with // the underlying writer. Close() error @@ -181,13 +257,12 @@ type indexWriterSeries struct { // indexWriter implements the IndexWriter interface for the standard // serialization format. type indexWriter struct { - ow io.Writer - w *bufio.Writer + f *os.File + bufw *bufio.Writer n int64 started bool - series map[uint32]*indexWriterSeries - + series map[uint32]*indexWriterSeries symbols map[string]uint32 // symbol offsets labelIndexes []hashEntry // label index offsets postings []hashEntry // postings lists offsets @@ -195,15 +270,31 @@ type indexWriter struct { crc32 hash.Hash } -func newIndexWriter(w io.Writer) *indexWriter { - return &indexWriter{ - w: bufio.NewWriterSize(w, 1*1024*1024), - ow: w, +func newIndexWriter(dir string) (*indexWriter, error) { + df, err := fileutil.OpenDir(dir) + if err != nil { + return nil, err + } + f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return nil, err + } + if err := fileutil.Fsync(df); err != nil { + return nil, errors.Wrap(err, "sync dir") + } + + iw := &indexWriter{ + f: f, + bufw: bufio.NewWriterSize(f, 1*1024*1024), n: 0, symbols: make(map[string]uint32, 4096), series: make(map[uint32]*indexWriterSeries, 4096), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } + if err := iw.writeMeta(); err != nil { + return nil, err + } + return iw, nil } func (w *indexWriter) write(wr io.Writer, b []byte) error { @@ -215,7 +306,7 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error { // section writes a CRC32 checksummed section of length l and guarded by flag. func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error { w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.w) + wr := io.MultiWriter(w.crc32, w.bufw) b := [5]byte{flag, 0, 0, 0, 0} binary.BigEndian.PutUint32(b[1:], l) @@ -225,9 +316,9 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er } if err := f(wr); err != nil { - return errors.Wrap(err, "contents write func") + return errors.Wrap(err, "write contents") } - if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { + if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil { return errors.Wrap(err, "writing checksum") } return nil @@ -239,10 +330,13 @@ func (w *indexWriter) writeMeta() error { binary.BigEndian.PutUint32(b[:4], MagicIndex) b[4] = flagStd - return w.write(w.w, b[:]) + return w.write(w.bufw, b[:]) } -func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) { +func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error { + if _, ok := w.series[ref]; ok { + return errors.Errorf("series with reference %d already added", ref) + } // Populate the symbol table from all label sets we have to reference. for _, l := range lset { w.symbols[l.Name] = 0 @@ -253,6 +347,7 @@ func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkM labels: lset, chunks: chunks, } + return nil } func (w *indexWriter) writeSymbols() error { @@ -340,9 +435,6 @@ func (w *indexWriter) writeSeries() error { } func (w *indexWriter) init() error { - if err := w.writeMeta(); err != nil { - return err - } if err := w.writeSymbols(); err != nil { return err } @@ -439,10 +531,6 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { }) } -func (w *indexWriter) Size() int64 { - return w.n -} - type hashEntry struct { name string offset uint32 @@ -482,24 +570,22 @@ func (w *indexWriter) finalize() error { // for any index query. // TODO(fabxc): also store offset to series section to allow plain // iteration over all existing series? - // TODO(fabxc): store references like these that are not resolved via direct - // mmap using explicit endianness? b := [8]byte{} binary.BigEndian.PutUint32(b[:4], lo) binary.BigEndian.PutUint32(b[4:], po) - return w.write(w.w, b[:]) + return w.write(w.bufw, b[:]) } func (w *indexWriter) Close() error { - // Handle blocks without any data. - if !w.started { - if err := w.init(); err != nil { - return err - } - } if err := w.finalize(); err != nil { return err } - return w.w.Flush() + if err := w.bufw.Flush(); err != nil { + return err + } + if err := fileutil.Fsync(w.f); err != nil { + return err + } + return w.f.Close() } diff --git a/vendor/vendor.json b/vendor/vendor.json index 6df803d5e..2586d9857 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -368,10 +368,10 @@ "revisionTime": "2016-09-30T00:14:02Z" }, { - "checksumSHA1": "edB8coiX4s6hf6BZuYE5+MPJYX8=", + "checksumSHA1": "UeErAjCWDt1vzaXGQGxnenFUg7w=", "path": "github.com/fabxc/tsdb", - "revision": "f734773214e1bcb7962d92155863110d01214db5", - "revisionTime": "2017-02-19T12:01:19Z" + "revision": "db5c88ea9ac9400b0e58ef18b9b272c34b98639b", + "revisionTime": "2017-02-28T07:40:51Z" }, { "checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",