Index-persistence switched from gob to a hand-coded solution.

Change-Id: Ib4ec42535bd08df16d34d4774bb638e35c5a1841
This commit is contained in:
Bjoern Rabenstein 2014-08-12 17:46:46 +02:00
parent e7ed39c9a6
commit ecdf5ab14f
6 changed files with 419 additions and 63 deletions

View file

@ -362,11 +362,11 @@ func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) met
case it.chunk.len():
return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)}
default:
if v := it.chunk.valueAtIndex(i); v.Timestamp.Equal(t) {
v := it.chunk.valueAtIndex(i)
if v.Timestamp.Equal(t) {
return metric.Values{*v}
} else {
return metric.Values{*it.chunk.valueAtIndex(i - 1), *v}
}
return metric.Values{*it.chunk.valueAtIndex(i - 1), *v}
}
}

View file

@ -47,7 +47,9 @@ type SeriesIterator interface {
type Persistence interface {
// PersistChunk persists a single chunk of a series.
PersistChunk(clientmodel.Fingerprint, chunk) error
// PersistIndexes persists a Prometheus server's timeseries indexes.
// PersistIndexes persists a Prometheus server's timeseries indexes. It
// is the caller's responsibility to not modify indexes while persisting
// is underway, and to not call this method multiple times concurrently.
PersistIndexes(i *Indexes) error
// PersistHeads persists all open (non-full) head chunks.
PersistHeads(map[clientmodel.Fingerprint]*memorySeries) error
@ -64,7 +66,9 @@ type Persistence interface {
LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error)
// LoadHeads loads all open (non-full) head chunks.
LoadHeads(map[clientmodel.Fingerprint]*memorySeries) error
// LoadIndexes loads and returns all timeseries indexes.
// LoadIndexes loads and returns all timeseries indexes. It is the
// caller's responsibility to not modify indexes while loading is
// underway, and to not call this method multiple times concurrently.
LoadIndexes() (*Indexes, error)
}

View file

@ -24,8 +24,12 @@ import (
const (
seriesFileName = "series.db"
seriesTempFileName = "series.db.tmp"
indexFileName = "index.db"
headsFileName = "heads.db"
indexFileName = "index.db"
indexFormatVersion = 1
indexMagicString = "PrometheusIndexes"
indexBufSize = 1 << 15 // 32kiB. TODO: Tweak.
chunkHeaderLen = 17
chunkHeaderTypeOffset = 0
@ -40,6 +44,7 @@ const (
type diskPersistence struct {
basePath string
chunkLen int
buf []byte // Staging space for persisting indexes.
}
func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) {
@ -53,6 +58,7 @@ func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) {
return &diskPersistence{
basePath: basePath,
chunkLen: chunkLen,
buf: make([]byte, binary.MaxVarintLen64), // Also sufficient for uint64.
}, nil
}
@ -232,6 +238,38 @@ func (p *diskPersistence) indexPath() string {
return path.Join(p.basePath, indexFileName)
}
// PersistIndexes persists the indexes to disk. Do not call it concurrently with
// LoadIndexes as they share a buffer for staging. This method depends on the
// following type conversions being possible:
// clientmodel.LabelName -> string
// clientmodel.LabelValue -> string
// clientmodel.Fingerprint -> uint64
//
// Description of the on-disk format:
//
// Label names and label values are encoded as their varint-encoded length
// followed by their byte sequence.
//
// Fingerprints are encoded as big-endian uint64.
//
// The file starts with the 'magic' byte sequence "PrometheusIndexes", followed
// by a varint-encoded version number (currently 1).
//
// The indexes follow one after another in the order FingerprintToSeries,
// LabelPairToFingerprints, LabelNameToLabelValues. Each index starts with the
// varint-encoded number of entries in that index, followed by the corresponding
// number of entries.
//
// An entry in FingerprintToSeries consists of a fingerprint, followed by the
// number of label pairs, followed by those label pairs, each in order label
// name and then label value.
//
// An entry in LabelPairToFingerprints consists of a label name, then a label
// value, then a varint-encoded number of fingerprints, followed by those
// fingerprints.
//
// An entry in LabelNameToLabelValues consists of a label name, followed by the
// varint-encoded number of label values, followed by those label values.
func (p *diskPersistence) PersistIndexes(i *Indexes) error {
f, err := os.OpenFile(p.indexPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
@ -239,21 +277,132 @@ func (p *diskPersistence) PersistIndexes(i *Indexes) error {
}
defer f.Close()
enc := gob.NewEncoder(f)
if err := enc.Encode(i); err != nil {
p.setBufLen(binary.MaxVarintLen64)
w := bufio.NewWriterSize(f, indexBufSize)
if _, err := w.WriteString(indexMagicString); err != nil {
return err
}
if err := p.persistVarint(w, indexFormatVersion); err != nil {
return err
}
if err := p.persistFingerprintToSeries(w, i.FingerprintToSeries); err != nil {
return err
}
if err := p.persistLabelPairToFingerprints(w, i.LabelPairToFingerprints); err != nil {
return err
}
if err := p.persistLabelNameToLabelValues(w, i.LabelNameToLabelValues); err != nil {
return err
}
return w.Flush()
}
func (p *diskPersistence) persistVarint(w io.Writer, i int) error {
bytesWritten := binary.PutVarint(p.buf, int64(i))
_, err := w.Write(p.buf[:bytesWritten])
return err
}
// persistFingerprint depends on clientmodel.Fingerprint to be convertible to
// uint64.
func (p *diskPersistence) persistFingerprint(w io.Writer, fp clientmodel.Fingerprint) error {
binary.BigEndian.PutUint64(p.buf, uint64(fp))
_, err := w.Write(p.buf[:8])
return err
}
func (p *diskPersistence) persistString(w *bufio.Writer, s string) error {
if err := p.persistVarint(w, len(s)); err != nil {
return err
}
_, err := w.WriteString(s)
return err
}
// persistFingerprintToSeries depends on clientmodel.LabelName and
// clientmodel.LabelValue to be convertible to string.
func (p *diskPersistence) persistFingerprintToSeries(w *bufio.Writer, index map[clientmodel.Fingerprint]*memorySeries) error {
if err := p.persistVarint(w, len(index)); err != nil {
return err
}
for fp, ms := range index {
if err := p.persistFingerprint(w, fp); err != nil {
return err
}
if err := p.persistVarint(w, len(ms.metric)); err != nil {
return err
}
for n, v := range ms.metric {
if err := p.persistString(w, string(n)); err != nil {
return err
}
if err := p.persistString(w, string(v)); err != nil {
return err
}
}
}
return nil
}
// persistLabelPairToFingerprints depends on clientmodel.LabelName and
// clientmodel.LabelValue to be convertible to string.
func (p *diskPersistence) persistLabelPairToFingerprints(w *bufio.Writer, index map[metric.LabelPair]utility.Set) error {
if err := p.persistVarint(w, len(index)); err != nil {
return err
}
for lp, fps := range index {
if err := p.persistString(w, string(lp.Name)); err != nil {
return err
}
if err := p.persistString(w, string(lp.Value)); err != nil {
return err
}
if err := p.persistVarint(w, len(fps)); err != nil {
return err
}
for fp := range fps {
if err := p.persistFingerprint(w, fp.(clientmodel.Fingerprint)); err != nil {
return err
}
}
}
return nil
}
// persistLabelNameToLabelValues depends on clientmodel.LabelValue to be convertible to string.
func (p *diskPersistence) persistLabelNameToLabelValues(w *bufio.Writer, index map[clientmodel.LabelName]utility.Set) error {
if err := p.persistVarint(w, len(index)); err != nil {
return err
}
for ln, lvs := range index {
if err := p.persistString(w, string(ln)); err != nil {
return err
}
if err := p.persistVarint(w, len(lvs)); err != nil {
return err
}
for lv := range lvs {
if err := p.persistString(w, string(lv.(clientmodel.LabelValue))); err != nil {
return err
}
}
}
return nil
}
// LoadIndexes loads the indexes from disk. See PersistIndexes for details about
// the disk format. Do not call LoadIndexes and PersistIndexes concurrently as
// they share a buffer for staging.
func (p *diskPersistence) LoadIndexes() (*Indexes, error) {
f, err := os.Open(p.indexPath())
if os.IsNotExist(err) {
return &Indexes{
FingerprintToSeries: make(map[clientmodel.Fingerprint]*memorySeries),
LabelPairToFingerprints: make(map[metric.LabelPair]utility.Set),
LabelNameToLabelValues: make(map[clientmodel.LabelName]utility.Set),
FingerprintToSeries: map[clientmodel.Fingerprint]*memorySeries{},
LabelPairToFingerprints: map[metric.LabelPair]utility.Set{},
LabelNameToLabelValues: map[clientmodel.LabelName]utility.Set{},
}, nil
}
if err != nil {
@ -261,13 +410,162 @@ func (p *diskPersistence) LoadIndexes() (*Indexes, error) {
}
defer f.Close()
dec := gob.NewDecoder(f)
var i Indexes
if err := dec.Decode(&i); err != nil {
r := bufio.NewReaderSize(f, indexBufSize)
p.setBufLen(len(indexMagicString))
if _, err := io.ReadFull(r, p.buf); err != nil {
return nil, err
}
magic := string(p.buf)
if magic != indexMagicString {
return nil, fmt.Errorf(
"unexpected magic string, want %q, got %q",
indexMagicString, magic,
)
}
if version, err := binary.ReadVarint(r); version != indexFormatVersion || err != nil {
return nil, fmt.Errorf("unknown index format version, want %d", indexFormatVersion)
}
i := &Indexes{}
if err := p.loadFingerprintToSeries(r, i); err != nil {
return nil, err
}
if err := p.loadLabelPairToFingerprints(r, i); err != nil {
return nil, err
}
if err := p.loadLabelNameToLabelValues(r, i); err != nil {
return nil, err
}
return &i, nil
return i, nil
}
func (p *diskPersistence) loadFingerprintToSeries(r *bufio.Reader, i *Indexes) error {
length, err := binary.ReadVarint(r)
if err != nil {
return err
}
i.FingerprintToSeries = make(map[clientmodel.Fingerprint]*memorySeries, length)
for ; length > 0; length-- {
fp, err := p.loadFingerprint(r)
if err != nil {
return err
}
numLabelPairs, err := binary.ReadVarint(r)
if err != nil {
return err
}
m := make(clientmodel.Metric, numLabelPairs)
i.FingerprintToSeries[fp] = &memorySeries{metric: m}
for ; numLabelPairs > 0; numLabelPairs-- {
ln, err := p.loadString(r)
if err != nil {
return err
}
lv, err := p.loadString(r)
if err != nil {
return err
}
m[clientmodel.LabelName(ln)] = clientmodel.LabelValue(lv)
}
}
return nil
}
func (p *diskPersistence) loadLabelPairToFingerprints(r *bufio.Reader, i *Indexes) error {
length, err := binary.ReadVarint(r)
if err != nil {
return err
}
i.LabelPairToFingerprints = make(map[metric.LabelPair]utility.Set, length)
for ; length > 0; length-- {
ln, err := p.loadString(r)
if err != nil {
return err
}
lv, err := p.loadString(r)
if err != nil {
return err
}
numFPs, err := binary.ReadVarint(r)
if err != nil {
return err
}
s := make(utility.Set, numFPs)
i.LabelPairToFingerprints[metric.LabelPair{
Name: clientmodel.LabelName(ln),
Value: clientmodel.LabelValue(lv),
}] = s
for ; numFPs > 0; numFPs-- {
fp, err := p.loadFingerprint(r)
if err != nil {
return err
}
s.Add(fp)
}
}
return nil
}
func (p *diskPersistence) loadLabelNameToLabelValues(r *bufio.Reader, i *Indexes) error {
length, err := binary.ReadVarint(r)
if err != nil {
return err
}
i.LabelNameToLabelValues = make(map[clientmodel.LabelName]utility.Set, length)
for ; length > 0; length-- {
ln, err := p.loadString(r)
if err != nil {
return err
}
numLVs, err := binary.ReadVarint(r)
if err != nil {
return err
}
s := make(utility.Set, numLVs)
i.LabelNameToLabelValues[clientmodel.LabelName(ln)] = s
for ; numLVs > 0; numLVs-- {
lv, err := p.loadString(r)
if err != nil {
return err
}
s.Add(clientmodel.LabelValue(lv))
}
}
return nil
}
func (p *diskPersistence) loadFingerprint(r io.Reader) (clientmodel.Fingerprint, error) {
p.setBufLen(8)
if _, err := io.ReadFull(r, p.buf); err != nil {
return 0, err
}
return clientmodel.Fingerprint(binary.BigEndian.Uint64(p.buf)), nil
}
func (p *diskPersistence) loadString(r *bufio.Reader) (string, error) {
length, err := binary.ReadVarint(r)
if err != nil {
return "", err
}
p.setBufLen(int(length))
if _, err := io.ReadFull(r, p.buf); err != nil {
return "", err
}
return string(p.buf), nil
}
func (p *diskPersistence) setBufLen(l int) {
if cap(p.buf) >= l {
p.buf = p.buf[:l]
} else {
p.buf = make([]byte, l)
}
}
func (p *diskPersistence) headsPath() string {

View file

@ -2,6 +2,7 @@ package storage_ng
import (
"io/ioutil"
"os"
"testing"
clientmodel "github.com/prometheus/client_golang/model"
@ -10,8 +11,8 @@ import (
"github.com/prometheus/prometheus/utility"
)
func TestIndexPersistence(t *testing.T) {
expected := Indexes{
var (
indexes = Indexes{
FingerprintToSeries: map[clientmodel.Fingerprint]*memorySeries{
0: {
metric: clientmodel.Metric{
@ -23,6 +24,13 @@ func TestIndexPersistence(t *testing.T) {
metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "metric_0",
"label_2": "value_2",
"label_3": "value_3",
},
},
2: {
metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "metric_1",
"label_1": "value_2",
},
},
},
@ -31,77 +39,150 @@ func TestIndexPersistence(t *testing.T) {
Name: clientmodel.MetricNameLabel,
Value: "metric_0",
}: {
0: struct{}{},
1: struct{}{},
clientmodel.Fingerprint(0): struct{}{},
clientmodel.Fingerprint(1): struct{}{},
},
metric.LabelPair{
Name: clientmodel.MetricNameLabel,
Value: "metric_1",
}: {
clientmodel.Fingerprint(2): struct{}{},
},
metric.LabelPair{
Name: "label_1",
Value: "value_1",
}: {
0: struct{}{},
clientmodel.Fingerprint(0): struct{}{},
},
metric.LabelPair{
Name: "label_1",
Value: "value_2",
}: {
clientmodel.Fingerprint(2): struct{}{},
},
metric.LabelPair{
Name: "label_2",
Value: "value_2",
}: {
1: struct{}{},
clientmodel.Fingerprint(1): struct{}{},
},
metric.LabelPair{
Name: "label_3",
Value: "value_2",
}: {
clientmodel.Fingerprint(1): struct{}{},
},
},
LabelNameToLabelValues: map[clientmodel.LabelName]utility.Set{
clientmodel.MetricNameLabel: {
clientmodel.LabelValue("metric_0"): struct{}{},
clientmodel.LabelValue("metric_1"): struct{}{},
},
"label_1": {
clientmodel.LabelValue("value_1"): struct{}{},
clientmodel.LabelValue("value_2"): struct{}{},
},
"label_2": {
clientmodel.LabelValue("value_2"): struct{}{},
},
"label_3": {
clientmodel.LabelValue("value_3"): struct{}{},
},
},
}
)
func TestIndexPersistence(t *testing.T) {
basePath, err := ioutil.TempDir("", "test_index_persistence")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(basePath)
p, err := NewDiskPersistence(basePath, 1024)
if err != nil {
t.Fatal(err)
}
p.PersistIndexes(&expected)
if err := p.PersistIndexes(&indexes); err != nil {
t.Fatal(err)
}
actual, err := p.LoadIndexes()
if err != nil {
t.Fatal(err)
}
if len(actual.FingerprintToSeries) != len(expected.FingerprintToSeries) {
t.Fatalf("Count mismatch: Got %d; want %d", len(actual.FingerprintToSeries), len(expected.FingerprintToSeries))
if len(actual.FingerprintToSeries) != len(indexes.FingerprintToSeries) {
t.Fatalf("Count mismatch: Got %d; want %d", len(actual.FingerprintToSeries), len(indexes.FingerprintToSeries))
}
for fp, actualSeries := range actual.FingerprintToSeries {
expectedSeries := expected.FingerprintToSeries[fp]
expectedSeries := indexes.FingerprintToSeries[fp]
if !expectedSeries.metric.Equal(actualSeries.metric) {
t.Fatalf("%s: Got %s; want %s", fp, actualSeries.metric, expectedSeries.metric)
}
}
if len(actual.LabelPairToFingerprints) != len(expected.LabelPairToFingerprints) {
t.Fatalf("Count mismatch: Got %d; want %d", len(actual.LabelPairToFingerprints), len(expected.LabelPairToFingerprints))
if len(actual.LabelPairToFingerprints) != len(indexes.LabelPairToFingerprints) {
t.Fatalf("Count mismatch: Got %d; want %d", len(actual.LabelPairToFingerprints), len(indexes.LabelPairToFingerprints))
}
for lp, actualFps := range actual.LabelPairToFingerprints {
expectedFps := expected.LabelPairToFingerprints[lp]
expectedFps := indexes.LabelPairToFingerprints[lp]
if len(actualFps) != len(actualFps.Intersection(expectedFps)) {
t.Fatalf("%s: Got %s; want %s", lp, actualFps, expectedFps)
}
}
if len(actual.LabelNameToLabelValues) != len(expected.LabelNameToLabelValues) {
t.Fatalf("Count mismatch: Got %d; want %d", len(actual.LabelNameToLabelValues), len(expected.LabelNameToLabelValues))
if len(actual.LabelNameToLabelValues) != len(indexes.LabelNameToLabelValues) {
t.Fatalf("Count mismatch: Got %d; want %d", len(actual.LabelNameToLabelValues), len(indexes.LabelNameToLabelValues))
}
for name, actualVals := range actual.LabelNameToLabelValues {
expectedVals := expected.LabelNameToLabelValues[name]
expectedVals := indexes.LabelNameToLabelValues[name]
if len(actualVals) != len(actualVals.Intersection(expectedVals)) {
t.Fatalf("%s: Got %s; want %s", name, actualVals, expectedVals)
}
}
}
func BenchmarkPersistIndexes(b *testing.B) {
basePath, err := ioutil.TempDir("", "test_index_persistence")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(basePath)
p, err := NewDiskPersistence(basePath, 1024)
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := p.PersistIndexes(&indexes); err != nil {
b.Fatal(err)
}
}
b.StopTimer()
}
func BenchmarkLoadIndexes(b *testing.B) {
basePath, err := ioutil.TempDir("", "test_index_persistence")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(basePath)
p, err := NewDiskPersistence(basePath, 1024)
if err != nil {
b.Fatal(err)
}
if err := p.PersistIndexes(&indexes); err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := p.LoadIndexes()
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()
}

View file

@ -1,8 +1,6 @@
package storage_ng
import (
"bytes"
"encoding/gob"
"sort"
"sync"
@ -355,30 +353,6 @@ func (s *memorySeries) values() metric.Values {
return values
}
var gobWriter bytes.Buffer
var seriesEncoder *gob.Encoder
func (s *memorySeries) GobEncode() ([]byte, error) {
gobWriter.Reset()
if seriesEncoder == nil {
seriesEncoder = gob.NewEncoder(&gobWriter)
}
err := seriesEncoder.Encode(s.metric)
return gobWriter.Bytes(), err
}
var gobReader bytes.Reader
var seriesDecoder *gob.Decoder
func (s *memorySeries) GobDecode(buf []byte) error {
gobReader = *bytes.NewReader(buf)
if seriesDecoder == nil {
seriesDecoder = gob.NewDecoder(&gobReader)
}
err := seriesDecoder.Decode(&s.metric)
return err
}
func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values {
it.mtx.Lock()
defer it.mtx.Unlock()
@ -419,12 +393,11 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V
it.chunks[i-1].newIterator().getValueAtTime(t)[0],
it.chunks[i].newIterator().getValueAtTime(t)[0],
}
} else {
// We ended up in the middle of a chunk. We might stay there for a while,
// so save it as the current chunk iterator.
it.chunkIt = it.chunks[i].newIterator()
return it.chunkIt.getValueAtTime(t)
}
// We ended up in the middle of a chunk. We might stay there for a while,
// so save it as the current chunk iterator.
it.chunkIt = it.chunks[i].newIterator()
return it.chunkIt.getValueAtTime(t)
}
func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values {

View file

@ -282,7 +282,7 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
glog.Info("Purging old series data...")
s.mtx.RLock()
fps := make([]clientmodel.Fingerprint, 0, len(s.fingerprintToSeries))
for fp, _ := range s.fingerprintToSeries {
for fp := range s.fingerprintToSeries {
fps = append(fps, fp)
}
s.mtx.RUnlock()