Modify IndexReader API to accomodate compaction

This changes the IndexReader API to expose plain labels
and chunk meta information instead of a Series interface.
Dropping of irrelevant chunks is moved into the querier.

A LabelIndices method is added to query for existing label
value indices.
This commit is contained in:
Fabian Reinartz 2016-12-31 15:35:08 +01:00
parent 1e1a37b15b
commit c00d17e691
4 changed files with 125 additions and 95 deletions

16
db.go
View file

@ -284,6 +284,9 @@ func (s *Shard) Close() error {
} }
func (s *Shard) appendBatch(samples []hashedSample) error { func (s *Shard) appendBatch(samples []hashedSample) error {
if len(samples) == 0 {
return nil
}
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
@ -291,7 +294,7 @@ func (s *Shard) appendBatch(samples []hashedSample) error {
// different time blocks. Those may occurr during transition to still // different time blocks. Those may occurr during transition to still
// allow late samples to arrive for a previous block. // allow late samples to arrive for a previous block.
err := s.head.appendBatch(samples) err := s.head.appendBatch(samples)
if err != nil { if err == nil {
s.metrics.samplesAppended.Add(float64(len(samples))) s.metrics.samplesAppended.Add(float64(len(samples)))
} }
@ -451,9 +454,14 @@ func (es MultiError) Error() string {
} }
// Add adds the error to the error list if it is not nil. // Add adds the error to the error list if it is not nil.
func (es MultiError) Add(err error) { func (es *MultiError) Add(err error) {
if err != nil { if err == nil {
es = append(es, err) return
}
if merr, ok := err.(MultiError); ok {
*es = append(*es, merr...)
} else {
*es = append(*es, err)
} }
} }

59
head.go
View file

@ -1,6 +1,7 @@
package tsdb package tsdb
import ( import (
"errors"
"os" "os"
"sort" "sort"
"sync" "sync"
@ -112,25 +113,22 @@ func (h *HeadBlock) Postings(name, value string) (Postings, error) {
} }
// Series returns the series for the given reference. // Series returns the series for the given reference.
func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) { func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
if int(ref) >= len(h.descs) { if int(ref) >= len(h.descs) {
return nil, errNotFound return nil, nil, errNotFound
} }
cd := h.descs[ref] cd := h.descs[ref]
if !intervalOverlap(cd.firsTimestamp, cd.lastTimestamp, mint, maxt) { return cd.lset, []ChunkMeta{{MinTime: h.stats.MinTime, Ref: ref}}, nil
return nil, nil }
func (h *HeadBlock) LabelIndices() ([][]string, error) {
res := [][]string{}
for s := range h.values {
res = append(res, []string{s})
} }
s := &chunkSeries{ return res, nil
labels: cd.lset,
chunks: []ChunkMeta{
{MinTime: h.stats.MinTime, Ref: 0},
},
chunk: func(ref uint32) (chunks.Chunk, error) {
return cd.chunk, nil
},
}
return s, nil
} }
// get retrieves the chunk with the hash and label set and creates // get retrieves the chunk with the hash and label set and creates
@ -186,6 +184,11 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
return cd return cd
} }
var (
ErrOutOfOrderSample = errors.New("out of order sample")
ErrAmendSample = errors.New("amending sample")
)
func (h *HeadBlock) appendBatch(samples []hashedSample) error { func (h *HeadBlock) appendBatch(samples []hashedSample) error {
// Find head chunks for all samples and allocate new IDs/refs for // Find head chunks for all samples and allocate new IDs/refs for
// ones we haven't seen before. // ones we haven't seen before.
@ -200,6 +203,13 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
cd := h.get(s.hash, s.labels) cd := h.get(s.hash, s.labels)
if cd != nil { if cd != nil {
// Samples must only occur in order.
if s.t < cd.lastTimestamp {
return ErrOutOfOrderSample
}
if cd.lastTimestamp == s.t && cd.lastValue != s.v {
return ErrAmendSample
}
// TODO(fabxc): sample refs are only scoped within a block for // TODO(fabxc): sample refs are only scoped within a block for
// now and we ignore any previously set value // now and we ignore any previously set value
s.ref = cd.ref s.ref = cd.ref
@ -232,14 +242,17 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
} }
for _, s := range samples { for _, s := range samples {
h.descs[s.ref].append(s.t, s.v) cd := h.descs[s.ref]
// Skip duplicate samples.
appended.Inc() if cd.lastTimestamp == s.t && cd.lastValue != s.v {
h.stats.SampleCount++ continue
}
cd.append(s.t, s.v)
if s.t > h.stats.MaxTime { if s.t > h.stats.MaxTime {
h.stats.MaxTime = s.t h.stats.MaxTime = s.t
} }
h.stats.SampleCount++
} }
return nil return nil
@ -287,5 +300,13 @@ func (h *HeadBlock) persist(p string) (int64, error) {
} }
} }
return iw.Size() + sw.Size(), nil // Everything written successfully, we can remove the WAL.
if err := h.wal.Close(); err != nil {
return 0, err
}
if err := os.Remove(h.wal.f.Name()); err != nil {
return 0, err
}
return iw.Size() + sw.Size(), err
} }

