// Copyright 2017 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package index import ( "bufio" "context" "encoding/binary" "hash" "hash/crc32" "io" "io/ioutil" "math" "os" "path/filepath" "sort" "strings" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" ) const ( // MagicIndex 4 bytes at the head of an index file. MagicIndex = 0xBAAAD700 // HeaderLen represents number of bytes reserved of index for header. HeaderLen = 5 // FormatV1 represents 1 version of index. FormatV1 = 1 // FormatV2 represents 2 version of index. FormatV2 = 2 labelNameSeparator = "\xff" indexFilename = "index" ) type indexWriterSeries struct { labels labels.Labels chunks []chunks.Meta // series file offset of chunks } type indexWriterSeriesSlice []*indexWriterSeries func (s indexWriterSeriesSlice) Len() int { return len(s) } func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s indexWriterSeriesSlice) Less(i, j int) bool { return labels.Compare(s[i].labels, s[j].labels) < 0 } type indexWriterStage uint8 const ( idxStageNone indexWriterStage = iota idxStageSymbols idxStageSeries idxStageLabelIndex idxStagePostings idxStageDone ) func (s indexWriterStage) String() string { switch s { case idxStageNone: return "none" case idxStageSymbols: return "symbols" case idxStageSeries: return "series" case idxStageLabelIndex: return "label index" case idxStagePostings: return "postings" case idxStageDone: return "done" } return "" } // The table gets initialized with sync.Once but may still cause a race // with any other use of the crc32 package anywhere. Thus we initialize it // before. var castagnoliTable *crc32.Table func init() { castagnoliTable = crc32.MakeTable(crc32.Castagnoli) } // newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the // polynomial may be easily changed in one location at a later time, if necessary. func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { ctx context.Context f *os.File fbuf *bufio.Writer pos uint64 toc TOC stage indexWriterStage // Reusable memory. buf1 encoding.Encbuf buf2 encoding.Encbuf symbols map[string]uint32 // symbol offsets reverseSymbols map[uint32]string labelIndexes []labelIndexHashEntry // label index offsets postings []postingsHashEntry // postings lists offsets labelNames map[string]uint64 // label names, and their usage // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels lastRef uint64 crc32 hash.Hash Version int } // TOC represents index Table Of Content that states where each section of index starts. type TOC struct { Symbols uint64 Series uint64 LabelIndices uint64 LabelIndicesTable uint64 Postings uint64 PostingsTable uint64 } // NewTOCFromByteSlice return parsed TOC from given index byte slice. func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { if bs.Len() < indexTOCLen { return nil, encoding.ErrInvalidSize } b := bs.Range(bs.Len()-indexTOCLen, bs.Len()) expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) d := encoding.Decbuf{B: b[:len(b)-4]} if d.Crc32(castagnoliTable) != expCRC { return nil, errors.Wrap(encoding.ErrInvalidChecksum, "read TOC") } if err := d.Err(); err != nil { return nil, err } return &TOC{ Symbols: d.Be64(), Series: d.Be64(), LabelIndices: d.Be64(), LabelIndicesTable: d.Be64(), Postings: d.Be64(), PostingsTable: d.Be64(), }, nil } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. func NewWriter(ctx context.Context, fn string) (*Writer, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) if err != nil { return nil, err } defer df.Close() // Close for platform windows. if err := os.RemoveAll(fn); err != nil { return nil, errors.Wrap(err, "remove any existing index at path") } f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return nil, err } if err := df.Sync(); err != nil { return nil, errors.Wrap(err, "sync dir") } iw := &Writer{ ctx: ctx, f: f, fbuf: bufio.NewWriterSize(f, 1<<22), pos: 0, stage: idxStageNone, // Reusable memory. buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, // Caches. labelNames: make(map[string]uint64, 1<<8), crc32: newCRC32(), } if err := iw.writeMeta(); err != nil { return nil, err } return iw, nil } func (w *Writer) write(bufs ...[]byte) error { for _, b := range bufs { n, err := w.fbuf.Write(b) w.pos += uint64(n) if err != nil { return err } // For now the index file must not grow beyond 64GiB. Some of the fixed-sized // offset references in v1 are only 4 bytes large. // Once we move to compressed/varint representations in those areas, this limitation // can be lifted. if w.pos > 16*math.MaxUint32 { return errors.Errorf("exceeding max size of 64GiB") } } return nil } func (w *Writer) writeAt(buf []byte, pos uint64) error { if err := w.fbuf.Flush(); err != nil { return err } _, err := w.f.WriteAt(buf, int64(pos)) return err } // addPadding adds zero byte padding until the file size is a multiple size. func (w *Writer) addPadding(size int) error { p := w.pos % uint64(size) if p == 0 { return nil } p = uint64(size) - p return errors.Wrap(w.write(make([]byte, p)), "add padding") } // ensureStage handles transitions between write stages and ensures that IndexWriter // methods are called in an order valid for the implementation. func (w *Writer) ensureStage(s indexWriterStage) error { select { case <-w.ctx.Done(): return w.ctx.Err() default: } if w.stage == s { return nil } if w.stage > s { return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) } // Mark start of sections in table of contents. switch s { case idxStageSymbols: w.toc.Symbols = w.pos case idxStageSeries: w.toc.Series = w.pos case idxStageLabelIndex: w.toc.LabelIndices = w.pos case idxStageDone: w.toc.Postings = w.pos if err := w.writePostings(); err != nil { return err } w.toc.LabelIndicesTable = w.pos if err := w.writeLabelIndexesOffsetTable(); err != nil { return err } w.toc.PostingsTable = w.pos if err := w.writePostingsOffsetTable(); err != nil { return err } if err := w.writeTOC(); err != nil { return err } } w.stage = s return nil } func (w *Writer) writeMeta() error { w.buf1.Reset() w.buf1.PutBE32(MagicIndex) w.buf1.PutByte(FormatV2) return w.write(w.buf1.Get()) } // AddSeries adds the series one at a time along with its chunks. func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error { if err := w.ensureStage(idxStageSeries); err != nil { return err } if labels.Compare(lset, w.lastSeries) <= 0 { return errors.Errorf("out-of-order series added with label set %q", lset) } if ref < w.lastRef && len(w.lastSeries) != 0 { return errors.Errorf("series with reference greater than %d already added", ref) } // We add padding to 16 bytes to increase the addressable space we get through 4 byte // series references. if err := w.addPadding(16); err != nil { return errors.Errorf("failed to write padding bytes: %v", err) } if w.pos%16 != 0 { return errors.Errorf("series write not 16-byte aligned at %d", w.pos) } w.buf2.Reset() w.buf2.PutUvarint(len(lset)) for _, l := range lset { // here we have an index for the symbol file if v2, otherwise it's an offset index, ok := w.symbols[l.Name] if !ok { return errors.Errorf("symbol entry for %q does not exist", l.Name) } w.labelNames[l.Name]++ w.buf2.PutUvarint32(index) index, ok = w.symbols[l.Value] if !ok { return errors.Errorf("symbol entry for %q does not exist", l.Value) } w.buf2.PutUvarint32(index) } w.buf2.PutUvarint(len(chunks)) if len(chunks) > 0 { c := chunks[0] w.buf2.PutVarint64(c.MinTime) w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime)) w.buf2.PutUvarint64(c.Ref) t0 := c.MaxTime ref0 := int64(c.Ref) for _, c := range chunks[1:] { w.buf2.PutUvarint64(uint64(c.MinTime - t0)) w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime)) t0 = c.MaxTime w.buf2.PutVarint64(int64(c.Ref) - ref0) ref0 = int64(c.Ref) } } w.buf1.Reset() w.buf1.PutUvarint(w.buf2.Len()) w.buf2.PutHash(w.crc32) if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil { return errors.Wrap(err, "write series data") } w.lastSeries = append(w.lastSeries[:0], lset...) w.lastRef = ref return nil } func (w *Writer) AddSymbols(sym map[string]struct{}) error { if err := w.ensureStage(idxStageSymbols); err != nil { return err } // Generate sorted list of strings we will store as reference table. symbols := make([]string, 0, len(sym)) for s := range sym { symbols = append(symbols, s) } sort.Strings(symbols) startPos := w.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err } w.crc32.Reset() w.buf1.Reset() w.buf1.PutBE32int(len(symbols)) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } w.symbols = make(map[string]uint32, len(symbols)) w.reverseSymbols = make(map[uint32]string, len(symbols)) for index, s := range symbols { w.symbols[s] = uint32(index) w.reverseSymbols[uint32(index)] = s w.buf1.Reset() w.buf1.PutUvarintStr(s) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } } // Write out the length. w.buf1.Reset() w.buf1.PutBE32int(int(w.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } w.buf1.Reset() w.buf1.PutHashSum(w.crc32) return w.write(w.buf1.Get()) } func (w *Writer) WriteLabelIndex(names []string, values []string) error { if len(values)%len(names) != 0 { return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) } if err := w.ensureStage(idxStageLabelIndex); err != nil { return errors.Wrap(err, "ensure stage") } valt, err := NewStringTuples(values, len(names)) if err != nil { return err } sort.Sort(valt) // Align beginning to 4 bytes for more efficient index list scans. if err := w.addPadding(4); err != nil { return err } w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{ keys: names, offset: w.pos, }) startPos := w.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err } w.crc32.Reset() w.buf1.Reset() w.buf1.PutBE32int(len(names)) w.buf1.PutBE32int(valt.Len()) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } // here we have an index for the symbol file if v2, otherwise it's an offset for _, v := range valt.entries { index, ok := w.symbols[v] if !ok { return errors.Errorf("symbol entry for %q does not exist", v) } w.buf1.Reset() w.buf1.PutBE32(index) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } } // Write out the length. w.buf1.Reset() w.buf1.PutBE32int(int(w.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } w.buf1.Reset() w.buf1.PutHashSum(w.crc32) return w.write(w.buf1.Get()) } // writeLabelIndexesOffsetTable writes the label indices offset table. func (w *Writer) writeLabelIndexesOffsetTable() error { startPos := w.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err } w.crc32.Reset() w.buf1.Reset() w.buf1.PutBE32int(len(w.labelIndexes)) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } for _, e := range w.labelIndexes { w.buf1.Reset() w.buf1.PutUvarint(len(e.keys)) for _, k := range e.keys { w.buf1.PutUvarintStr(k) } w.buf1.PutUvarint64(e.offset) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } } // Write out the length. w.buf1.Reset() w.buf1.PutBE32int(int(w.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } w.buf1.Reset() w.buf1.PutHashSum(w.crc32) return w.write(w.buf1.Get()) } // writePostingsOffsetTable writes the postings offset table. func (w *Writer) writePostingsOffsetTable() error { startPos := w.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err } w.crc32.Reset() w.buf1.Reset() w.buf1.PutBE32int(len(w.postings)) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } for _, e := range w.postings { w.buf1.Reset() w.buf1.PutUvarint(2) w.buf1.PutUvarintStr(e.name) w.buf1.PutUvarintStr(e.value) w.buf1.PutUvarint64(e.offset) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } } // Write out the length. w.buf1.Reset() w.buf1.PutBE32int(int(w.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } w.buf1.Reset() w.buf1.PutHashSum(w.crc32) return w.write(w.buf1.Get()) } const indexTOCLen = 6*8 + 4 func (w *Writer) writeTOC() error { w.buf1.Reset() w.buf1.PutBE64(w.toc.Symbols) w.buf1.PutBE64(w.toc.Series) w.buf1.PutBE64(w.toc.LabelIndices) w.buf1.PutBE64(w.toc.LabelIndicesTable) w.buf1.PutBE64(w.toc.Postings) w.buf1.PutBE64(w.toc.PostingsTable) w.buf1.PutHash(w.crc32) return w.write(w.buf1.Get()) } func (w *Writer) writePostings() error { names := make([]string, 0, len(w.labelNames)) for n := range w.labelNames { names = append(names, n) } sort.Strings(names) if err := w.fbuf.Flush(); err != nil { return err } f, err := fileutil.OpenMmapFile(w.f.Name()) if err != nil { return err } defer f.Close() // Write out the special all posting. offsets := []uint32{} d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) d.Skip(int(w.toc.Series)) for d.Len() > 0 { d.ConsumePadding() startPos := w.toc.LabelIndices - uint64(d.Len()) if startPos%16 != 0 { return errors.Errorf("series not 16-byte aligned at %d", startPos) } offsets = append(offsets, uint32(startPos/16)) // Skip to next series. The 4 is for the CRC32. d.Skip(d.Uvarint() + 4) if err := d.Err(); err != nil { return nil } } if err := w.writePosting("", "", offsets); err != nil { return err } maxPostings := uint64(len(offsets)) // No label name can have more postings than this. for len(names) > 0 { batchNames := []string{} var c uint64 // Try to bunch up label names into one loop, but avoid // using more memory than a single label name can. for len(names) > 0 { if w.labelNames[names[0]]+c > maxPostings { break } batchNames = append(batchNames, names[0]) c += w.labelNames[names[0]] names = names[1:] } nameSymbols := map[uint32]struct{}{} for _, name := range batchNames { nameSymbols[w.symbols[name]] = struct{}{} } // Label name -> label value -> positions. postings := map[uint32]map[uint32][]uint32{} d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) d.Skip(int(w.toc.Series)) for d.Len() > 0 { d.ConsumePadding() startPos := w.toc.LabelIndices - uint64(d.Len()) l := d.Uvarint() // Length of this series in bytes. startLen := d.Len() // See if label names we want are in the series. numLabels := d.Uvarint() for i := 0; i < numLabels; i++ { lno := uint32(d.Uvarint()) lvo := uint32(d.Uvarint()) if _, ok := nameSymbols[lno]; ok { if _, ok := postings[lno]; !ok { postings[lno] = map[uint32][]uint32{} } if _, ok := postings[lno][lvo]; !ok { postings[lno][lvo] = []uint32{} } postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16)) } } // Skip to next series. The 4 is for the CRC32. d.Skip(l - (startLen - d.Len()) + 4) if err := d.Err(); err != nil { return nil } } for _, name := range batchNames { // Write out postings for this label name. values := make([]uint32, 0, len(postings[w.symbols[name]])) for v := range postings[w.symbols[name]] { values = append(values, v) } // Symbol numbers are in order, so the strings will also be in order. sort.Sort(uint32slice(values)) for _, v := range values { if err := w.writePosting(name, w.reverseSymbols[v], postings[w.symbols[name]][v]); err != nil { return err } } } select { case <-w.ctx.Done(): return w.ctx.Err() default: } } return nil } func (w *Writer) writePosting(name, value string, offs []uint32) error { // Align beginning to 4 bytes for more efficient postings list scans. if err := w.addPadding(4); err != nil { return err } w.postings = append(w.postings, postingsHashEntry{ name: name, value: value, offset: w.pos, }) w.buf1.Reset() w.buf1.PutBE32int(len(offs)) for _, off := range offs { if off > (1<<32)-1 { return errors.Errorf("series offset %d exceeds 4 bytes", off) } w.buf1.PutBE32(off) } w.buf2.Reset() w.buf2.PutBE32int(w.buf1.Len()) w.buf1.PutHash(w.crc32) return w.write(w.buf2.Get(), w.buf1.Get()) } type uint32slice []uint32 func (s uint32slice) Len() int { return len(s) } func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } type labelIndexHashEntry struct { keys []string offset uint64 } type postingsHashEntry struct { name, value string offset uint64 } func (w *Writer) Close() error { if err := w.ensureStage(idxStageDone); err != nil { return err } if err := w.fbuf.Flush(); err != nil { return err } if err := w.f.Sync(); err != nil { return err } return w.f.Close() } // StringTuples provides access to a sorted list of string tuples. type StringTuples interface { // Total number of tuples in the list. Len() int // At returns the tuple at position i. At(i int) ([]string, error) } type Reader struct { b ByteSlice toc *TOC // Close that releases the underlying resources of the byte slice. c io.Closer // Cached hashmaps of section offsets. labels map[string]uint64 // Map of LabelName to a list of some LabelValues's position in the offset table. // The first and last values for each name are always present. postings map[string][]postingOffset // Cache of read symbols. Strings that are returned when reading from the // block are always backed by true strings held in here rather than // strings that are backed by byte slices from the mmap'd index file. This // prevents memory faults when applications work with read symbols after // the block has been unmapped. The older format has sparse indexes so a map // must be used, but the new format is not so we can use a slice. symbolsV1 map[uint32]string symbolsV2 []string symbolsTableSize uint64 dec *Decoder version int } type postingOffset struct { value string off int } // ByteSlice abstracts a byte slice. type ByteSlice interface { Len() int Range(start, end int) []byte } type realByteSlice []byte func (b realByteSlice) Len() int { return len(b) } func (b realByteSlice) Range(start, end int) []byte { return b[start:end] } func (b realByteSlice) Sub(start, end int) ByteSlice { return b[start:end] } // NewReader returns a new index reader on the given byte slice. It automatically // handles different format versions. func NewReader(b ByteSlice) (*Reader, error) { return newReader(b, ioutil.NopCloser(nil)) } // NewFileReader returns a new index reader against the given index file. func NewFileReader(path string) (*Reader, error) { f, err := fileutil.OpenMmapFile(path) if err != nil { return nil, err } r, err := newReader(realByteSlice(f.Bytes()), f) if err != nil { var merr tsdb_errors.MultiError merr.Add(err) merr.Add(f.Close()) return nil, merr } return r, nil } func newReader(b ByteSlice, c io.Closer) (*Reader, error) { r := &Reader{ b: b, c: c, labels: map[string]uint64{}, postings: map[string][]postingOffset{}, } // Verify header. if r.b.Len() < HeaderLen { return nil, errors.Wrap(encoding.ErrInvalidSize, "index header") } if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { return nil, errors.Errorf("invalid magic number %x", m) } r.version = int(r.b.Range(4, 5)[0]) if r.version != FormatV1 && r.version != FormatV2 { return nil, errors.Errorf("unknown index file version %d", r.version) } var err error r.toc, err = NewTOCFromByteSlice(b) if err != nil { return nil, errors.Wrap(err, "read TOC") } r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(r.toc.Symbols)) if err != nil { return nil, errors.Wrap(err, "read symbols") } // Use the strings already allocated by symbols, rather than // re-allocating them again below. // Additionally, calculate symbolsTableSize. allocatedSymbols := make(map[string]string, len(r.symbolsV1)+len(r.symbolsV2)) for _, s := range r.symbolsV1 { r.symbolsTableSize += uint64(len(s) + 8) allocatedSymbols[s] = s } for _, s := range r.symbolsV2 { r.symbolsTableSize += uint64(len(s) + 8) allocatedSymbols[s] = s } if err := ReadOffsetTable(r.b, r.toc.LabelIndicesTable, func(key []string, off uint64, _ int) error { if len(key) != 1 { return errors.Errorf("unexpected key length for label indices table %d", len(key)) } r.labels[allocatedSymbols[key[0]]] = off return nil }); err != nil { return nil, errors.Wrap(err, "read label index table") } var lastKey []string lastOff := 0 valueCount := 0 // For the postings offset table we keep every label name but only every nth // label value (plus the first and last one), to save memory. if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error { if len(key) != 2 { return errors.Errorf("unexpected key length for posting table %d", len(key)) } if _, ok := r.postings[key[0]]; !ok { // Next label name. r.postings[allocatedSymbols[key[0]]] = []postingOffset{} if lastKey != nil { // Always include last value for each label name. r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: allocatedSymbols[lastKey[1]], off: lastOff}) } lastKey = nil valueCount = 0 } if valueCount%32 == 0 { r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: allocatedSymbols[key[1]], off: off}) lastKey = nil } else { lastKey = key lastOff = off } valueCount++ return nil }); err != nil { return nil, errors.Wrap(err, "read postings table") } if lastKey != nil { r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: allocatedSymbols[lastKey[1]], off: lastOff}) } // Trim any extra space in the slices. for k, v := range r.postings { l := make([]postingOffset, len(v)) copy(l, v) r.postings[k] = l } r.dec = &Decoder{LookupSymbol: r.lookupSymbol} return r, nil } // Version returns the file format version of the underlying index. func (r *Reader) Version() int { return r.version } // Range marks a byte range. type Range struct { Start, End int64 } // PostingsRanges returns a new map of byte range in the underlying index file // for all postings lists. func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { m := map[labels.Label]Range{} if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error { if len(key) != 2 { return errors.Errorf("unexpected key length for posting table %d", len(key)) } d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable) if d.Err() != nil { return d.Err() } m[labels.Label{Name: key[0], Value: key[1]}] = Range{ Start: int64(off) + 4, End: int64(off) + 4 + int64(d.Len()), } return nil }); err != nil { return nil, errors.Wrap(err, "read postings table") } return m, nil } // ReadSymbols reads the symbol table fully into memory and allocates proper strings for them. // Strings backed by the mmap'd memory would cause memory faults if applications keep using them // after the reader is closed. func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]string, error) { if off == 0 { return nil, nil, nil } d := encoding.NewDecbufAt(bs, off, castagnoliTable) var ( origLen = d.Len() cnt = d.Be32int() basePos = uint32(off) + 4 nextPos = basePos + uint32(origLen-d.Len()) symbolSlice []string symbols = map[uint32]string{} ) if version == FormatV2 { symbolSlice = make([]string, 0, cnt) } for d.Err() == nil && d.Len() > 0 && cnt > 0 { s := d.UvarintStr() if version == FormatV2 { symbolSlice = append(symbolSlice, s) } else { symbols[nextPos] = s nextPos = basePos + uint32(origLen-d.Len()) } cnt-- } return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols") } // ReadOffsetTable reads an offset table and at the given position calls f for each // found entry. If f returns an error it stops decoding and returns the received error. func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64, int) error) error { d := encoding.NewDecbufAt(bs, int(off), castagnoliTable) startLen := d.Len() cnt := d.Be32() for d.Err() == nil && d.Len() > 0 && cnt > 0 { offsetPos := startLen - d.Len() keyCount := d.Uvarint() // The Postings offset table takes only 2 keys per entry (name and value of label), // and the LabelIndices offset table takes only 1 key per entry (a label name). // Hence setting the size to max of both, i.e. 2. keys := make([]string, 0, 2) for i := 0; i < keyCount; i++ { keys = append(keys, d.UvarintStr()) } o := d.Uvarint64() if d.Err() != nil { break } if err := f(keys, o, offsetPos); err != nil { return err } cnt-- } return d.Err() } // Close the reader and its underlying resources. func (r *Reader) Close() error { return r.c.Close() } func (r *Reader) lookupSymbol(o uint32) (string, error) { if int(o) < len(r.symbolsV2) { return r.symbolsV2[o], nil } s, ok := r.symbolsV1[o] if !ok { return "", errors.Errorf("unknown symbol offset %d", o) } return s, nil } // Symbols returns a set of symbols that exist within the index. func (r *Reader) Symbols() (map[string]struct{}, error) { res := make(map[string]struct{}, len(r.symbolsV1)+len(r.symbolsV2)) for _, s := range r.symbolsV1 { res[s] = struct{}{} } for _, s := range r.symbolsV2 { res[s] = struct{}{} } return res, nil } // SymbolTableSize returns the symbol table size in bytes. func (r *Reader) SymbolTableSize() uint64 { return r.symbolsTableSize } // LabelValues returns value tuples that exist for the given label name tuples. func (r *Reader) LabelValues(names ...string) (StringTuples, error) { key := strings.Join(names, labelNameSeparator) off, ok := r.labels[key] if !ok { // XXX(fabxc): hot fix. Should return a partial data error and handle cases // where the entire block has no data gracefully. return emptyStringTuples{}, nil //return nil, fmt.Errorf("label index doesn't exist") } d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable) nc := d.Be32int() d.Be32() // consume unused value entry count. if d.Err() != nil { return nil, errors.Wrap(d.Err(), "read label value index") } st := &serializedStringTuples{ idsCount: nc, idsBytes: d.Get(), lookup: r.lookupSymbol, } return st, nil } type emptyStringTuples struct{} func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } func (emptyStringTuples) Len() int { return 0 } // LabelIndices returns a slice of label names for which labels or label tuples value indices exist. // NOTE: This is deprecated. Use `LabelNames()` instead. func (r *Reader) LabelIndices() ([][]string, error) { var res [][]string for s := range r.labels { res = append(res, strings.Split(s, labelNameSeparator)) } return res, nil } // Series reads the series with the given ID and writes its labels and chunks into lbls and chks. func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { offset := id // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. if r.version == FormatV2 { offset = id * 16 } d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) if d.Err() != nil { return d.Err() } return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series") } func (r *Reader) Postings(name string, values ...string) (Postings, error) { e, ok := r.postings[name] if !ok { return EmptyPostings(), nil } if len(values) == 0 { return EmptyPostings(), nil } res := make([]Postings, 0, len(values)) skip := 0 valueIndex := 0 for valueIndex < len(values) && values[valueIndex] < e[0].value { // Discard values before the start. valueIndex++ } for valueIndex < len(values) { value := values[valueIndex] i := sort.Search(len(e), func(i int) bool { return e[i].value >= value }) if i == len(e) { // We're past the end. break } if i > 0 && e[i].value != value { // Need to look from previous entry. i-- } // Don't Crc32 the entire postings offset table, this is very slow // so hope any issues were caught at startup. d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) d.Skip(e[i].off) // Iterate on the offset table. var postingsOff uint64 // The offset into the postings table. for d.Err() == nil { if skip == 0 { // These are always the same number of bytes, // and it's faster to skip than parse. skip = d.Len() d.Uvarint() // Keycount. d.UvarintBytes() // Label name. skip -= d.Len() } else { d.Skip(skip) } v := d.UvarintBytes() // Label value. postingsOff = d.Uvarint64() // Offset. for string(v) >= value { if string(v) == value { // Read from the postings table. d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) _, p, err := r.dec.Postings(d2.Get()) if err != nil { return nil, errors.Wrap(err, "decode postings") } res = append(res, p) } valueIndex++ if valueIndex == len(values) { break } value = values[valueIndex] } if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) { // Need to go to a later postings offset entry, if there is one. break } } if d.Err() != nil { return nil, errors.Wrap(d.Err(), "get postings offset entry") } } return Merge(res...), nil } // SortedPostings returns the given postings list reordered so that the backing series // are sorted. func (r *Reader) SortedPostings(p Postings) Postings { return p } // Size returns the size of an index file. func (r *Reader) Size() int64 { return int64(r.b.Len()) } // LabelNames returns all the unique label names present in the index. func (r *Reader) LabelNames() ([]string, error) { labelNamesMap := make(map[string]struct{}, len(r.labels)) for key := range r.labels { // 'key' contains the label names concatenated with the // delimiter 'labelNameSeparator'. names := strings.Split(key, labelNameSeparator) for _, name := range names { if name == allPostingsKey.Name { // This is not from any metric. // It is basically an empty label name. continue } labelNamesMap[name] = struct{}{} } } labelNames := make([]string, 0, len(labelNamesMap)) for name := range labelNamesMap { labelNames = append(labelNames, name) } sort.Strings(labelNames) return labelNames, nil } type stringTuples struct { length int // tuple length entries []string // flattened tuple entries swapBuf []string } func NewStringTuples(entries []string, length int) (*stringTuples, error) { if len(entries)%length != 0 { return nil, errors.Wrap(encoding.ErrInvalidSize, "string tuple list") } return &stringTuples{ entries: entries, length: length, }, nil } func (t *stringTuples) Len() int { return len(t.entries) / t.length } func (t *stringTuples) At(i int) ([]string, error) { return t.entries[i : i+t.length], nil } func (t *stringTuples) Swap(i, j int) { if t.swapBuf == nil { t.swapBuf = make([]string, t.length) } copy(t.swapBuf, t.entries[i:i+t.length]) for k := 0; k < t.length; k++ { t.entries[i+k] = t.entries[j+k] t.entries[j+k] = t.swapBuf[k] } } func (t *stringTuples) Less(i, j int) bool { for k := 0; k < t.length; k++ { d := strings.Compare(t.entries[i+k], t.entries[j+k]) if d < 0 { return true } if d > 0 { return false } } return false } type serializedStringTuples struct { idsCount int idsBytes []byte // bytes containing the ids pointing to the string in the lookup table. lookup func(uint32) (string, error) } func (t *serializedStringTuples) Len() int { return len(t.idsBytes) / (4 * t.idsCount) } func (t *serializedStringTuples) At(i int) ([]string, error) { if len(t.idsBytes) < (i+t.idsCount)*4 { return nil, encoding.ErrInvalidSize } res := make([]string, 0, t.idsCount) for k := 0; k < t.idsCount; k++ { offset := binary.BigEndian.Uint32(t.idsBytes[(i+k)*4:]) s, err := t.lookup(offset) if err != nil { return nil, errors.Wrap(err, "symbol lookup") } res = append(res, s) } return res, nil } // Decoder provides decoding methods for the v1 and v2 index file format. // // It currently does not contain decoding methods for all entry types but can be extended // by them if there's demand. type Decoder struct { LookupSymbol func(uint32) (string, error) } // Postings returns a postings list for b and its number of elements. func (dec *Decoder) Postings(b []byte) (int, Postings, error) { d := encoding.Decbuf{B: b} n := d.Be32int() l := d.Get() return n, newBigEndianPostings(l), d.Err() } // Series decodes a series entry from the given byte slice into lset and chks. func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { *lbls = (*lbls)[:0] *chks = (*chks)[:0] d := encoding.Decbuf{B: b} k := d.Uvarint() for i := 0; i < k; i++ { lno := uint32(d.Uvarint()) lvo := uint32(d.Uvarint()) if d.Err() != nil { return errors.Wrap(d.Err(), "read series label offsets") } ln, err := dec.LookupSymbol(lno) if err != nil { return errors.Wrap(err, "lookup label name") } lv, err := dec.LookupSymbol(lvo) if err != nil { return errors.Wrap(err, "lookup label value") } *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) } // Read the chunks meta data. k = d.Uvarint() if k == 0 { return nil } t0 := d.Varint64() maxt := int64(d.Uvarint64()) + t0 ref0 := int64(d.Uvarint64()) *chks = append(*chks, chunks.Meta{ Ref: uint64(ref0), MinTime: t0, MaxTime: maxt, }) t0 = maxt for i := 1; i < k; i++ { mint := int64(d.Uvarint64()) + t0 maxt := int64(d.Uvarint64()) + mint ref0 += d.Varint64() t0 = maxt if d.Err() != nil { return errors.Wrapf(d.Err(), "read meta for chunk %d", i) } *chks = append(*chks, chunks.Meta{ Ref: uint64(ref0), MinTime: mint, MaxTime: maxt, }) } return d.Err() }