mirror of
synced 2025-03-05 20:59:13 -08:00
vendor: integrate various tsdb fixes
This commit is contained in:
@ -163,6 +163,8 @@ func (c *compactor) Compact(dirs ...string) (err error) {
if err != nil {
return err
defer b.Close()
blocks = append(blocks, b)
@ -6,10 +6,8 @@ import (
@ -334,6 +332,9 @@ func (db *DB) reloadBlocks() error {
defer db.mtx.Unlock()
defer db.headmtx.Unlock()
dirs, err := blockDirs(db.dir)
if err != nil {
return errors.Wrap(err, "find blocks")
@ -355,17 +356,20 @@ func (db *DB) reloadBlocks() error {
for i, meta := range metas {
b, ok := db.seqBlocks[meta.Sequence]
if !ok {
return errors.Errorf("missing block for sequence %d", meta.Sequence)
if meta.Compaction.Generation == 0 {
if !ok {
b, err = openHeadBlock(dirs[i], db.logger)
if err != nil {
return errors.Wrapf(err, "load head at %s", dirs[i])
if meta.ULID != b.Meta().ULID {
return errors.Errorf("head block ULID changed unexpectedly")
heads = append(heads, b.(*headBlock))
} else {
if meta.ULID != b.Meta().ULID {
if ok && meta.ULID != b.Meta().ULID {
if err := b.Close(); err != nil {
return err
@ -404,15 +408,18 @@ func (db *DB) Close() error {
// the block to be used afterwards.
var merr MultiError
var g errgroup.Group
for _, pb := range db.persisted {
for _, hb := range db.heads {
var merr MultiError
return merr.Err()
@ -453,19 +460,6 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error)
return ref | (uint64(h.generation) << 40), nil
func (a *dbAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
h, err := a.appenderFor(t)
if err != nil {
return 0, err
ref, err := h.hashedAdd(hash, lset, t, v)
if err != nil {
return 0, err
return ref | (uint64(h.generation) << 40), nil
func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
// We store the head generation in the 4th byte and use it to reject
// stale references.
@ -523,10 +517,9 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
return nil, ErrNotFound
// ensureHead makes sure that there is a head block for the timestamp t if
// it is within or after the currently appendable window.
func (db *DB) ensureHead(t int64) error {
// db.mtx.Lock()
// defer db.mtx.Unlock()
// Initial case for a new database: we must create the first
// AppendableBlocks-1 front padding heads.
if len(db.heads) == 0 {
@ -717,123 +710,6 @@ func nextSequenceFile(dir, prefix string) (string, int, error) {
return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil
// PartitionedDB is a time series storage.
type PartitionedDB struct {
logger log.Logger
dir string
partitionPow uint
Partitions []*DB
func isPowTwo(x int) bool {
return x > 0 && (x&(x-1)) == 0
// OpenPartitioned or create a new DB.
func OpenPartitioned(dir string, n int, l log.Logger, r prometheus.Registerer, opts *Options) (*PartitionedDB, error) {
if !isPowTwo(n) {
return nil, errors.Errorf("%d is not a power of two", n)
if opts == nil {
opts = DefaultOptions
if l == nil {
l = log.NewLogfmtLogger(os.Stdout)
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
c := &PartitionedDB{
logger: l,
dir: dir,
partitionPow: uint(math.Log2(float64(n))),
// Initialize vertical partitiondb.
// TODO(fabxc): validate partition number to be power of 2, which is required
// for the bitshift-modulo when finding the right partition.
for i := 0; i < n; i++ {
l := log.NewContext(l).With("partition", i)
d := partitionDir(dir, i)
s, err := Open(d, l, r, opts)
if err != nil {
return nil, fmt.Errorf("initializing partition %q failed: %s", d, err)
c.Partitions = append(c.Partitions, s)
return c, nil
func partitionDir(base string, i int) string {
return filepath.Join(base, fmt.Sprintf("p-%0.4d", i))
// Close the database.
func (db *PartitionedDB) Close() error {
var g errgroup.Group
for _, partition := range db.Partitions {
return g.Wait()
// Appender returns a new appender against the database.
func (db *PartitionedDB) Appender() Appender {
app := &partitionedAppender{db: db}
for _, p := range db.Partitions {
app.partitions = append(app.partitions, p.Appender().(*dbAppender))
return app
type partitionedAppender struct {
db *PartitionedDB
partitions []*dbAppender
func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
h := lset.Hash()
p := h >> (64 - a.db.partitionPow)
ref, err := a.partitions[p].hashedAdd(h, lset, t, v)
if err != nil {
return 0, err
return ref | (p << 48), nil
func (a *partitionedAppender) AddFast(ref uint64, t int64, v float64) error {
p := uint8((ref << 8) >> 56)
return a.partitions[p].AddFast(ref, t, v)
func (a *partitionedAppender) Commit() error {
var merr MultiError
for _, p := range a.partitions {
return merr.Err()
func (a *partitionedAppender) Rollback() error {
var merr MultiError
for _, p := range a.partitions {
return merr.Err()
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
@ -877,13 +753,7 @@ func (es MultiError) Err() error {
func yoloString(b []byte) string {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
h := reflect.StringHeader{
Data: sh.Data,
Len: sh.Len,
return *((*string)(unsafe.Pointer(&h)))
return *((*string)(unsafe.Pointer(&b)))
func closeAll(cs ...io.Closer) error {
@ -1,10 +0,0 @@
package tsdb
// maxMapSize represents the largest mmap size supported by Bolt.
const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
// Are unaligned load/stores broken on this arch?
var brokenUnaligned = false
@ -44,7 +44,6 @@ type headBlock struct {
activeWriters uint64
symbols map[string]struct{}
// descs holds all chunk descs for the head block. Each chunk implicitly
// is assigned the index as its ID.
series []*memSeries
@ -150,7 +149,7 @@ func (h *headBlock) Close() error {
if err := h.wal.Close(); err != nil {
return err
return errors.Wrapf(err, "close WAL for head %s", h.dir)
// Check whether the head block still exists in the underlying dir
// or has already been replaced with a compacted version or removed.
@ -223,10 +222,8 @@ type refdSample struct {
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
return a.hashedAdd(lset.Hash(), lset, t, v)
hash := lset.Hash()
func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
if ms := a.get(hash, lset); ms != nil {
return uint64(ms.ref), a.AddFast(uint64(ms.ref), t, v)
@ -530,13 +527,6 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries {
return s
func (h *headBlock) fullness() float64 {
defer h.metamtx.RUnlock()
return float64(h.meta.Stats.NumSamples) / float64(h.meta.Stats.NumSeries+1) / 250
func (h *headBlock) updateMapping() {
@ -586,7 +576,7 @@ type memSeries struct {
lastValue float64
sampleBuf [4]sample
app chunks.Appender // Current appender for the chunkdb.
app chunks.Appender // Current appender for the chunk.
func (s *memSeries) cut() *memChunk {
@ -76,6 +76,9 @@ func (s *DB) Querier(mint, maxt int64) Querier {
func (q *querier) LabelValues(n string) ([]string, error) {
if len(q.blocks) == 0 {
return nil, nil
res, err := q.blocks[0].LabelValues(n)
if err != nil {
return nil, err
@ -163,12 +166,16 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
return &blockSeriesSet{
index: q.index,
chunks: q.chunks,
it: p,
absent: absent,
mint: q.mint,
maxt: q.maxt,
set: &populatedChunkSeries{
set: &baseChunkSeries{
p: p,
index: q.index,
absent: absent,
chunks: q.chunks,
mint: q.mint,
maxt: q.maxt,
@ -233,69 +240,6 @@ func (q *blockQuerier) Close() error {
return nil
// partitionedQuerier merges query results from a set of partition querieres.
type partitionedQuerier struct {
mint, maxt int64
partitions []Querier
// Querier returns a new querier over the database for the given
// time range.
func (db *PartitionedDB) Querier(mint, maxt int64) Querier {
q := &partitionedQuerier{
mint: mint,
maxt: maxt,
for _, s := range db.Partitions {
q.partitions = append(q.partitions, s.Querier(mint, maxt))
return q
func (q *partitionedQuerier) Select(ms ...labels.Matcher) SeriesSet {
// We gather the non-overlapping series from every partition and simply
// return their union.
r := &mergedSeriesSet{}
for _, s := range q.partitions {
r.sets = append(r.sets, s.Select(ms...))
if len(r.sets) == 0 {
return nopSeriesSet{}
return r
func (q *partitionedQuerier) LabelValues(n string) ([]string, error) {
res, err := q.partitions[0].LabelValues(n)
if err != nil {
return nil, err
for _, sq := range q.partitions[1:] {
pr, err := sq.LabelValues(n)
if err != nil {
return nil, err
// Merge new values into deduplicated result.
res = mergeStrings(res, pr)
return res, nil
func (q *partitionedQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
func (q *partitionedQuerier) Close() error {
var merr MultiError
for _, sq := range q.partitions {
return merr.Err()
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
@ -424,23 +368,31 @@ func (s *partitionSeriesSet) Next() bool {
return true
// blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct {
index IndexReader
chunks ChunkReader
it Postings // postings list referencing series
absent []string // labels that must not be set for result series
mint, maxt int64 // considered time range
err error
cur Series
type chunkSeriesSet interface {
Next() bool
At() (labels.Labels, []ChunkMeta)
Err() error
func (s *blockSeriesSet) Next() bool {
// Step through the postings iterator to find potential series.
for s.it.Next() {
lset, chunks, err := s.index.Series(s.it.At())
// baseChunkSeries loads the label set and chunk references for a postings
// list from an index. It filters out series that have labels set that should be unset.
type baseChunkSeries struct {
p Postings
index IndexReader
absent []string // labels that must be unset in results.
lset labels.Labels
chks []ChunkMeta
err error
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks }
func (s *baseChunkSeries) Err() error { return s.err }
func (s *baseChunkSeries) Next() bool {
for s.p.Next() {
lset, chunks, err := s.index.Series(s.p.At())
if err != nil {
s.err = err
return false
@ -449,35 +401,87 @@ outer:
// If a series contains a label that must be absent, it is skipped as well.
for _, abs := range s.absent {
if lset.Get(abs) != "" {
continue outer
continue Outer
ser := &chunkSeries{
labels: lset,
chunks: make([]ChunkMeta, 0, len(chunks)),
chunk: s.chunks.Chunk,
// Only use chunks that fit the time range.
for _, c := range chunks {
s.lset = lset
s.chks = chunks
return true
if err := s.p.Err(); err != nil {
s.err = err
return false
// populatedChunkSeries loads chunk data from a store for a set of series
// with known chunk references. It filters out chunks that do not fit the
// given time range.
type populatedChunkSeries struct {
set chunkSeriesSet
chunks ChunkReader
mint, maxt int64
err error
chks []ChunkMeta
lset labels.Labels
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks }
func (s *populatedChunkSeries) Err() error { return s.err }
func (s *populatedChunkSeries) Next() bool {
for s.set.Next() {
lset, chks := s.set.At()
for i := range chks {
c := &chks[i]
if c.MaxTime < s.mint {
chks = chks[1:]
if c.MinTime > s.maxt {
chks = chks[:i]
ser.chunks = append(ser.chunks, c)
c.Chunk, s.err = s.chunks.Chunk(c.Ref)
if s.err != nil {
return false
// If no chunks of the series apply to the time range, skip it.
if len(ser.chunks) == 0 {
if len(chks) == 0 {
s.cur = ser
s.lset = lset
s.chks = chks
return true
if s.it.Err() != nil {
s.err = s.it.Err()
if err := s.set.Err(); err != nil {
s.err = err
return false
// blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct {
set chunkSeriesSet
err error
cur Series
func (s *blockSeriesSet) Next() bool {
for s.set.Next() {
lset, chunks := s.set.At()
s.cur = &chunkSeries{labels: lset, chunks: chunks}
return true
if s.set.Err() != nil {
s.err = s.set.Err()
return false
@ -490,10 +494,6 @@ func (s *blockSeriesSet) Err() error { return s.err }
type chunkSeries struct {
labels labels.Labels
chunks []ChunkMeta // in-order chunk refs
// chunk is a function that retrieves chunks based on a reference
// number contained in the chunk meta information.
chunk func(ref uint64) (chunks.Chunk, error)
func (s *chunkSeries) Labels() labels.Labels {
@ -501,21 +501,7 @@ func (s *chunkSeries) Labels() labels.Labels {
func (s *chunkSeries) Iterator() SeriesIterator {
var cs []chunks.Chunk
var mints []int64
for _, co := range s.chunks {
c, err := s.chunk(co.Ref)
if err != nil {
panic(err) // TODO(fabxc): add error series iterator.
cs = append(cs, c)
mints = append(mints, co.MinTime)
// TODO(fabxc): consider pushing chunk retrieval further down. In practice, we
// probably have to touch all chunks anyway and it doesn't matter.
return newChunkSeriesIterator(mints, cs)
return newChunkSeriesIterator(s.chunks)
// SeriesIterator iterates over the data of a time series.
@ -601,43 +587,38 @@ func (it *chainedSeriesIterator) Err() error {
// chunkSeriesIterator implements a series iterator on top
// of a list of time-sorted, non-overlapping chunks.
type chunkSeriesIterator struct {
mints []int64 // minimum timestamps for each iterator
chunks []chunks.Chunk
chunks []ChunkMeta
i int
cur chunks.Iterator
func newChunkSeriesIterator(mints []int64, cs []chunks.Chunk) *chunkSeriesIterator {
if len(mints) != len(cs) {
panic("chunk references and chunks length don't match")
func newChunkSeriesIterator(cs []ChunkMeta) *chunkSeriesIterator {
return &chunkSeriesIterator{
mints: mints,
chunks: cs,
i: 0,
cur: cs[0].Iterator(),
cur: cs[0].Chunk.Iterator(),
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
// Only do binary search forward to stay in line with other iterators
// that can only move forward.
x := sort.Search(len(it.mints[it.i:]), func(i int) bool { return it.mints[i] >= t })
x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t })
x += it.i
// If the timestamp was not found, it might be in the last chunk.
if x == len(it.mints) {
if x == len(it.chunks) {
// Go to previous chunk if the chunk doesn't exactly start with t.
// If we are already at the first chunk, we use it as it's the best we have.
if x > 0 && it.mints[x] > t {
if x > 0 && it.chunks[x].MinTime > t {
it.i = x
it.cur = it.chunks[x].Iterator()
it.cur = it.chunks[x].Chunk.Iterator()
for it.cur.Next() {
t0, _ := it.cur.At()
@ -664,7 +645,7 @@ func (it *chunkSeriesIterator) Next() bool {
it.cur = it.chunks[it.i].Iterator()
it.cur = it.chunks[it.i].Chunk.Iterator()
return it.Next()
@ -1,459 +0,0 @@
package tsdb
import (
// ChunkReader provides reading access of serialized time series data.
type ChunkReader interface {
// Chunk returns the series data chunk with the given reference.
Chunk(ref uint64) (chunks.Chunk, error)
// Close releases all underlying resources of the reader.
Close() error
// chunkReader implements a SeriesReader for a serialized byte stream
// of series data.
type chunkReader struct {
// The underlying bytes holding the encoded series data.
bs [][]byte
// Closers for resources behind the byte slices.
cs []io.Closer
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
func newChunkReader(dir string) (*chunkReader, error) {
files, err := sequenceFiles(dir, "")
if err != nil {
return nil, err
var cr chunkReader
for _, fn := range files {
f, err := openMmapFile(fn)
if err != nil {
return nil, errors.Wrapf(err, "mmap files")
cr.cs = append(cr.cs, f)
cr.bs = append(cr.bs, f.b)
for i, b := range cr.bs {
if len(b) < 4 {
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
// Verify magic number.
if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries {
return nil, fmt.Errorf("invalid magic number %x", m)
return &cr, nil
func (s *chunkReader) Close() error {
return closeAll(s.cs...)
func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
var (
seq = int(ref >> 32)
off = int((ref << 32) >> 32)
if seq >= len(s.bs) {
return nil, errors.Errorf("reference sequence %d out of range", seq)
b := s.bs[seq]
if int(off) >= len(b) {
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b))
b = b[off:]
l, n := binary.Uvarint(b)
if n < 0 {
return nil, fmt.Errorf("reading chunk length failed")
b = b[n:]
enc := chunks.Encoding(b[0])
c, err := chunks.FromData(enc, b[1:1+l])
if err != nil {
return nil, err
return c, nil
// IndexReader provides reading access of serialized index data.
type IndexReader interface {
// LabelValues returns the possible label values
LabelValues(names ...string) (StringTuples, error)
// Postings returns the postings list iterator for the label pair.
Postings(name, value string) (Postings, error)
// Series returns the series for the given reference.
Series(ref uint32) (labels.Labels, []ChunkMeta, error)
// LabelIndices returns the label pairs for which indices exist.
LabelIndices() ([][]string, error)
// Close released the underlying resources of the reader.
Close() error
// StringTuples provides access to a sorted list of string tuples.
type StringTuples interface {
// Total number of tuples in the list.
Len() int
// At returns the tuple at position i.
At(i int) ([]string, error)
type indexReader struct {
// The underlying byte slice holding the encoded series data.
b []byte
// Close that releases the underlying resources of the byte slice.
c io.Closer
// Cached hashmaps of section offsets.
labels map[string]uint32
postings map[string]uint32
var (
errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
// newIndexReader returns a new indexReader on the given directory.
func newIndexReader(dir string) (*indexReader, error) {
f, err := openMmapFile(filepath.Join(dir, "index"))
if err != nil {
return nil, err
r := &indexReader{b: f.b, c: f}
// Verify magic number.
if len(f.b) < 4 {
return nil, errors.Wrap(errInvalidSize, "index header")
if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex {
return nil, errors.Errorf("invalid magic number %x", m)
// The last two 4 bytes hold the pointers to the hashmaps.
loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4])
poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:])
flag, b, err := r.section(loff)
if err != nil {
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff)
if r.labels, err = readHashmap(flag, b); err != nil {
return nil, errors.Wrap(err, "read label index hashmap")
flag, b, err = r.section(poff)
if err != nil {
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff)
if r.postings, err = readHashmap(flag, b); err != nil {
return nil, errors.Wrap(err, "read postings hashmap")
return r, nil
func readHashmap(flag byte, b []byte) (map[string]uint32, error) {
if flag != flagStd {
return nil, errInvalidFlag
h := make(map[string]uint32, 512)
for len(b) > 0 {
l, n := binary.Uvarint(b)
if n < 1 {
return nil, errors.Wrap(errInvalidSize, "read key length")
b = b[n:]
if len(b) < int(l) {
return nil, errors.Wrap(errInvalidSize, "read key")
s := string(b[:l])
b = b[l:]
o, n := binary.Uvarint(b)
if n < 1 {
return nil, errors.Wrap(errInvalidSize, "read offset value")
b = b[n:]
h[s] = uint32(o)
return h, nil
func (r *indexReader) Close() error {
return r.c.Close()
func (r *indexReader) section(o uint32) (byte, []byte, error) {
b := r.b[o:]
if len(b) < 5 {
return 0, nil, errors.Wrap(errInvalidSize, "read header")
flag := b[0]
l := binary.BigEndian.Uint32(b[1:5])
b = b[5:]
// b must have the given length plus 4 bytes for the CRC32 checksum.
if len(b) < int(l)+4 {
return 0, nil, errors.Wrap(errInvalidSize, "section content")
return flag, b[:l], nil
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
if int(o) > len(r.b) {
return "", errors.Errorf("invalid symbol offset %d", o)
l, n := binary.Uvarint(r.b[o:])
if n < 0 {
return "", errors.New("reading symbol length failed")
end := int(o) + n + int(l)
if end > len(r.b) {
return "", errors.New("invalid length")
b := r.b[int(o)+n : end]
return yoloString(b), nil
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
key := strings.Join(names, string(sep))
off, ok := r.labels[key]
if !ok {
return nil, fmt.Errorf("label index doesn't exist")
flag, b, err := r.section(off)
if err != nil {
return nil, errors.Wrapf(err, "section at %d", off)
if flag != flagStd {
return nil, errInvalidFlag
l, n := binary.Uvarint(b)
if n < 1 {
return nil, errors.Wrap(errInvalidSize, "read label index size")
st := &serializedStringTuples{
l: int(l),
b: b[n:],
lookup: r.lookupSymbol,
return st, nil
func (r *indexReader) LabelIndices() ([][]string, error) {
res := [][]string{}
for s := range r.labels {
res = append(res, strings.Split(s, string(sep)))
return res, nil
func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
k, n := binary.Uvarint(r.b[ref:])
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "number of labels")
b := r.b[int(ref)+n:]
lbls := make(labels.Labels, 0, k)
for i := 0; i < 2*int(k); i += 2 {
o, m := binary.Uvarint(b)
if m < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
n, err := r.lookupSymbol(uint32(o))
if err != nil {
return nil, nil, errors.Wrap(err, "symbol lookup")
b = b[m:]
o, m = binary.Uvarint(b)
if m < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
v, err := r.lookupSymbol(uint32(o))
if err != nil {
return nil, nil, errors.Wrap(err, "symbol lookup")
b = b[m:]
lbls = append(lbls, labels.Label{
Name: n,
Value: v,
// Read the chunks meta data.
k, n = binary.Uvarint(b)
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "number of chunks")
b = b[n:]
chunks := make([]ChunkMeta, 0, k)
for i := 0; i < int(k); i++ {
firstTime, n := binary.Varint(b)
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "first time")
b = b[n:]
lastTime, n := binary.Varint(b)
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "last time")
b = b[n:]
o, n := binary.Uvarint(b)
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "chunk offset")
b = b[n:]
chunks = append(chunks, ChunkMeta{
Ref: o,
MinTime: firstTime,
MaxTime: lastTime,
return lbls, chunks, nil
func (r *indexReader) Postings(name, value string) (Postings, error) {
key := name + string(sep) + value
off, ok := r.postings[key]
if !ok {
return nil, ErrNotFound
flag, b, err := r.section(off)
if err != nil {
return nil, errors.Wrapf(err, "section at %d", off)
if flag != flagStd {
return nil, errors.Wrapf(errInvalidFlag, "section at %d", off)
// TODO(fabxc): just read into memory as an intermediate solution.
// Add iterator over serialized data.
var l []uint32
for len(b) > 0 {
if len(b) < 4 {
return nil, errors.Wrap(errInvalidSize, "plain postings entry")
l = append(l, binary.BigEndian.Uint32(b[:4]))
b = b[4:]
return &listPostings{list: l, idx: -1}, nil
type stringTuples struct {
l int // tuple length
s []string // flattened tuple entries
func newStringTuples(s []string, l int) (*stringTuples, error) {
if len(s)%l != 0 {
return nil, errors.Wrap(errInvalidSize, "string tuple list")
return &stringTuples{s: s, l: l}, nil
func (t *stringTuples) Len() int { return len(t.s) / t.l }
func (t *stringTuples) At(i int) ([]string, error) { return t.s[i : i+t.l], nil }
func (t *stringTuples) Swap(i, j int) {
c := make([]string, t.l)
copy(c, t.s[i:i+t.l])
for k := 0; k < t.l; k++ {
t.s[i+k] = t.s[j+k]
t.s[j+k] = c[k]
func (t *stringTuples) Less(i, j int) bool {
for k := 0; k < t.l; k++ {
d := strings.Compare(t.s[i+k], t.s[j+k])
if d < 0 {
return true
if d > 0 {
return false
return false
type serializedStringTuples struct {
l int
b []byte
lookup func(uint32) (string, error)
func (t *serializedStringTuples) Len() int {
// TODO(fabxc): Cache this?
return len(t.b) / (4 * t.l)
func (t *serializedStringTuples) At(i int) ([]string, error) {
if len(t.b) < (i+t.l)*4 {
return nil, errInvalidSize
res := make([]string, 0, t.l)
for k := 0; k < t.l; k++ {
offset := binary.BigEndian.Uint32(t.b[(i+k)*4:])
s, err := t.lookup(offset)
if err != nil {
return nil, errors.Wrap(err, "symbol lookup")
res = append(res, s)
return res, nil
@ -448,7 +448,11 @@ func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) {
cr := r.rs[r.cur]
et, flag, b, err := r.entry(cr)
if err == io.EOF {
// If we reached the end of the reader, advance to the next one
// and close.
// Do not close on the last one as it will still be appended to.
// XXX(fabxc): leaky abstraction.
if err == io.EOF && r.cur < len(r.rs)-1 {
// Current reader completed, close and move to the next one.
if err := cr.Close(); err != nil {
return 0, 0, nil, err
@ -1,611 +0,0 @@
package tsdb
import (
const (
// MagicSeries 4 bytes at the head of series file.
MagicSeries = 0x85BD40DD
// MagicIndex 4 bytes at the head of an index file.
MagicIndex = 0xBAAAD700
const compactionPageBytes = minSectorSize * 64
// ChunkWriter serializes a time block of chunked series data.
type ChunkWriter interface {
// WriteChunks writes several chunks. The data field of the ChunkMetas
// must be populated.
// After returning successfully, the Ref fields in the ChunkMetas
// is set and can be used to retrieve the chunks from the written data.
WriteChunks(chunks ...ChunkMeta) error
// Close writes any required finalization and closes the resources
// associated with the underlying writer.
Close() error
// chunkWriter implements the ChunkWriter interface for the standard
// serialization format.
type chunkWriter struct {
dirFile *os.File
files []*os.File
wbuf *bufio.Writer
n int64
crc32 hash.Hash
segmentSize int64
const (
defaultChunkSegmentSize = 512 * 1024 * 1024
chunksFormatV1 = 1
indexFormatV1 = 1
func newChunkWriter(dir string) (*chunkWriter, error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
dirFile, err := fileutil.OpenDir(dir)
if err != nil {
return nil, err
cw := &chunkWriter{
dirFile: dirFile,
n: 0,
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
segmentSize: defaultChunkSegmentSize,
return cw, nil
func (w *chunkWriter) tail() *os.File {
if len(w.files) == 0 {
return nil
return w.files[len(w.files)-1]
// finalizeTail writes all pending data to the current tail file,
// truncates its size, and closes it.
func (w *chunkWriter) finalizeTail() error {
tf := w.tail()
if tf == nil {
return nil
if err := w.wbuf.Flush(); err != nil {
return err
if err := fileutil.Fsync(tf); err != nil {
return err
// As the file was pre-allocated, we truncate any superfluous zero bytes.
off, err := tf.Seek(0, os.SEEK_CUR)
if err != nil {
return err
if err := tf.Truncate(off); err != nil {
return err
return tf.Close()
func (w *chunkWriter) cut() error {
// Sync current tail to disk and close.
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
if err != nil {
return err
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return err
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
return err
if err = w.dirFile.Sync(); err != nil {
return err
// Write header metadata for new file.
metab := make([]byte, 8)
binary.BigEndian.PutUint32(metab[:4], MagicSeries)
metab[4] = chunksFormatV1
if _, err := f.Write(metab); err != nil {
return err
w.files = append(w.files, f)
if w.wbuf != nil {
} else {
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
w.n = 8
return nil
func (w *chunkWriter) write(wr io.Writer, b []byte) error {
n, err := wr.Write(b)
w.n += int64(n)
return err
func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.
maxLen := int64(binary.MaxVarintLen32)
for _, c := range chks {
maxLen += binary.MaxVarintLen32 + 1
maxLen += int64(len(c.Chunk.Bytes()))
newsz := w.n + maxLen
if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
if err := w.cut(); err != nil {
return err
// Write chunks sequentially and set the reference field in the ChunkMeta.
wr := io.MultiWriter(w.crc32, w.wbuf)
b := make([]byte, binary.MaxVarintLen32)
n := binary.PutUvarint(b, uint64(len(chks)))
if err := w.write(wr, b[:n]); err != nil {
return err
seq := uint64(w.seq()) << 32
for i := range chks {
chk := &chks[i]
chk.Ref = seq | uint64(w.n)
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
if err := w.write(wr, b[:n]); err != nil {
return err
if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil {
return err
if err := w.write(wr, chk.Chunk.Bytes()); err != nil {
return err
chk.Chunk = nil
if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil {
return err
return nil
func (w *chunkWriter) seq() int {
return len(w.files) - 1
func (w *chunkWriter) Close() error {
return w.finalizeTail()
// ChunkMeta holds information about a chunk of data.
type ChunkMeta struct {
// Ref and Chunk hold either a reference that can be used to retrieve
// chunk data or the data itself.
// Generally, only one of them is set.
Ref uint64
Chunk chunks.Chunk
MinTime, MaxTime int64 // time range the data covers
// IndexWriter serialized the index for a block of series data.
// The methods must generally be called in order they are specified.
type IndexWriter interface {
// AddSeries populates the index writer witha series and its offsets
// of chunks that the index can reference.
// The reference number is used to resolve a series against the postings
// list iterator. It only has to be available during the write processing.
AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error
// WriteLabelIndex serializes an index from label names to values.
// The passed in values chained tuples of strings of the length of names.
WriteLabelIndex(names []string, values []string) error
// WritePostings writes a postings list for a single label pair.
WritePostings(name, value string, it Postings) error
// Close writes any finalization and closes theresources associated with
// the underlying writer.
Close() error
type indexWriterSeries struct {
labels labels.Labels
chunks []ChunkMeta // series file offset of chunks
offset uint32 // index file offset of series reference
// indexWriter implements the IndexWriter interface for the standard
// serialization format.
type indexWriter struct {
f *os.File
bufw *bufio.Writer
n int64
started bool
// Reusable memory.
b []byte
uint32s []uint32
series map[uint32]*indexWriterSeries
symbols map[string]uint32 // symbol offsets
labelIndexes []hashEntry // label index offsets
postings []hashEntry // postings lists offsets
crc32 hash.Hash
func newIndexWriter(dir string) (*indexWriter, error) {
df, err := fileutil.OpenDir(dir)
if err != nil {
return nil, err
f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, err
if err := fileutil.Fsync(df); err != nil {
return nil, errors.Wrap(err, "sync dir")
iw := &indexWriter{
f: f,
bufw: bufio.NewWriterSize(f, 1<<22),
n: 0,
// Reusable memory.
b: make([]byte, 0, 1<<23),
uint32s: make([]uint32, 0, 1<<15),
// Caches.
symbols: make(map[string]uint32, 1<<13),
series: make(map[uint32]*indexWriterSeries, 1<<16),
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
if err := iw.writeMeta(); err != nil {
return nil, err
return iw, nil
func (w *indexWriter) write(wr io.Writer, b []byte) error {
n, err := wr.Write(b)
w.n += int64(n)
return err
// section writes a CRC32 checksummed section of length l and guarded by flag.
func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error {
wr := io.MultiWriter(w.crc32, w.bufw)
b := [5]byte{flag, 0, 0, 0, 0}
binary.BigEndian.PutUint32(b[1:], uint32(l))
if err := w.write(wr, b[:]); err != nil {
return errors.Wrap(err, "writing header")
if err := f(wr); err != nil {
return errors.Wrap(err, "write contents")
if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil {
return errors.Wrap(err, "writing checksum")
return nil
func (w *indexWriter) writeMeta() error {
b := [8]byte{}
binary.BigEndian.PutUint32(b[:4], MagicIndex)
b[4] = flagStd
return w.write(w.bufw, b[:])
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error {
if _, ok := w.series[ref]; ok {
return errors.Errorf("series with reference %d already added", ref)
// Populate the symbol table from all label sets we have to reference.
for _, l := range lset {
w.symbols[l.Name] = 0
w.symbols[l.Value] = 0
w.series[ref] = &indexWriterSeries{
labels: lset,
chunks: chunks,
return nil
func (w *indexWriter) writeSymbols() error {
// Generate sorted list of strings we will store as reference table.
symbols := make([]string, 0, len(w.symbols))
for s := range w.symbols {
symbols = append(symbols, s)
// The start of the section plus a 5 byte section header are our base.
// TODO(fabxc): switch to relative offsets and hold sections in a TOC.
base := uint32(w.n) + 5
buf := [binary.MaxVarintLen32]byte{}
w.b = append(w.b[:0], flagStd)
for _, s := range symbols {
w.symbols[s] = base + uint32(len(w.b))
n := binary.PutUvarint(buf[:], uint64(len(s)))
w.b = append(w.b, buf[:n]...)
w.b = append(w.b, s...)
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
return w.write(wr, w.b)
type indexWriterSeriesSlice []*indexWriterSeries
func (s indexWriterSeriesSlice) Len() int { return len(s) }
func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s indexWriterSeriesSlice) Less(i, j int) bool {
return labels.Compare(s[i].labels, s[j].labels) < 0
func (w *indexWriter) writeSeries() error {
// Series must be stored sorted along their labels.
series := make(indexWriterSeriesSlice, 0, len(w.series))
for _, s := range w.series {
series = append(series, s)
// Current end of file plus 5 bytes for section header.
// TODO(fabxc): switch to relative offsets.
base := uint32(w.n) + 5
w.b = w.b[:0]
buf := make([]byte, binary.MaxVarintLen64)
for _, s := range series {
// Write label set symbol references.
s.offset = base + uint32(len(w.b))
n := binary.PutUvarint(buf, uint64(len(s.labels)))
w.b = append(w.b, buf[:n]...)
for _, l := range s.labels {
n = binary.PutUvarint(buf, uint64(w.symbols[l.Name]))
w.b = append(w.b, buf[:n]...)
n = binary.PutUvarint(buf, uint64(w.symbols[l.Value]))
w.b = append(w.b, buf[:n]...)
// Write chunks meta data including reference into chunk file.
n = binary.PutUvarint(buf, uint64(len(s.chunks)))
w.b = append(w.b, buf[:n]...)
for _, c := range s.chunks {
n = binary.PutVarint(buf, c.MinTime)
w.b = append(w.b, buf[:n]...)
n = binary.PutVarint(buf, c.MaxTime)
w.b = append(w.b, buf[:n]...)
n = binary.PutUvarint(buf, uint64(c.Ref))
w.b = append(w.b, buf[:n]...)
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
return w.write(wr, w.b)
func (w *indexWriter) init() error {
if err := w.writeSymbols(); err != nil {
return err
if err := w.writeSeries(); err != nil {
return err
w.started = true
return nil
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
if !w.started {
if err := w.init(); err != nil {
return err
valt, err := newStringTuples(values, len(names))
if err != nil {
return err
w.labelIndexes = append(w.labelIndexes, hashEntry{
name: strings.Join(names, string(sep)),
offset: uint32(w.n),
buf := make([]byte, binary.MaxVarintLen32)
n := binary.PutUvarint(buf, uint64(len(names)))
l := n + len(values)*4
return w.section(l, flagStd, func(wr io.Writer) error {
// First byte indicates tuple size for index.
if err := w.write(wr, buf[:n]); err != nil {
return err
for _, v := range valt.s {
binary.BigEndian.PutUint32(buf, w.symbols[v])
if err := w.write(wr, buf[:4]); err != nil {
return err
return nil
func (w *indexWriter) WritePostings(name, value string, it Postings) error {
if !w.started {
if err := w.init(); err != nil {
return err
key := name + string(sep) + value
w.postings = append(w.postings, hashEntry{
name: key,
offset: uint32(w.n),
// Order of the references in the postings list does not imply order
// of the series references within the persisted block they are mapped to.
// We have to sort the new references again.
refs := w.uint32s[:0]
for it.Next() {
s, ok := w.series[it.At()]
if !ok {
return errors.Errorf("series for reference %d not found", it.At())
refs = append(refs, s.offset)
if err := it.Err(); err != nil {
return err
w.b = w.b[:0]
buf := make([]byte, 4)
for _, r := range refs {
binary.BigEndian.PutUint32(buf, r)
w.b = append(w.b, buf...)
w.uint32s = refs[:0]
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
return w.write(wr, w.b)
type uint32slice []uint32
func (s uint32slice) Len() int { return len(s) }
func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
type hashEntry struct {
name string
offset uint32
func (w *indexWriter) writeHashmap(h []hashEntry) error {
w.b = w.b[:0]
buf := [binary.MaxVarintLen32]byte{}
for _, e := range h {
n := binary.PutUvarint(buf[:], uint64(len(e.name)))
w.b = append(w.b, buf[:n]...)
w.b = append(w.b, e.name...)
n = binary.PutUvarint(buf[:], uint64(e.offset))
w.b = append(w.b, buf[:n]...)
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
return w.write(wr, w.b)
func (w *indexWriter) finalize() error {
// Write out hash maps to jump to correct label index and postings sections.
lo := uint32(w.n)
if err := w.writeHashmap(w.labelIndexes); err != nil {
return err
po := uint32(w.n)
if err := w.writeHashmap(w.postings); err != nil {
return err
// Terminate index file with offsets to hashmaps. This is the entry Pointer
// for any index query.
// TODO(fabxc): also store offset to series section to allow plain
// iteration over all existing series?
b := [8]byte{}
binary.BigEndian.PutUint32(b[:4], lo)
binary.BigEndian.PutUint32(b[4:], po)
return w.write(w.bufw, b[:])
func (w *indexWriter) Close() error {
if err := w.finalize(); err != nil {
return err
if err := w.bufw.Flush(); err != nil {
return err
if err := fileutil.Fsync(w.f); err != nil {
return err
return w.f.Close()
@ -368,10 +368,10 @@
"revisionTime": "2016-09-30T00:14:02Z"
"checksumSHA1": "IOnF9CNVjOBoVwdfzfUEv/+JotI=",
"checksumSHA1": "Aj4Cn1RClamxluIri/LQMnK/yB4=",
"path": "github.com/fabxc/tsdb",
"revision": "55a9b5428aceb644b3b297d7a9fd63d0354ce953",
"revisionTime": "2017-03-04T15:50:48Z"
"revision": "ca1bc920b795cfc670002e7643471b0277e79a9b",
"revisionTime": "2017-03-08T15:54:13Z"
"checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",
Reference in a new issue