From c00d17e691d6d9d24b2850a176a2886007123b6e Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 31 Dec 2016 15:35:08 +0100 Subject: [PATCH] Modify IndexReader API to accomodate compaction This changes the IndexReader API to expose plain labels and chunk meta information instead of a Series interface. Dropping of irrelevant chunks is moved into the querier. A LabelIndices method is added to query for existing label value indices. --- db.go | 16 ++++++-- head.go | 59 +++++++++++++++++++--------- querier.go | 34 ++++++++++++---- reader.go | 111 +++++++++++++++++++++++------------------------------ 4 files changed, 125 insertions(+), 95 deletions(-) diff --git a/db.go b/db.go index 7ca513ea3..1521b9760 100644 --- a/db.go +++ b/db.go @@ -284,6 +284,9 @@ func (s *Shard) Close() error { } func (s *Shard) appendBatch(samples []hashedSample) error { + if len(samples) == 0 { + return nil + } s.mtx.Lock() defer s.mtx.Unlock() @@ -291,7 +294,7 @@ func (s *Shard) appendBatch(samples []hashedSample) error { // different time blocks. Those may occurr during transition to still // allow late samples to arrive for a previous block. err := s.head.appendBatch(samples) - if err != nil { + if err == nil { s.metrics.samplesAppended.Add(float64(len(samples))) } @@ -451,9 +454,14 @@ func (es MultiError) Error() string { } // Add adds the error to the error list if it is not nil. -func (es MultiError) Add(err error) { - if err != nil { - es = append(es, err) +func (es *MultiError) Add(err error) { + if err == nil { + return + } + if merr, ok := err.(MultiError); ok { + *es = append(*es, merr...) + } else { + *es = append(*es, err) } } diff --git a/head.go b/head.go index b5e4f063a..931998004 100644 --- a/head.go +++ b/head.go @@ -1,6 +1,7 @@ package tsdb import ( + "errors" "os" "sort" "sync" @@ -112,25 +113,22 @@ func (h *HeadBlock) Postings(name, value string) (Postings, error) { } // Series returns the series for the given reference. -func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) { +func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { if int(ref) >= len(h.descs) { - return nil, errNotFound + return nil, nil, errNotFound } cd := h.descs[ref] - if !intervalOverlap(cd.firsTimestamp, cd.lastTimestamp, mint, maxt) { - return nil, nil + return cd.lset, []ChunkMeta{{MinTime: h.stats.MinTime, Ref: ref}}, nil +} + +func (h *HeadBlock) LabelIndices() ([][]string, error) { + res := [][]string{} + + for s := range h.values { + res = append(res, []string{s}) } - s := &chunkSeries{ - labels: cd.lset, - chunks: []ChunkMeta{ - {MinTime: h.stats.MinTime, Ref: 0}, - }, - chunk: func(ref uint32) (chunks.Chunk, error) { - return cd.chunk, nil - }, - } - return s, nil + return res, nil } // get retrieves the chunk with the hash and label set and creates @@ -186,6 +184,11 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { return cd } +var ( + ErrOutOfOrderSample = errors.New("out of order sample") + ErrAmendSample = errors.New("amending sample") +) + func (h *HeadBlock) appendBatch(samples []hashedSample) error { // Find head chunks for all samples and allocate new IDs/refs for // ones we haven't seen before. @@ -200,6 +203,13 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { cd := h.get(s.hash, s.labels) if cd != nil { + // Samples must only occur in order. + if s.t < cd.lastTimestamp { + return ErrOutOfOrderSample + } + if cd.lastTimestamp == s.t && cd.lastValue != s.v { + return ErrAmendSample + } // TODO(fabxc): sample refs are only scoped within a block for // now and we ignore any previously set value s.ref = cd.ref @@ -232,14 +242,17 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { } for _, s := range samples { - h.descs[s.ref].append(s.t, s.v) - - appended.Inc() - h.stats.SampleCount++ + cd := h.descs[s.ref] + // Skip duplicate samples. + if cd.lastTimestamp == s.t && cd.lastValue != s.v { + continue + } + cd.append(s.t, s.v) if s.t > h.stats.MaxTime { h.stats.MaxTime = s.t } + h.stats.SampleCount++ } return nil @@ -287,5 +300,13 @@ func (h *HeadBlock) persist(p string) (int64, error) { } } - return iw.Size() + sw.Size(), nil + // Everything written successfully, we can remove the WAL. + if err := h.wal.Close(); err != nil { + return 0, err + } + if err := os.Remove(h.wal.f.Name()); err != nil { + return 0, err + } + + return iw.Size() + sw.Size(), err } diff --git a/querier.go b/querier.go index 5e54020fc..1d53f47f6 100644 --- a/querier.go +++ b/querier.go @@ -236,6 +236,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { return &blockSeriesSet{ index: q.index, + chunks: q.series, it: Intersect(its...), absent: absent, mint: q.mint, @@ -422,6 +423,7 @@ func (s *shardSeriesSet) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { index IndexReader + chunks SeriesReader it Postings // postings list referencing series absent []string // labels that must not be set for result series mint, maxt int64 // considered time range @@ -434,24 +436,40 @@ func (s *blockSeriesSet) Next() bool { // Step through the postings iterator to find potential series. outer: for s.it.Next() { - series, err := s.index.Series(s.it.Value(), s.mint, s.maxt) + lset, chunks, err := s.index.Series(s.it.Value()) if err != nil { s.err = err return false } - // Resolving series may return nil if no applicable data for the - // time range exists and we can skip to the next series. - if series == nil { - continue - } + // If a series contains a label that must be absent, it is skipped as well. for _, abs := range s.absent { - if series.Labels().Get(abs) != "" { + if lset.Get(abs) != "" { continue outer } } - s.cur = series + ser := &chunkSeries{ + labels: lset, + chunks: make([]ChunkMeta, 0, len(chunks)), + chunk: s.chunks.Chunk, + } + // Only use chunks that fit the time range. + for _, c := range chunks { + if c.MaxTime < s.mint { + continue + } + if c.MinTime > s.maxt { + break + } + ser.chunks = append(ser.chunks, c) + } + // If no chunks of the series apply to the time range, skip it. + if len(ser.chunks) == 0 { + continue + } + + s.cur = ser return true } if s.it.Err() != nil { diff --git a/reader.go b/reader.go index 419519e3e..5a429d37b 100644 --- a/reader.go +++ b/reader.go @@ -63,7 +63,10 @@ type IndexReader interface { Postings(name, value string) (Postings, error) // Series returns the series for the given reference. - Series(ref uint32, mint, maxt int64) (Series, error) + Series(ref uint32) (labels.Labels, []ChunkMeta, error) + + // LabelIndices returns the label pairs for which indices exist. + LabelIndices() ([][]string, error) } // StringTuples provides access to a sorted list of string tuples. @@ -178,18 +181,18 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { return flag, b[:l], nil } -func (r *indexReader) lookupSymbol(o uint32) ([]byte, error) { +func (r *indexReader) lookupSymbol(o uint32) (string, error) { l, n := binary.Uvarint(r.b[o:]) if n < 0 { - return nil, fmt.Errorf("reading symbol length failed") + return "", fmt.Errorf("reading symbol length failed") } end := int(o) + n + int(l) if end > len(r.b) { - return nil, fmt.Errorf("invalid length") + return "", fmt.Errorf("invalid length") } - return r.b[int(o)+n : end], nil + return yoloString(r.b[int(o)+n : end]), nil } func (r *indexReader) Stats() (BlockStats, error) { @@ -241,56 +244,55 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { return st, nil } -func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { +func (r *indexReader) LabelIndices() ([][]string, error) { + res := [][]string{} + + for s := range r.labels { + res = append(res, strings.Split(s, string(sep))) + } + return res, nil +} + +func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { k, n := binary.Uvarint(r.b[ref:]) if n < 1 { - return nil, errors.Wrap(errInvalidSize, "number of labels") + return nil, nil, errors.Wrap(errInvalidSize, "number of labels") } b := r.b[int(ref)+n:] - offsets := make([]uint32, 0, 2*k) - - for i := 0; i < 2*int(k); i++ { - o, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "symbol offset") - } - offsets = append(offsets, uint32(o)) - - b = b[n:] - } - // Symbol offests must occur in pairs representing name and value. - if len(offsets)&1 != 0 { - return nil, errors.New("odd number of symbol references") - } - - // TODO(fabxc): Fully materialize series symbols for now. Figure out later if it - // makes sense to decode those lazily. - // If we use unsafe strings the there'll be no copy overhead. - // - // The references are expected to be sorted and match the order of - // the underlying strings. lbls := make(labels.Labels, 0, k) - for i := 0; i < len(offsets); i += 2 { - n, err := r.lookupSymbol(offsets[i]) - if err != nil { - return nil, errors.Wrap(err, "symbol lookup") + for i := 0; i < 2*int(k); i += 2 { + o, m := binary.Uvarint(b) + if m < 1 { + return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") } - v, err := r.lookupSymbol(offsets[i+1]) + n, err := r.lookupSymbol(uint32(o)) if err != nil { - return nil, errors.Wrap(err, "symbol lookup") + return nil, nil, errors.Wrap(err, "symbol lookup") } + b = b[m:] + + o, m = binary.Uvarint(b) + if m < 1 { + return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") + } + v, err := r.lookupSymbol(uint32(o)) + if err != nil { + return nil, nil, errors.Wrap(err, "symbol lookup") + } + b = b[m:] + lbls = append(lbls, labels.Label{ - Name: string(n), - Value: string(v), + Name: n, + Value: v, }) } // Read the chunks meta data. k, n = binary.Uvarint(b) if n < 1 { - return nil, errors.Wrap(errInvalidSize, "number of chunks") + return nil, nil, errors.Wrap(errInvalidSize, "number of chunks") } b = b[n:] @@ -299,49 +301,30 @@ func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { for i := 0; i < int(k); i++ { firstTime, n := binary.Varint(b) if n < 1 { - return nil, errors.Wrap(errInvalidSize, "first time") + return nil, nil, errors.Wrap(errInvalidSize, "first time") } b = b[n:] - // Terminate early if we exceeded the queried time range. - if firstTime > maxt { - break - } - lastTime, n := binary.Varint(b) if n < 1 { - return nil, errors.Wrap(errInvalidSize, "last time") + return nil, nil, errors.Wrap(errInvalidSize, "last time") } b = b[n:] o, n := binary.Uvarint(b) if n < 1 { - return nil, errors.Wrap(errInvalidSize, "chunk offset") + return nil, nil, errors.Wrap(errInvalidSize, "chunk offset") } b = b[n:] - // Skip the chunk if it is before the queried time range. - if lastTime < mint { - continue - } - chunks = append(chunks, ChunkMeta{ Ref: uint32(o), MinTime: firstTime, MaxTime: lastTime, }) } - // If no chunks applicable to the time range were found, the series - // can be skipped. - if len(chunks) == 0 { - return nil, nil - } - return &chunkSeries{ - labels: lbls, - chunks: chunks, - chunk: r.series.Chunk, - }, nil + return lbls, chunks, nil } func (r *indexReader) Postings(name, value string) (Postings, error) { @@ -419,7 +402,7 @@ func (t *stringTuples) Less(i, j int) bool { type serializedStringTuples struct { l int b []byte - lookup func(uint32) ([]byte, error) + lookup func(uint32) (string, error) } func (t *serializedStringTuples) Len() int { @@ -436,11 +419,11 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { for k := 0; k < t.l; k++ { offset := binary.BigEndian.Uint32(t.b[(i+k)*4:]) - b, err := t.lookup(offset) + s, err := t.lookup(offset) if err != nil { return nil, errors.Wrap(err, "symbol lookup") } - res = append(res, string(b)) + res = append(res, s) } return res, nil