mirror of
https://github.com/prometheus/prometheus.git
synced 2024-09-23 01:07:32 -07:00
767fa704b6
Rather than keeping the entire symbol table in memory, keep every nth offset and walk from there to the entry we need. This ends up slightly slower, ~360ms per 1M series returned from PostingsForMatchers which is not much considering the rest of the CPU such a query would go on to use. Make LabelValues use the postings tables, rather than having to do symbol lookups. Use yoloString, as PostingsForMatchers doesn't need the strings to stick around and adjust the API call to keep the Querier open until it's all marshalled. Remove allocatedSymbols memory optimisation, we no longer keep all the symbol strings in heap memory. Remove LabelValuesFor and LabelIndices, they're dead code. Ensure we've still tests for label indices, and add missing test that we can work with old V1 Format index files. PostingForMatchers performance is slightly better, with a big drop in allocation counts due to using yoloString for LabelValues: benchmark old ns/op new ns/op delta BenchmarkPostingsForMatchers/Block/n="1"-4 36698 36681 -0.05% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 522786 560887 +7.29% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 511652 537680 +5.09% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 522102 564239 +8.07% BenchmarkPostingsForMatchers/Block/i=~".*"-4 113689911 111795919 -1.67% BenchmarkPostingsForMatchers/Block/i=~".+"-4 135825572 132871085 -2.18% BenchmarkPostingsForMatchers/Block/i=~""-4 40782628 38038181 -6.73% BenchmarkPostingsForMatchers/Block/i!=""-4 31267869 29194327 -6.63% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 112733329 111568823 -1.03% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 112868153 111232029 -1.45% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 31338257 29349446 -6.35% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 32054482 29972436 -6.50% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 136504654 133968442 -1.86% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 27960350 27264997 -2.49% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 136765564 133860724 -2.12% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 163714583 159453668 -2.60% benchmark old allocs new allocs delta BenchmarkPostingsForMatchers/Block/n="1"-4 6 6 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 11 11 +0.00% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 11 11 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 17 15 -11.76% BenchmarkPostingsForMatchers/Block/i=~".*"-4 100012 12 -99.99% BenchmarkPostingsForMatchers/Block/i=~".+"-4 200040 100040 -49.99% BenchmarkPostingsForMatchers/Block/i=~""-4 200045 100045 -49.99% BenchmarkPostingsForMatchers/Block/i!=""-4 200041 100041 -49.99% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 100017 17 -99.98% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 100023 23 -99.98% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 200046 100046 -49.99% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 200050 100050 -49.99% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 200049 100049 -49.99% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 111150 11150 -89.97% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 200055 100055 -49.99% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 311238 111238 -64.26% benchmark old bytes new bytes delta BenchmarkPostingsForMatchers/Block/n="1"-4 296 296 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j="foo"-4 424 424 +0.00% BenchmarkPostingsForMatchers/Block/j="foo",n="1"-4 424 424 +0.00% BenchmarkPostingsForMatchers/Block/n="1",j!="foo"-4 552 1544 +179.71% BenchmarkPostingsForMatchers/Block/i=~".*"-4 1600482 1606125 +0.35% BenchmarkPostingsForMatchers/Block/i=~".+"-4 17259065 17264709 +0.03% BenchmarkPostingsForMatchers/Block/i=~""-4 17259150 17264780 +0.03% BenchmarkPostingsForMatchers/Block/i!=""-4 17259048 17264680 +0.03% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",j="foo"-4 1600610 1606242 +0.35% BenchmarkPostingsForMatchers/Block/n="1",i=~".*",i!="2",j="foo"-4 1600813 1606434 +0.35% BenchmarkPostingsForMatchers/Block/n="1",i!=""-4 17259176 17264808 +0.03% BenchmarkPostingsForMatchers/Block/n="1",i!="",j="foo"-4 17259304 17264936 +0.03% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",j="foo"-4 17259333 17264965 +0.03% BenchmarkPostingsForMatchers/Block/n="1",i=~"1.+",j="foo"-4 3142628 3148262 +0.18% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!="2",j="foo"-4 17259509 17265141 +0.03% BenchmarkPostingsForMatchers/Block/n="1",i=~".+",i!~"2.*",j="foo"-4 20405680 20416944 +0.06% However overall Select performance is down and involves more allocs, due to having to do more than a simple map lookup to resolve a symbol and that all the strings returned are allocated: benchmark old ns/op new ns/op delta BenchmarkQuerierSelect/Block/1of1000000-4 506092636 862678244 +70.46% BenchmarkQuerierSelect/Block/10of1000000-4 505638968 860917636 +70.26% BenchmarkQuerierSelect/Block/100of1000000-4 505229450 882150048 +74.60% BenchmarkQuerierSelect/Block/1000of1000000-4 515905414 862241115 +67.13% BenchmarkQuerierSelect/Block/10000of1000000-4 516785354 874841110 +69.29% BenchmarkQuerierSelect/Block/100000of1000000-4 540742808 907030187 +67.74% BenchmarkQuerierSelect/Block/1000000of1000000-4 815224288 1181236903 +44.90% benchmark old allocs new allocs delta BenchmarkQuerierSelect/Block/1of1000000-4 4000020 6000020 +50.00% BenchmarkQuerierSelect/Block/10of1000000-4 4000038 6000038 +50.00% BenchmarkQuerierSelect/Block/100of1000000-4 4000218 6000218 +50.00% BenchmarkQuerierSelect/Block/1000of1000000-4 4002018 6002018 +49.97% BenchmarkQuerierSelect/Block/10000of1000000-4 4020018 6020018 +49.75% BenchmarkQuerierSelect/Block/100000of1000000-4 4200018 6200018 +47.62% BenchmarkQuerierSelect/Block/1000000of1000000-4 6000018 8000019 +33.33% benchmark old bytes new bytes delta BenchmarkQuerierSelect/Block/1of1000000-4 176001468 227201476 +29.09% BenchmarkQuerierSelect/Block/10of1000000-4 176002620 227202628 +29.09% BenchmarkQuerierSelect/Block/100of1000000-4 176014140 227214148 +29.09% BenchmarkQuerierSelect/Block/1000of1000000-4 176129340 227329348 +29.07% BenchmarkQuerierSelect/Block/10000of1000000-4 177281340 228481348 +28.88% BenchmarkQuerierSelect/Block/100000of1000000-4 188801340 240001348 +27.12% BenchmarkQuerierSelect/Block/1000000of1000000-4 304001340 355201616 +16.84% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
1448 lines
35 KiB
Go
1448 lines
35 KiB
Go
// Copyright 2017 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package index
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/binary"
|
|
"hash"
|
|
"hash/crc32"
|
|
"io"
|
|
"io/ioutil"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"unsafe"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/encoding"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
"github.com/prometheus/prometheus/tsdb/fileutil"
|
|
)
|
|
|
|
const (
|
|
// MagicIndex 4 bytes at the head of an index file.
|
|
MagicIndex = 0xBAAAD700
|
|
// HeaderLen represents number of bytes reserved of index for header.
|
|
HeaderLen = 5
|
|
|
|
// FormatV1 represents 1 version of index.
|
|
FormatV1 = 1
|
|
// FormatV2 represents 2 version of index.
|
|
FormatV2 = 2
|
|
|
|
indexFilename = "index"
|
|
)
|
|
|
|
type indexWriterSeries struct {
|
|
labels labels.Labels
|
|
chunks []chunks.Meta // series file offset of chunks
|
|
}
|
|
|
|
type indexWriterSeriesSlice []*indexWriterSeries
|
|
|
|
func (s indexWriterSeriesSlice) Len() int { return len(s) }
|
|
func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
|
|
func (s indexWriterSeriesSlice) Less(i, j int) bool {
|
|
return labels.Compare(s[i].labels, s[j].labels) < 0
|
|
}
|
|
|
|
type indexWriterStage uint8
|
|
|
|
const (
|
|
idxStageNone indexWriterStage = iota
|
|
idxStageSymbols
|
|
idxStageSeries
|
|
idxStageLabelIndex
|
|
idxStagePostings
|
|
idxStageDone
|
|
)
|
|
|
|
func (s indexWriterStage) String() string {
|
|
switch s {
|
|
case idxStageNone:
|
|
return "none"
|
|
case idxStageSymbols:
|
|
return "symbols"
|
|
case idxStageSeries:
|
|
return "series"
|
|
case idxStageLabelIndex:
|
|
return "label index"
|
|
case idxStagePostings:
|
|
return "postings"
|
|
case idxStageDone:
|
|
return "done"
|
|
}
|
|
return "<unknown>"
|
|
}
|
|
|
|
// The table gets initialized with sync.Once but may still cause a race
|
|
// with any other use of the crc32 package anywhere. Thus we initialize it
|
|
// before.
|
|
var castagnoliTable *crc32.Table
|
|
|
|
func init() {
|
|
castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
|
|
}
|
|
|
|
// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
|
|
// polynomial may be easily changed in one location at a later time, if necessary.
|
|
func newCRC32() hash.Hash32 {
|
|
return crc32.New(castagnoliTable)
|
|
}
|
|
|
|
// Writer implements the IndexWriter interface for the standard
|
|
// serialization format.
|
|
type Writer struct {
|
|
ctx context.Context
|
|
f *os.File
|
|
fbuf *bufio.Writer
|
|
pos uint64
|
|
|
|
toc TOC
|
|
stage indexWriterStage
|
|
|
|
// Reusable memory.
|
|
buf1 encoding.Encbuf
|
|
buf2 encoding.Encbuf
|
|
|
|
symbols map[string]uint32 // symbol offsets
|
|
reverseSymbols map[uint32]string
|
|
labelIndexes []labelIndexHashEntry // label index offsets
|
|
postings []postingsHashEntry // postings lists offsets
|
|
labelNames map[string]uint64 // label names, and their usage
|
|
|
|
// Hold last series to validate that clients insert new series in order.
|
|
lastSeries labels.Labels
|
|
lastRef uint64
|
|
|
|
crc32 hash.Hash
|
|
|
|
Version int
|
|
}
|
|
|
|
// TOC represents index Table Of Content that states where each section of index starts.
|
|
type TOC struct {
|
|
Symbols uint64
|
|
Series uint64
|
|
LabelIndices uint64
|
|
LabelIndicesTable uint64
|
|
Postings uint64
|
|
PostingsTable uint64
|
|
}
|
|
|
|
// NewTOCFromByteSlice return parsed TOC from given index byte slice.
|
|
func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
|
|
if bs.Len() < indexTOCLen {
|
|
return nil, encoding.ErrInvalidSize
|
|
}
|
|
b := bs.Range(bs.Len()-indexTOCLen, bs.Len())
|
|
|
|
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
|
|
d := encoding.Decbuf{B: b[:len(b)-4]}
|
|
|
|
if d.Crc32(castagnoliTable) != expCRC {
|
|
return nil, errors.Wrap(encoding.ErrInvalidChecksum, "read TOC")
|
|
}
|
|
|
|
if err := d.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &TOC{
|
|
Symbols: d.Be64(),
|
|
Series: d.Be64(),
|
|
LabelIndices: d.Be64(),
|
|
LabelIndicesTable: d.Be64(),
|
|
Postings: d.Be64(),
|
|
PostingsTable: d.Be64(),
|
|
}, nil
|
|
}
|
|
|
|
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
|
|
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
|
dir := filepath.Dir(fn)
|
|
|
|
df, err := fileutil.OpenDir(dir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer df.Close() // Close for platform windows.
|
|
|
|
if err := os.RemoveAll(fn); err != nil {
|
|
return nil, errors.Wrap(err, "remove any existing index at path")
|
|
}
|
|
|
|
f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, 0666)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := df.Sync(); err != nil {
|
|
return nil, errors.Wrap(err, "sync dir")
|
|
}
|
|
|
|
iw := &Writer{
|
|
ctx: ctx,
|
|
f: f,
|
|
fbuf: bufio.NewWriterSize(f, 1<<22),
|
|
pos: 0,
|
|
stage: idxStageNone,
|
|
|
|
// Reusable memory.
|
|
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
|
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
|
|
|
// Caches.
|
|
labelNames: make(map[string]uint64, 1<<8),
|
|
crc32: newCRC32(),
|
|
}
|
|
if err := iw.writeMeta(); err != nil {
|
|
return nil, err
|
|
}
|
|
return iw, nil
|
|
}
|
|
|
|
func (w *Writer) write(bufs ...[]byte) error {
|
|
for _, b := range bufs {
|
|
n, err := w.fbuf.Write(b)
|
|
w.pos += uint64(n)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
|
|
// offset references in v1 are only 4 bytes large.
|
|
// Once we move to compressed/varint representations in those areas, this limitation
|
|
// can be lifted.
|
|
if w.pos > 16*math.MaxUint32 {
|
|
return errors.Errorf("exceeding max size of 64GiB")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *Writer) writeAt(buf []byte, pos uint64) error {
|
|
if err := w.fbuf.Flush(); err != nil {
|
|
return err
|
|
}
|
|
_, err := w.f.WriteAt(buf, int64(pos))
|
|
return err
|
|
}
|
|
|
|
// addPadding adds zero byte padding until the file size is a multiple size.
|
|
func (w *Writer) addPadding(size int) error {
|
|
p := w.pos % uint64(size)
|
|
if p == 0 {
|
|
return nil
|
|
}
|
|
p = uint64(size) - p
|
|
return errors.Wrap(w.write(make([]byte, p)), "add padding")
|
|
}
|
|
|
|
// ensureStage handles transitions between write stages and ensures that IndexWriter
|
|
// methods are called in an order valid for the implementation.
|
|
func (w *Writer) ensureStage(s indexWriterStage) error {
|
|
select {
|
|
case <-w.ctx.Done():
|
|
return w.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if w.stage == s {
|
|
return nil
|
|
}
|
|
if w.stage > s {
|
|
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
|
|
}
|
|
|
|
// Mark start of sections in table of contents.
|
|
switch s {
|
|
case idxStageSymbols:
|
|
w.toc.Symbols = w.pos
|
|
case idxStageSeries:
|
|
w.toc.Series = w.pos
|
|
|
|
case idxStageLabelIndex:
|
|
w.toc.LabelIndices = w.pos
|
|
|
|
case idxStageDone:
|
|
w.toc.Postings = w.pos
|
|
if err := w.writePostings(); err != nil {
|
|
return err
|
|
}
|
|
|
|
w.toc.LabelIndicesTable = w.pos
|
|
if err := w.writeLabelIndexesOffsetTable(); err != nil {
|
|
return err
|
|
}
|
|
w.toc.PostingsTable = w.pos
|
|
if err := w.writePostingsOffsetTable(); err != nil {
|
|
return err
|
|
}
|
|
if err := w.writeTOC(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
w.stage = s
|
|
return nil
|
|
}
|
|
|
|
func (w *Writer) writeMeta() error {
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32(MagicIndex)
|
|
w.buf1.PutByte(FormatV2)
|
|
|
|
return w.write(w.buf1.Get())
|
|
}
|
|
|
|
// AddSeries adds the series one at a time along with its chunks.
|
|
func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error {
|
|
if err := w.ensureStage(idxStageSeries); err != nil {
|
|
return err
|
|
}
|
|
if labels.Compare(lset, w.lastSeries) <= 0 {
|
|
return errors.Errorf("out-of-order series added with label set %q", lset)
|
|
}
|
|
|
|
if ref < w.lastRef && len(w.lastSeries) != 0 {
|
|
return errors.Errorf("series with reference greater than %d already added", ref)
|
|
}
|
|
// We add padding to 16 bytes to increase the addressable space we get through 4 byte
|
|
// series references.
|
|
if err := w.addPadding(16); err != nil {
|
|
return errors.Errorf("failed to write padding bytes: %v", err)
|
|
}
|
|
|
|
if w.pos%16 != 0 {
|
|
return errors.Errorf("series write not 16-byte aligned at %d", w.pos)
|
|
}
|
|
|
|
w.buf2.Reset()
|
|
w.buf2.PutUvarint(len(lset))
|
|
|
|
for _, l := range lset {
|
|
// here we have an index for the symbol file if v2, otherwise it's an offset
|
|
index, ok := w.symbols[l.Name]
|
|
if !ok {
|
|
return errors.Errorf("symbol entry for %q does not exist", l.Name)
|
|
}
|
|
w.labelNames[l.Name]++
|
|
w.buf2.PutUvarint32(index)
|
|
|
|
index, ok = w.symbols[l.Value]
|
|
if !ok {
|
|
return errors.Errorf("symbol entry for %q does not exist", l.Value)
|
|
}
|
|
w.buf2.PutUvarint32(index)
|
|
}
|
|
|
|
w.buf2.PutUvarint(len(chunks))
|
|
|
|
if len(chunks) > 0 {
|
|
c := chunks[0]
|
|
w.buf2.PutVarint64(c.MinTime)
|
|
w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime))
|
|
w.buf2.PutUvarint64(c.Ref)
|
|
t0 := c.MaxTime
|
|
ref0 := int64(c.Ref)
|
|
|
|
for _, c := range chunks[1:] {
|
|
w.buf2.PutUvarint64(uint64(c.MinTime - t0))
|
|
w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime))
|
|
t0 = c.MaxTime
|
|
|
|
w.buf2.PutVarint64(int64(c.Ref) - ref0)
|
|
ref0 = int64(c.Ref)
|
|
}
|
|
}
|
|
|
|
w.buf1.Reset()
|
|
w.buf1.PutUvarint(w.buf2.Len())
|
|
|
|
w.buf2.PutHash(w.crc32)
|
|
|
|
if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil {
|
|
return errors.Wrap(err, "write series data")
|
|
}
|
|
|
|
w.lastSeries = append(w.lastSeries[:0], lset...)
|
|
w.lastRef = ref
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *Writer) AddSymbols(sym map[string]struct{}) error {
|
|
if err := w.ensureStage(idxStageSymbols); err != nil {
|
|
return err
|
|
}
|
|
// Generate sorted list of strings we will store as reference table.
|
|
symbols := make([]string, 0, len(sym))
|
|
|
|
for s := range sym {
|
|
symbols = append(symbols, s)
|
|
}
|
|
sort.Strings(symbols)
|
|
|
|
startPos := w.pos
|
|
// Leave 4 bytes of space for the length, which will be calculated later.
|
|
if err := w.write([]byte("alen")); err != nil {
|
|
return err
|
|
}
|
|
w.crc32.Reset()
|
|
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32int(len(symbols))
|
|
w.buf1.WriteToHash(w.crc32)
|
|
if err := w.write(w.buf1.Get()); err != nil {
|
|
return err
|
|
}
|
|
|
|
w.symbols = make(map[string]uint32, len(symbols))
|
|
w.reverseSymbols = make(map[uint32]string, len(symbols))
|
|
|
|
for index, s := range symbols {
|
|
w.symbols[s] = uint32(index)
|
|
w.reverseSymbols[uint32(index)] = s
|
|
w.buf1.Reset()
|
|
w.buf1.PutUvarintStr(s)
|
|
w.buf1.WriteToHash(w.crc32)
|
|
if err := w.write(w.buf1.Get()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Write out the length.
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32int(int(w.pos - startPos - 4))
|
|
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
|
return err
|
|
}
|
|
|
|
w.buf1.Reset()
|
|
w.buf1.PutHashSum(w.crc32)
|
|
return w.write(w.buf1.Get())
|
|
}
|
|
|
|
func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
|
if len(values)%len(names) != 0 {
|
|
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
|
|
}
|
|
if err := w.ensureStage(idxStageLabelIndex); err != nil {
|
|
return errors.Wrap(err, "ensure stage")
|
|
}
|
|
|
|
valt, err := NewStringTuples(values, len(names))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sort.Sort(valt)
|
|
|
|
// Align beginning to 4 bytes for more efficient index list scans.
|
|
if err := w.addPadding(4); err != nil {
|
|
return err
|
|
}
|
|
|
|
w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{
|
|
keys: names,
|
|
offset: w.pos,
|
|
})
|
|
|
|
startPos := w.pos
|
|
// Leave 4 bytes of space for the length, which will be calculated later.
|
|
if err := w.write([]byte("alen")); err != nil {
|
|
return err
|
|
}
|
|
w.crc32.Reset()
|
|
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32int(len(names))
|
|
w.buf1.PutBE32int(valt.Len())
|
|
w.buf1.WriteToHash(w.crc32)
|
|
if err := w.write(w.buf1.Get()); err != nil {
|
|
return err
|
|
}
|
|
|
|
// here we have an index for the symbol file if v2, otherwise it's an offset
|
|
for _, v := range valt.entries {
|
|
index, ok := w.symbols[v]
|
|
if !ok {
|
|
return errors.Errorf("symbol entry for %q does not exist", v)
|
|
}
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32(index)
|
|
w.buf1.WriteToHash(w.crc32)
|
|
if err := w.write(w.buf1.Get()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Write out the length.
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32int(int(w.pos - startPos - 4))
|
|
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
|
return err
|
|
}
|
|
|
|
w.buf1.Reset()
|
|
w.buf1.PutHashSum(w.crc32)
|
|
return w.write(w.buf1.Get())
|
|
}
|
|
|
|
// writeLabelIndexesOffsetTable writes the label indices offset table.
|
|
func (w *Writer) writeLabelIndexesOffsetTable() error {
|
|
startPos := w.pos
|
|
// Leave 4 bytes of space for the length, which will be calculated later.
|
|
if err := w.write([]byte("alen")); err != nil {
|
|
return err
|
|
}
|
|
w.crc32.Reset()
|
|
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32int(len(w.labelIndexes))
|
|
w.buf1.WriteToHash(w.crc32)
|
|
if err := w.write(w.buf1.Get()); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, e := range w.labelIndexes {
|
|
w.buf1.Reset()
|
|
w.buf1.PutUvarint(len(e.keys))
|
|
for _, k := range e.keys {
|
|
w.buf1.PutUvarintStr(k)
|
|
}
|
|
w.buf1.PutUvarint64(e.offset)
|
|
w.buf1.WriteToHash(w.crc32)
|
|
if err := w.write(w.buf1.Get()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// Write out the length.
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32int(int(w.pos - startPos - 4))
|
|
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
|
return err
|
|
}
|
|
|
|
w.buf1.Reset()
|
|
w.buf1.PutHashSum(w.crc32)
|
|
return w.write(w.buf1.Get())
|
|
}
|
|
|
|
// writePostingsOffsetTable writes the postings offset table.
|
|
func (w *Writer) writePostingsOffsetTable() error {
|
|
startPos := w.pos
|
|
// Leave 4 bytes of space for the length, which will be calculated later.
|
|
if err := w.write([]byte("alen")); err != nil {
|
|
return err
|
|
}
|
|
w.crc32.Reset()
|
|
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32int(len(w.postings))
|
|
w.buf1.WriteToHash(w.crc32)
|
|
if err := w.write(w.buf1.Get()); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, e := range w.postings {
|
|
w.buf1.Reset()
|
|
w.buf1.PutUvarint(2)
|
|
w.buf1.PutUvarintStr(e.name)
|
|
w.buf1.PutUvarintStr(e.value)
|
|
w.buf1.PutUvarint64(e.offset)
|
|
w.buf1.WriteToHash(w.crc32)
|
|
if err := w.write(w.buf1.Get()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Write out the length.
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32int(int(w.pos - startPos - 4))
|
|
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
|
return err
|
|
}
|
|
|
|
w.buf1.Reset()
|
|
w.buf1.PutHashSum(w.crc32)
|
|
return w.write(w.buf1.Get())
|
|
}
|
|
|
|
const indexTOCLen = 6*8 + 4
|
|
|
|
func (w *Writer) writeTOC() error {
|
|
w.buf1.Reset()
|
|
|
|
w.buf1.PutBE64(w.toc.Symbols)
|
|
w.buf1.PutBE64(w.toc.Series)
|
|
w.buf1.PutBE64(w.toc.LabelIndices)
|
|
w.buf1.PutBE64(w.toc.LabelIndicesTable)
|
|
w.buf1.PutBE64(w.toc.Postings)
|
|
w.buf1.PutBE64(w.toc.PostingsTable)
|
|
|
|
w.buf1.PutHash(w.crc32)
|
|
|
|
return w.write(w.buf1.Get())
|
|
}
|
|
|
|
func (w *Writer) writePostings() error {
|
|
names := make([]string, 0, len(w.labelNames))
|
|
for n := range w.labelNames {
|
|
names = append(names, n)
|
|
}
|
|
sort.Strings(names)
|
|
|
|
if err := w.fbuf.Flush(); err != nil {
|
|
return err
|
|
}
|
|
f, err := fileutil.OpenMmapFile(w.f.Name())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
// Write out the special all posting.
|
|
offsets := []uint32{}
|
|
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
|
|
d.Skip(int(w.toc.Series))
|
|
for d.Len() > 0 {
|
|
d.ConsumePadding()
|
|
startPos := w.toc.LabelIndices - uint64(d.Len())
|
|
if startPos%16 != 0 {
|
|
return errors.Errorf("series not 16-byte aligned at %d", startPos)
|
|
}
|
|
offsets = append(offsets, uint32(startPos/16))
|
|
// Skip to next series. The 4 is for the CRC32.
|
|
d.Skip(d.Uvarint() + 4)
|
|
if err := d.Err(); err != nil {
|
|
return nil
|
|
}
|
|
}
|
|
if err := w.writePosting("", "", offsets); err != nil {
|
|
return err
|
|
}
|
|
maxPostings := uint64(len(offsets)) // No label name can have more postings than this.
|
|
|
|
for len(names) > 0 {
|
|
batchNames := []string{}
|
|
var c uint64
|
|
// Try to bunch up label names into one loop, but avoid
|
|
// using more memory than a single label name can.
|
|
for len(names) > 0 {
|
|
if w.labelNames[names[0]]+c > maxPostings {
|
|
break
|
|
}
|
|
batchNames = append(batchNames, names[0])
|
|
c += w.labelNames[names[0]]
|
|
names = names[1:]
|
|
}
|
|
|
|
nameSymbols := map[uint32]struct{}{}
|
|
for _, name := range batchNames {
|
|
nameSymbols[w.symbols[name]] = struct{}{}
|
|
}
|
|
// Label name -> label value -> positions.
|
|
postings := map[uint32]map[uint32][]uint32{}
|
|
|
|
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
|
|
d.Skip(int(w.toc.Series))
|
|
for d.Len() > 0 {
|
|
d.ConsumePadding()
|
|
startPos := w.toc.LabelIndices - uint64(d.Len())
|
|
l := d.Uvarint() // Length of this series in bytes.
|
|
startLen := d.Len()
|
|
|
|
// See if label names we want are in the series.
|
|
numLabels := d.Uvarint()
|
|
for i := 0; i < numLabels; i++ {
|
|
lno := uint32(d.Uvarint())
|
|
lvo := uint32(d.Uvarint())
|
|
|
|
if _, ok := nameSymbols[lno]; ok {
|
|
if _, ok := postings[lno]; !ok {
|
|
postings[lno] = map[uint32][]uint32{}
|
|
}
|
|
if _, ok := postings[lno][lvo]; !ok {
|
|
postings[lno][lvo] = []uint32{}
|
|
}
|
|
postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16))
|
|
}
|
|
}
|
|
// Skip to next series. The 4 is for the CRC32.
|
|
d.Skip(l - (startLen - d.Len()) + 4)
|
|
if err := d.Err(); err != nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
for _, name := range batchNames {
|
|
// Write out postings for this label name.
|
|
values := make([]uint32, 0, len(postings[w.symbols[name]]))
|
|
for v := range postings[w.symbols[name]] {
|
|
values = append(values, v)
|
|
|
|
}
|
|
// Symbol numbers are in order, so the strings will also be in order.
|
|
sort.Sort(uint32slice(values))
|
|
for _, v := range values {
|
|
if err := w.writePosting(name, w.reverseSymbols[v], postings[w.symbols[name]][v]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
select {
|
|
case <-w.ctx.Done():
|
|
return w.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *Writer) writePosting(name, value string, offs []uint32) error {
|
|
// Align beginning to 4 bytes for more efficient postings list scans.
|
|
if err := w.addPadding(4); err != nil {
|
|
return err
|
|
}
|
|
|
|
w.postings = append(w.postings, postingsHashEntry{
|
|
name: name,
|
|
value: value,
|
|
offset: w.pos,
|
|
})
|
|
|
|
w.buf1.Reset()
|
|
w.buf1.PutBE32int(len(offs))
|
|
|
|
for _, off := range offs {
|
|
if off > (1<<32)-1 {
|
|
return errors.Errorf("series offset %d exceeds 4 bytes", off)
|
|
}
|
|
w.buf1.PutBE32(off)
|
|
}
|
|
|
|
w.buf2.Reset()
|
|
w.buf2.PutBE32int(w.buf1.Len())
|
|
w.buf1.PutHash(w.crc32)
|
|
return w.write(w.buf2.Get(), w.buf1.Get())
|
|
}
|
|
|
|
type uint32slice []uint32
|
|
|
|
func (s uint32slice) Len() int { return len(s) }
|
|
func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
|
|
|
|
type labelIndexHashEntry struct {
|
|
keys []string
|
|
offset uint64
|
|
}
|
|
|
|
type postingsHashEntry struct {
|
|
name, value string
|
|
offset uint64
|
|
}
|
|
|
|
func (w *Writer) Close() error {
|
|
if err := w.ensureStage(idxStageDone); err != nil {
|
|
return err
|
|
}
|
|
if err := w.fbuf.Flush(); err != nil {
|
|
return err
|
|
}
|
|
if err := w.f.Sync(); err != nil {
|
|
return err
|
|
}
|
|
return w.f.Close()
|
|
}
|
|
|
|
// StringTuples provides access to a sorted list of string tuples.
|
|
type StringTuples interface {
|
|
// Total number of tuples in the list.
|
|
Len() int
|
|
// At returns the tuple at position i.
|
|
At(i int) ([]string, error)
|
|
}
|
|
|
|
type Reader struct {
|
|
b ByteSlice
|
|
toc *TOC
|
|
|
|
// Close that releases the underlying resources of the byte slice.
|
|
c io.Closer
|
|
|
|
// Map of LabelName to a list of some LabelValues's position in the offset table.
|
|
// The first and last values for each name are always present.
|
|
postings map[string][]postingOffset
|
|
|
|
symbols *Symbols
|
|
nameSymbols map[uint32]string // Cache of the label name symbol lookups,
|
|
// as there are not many and they are half of all lookups.
|
|
|
|
dec *Decoder
|
|
|
|
version int
|
|
}
|
|
|
|
type postingOffset struct {
|
|
value string
|
|
off int
|
|
}
|
|
|
|
// ByteSlice abstracts a byte slice.
|
|
type ByteSlice interface {
|
|
Len() int
|
|
Range(start, end int) []byte
|
|
}
|
|
|
|
type realByteSlice []byte
|
|
|
|
func (b realByteSlice) Len() int {
|
|
return len(b)
|
|
}
|
|
|
|
func (b realByteSlice) Range(start, end int) []byte {
|
|
return b[start:end]
|
|
}
|
|
|
|
func (b realByteSlice) Sub(start, end int) ByteSlice {
|
|
return b[start:end]
|
|
}
|
|
|
|
// NewReader returns a new index reader on the given byte slice. It automatically
|
|
// handles different format versions.
|
|
func NewReader(b ByteSlice) (*Reader, error) {
|
|
return newReader(b, ioutil.NopCloser(nil))
|
|
}
|
|
|
|
// NewFileReader returns a new index reader against the given index file.
|
|
func NewFileReader(path string) (*Reader, error) {
|
|
f, err := fileutil.OpenMmapFile(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r, err := newReader(realByteSlice(f.Bytes()), f)
|
|
if err != nil {
|
|
var merr tsdb_errors.MultiError
|
|
merr.Add(err)
|
|
merr.Add(f.Close())
|
|
return nil, merr
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
|
r := &Reader{
|
|
b: b,
|
|
c: c,
|
|
postings: map[string][]postingOffset{},
|
|
}
|
|
|
|
// Verify header.
|
|
if r.b.Len() < HeaderLen {
|
|
return nil, errors.Wrap(encoding.ErrInvalidSize, "index header")
|
|
}
|
|
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
|
|
return nil, errors.Errorf("invalid magic number %x", m)
|
|
}
|
|
r.version = int(r.b.Range(4, 5)[0])
|
|
|
|
if r.version != FormatV1 && r.version != FormatV2 {
|
|
return nil, errors.Errorf("unknown index file version %d", r.version)
|
|
}
|
|
|
|
var err error
|
|
r.toc, err = NewTOCFromByteSlice(b)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "read TOC")
|
|
}
|
|
|
|
r.symbols, err = NewSymbols(r.b, r.version, int(r.toc.Symbols))
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "read symbols")
|
|
}
|
|
|
|
var lastKey []string
|
|
lastOff := 0
|
|
valueCount := 0
|
|
// For the postings offset table we keep every label name but only every nth
|
|
// label value (plus the first and last one), to save memory.
|
|
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error {
|
|
if len(key) != 2 {
|
|
return errors.Errorf("unexpected key length for posting table %d", len(key))
|
|
}
|
|
if _, ok := r.postings[key[0]]; !ok {
|
|
// Next label name.
|
|
r.postings[key[0]] = []postingOffset{}
|
|
if lastKey != nil {
|
|
// Always include last value for each label name.
|
|
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
|
|
}
|
|
lastKey = nil
|
|
valueCount = 0
|
|
}
|
|
if valueCount%32 == 0 {
|
|
r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], off: off})
|
|
lastKey = nil
|
|
} else {
|
|
lastKey = key
|
|
lastOff = off
|
|
}
|
|
valueCount++
|
|
return nil
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "read postings table")
|
|
}
|
|
if lastKey != nil {
|
|
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
|
|
}
|
|
// Trim any extra space in the slices.
|
|
for k, v := range r.postings {
|
|
l := make([]postingOffset, len(v))
|
|
copy(l, v)
|
|
r.postings[k] = l
|
|
}
|
|
|
|
r.nameSymbols = make(map[uint32]string, len(r.postings))
|
|
for k := range r.postings {
|
|
if k == "" {
|
|
continue
|
|
}
|
|
off, err := r.symbols.ReverseLookup(k)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "reverse symbol lookup")
|
|
}
|
|
r.nameSymbols[off] = k
|
|
}
|
|
|
|
r.dec = &Decoder{LookupSymbol: r.lookupSymbol}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
// Version returns the file format version of the underlying index.
|
|
func (r *Reader) Version() int {
|
|
return r.version
|
|
}
|
|
|
|
// Range marks a byte range.
|
|
type Range struct {
|
|
Start, End int64
|
|
}
|
|
|
|
// PostingsRanges returns a new map of byte range in the underlying index file
|
|
// for all postings lists.
|
|
func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
|
|
m := map[labels.Label]Range{}
|
|
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error {
|
|
if len(key) != 2 {
|
|
return errors.Errorf("unexpected key length for posting table %d", len(key))
|
|
}
|
|
d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable)
|
|
if d.Err() != nil {
|
|
return d.Err()
|
|
}
|
|
m[labels.Label{Name: key[0], Value: key[1]}] = Range{
|
|
Start: int64(off) + 4,
|
|
End: int64(off) + 4 + int64(d.Len()),
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "read postings table")
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
type Symbols struct {
|
|
bs ByteSlice
|
|
version int
|
|
off int
|
|
|
|
offsets []int
|
|
seen int
|
|
}
|
|
|
|
const symbolFactor = 32
|
|
|
|
// NewSymbols returns a Symbols object for symbol lookups.
|
|
func NewSymbols(bs ByteSlice, version int, off int) (*Symbols, error) {
|
|
s := &Symbols{
|
|
bs: bs,
|
|
version: version,
|
|
off: off,
|
|
}
|
|
if off == 0 {
|
|
// Only happens in some tests.
|
|
return nil, nil
|
|
}
|
|
d := encoding.NewDecbufAt(bs, off, castagnoliTable)
|
|
var (
|
|
origLen = d.Len()
|
|
cnt = d.Be32int()
|
|
basePos = off + 4
|
|
)
|
|
s.offsets = make([]int, 0, cnt/symbolFactor)
|
|
for d.Err() == nil && s.seen < cnt {
|
|
if s.seen%symbolFactor == 0 {
|
|
s.offsets = append(s.offsets, basePos+origLen-d.Len())
|
|
}
|
|
d.UvarintBytes() // The symbol.
|
|
s.seen++
|
|
}
|
|
if d.Err() != nil {
|
|
return nil, d.Err()
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func (s Symbols) Lookup(o uint32) (string, error) {
|
|
d := encoding.Decbuf{
|
|
B: s.bs.Range(0, s.bs.Len()),
|
|
}
|
|
if s.version == FormatV2 {
|
|
if int(o) > s.seen {
|
|
return "", errors.Errorf("unknown symbol offset %d", o)
|
|
}
|
|
d.Skip(s.offsets[int(o/symbolFactor)])
|
|
// Walk until we find the one we want.
|
|
for i := o - (o / symbolFactor * symbolFactor); i > 0; i-- {
|
|
d.UvarintBytes()
|
|
}
|
|
} else {
|
|
d.Skip(int(o))
|
|
}
|
|
sym := d.UvarintStr()
|
|
if d.Err() != nil {
|
|
return "", d.Err()
|
|
}
|
|
return sym, nil
|
|
}
|
|
|
|
func (s Symbols) ReverseLookup(sym string) (uint32, error) {
|
|
i := sort.Search(len(s.offsets), func(i int) bool {
|
|
// Any decoding errors here will be lost, however
|
|
// we already read through all of this at startup.
|
|
d := encoding.Decbuf{
|
|
B: s.bs.Range(0, s.bs.Len()),
|
|
}
|
|
d.Skip(s.offsets[i])
|
|
return yoloString(d.UvarintBytes()) > sym
|
|
})
|
|
d := encoding.Decbuf{
|
|
B: s.bs.Range(0, s.bs.Len()),
|
|
}
|
|
if i > 0 {
|
|
i--
|
|
}
|
|
d.Skip(s.offsets[i])
|
|
res := i * 32
|
|
var lastLen int
|
|
var lastSymbol string
|
|
for d.Err() == nil && res <= s.seen {
|
|
lastLen = d.Len()
|
|
lastSymbol = yoloString(d.UvarintBytes())
|
|
if lastSymbol >= sym {
|
|
break
|
|
}
|
|
res++
|
|
}
|
|
if d.Err() != nil {
|
|
return 0, d.Err()
|
|
}
|
|
if lastSymbol != sym {
|
|
return 0, errors.Errorf("unknown symbol %q", sym)
|
|
}
|
|
if s.version == FormatV2 {
|
|
return uint32(res), nil
|
|
}
|
|
return uint32(s.bs.Len() - lastLen), nil
|
|
}
|
|
|
|
func (s Symbols) All() (map[string]struct{}, error) {
|
|
d := encoding.NewDecbufAt(s.bs, s.off, castagnoliTable)
|
|
cnt := d.Be32int()
|
|
res := make(map[string]struct{}, cnt)
|
|
for d.Err() == nil && cnt > 0 {
|
|
res[d.UvarintStr()] = struct{}{}
|
|
cnt--
|
|
}
|
|
if d.Err() != nil {
|
|
return nil, d.Err()
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (s Symbols) Size() int {
|
|
return len(s.offsets) * 8
|
|
}
|
|
|
|
// ReadOffsetTable reads an offset table and at the given position calls f for each
|
|
// found entry. If f returns an error it stops decoding and returns the received error.
|
|
func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64, int) error) error {
|
|
d := encoding.NewDecbufAt(bs, int(off), castagnoliTable)
|
|
startLen := d.Len()
|
|
cnt := d.Be32()
|
|
|
|
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
|
|
offsetPos := startLen - d.Len()
|
|
keyCount := d.Uvarint()
|
|
// The Postings offset table takes only 2 keys per entry (name and value of label),
|
|
// and the LabelIndices offset table takes only 1 key per entry (a label name).
|
|
// Hence setting the size to max of both, i.e. 2.
|
|
keys := make([]string, 0, 2)
|
|
|
|
for i := 0; i < keyCount; i++ {
|
|
keys = append(keys, d.UvarintStr())
|
|
}
|
|
o := d.Uvarint64()
|
|
if d.Err() != nil {
|
|
break
|
|
}
|
|
if err := f(keys, o, offsetPos); err != nil {
|
|
return err
|
|
}
|
|
cnt--
|
|
}
|
|
return d.Err()
|
|
}
|
|
|
|
// Close the reader and its underlying resources.
|
|
func (r *Reader) Close() error {
|
|
return r.c.Close()
|
|
}
|
|
|
|
func (r *Reader) lookupSymbol(o uint32) (string, error) {
|
|
if s, ok := r.nameSymbols[o]; ok {
|
|
return s, nil
|
|
}
|
|
return r.symbols.Lookup(o)
|
|
}
|
|
|
|
// Symbols returns a set of symbols that exist within the index.
|
|
func (r *Reader) Symbols() (map[string]struct{}, error) {
|
|
return r.symbols.All()
|
|
}
|
|
|
|
// SymbolTableSize returns the symbol table size in bytes.
|
|
func (r *Reader) SymbolTableSize() uint64 {
|
|
return uint64(r.symbols.Size())
|
|
}
|
|
|
|
// LabelValues returns value tuples that exist for the given label name tuples.
|
|
// It is not safe to use the return value beyond the lifetime of the byte slice
|
|
// passed into the Reader.
|
|
func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
|
|
if len(names) != 1 {
|
|
return nil, errors.Errorf("only one label name supported")
|
|
}
|
|
e, ok := r.postings[names[0]]
|
|
if !ok {
|
|
return emptyStringTuples{}, nil
|
|
}
|
|
if len(e) == 0 {
|
|
return emptyStringTuples{}, nil
|
|
}
|
|
values := make([]string, 0, len(e)*symbolFactor)
|
|
|
|
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
|
|
d.Skip(e[0].off)
|
|
lastVal := e[len(e)-1].value
|
|
|
|
skip := 0
|
|
for d.Err() == nil {
|
|
if skip == 0 {
|
|
// These are always the same number of bytes,
|
|
// and it's faster to skip than parse.
|
|
skip = d.Len()
|
|
d.Uvarint() // Keycount.
|
|
d.UvarintBytes() // Label name.
|
|
skip -= d.Len()
|
|
} else {
|
|
d.Skip(skip)
|
|
}
|
|
s := yoloString(d.UvarintBytes()) //Label value.
|
|
values = append(values, s)
|
|
if s == lastVal {
|
|
break
|
|
}
|
|
d.Uvarint64() // Offset.
|
|
}
|
|
if d.Err() != nil {
|
|
return nil, errors.Wrap(d.Err(), "get postings offset entry")
|
|
}
|
|
return NewStringTuples(values, 1)
|
|
}
|
|
|
|
type emptyStringTuples struct{}
|
|
|
|
func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil }
|
|
func (emptyStringTuples) Len() int { return 0 }
|
|
|
|
// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
|
|
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
|
|
offset := id
|
|
// In version 2 series IDs are no longer exact references but series are 16-byte padded
|
|
// and the ID is the multiple of 16 of the actual position.
|
|
if r.version == FormatV2 {
|
|
offset = id * 16
|
|
}
|
|
d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
|
|
if d.Err() != nil {
|
|
return d.Err()
|
|
}
|
|
return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series")
|
|
}
|
|
|
|
func (r *Reader) Postings(name string, values ...string) (Postings, error) {
|
|
e, ok := r.postings[name]
|
|
if !ok {
|
|
return EmptyPostings(), nil
|
|
}
|
|
|
|
if len(values) == 0 {
|
|
return EmptyPostings(), nil
|
|
}
|
|
|
|
res := make([]Postings, 0, len(values))
|
|
skip := 0
|
|
valueIndex := 0
|
|
for valueIndex < len(values) && values[valueIndex] < e[0].value {
|
|
// Discard values before the start.
|
|
valueIndex++
|
|
}
|
|
for valueIndex < len(values) {
|
|
value := values[valueIndex]
|
|
|
|
i := sort.Search(len(e), func(i int) bool { return e[i].value >= value })
|
|
if i == len(e) {
|
|
// We're past the end.
|
|
break
|
|
}
|
|
if i > 0 && e[i].value != value {
|
|
// Need to look from previous entry.
|
|
i--
|
|
}
|
|
// Don't Crc32 the entire postings offset table, this is very slow
|
|
// so hope any issues were caught at startup.
|
|
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
|
|
d.Skip(e[i].off)
|
|
|
|
// Iterate on the offset table.
|
|
var postingsOff uint64 // The offset into the postings table.
|
|
for d.Err() == nil {
|
|
if skip == 0 {
|
|
// These are always the same number of bytes,
|
|
// and it's faster to skip than parse.
|
|
skip = d.Len()
|
|
d.Uvarint() // Keycount.
|
|
d.UvarintBytes() // Label name.
|
|
skip -= d.Len()
|
|
} else {
|
|
d.Skip(skip)
|
|
}
|
|
v := d.UvarintBytes() // Label value.
|
|
postingsOff = d.Uvarint64() // Offset.
|
|
for string(v) >= value {
|
|
if string(v) == value {
|
|
// Read from the postings table.
|
|
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
|
|
_, p, err := r.dec.Postings(d2.Get())
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "decode postings")
|
|
}
|
|
res = append(res, p)
|
|
}
|
|
valueIndex++
|
|
if valueIndex == len(values) {
|
|
break
|
|
}
|
|
value = values[valueIndex]
|
|
}
|
|
if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) {
|
|
// Need to go to a later postings offset entry, if there is one.
|
|
break
|
|
}
|
|
}
|
|
if d.Err() != nil {
|
|
return nil, errors.Wrap(d.Err(), "get postings offset entry")
|
|
}
|
|
}
|
|
|
|
return Merge(res...), nil
|
|
}
|
|
|
|
// SortedPostings returns the given postings list reordered so that the backing series
|
|
// are sorted.
|
|
func (r *Reader) SortedPostings(p Postings) Postings {
|
|
return p
|
|
}
|
|
|
|
// Size returns the size of an index file.
|
|
func (r *Reader) Size() int64 {
|
|
return int64(r.b.Len())
|
|
}
|
|
|
|
// LabelNames returns all the unique label names present in the index.
|
|
func (r *Reader) LabelNames() ([]string, error) {
|
|
labelNames := make([]string, 0, len(r.postings))
|
|
for name := range r.postings {
|
|
if name == allPostingsKey.Name {
|
|
// This is not from any metric.
|
|
continue
|
|
}
|
|
labelNames = append(labelNames, name)
|
|
}
|
|
sort.Strings(labelNames)
|
|
return labelNames, nil
|
|
}
|
|
|
|
type stringTuples struct {
|
|
length int // tuple length
|
|
entries []string // flattened tuple entries
|
|
swapBuf []string
|
|
}
|
|
|
|
func NewStringTuples(entries []string, length int) (*stringTuples, error) {
|
|
if len(entries)%length != 0 {
|
|
return nil, errors.Wrap(encoding.ErrInvalidSize, "string tuple list")
|
|
}
|
|
return &stringTuples{
|
|
entries: entries,
|
|
length: length,
|
|
}, nil
|
|
}
|
|
|
|
func (t *stringTuples) Len() int { return len(t.entries) / t.length }
|
|
func (t *stringTuples) At(i int) ([]string, error) { return t.entries[i : i+t.length], nil }
|
|
|
|
func (t *stringTuples) Swap(i, j int) {
|
|
if t.swapBuf == nil {
|
|
t.swapBuf = make([]string, t.length)
|
|
}
|
|
copy(t.swapBuf, t.entries[i:i+t.length])
|
|
for k := 0; k < t.length; k++ {
|
|
t.entries[i+k] = t.entries[j+k]
|
|
t.entries[j+k] = t.swapBuf[k]
|
|
}
|
|
}
|
|
|
|
func (t *stringTuples) Less(i, j int) bool {
|
|
for k := 0; k < t.length; k++ {
|
|
d := strings.Compare(t.entries[i+k], t.entries[j+k])
|
|
|
|
if d < 0 {
|
|
return true
|
|
}
|
|
if d > 0 {
|
|
return false
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Decoder provides decoding methods for the v1 and v2 index file format.
|
|
//
|
|
// It currently does not contain decoding methods for all entry types but can be extended
|
|
// by them if there's demand.
|
|
type Decoder struct {
|
|
LookupSymbol func(uint32) (string, error)
|
|
}
|
|
|
|
// Postings returns a postings list for b and its number of elements.
|
|
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
|
|
d := encoding.Decbuf{B: b}
|
|
n := d.Be32int()
|
|
l := d.Get()
|
|
return n, newBigEndianPostings(l), d.Err()
|
|
}
|
|
|
|
// Series decodes a series entry from the given byte slice into lset and chks.
|
|
func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
|
|
*lbls = (*lbls)[:0]
|
|
*chks = (*chks)[:0]
|
|
|
|
d := encoding.Decbuf{B: b}
|
|
|
|
k := d.Uvarint()
|
|
|
|
for i := 0; i < k; i++ {
|
|
lno := uint32(d.Uvarint())
|
|
lvo := uint32(d.Uvarint())
|
|
|
|
if d.Err() != nil {
|
|
return errors.Wrap(d.Err(), "read series label offsets")
|
|
}
|
|
|
|
ln, err := dec.LookupSymbol(lno)
|
|
if err != nil {
|
|
return errors.Wrap(err, "lookup label name")
|
|
}
|
|
lv, err := dec.LookupSymbol(lvo)
|
|
if err != nil {
|
|
return errors.Wrap(err, "lookup label value")
|
|
}
|
|
|
|
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
|
|
}
|
|
|
|
// Read the chunks meta data.
|
|
k = d.Uvarint()
|
|
|
|
if k == 0 {
|
|
return nil
|
|
}
|
|
|
|
t0 := d.Varint64()
|
|
maxt := int64(d.Uvarint64()) + t0
|
|
ref0 := int64(d.Uvarint64())
|
|
|
|
*chks = append(*chks, chunks.Meta{
|
|
Ref: uint64(ref0),
|
|
MinTime: t0,
|
|
MaxTime: maxt,
|
|
})
|
|
t0 = maxt
|
|
|
|
for i := 1; i < k; i++ {
|
|
mint := int64(d.Uvarint64()) + t0
|
|
maxt := int64(d.Uvarint64()) + mint
|
|
|
|
ref0 += d.Varint64()
|
|
t0 = maxt
|
|
|
|
if d.Err() != nil {
|
|
return errors.Wrapf(d.Err(), "read meta for chunk %d", i)
|
|
}
|
|
|
|
*chks = append(*chks, chunks.Meta{
|
|
Ref: uint64(ref0),
|
|
MinTime: mint,
|
|
MaxTime: maxt,
|
|
})
|
|
}
|
|
return d.Err()
|
|
}
|
|
|
|
func yoloString(b []byte) string {
|
|
return *((*string)(unsafe.Pointer(&b)))
|
|
}
|