mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Allocate and cache strings for persisted blocks
This change loads the full symbol table when we open a persisted block and allocates a string for each. This ensures that strings retrieved through the index can be used after the block was closed. Before we backed the strings by the mmap'd byte regions which would segfault in this case. Also remove an inconsistency in the disk format and move both offset tables to the end (breaking change).
This commit is contained in:
parent
c521ac495f
commit
78df406dac
|
@ -18,14 +18,14 @@ It is terminated by a table of contents which serves as an entry point into the
|
|||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ Label Index N │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ Label Index Table │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ Postings 1 │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ ... │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ Postings N │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ Label Index Table │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ Postings Table │ │
|
||||
│ ├──────────────────────────────────────────────┤ │
|
||||
│ │ TOC │ │
|
||||
|
|
7
block.go
7
block.go
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
||||
// DiskBlock represents a data block backed by on-disk data.
|
||||
type DiskBlock interface {
|
||||
BlockReader
|
||||
|
||||
|
@ -42,6 +43,7 @@ type DiskBlock interface {
|
|||
Close() error
|
||||
}
|
||||
|
||||
// BlockReader provides reading access to a data block.
|
||||
type BlockReader interface {
|
||||
// Index returns an IndexReader over the block's data.
|
||||
Index() IndexReader
|
||||
|
@ -53,11 +55,6 @@ type BlockReader interface {
|
|||
Tombstones() TombstoneReader
|
||||
}
|
||||
|
||||
// Snapshottable defines an entity that can be backedup online.
|
||||
type Snapshottable interface {
|
||||
Snapshot(dir string) error
|
||||
}
|
||||
|
||||
// Appendable defines an entity to which data can be appended.
|
||||
type Appendable interface {
|
||||
// Appender returns a new Appender against an underlying store.
|
||||
|
|
|
@ -77,22 +77,6 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
|
|||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
||||
|
||||
// uvarintTempStr decodes like uvarintStr but the returned string is
|
||||
// not safe to use if the underyling buffer changes.
|
||||
func (d *decbuf) uvarintTempStr() string {
|
||||
l := d.uvarint64()
|
||||
if d.e != nil {
|
||||
return ""
|
||||
}
|
||||
if len(d.b) < int(l) {
|
||||
d.e = errInvalidSize
|
||||
return ""
|
||||
}
|
||||
s := yoloString(d.b[:l])
|
||||
d.b = d.b[l:]
|
||||
return s
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarintStr() string {
|
||||
l := d.uvarint64()
|
||||
if d.e != nil {
|
||||
|
|
83
index.go
83
index.go
|
@ -232,13 +232,13 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error {
|
|||
w.toc.labelIndices = w.pos
|
||||
|
||||
case idxStagePostings:
|
||||
w.toc.postings = w.pos
|
||||
|
||||
case idxStageDone:
|
||||
w.toc.labelIndicesTable = w.pos
|
||||
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
|
||||
return err
|
||||
}
|
||||
w.toc.postings = w.pos
|
||||
|
||||
case idxStageDone:
|
||||
w.toc.postingsTable = w.pos
|
||||
if err := w.writeOffsetTable(w.postings); err != nil {
|
||||
return err
|
||||
|
@ -404,10 +404,8 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
|||
|
||||
// writeOffsetTable writes a sequence of readable hash entries.
|
||||
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32int(len(entries))
|
||||
|
||||
w.buf2.reset()
|
||||
w.buf2.putBE32int(len(entries))
|
||||
|
||||
for _, e := range entries {
|
||||
w.buf2.putUvarint(len(e.keys))
|
||||
|
@ -417,6 +415,7 @@ func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
|
|||
w.buf2.putUvarint64(e.offset)
|
||||
}
|
||||
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32int(w.buf2.len())
|
||||
w.buf2.putHash(w.crc32)
|
||||
|
||||
|
@ -563,6 +562,12 @@ type indexReader struct {
|
|||
// Cached hashmaps of section offsets.
|
||||
labels map[string]uint32
|
||||
postings map[string]uint32
|
||||
// Cache of read symbols. Strings that are returned when reading from the
|
||||
// block are always backed by true strings held in here rather than
|
||||
// strings that are backed by byte slices from the mmap'd index file. This
|
||||
// prevents memory faults when applications work with read symbols after
|
||||
// the block has been unmapped.
|
||||
symbols map[uint32]string
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -579,7 +584,11 @@ func newIndexReader(dir string) (*indexReader, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := &indexReader{b: f.b, c: f}
|
||||
r := &indexReader{
|
||||
b: f.b,
|
||||
c: f,
|
||||
symbols: map[uint32]string{},
|
||||
}
|
||||
|
||||
// Verify magic number.
|
||||
if len(f.b) < 4 {
|
||||
|
@ -592,6 +601,9 @@ func newIndexReader(dir string) (*indexReader, error) {
|
|||
if err := r.readTOC(); err != nil {
|
||||
return nil, errors.Wrap(err, "read TOC")
|
||||
}
|
||||
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
|
||||
return nil, errors.Wrap(err, "read symbols")
|
||||
}
|
||||
|
||||
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
|
||||
if err != nil {
|
||||
|
@ -623,21 +635,40 @@ func (r *indexReader) decbufAt(off int) decbuf {
|
|||
return decbuf{b: r.b[off:]}
|
||||
}
|
||||
|
||||
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
|
||||
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
|
||||
// after the reader is closed.
|
||||
func (r *indexReader) readSymbols(off int) error {
|
||||
if off == 0 {
|
||||
return nil
|
||||
}
|
||||
var (
|
||||
d1 = r.decbufAt(int(off))
|
||||
d2 = d1.decbuf(d1.be32int())
|
||||
origLen = d2.len()
|
||||
cnt = d2.be32int()
|
||||
basePos = uint32(off) + 4
|
||||
nextPos = basePos + uint32(origLen-d2.len())
|
||||
)
|
||||
for d2.err() == nil && d2.len() > 0 && cnt > 0 {
|
||||
s := d2.uvarintStr()
|
||||
r.symbols[uint32(nextPos)] = s
|
||||
|
||||
nextPos = basePos + uint32(origLen-d2.len())
|
||||
cnt--
|
||||
}
|
||||
return d2.err()
|
||||
}
|
||||
|
||||
// readOffsetTable reads an offset table at the given position and returns a map
|
||||
// with the key strings concatenated by the 0xff unicode non-character.
|
||||
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
||||
// A table might not have been written at all, in which case the position
|
||||
// is zeroed out.
|
||||
if off == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
const sep = "\xff"
|
||||
|
||||
var (
|
||||
d1 = r.decbufAt(int(off))
|
||||
cnt = d1.be32()
|
||||
d2 = d1.decbuf(d1.be32int())
|
||||
cnt = d2.be32()
|
||||
)
|
||||
|
||||
res := make(map[string]uint32, 512)
|
||||
|
@ -647,7 +678,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
|||
keys := make([]string, 0, keyCount)
|
||||
|
||||
for i := 0; i < keyCount; i++ {
|
||||
keys = append(keys, d2.uvarintTempStr())
|
||||
keys = append(keys, d2.uvarintStr())
|
||||
}
|
||||
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
|
||||
|
||||
|
@ -682,28 +713,20 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
|||
}
|
||||
|
||||
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
||||
d := r.decbufAt(int(o))
|
||||
|
||||
s := d.uvarintTempStr()
|
||||
if d.err() != nil {
|
||||
return "", errors.Wrapf(d.err(), "read symbol at %d", o)
|
||||
s, ok := r.symbols[o]
|
||||
if !ok {
|
||||
return "", errors.Errorf("unknown symbol offset %d", o)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (r *indexReader) Symbols() (map[string]struct{}, error) {
|
||||
d1 := r.decbufAt(int(r.toc.symbols))
|
||||
d2 := d1.decbuf(d1.be32int())
|
||||
res := make(map[string]struct{}, len(r.symbols))
|
||||
|
||||
count := d2.be32int()
|
||||
sym := make(map[string]struct{}, count)
|
||||
|
||||
for ; count > 0; count-- {
|
||||
s := d2.uvarintTempStr()
|
||||
sym[s] = struct{}{}
|
||||
for _, s := range r.symbols {
|
||||
res[s] = struct{}{}
|
||||
}
|
||||
|
||||
return sym, d2.err()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||
|
|
|
@ -221,7 +221,7 @@ func TestIndexRW_Postings(t *testing.T) {
|
|||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(c))
|
||||
require.Equal(t, l, series[i])
|
||||
require.Equal(t, series[i], l)
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
|
||||
|
|
Loading…
Reference in a new issue