View file

@ -236,6 +236,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
return &blockSeriesSet{ return &blockSeriesSet{
index: q.index, index: q.index,
chunks: q.series,
it: Intersect(its...), it: Intersect(its...),
absent: absent, absent: absent,
mint: q.mint, mint: q.mint,
@ -422,6 +423,7 @@ func (s *shardSeriesSet) Next() bool {
// blockSeriesSet is a set of series from an inverted index query. // blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct { type blockSeriesSet struct {
index IndexReader index IndexReader
chunks SeriesReader
it Postings // postings list referencing series it Postings // postings list referencing series
absent []string // labels that must not be set for result series absent []string // labels that must not be set for result series
mint, maxt int64 // considered time range mint, maxt int64 // considered time range
@ -434,24 +436,40 @@ func (s *blockSeriesSet) Next() bool {
// Step through the postings iterator to find potential series. // Step through the postings iterator to find potential series.
outer: outer:
for s.it.Next() { for s.it.Next() {
series, err := s.index.Series(s.it.Value(), s.mint, s.maxt) lset, chunks, err := s.index.Series(s.it.Value())
if err != nil { if err != nil {
s.err = err s.err = err
return false return false
} }
// Resolving series may return nil if no applicable data for the
// time range exists and we can skip to the next series.
if series == nil {
continue
}
// If a series contains a label that must be absent, it is skipped as well. // If a series contains a label that must be absent, it is skipped as well.
for _, abs := range s.absent { for _, abs := range s.absent {
if series.Labels().Get(abs) != "" { if lset.Get(abs) != "" {
continue outer continue outer
} }
} }
s.cur = series ser := &chunkSeries{
labels: lset,
chunks: make([]ChunkMeta, 0, len(chunks)),
chunk: s.chunks.Chunk,
}
// Only use chunks that fit the time range.
for _, c := range chunks {
if c.MaxTime < s.mint {
continue
}
if c.MinTime > s.maxt {
break
}
ser.chunks = append(ser.chunks, c)
}
// If no chunks of the series apply to the time range, skip it.
if len(ser.chunks) == 0 {
continue
}
s.cur = ser
return true return true
} }
if s.it.Err() != nil { if s.it.Err() != nil {

111
reader.go
View file

@ -63,7 +63,10 @@ type IndexReader interface {
Postings(name, value string) (Postings, error) Postings(name, value string) (Postings, error)
// Series returns the series for the given reference. // Series returns the series for the given reference.
Series(ref uint32, mint, maxt int64) (Series, error) Series(ref uint32) (labels.Labels, []ChunkMeta, error)
// LabelIndices returns the label pairs for which indices exist.
LabelIndices() ([][]string, error)
} }
// StringTuples provides access to a sorted list of string tuples. // StringTuples provides access to a sorted list of string tuples.
@ -178,18 +181,18 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
return flag, b[:l], nil return flag, b[:l], nil
} }
func (r *indexReader) lookupSymbol(o uint32) ([]byte, error) { func (r *indexReader) lookupSymbol(o uint32) (string, error) {
l, n := binary.Uvarint(r.b[o:]) l, n := binary.Uvarint(r.b[o:])
if n < 0 { if n < 0 {
return nil, fmt.Errorf("reading symbol length failed") return "", fmt.Errorf("reading symbol length failed")
} }
end := int(o) + n + int(l) end := int(o) + n + int(l)
if end > len(r.b) { if end > len(r.b) {
return nil, fmt.Errorf("invalid length") return "", fmt.Errorf("invalid length")
} }
return r.b[int(o)+n : end], nil return yoloString(r.b[int(o)+n : end]), nil
} }
func (r *indexReader) Stats() (BlockStats, error) { func (r *indexReader) Stats() (BlockStats, error) {
@ -241,56 +244,55 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
return st, nil return st, nil
} }
func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { func (r *indexReader) LabelIndices() ([][]string, error) {
res := [][]string{}
for s := range r.labels {
res = append(res, strings.Split(s, string(sep)))
}
return res, nil
}
func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
k, n := binary.Uvarint(r.b[ref:]) k, n := binary.Uvarint(r.b[ref:])
if n < 1 { if n < 1 {
return nil, errors.Wrap(errInvalidSize, "number of labels") return nil, nil, errors.Wrap(errInvalidSize, "number of labels")
} }
b := r.b[int(ref)+n:] b := r.b[int(ref)+n:]
offsets := make([]uint32, 0, 2*k)
for i := 0; i < 2*int(k); i++ {
o, n := binary.Uvarint(b)
if n < 1 {
return nil, errors.Wrap(errInvalidSize, "symbol offset")
}
offsets = append(offsets, uint32(o))
b = b[n:]
}
// Symbol offests must occur in pairs representing name and value.
if len(offsets)&1 != 0 {
return nil, errors.New("odd number of symbol references")
}
// TODO(fabxc): Fully materialize series symbols for now. Figure out later if it
// makes sense to decode those lazily.
// If we use unsafe strings the there'll be no copy overhead.
//
// The references are expected to be sorted and match the order of
// the underlying strings.
lbls := make(labels.Labels, 0, k) lbls := make(labels.Labels, 0, k)
for i := 0; i < len(offsets); i += 2 { for i := 0; i < 2*int(k); i += 2 {
n, err := r.lookupSymbol(offsets[i]) o, m := binary.Uvarint(b)
if err != nil { if m < 1 {
return nil, errors.Wrap(err, "symbol lookup") return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
} }
v, err := r.lookupSymbol(offsets[i+1]) n, err := r.lookupSymbol(uint32(o))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "symbol lookup") return nil, nil, errors.Wrap(err, "symbol lookup")
} }
b = b[m:]
o, m = binary.Uvarint(b)
if m < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
}
v, err := r.lookupSymbol(uint32(o))
if err != nil {
return nil, nil, errors.Wrap(err, "symbol lookup")
}
b = b[m:]
lbls = append(lbls, labels.Label{ lbls = append(lbls, labels.Label{
Name: string(n), Name: n,
Value: string(v), Value: v,
}) })
} }
// Read the chunks meta data. // Read the chunks meta data.
k, n = binary.Uvarint(b) k, n = binary.Uvarint(b)
if n < 1 { if n < 1 {
return nil, errors.Wrap(errInvalidSize, "number of chunks") return nil, nil, errors.Wrap(errInvalidSize, "number of chunks")
} }
b = b[n:] b = b[n:]
@ -299,49 +301,30 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) {
for i := 0; i < int(k); i++ { for i := 0; i < int(k); i++ {
firstTime, n := binary.Varint(b) firstTime, n := binary.Varint(b)
if n < 1 { if n < 1 {
return nil, errors.Wrap(errInvalidSize, "first time") return nil, nil, errors.Wrap(errInvalidSize, "first time")
} }
b = b[n:] b = b[n:]
// Terminate early if we exceeded the queried time range.
if firstTime > maxt {
break
}
lastTime, n := binary.Varint(b) lastTime, n := binary.Varint(b)
if n < 1 { if n < 1 {
return nil, errors.Wrap(errInvalidSize, "last time") return nil, nil, errors.Wrap(errInvalidSize, "last time")
} }
b = b[n:] b = b[n:]
o, n := binary.Uvarint(b) o, n := binary.Uvarint(b)
if n < 1 { if n < 1 {
return nil, errors.Wrap(errInvalidSize, "chunk offset") return nil, nil, errors.Wrap(errInvalidSize, "chunk offset")
} }
b = b[n:] b = b[n:]
// Skip the chunk if it is before the queried time range.
if lastTime < mint {
continue
}
chunks = append(chunks, ChunkMeta{ chunks = append(chunks, ChunkMeta{
Ref: uint32(o), Ref: uint32(o),
MinTime: firstTime, MinTime: firstTime,
MaxTime: lastTime, MaxTime: lastTime,
}) })
} }
// If no chunks applicable to the time range were found, the series
// can be skipped.
if len(chunks) == 0 {
return nil, nil
}
return &chunkSeries{ return lbls, chunks, nil
labels: lbls,
chunks: chunks,
chunk: r.series.Chunk,
}, nil
} }
func (r *indexReader) Postings(name, value string) (Postings, error) { func (r *indexReader) Postings(name, value string) (Postings, error) {
@ -419,7 +402,7 @@ func (t *stringTuples) Less(i, j int) bool {
type serializedStringTuples struct { type serializedStringTuples struct {
l int l int
b []byte b []byte
lookup func(uint32) ([]byte, error) lookup func(uint32) (string, error)
} }
func (t *serializedStringTuples) Len() int { func (t *serializedStringTuples) Len() int {
@ -436,11 +419,11 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
for k := 0; k < t.l; k++ { for k := 0; k < t.l; k++ {
offset := binary.BigEndian.Uint32(t.b[(i+k)*4:]) offset := binary.BigEndian.Uint32(t.b[(i+k)*4:])
b, err := t.lookup(offset) s, err := t.lookup(offset)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "symbol lookup") return nil, errors.Wrap(err, "symbol lookup")
} }
res = append(res, string(b)) res = append(res, s)
} }
return res, nil return res, nil