From ecdf5ab14f3d0e84c2ab0f2a1cf06e00ebd664ff Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Tue, 12 Aug 2014 17:46:46 +0200 Subject: [PATCH] Index-persistence switched from gob to a hand-coded solution. Change-Id: Ib4ec42535bd08df16d34d4774bb638e35c5a1841 --- storage/local/delta.go | 6 +- storage/local/interface.go | 8 +- storage/local/persistence.go | 318 +++++++++++++++++++++++++++++- storage/local/persistence_test.go | 113 +++++++++-- storage/local/series.go | 35 +--- storage/local/storage.go | 2 +- 6 files changed, 419 insertions(+), 63 deletions(-) diff --git a/storage/local/delta.go b/storage/local/delta.go index b706d6b49a..db23664c96 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -362,11 +362,11 @@ func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) met case it.chunk.len(): return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)} default: - if v := it.chunk.valueAtIndex(i); v.Timestamp.Equal(t) { + v := it.chunk.valueAtIndex(i) + if v.Timestamp.Equal(t) { return metric.Values{*v} - } else { - return metric.Values{*it.chunk.valueAtIndex(i - 1), *v} } + return metric.Values{*it.chunk.valueAtIndex(i - 1), *v} } } diff --git a/storage/local/interface.go b/storage/local/interface.go index 3e79b3984f..e4fa6e269e 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -47,7 +47,9 @@ type SeriesIterator interface { type Persistence interface { // PersistChunk persists a single chunk of a series. PersistChunk(clientmodel.Fingerprint, chunk) error - // PersistIndexes persists a Prometheus server's timeseries indexes. + // PersistIndexes persists a Prometheus server's timeseries indexes. It + // is the caller's responsibility to not modify indexes while persisting + // is underway, and to not call this method multiple times concurrently. PersistIndexes(i *Indexes) error // PersistHeads persists all open (non-full) head chunks. PersistHeads(map[clientmodel.Fingerprint]*memorySeries) error @@ -64,7 +66,9 @@ type Persistence interface { LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error) // LoadHeads loads all open (non-full) head chunks. LoadHeads(map[clientmodel.Fingerprint]*memorySeries) error - // LoadIndexes loads and returns all timeseries indexes. + // LoadIndexes loads and returns all timeseries indexes. It is the + // caller's responsibility to not modify indexes while loading is + // underway, and to not call this method multiple times concurrently. LoadIndexes() (*Indexes, error) } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index e154431d9c..3104d8dcbb 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -24,8 +24,12 @@ import ( const ( seriesFileName = "series.db" seriesTempFileName = "series.db.tmp" - indexFileName = "index.db" headsFileName = "heads.db" + indexFileName = "index.db" + + indexFormatVersion = 1 + indexMagicString = "PrometheusIndexes" + indexBufSize = 1 << 15 // 32kiB. TODO: Tweak. chunkHeaderLen = 17 chunkHeaderTypeOffset = 0 @@ -40,6 +44,7 @@ const ( type diskPersistence struct { basePath string chunkLen int + buf []byte // Staging space for persisting indexes. } func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) { @@ -53,6 +58,7 @@ func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) { return &diskPersistence{ basePath: basePath, chunkLen: chunkLen, + buf: make([]byte, binary.MaxVarintLen64), // Also sufficient for uint64. }, nil } @@ -232,6 +238,38 @@ func (p *diskPersistence) indexPath() string { return path.Join(p.basePath, indexFileName) } +// PersistIndexes persists the indexes to disk. Do not call it concurrently with +// LoadIndexes as they share a buffer for staging. This method depends on the +// following type conversions being possible: +// clientmodel.LabelName -> string +// clientmodel.LabelValue -> string +// clientmodel.Fingerprint -> uint64 +// +// Description of the on-disk format: +// +// Label names and label values are encoded as their varint-encoded length +// followed by their byte sequence. +// +// Fingerprints are encoded as big-endian uint64. +// +// The file starts with the 'magic' byte sequence "PrometheusIndexes", followed +// by a varint-encoded version number (currently 1). +// +// The indexes follow one after another in the order FingerprintToSeries, +// LabelPairToFingerprints, LabelNameToLabelValues. Each index starts with the +// varint-encoded number of entries in that index, followed by the corresponding +// number of entries. +// +// An entry in FingerprintToSeries consists of a fingerprint, followed by the +// number of label pairs, followed by those label pairs, each in order label +// name and then label value. +// +// An entry in LabelPairToFingerprints consists of a label name, then a label +// value, then a varint-encoded number of fingerprints, followed by those +// fingerprints. +// +// An entry in LabelNameToLabelValues consists of a label name, followed by the +// varint-encoded number of label values, followed by those label values. func (p *diskPersistence) PersistIndexes(i *Indexes) error { f, err := os.OpenFile(p.indexPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) if err != nil { @@ -239,21 +277,132 @@ func (p *diskPersistence) PersistIndexes(i *Indexes) error { } defer f.Close() - enc := gob.NewEncoder(f) - if err := enc.Encode(i); err != nil { + p.setBufLen(binary.MaxVarintLen64) + + w := bufio.NewWriterSize(f, indexBufSize) + if _, err := w.WriteString(indexMagicString); err != nil { + return err + } + if err := p.persistVarint(w, indexFormatVersion); err != nil { return err } + if err := p.persistFingerprintToSeries(w, i.FingerprintToSeries); err != nil { + return err + } + if err := p.persistLabelPairToFingerprints(w, i.LabelPairToFingerprints); err != nil { + return err + } + if err := p.persistLabelNameToLabelValues(w, i.LabelNameToLabelValues); err != nil { + return err + } + + return w.Flush() +} + +func (p *diskPersistence) persistVarint(w io.Writer, i int) error { + bytesWritten := binary.PutVarint(p.buf, int64(i)) + _, err := w.Write(p.buf[:bytesWritten]) + return err +} + +// persistFingerprint depends on clientmodel.Fingerprint to be convertible to +// uint64. +func (p *diskPersistence) persistFingerprint(w io.Writer, fp clientmodel.Fingerprint) error { + binary.BigEndian.PutUint64(p.buf, uint64(fp)) + _, err := w.Write(p.buf[:8]) + return err +} + +func (p *diskPersistence) persistString(w *bufio.Writer, s string) error { + if err := p.persistVarint(w, len(s)); err != nil { + return err + } + _, err := w.WriteString(s) + return err +} + +// persistFingerprintToSeries depends on clientmodel.LabelName and +// clientmodel.LabelValue to be convertible to string. +func (p *diskPersistence) persistFingerprintToSeries(w *bufio.Writer, index map[clientmodel.Fingerprint]*memorySeries) error { + if err := p.persistVarint(w, len(index)); err != nil { + return err + } + for fp, ms := range index { + if err := p.persistFingerprint(w, fp); err != nil { + return err + } + if err := p.persistVarint(w, len(ms.metric)); err != nil { + return err + } + for n, v := range ms.metric { + if err := p.persistString(w, string(n)); err != nil { + return err + } + if err := p.persistString(w, string(v)); err != nil { + return err + } + } + } return nil } +// persistLabelPairToFingerprints depends on clientmodel.LabelName and +// clientmodel.LabelValue to be convertible to string. +func (p *diskPersistence) persistLabelPairToFingerprints(w *bufio.Writer, index map[metric.LabelPair]utility.Set) error { + if err := p.persistVarint(w, len(index)); err != nil { + return err + } + for lp, fps := range index { + if err := p.persistString(w, string(lp.Name)); err != nil { + return err + } + if err := p.persistString(w, string(lp.Value)); err != nil { + return err + } + if err := p.persistVarint(w, len(fps)); err != nil { + return err + } + for fp := range fps { + if err := p.persistFingerprint(w, fp.(clientmodel.Fingerprint)); err != nil { + return err + } + } + } + return nil +} + +// persistLabelNameToLabelValues depends on clientmodel.LabelValue to be convertible to string. +func (p *diskPersistence) persistLabelNameToLabelValues(w *bufio.Writer, index map[clientmodel.LabelName]utility.Set) error { + if err := p.persistVarint(w, len(index)); err != nil { + return err + } + for ln, lvs := range index { + if err := p.persistString(w, string(ln)); err != nil { + return err + } + if err := p.persistVarint(w, len(lvs)); err != nil { + return err + } + for lv := range lvs { + if err := p.persistString(w, string(lv.(clientmodel.LabelValue))); err != nil { + return err + } + } + } + return nil +} + +// LoadIndexes loads the indexes from disk. See PersistIndexes for details about +// the disk format. Do not call LoadIndexes and PersistIndexes concurrently as +// they share a buffer for staging. func (p *diskPersistence) LoadIndexes() (*Indexes, error) { f, err := os.Open(p.indexPath()) if os.IsNotExist(err) { return &Indexes{ - FingerprintToSeries: make(map[clientmodel.Fingerprint]*memorySeries), - LabelPairToFingerprints: make(map[metric.LabelPair]utility.Set), - LabelNameToLabelValues: make(map[clientmodel.LabelName]utility.Set), + FingerprintToSeries: map[clientmodel.Fingerprint]*memorySeries{}, + LabelPairToFingerprints: map[metric.LabelPair]utility.Set{}, + LabelNameToLabelValues: map[clientmodel.LabelName]utility.Set{}, }, nil } if err != nil { @@ -261,13 +410,162 @@ func (p *diskPersistence) LoadIndexes() (*Indexes, error) { } defer f.Close() - dec := gob.NewDecoder(f) - var i Indexes - if err := dec.Decode(&i); err != nil { + r := bufio.NewReaderSize(f, indexBufSize) + + p.setBufLen(len(indexMagicString)) + if _, err := io.ReadFull(r, p.buf); err != nil { + return nil, err + } + magic := string(p.buf) + if magic != indexMagicString { + return nil, fmt.Errorf( + "unexpected magic string, want %q, got %q", + indexMagicString, magic, + ) + } + if version, err := binary.ReadVarint(r); version != indexFormatVersion || err != nil { + return nil, fmt.Errorf("unknown index format version, want %d", indexFormatVersion) + } + + i := &Indexes{} + + if err := p.loadFingerprintToSeries(r, i); err != nil { + return nil, err + } + if err := p.loadLabelPairToFingerprints(r, i); err != nil { + return nil, err + } + if err := p.loadLabelNameToLabelValues(r, i); err != nil { return nil, err } - return &i, nil + return i, nil +} + +func (p *diskPersistence) loadFingerprintToSeries(r *bufio.Reader, i *Indexes) error { + length, err := binary.ReadVarint(r) + if err != nil { + return err + } + i.FingerprintToSeries = make(map[clientmodel.Fingerprint]*memorySeries, length) + + for ; length > 0; length-- { + fp, err := p.loadFingerprint(r) + if err != nil { + return err + } + numLabelPairs, err := binary.ReadVarint(r) + if err != nil { + return err + } + m := make(clientmodel.Metric, numLabelPairs) + i.FingerprintToSeries[fp] = &memorySeries{metric: m} + for ; numLabelPairs > 0; numLabelPairs-- { + ln, err := p.loadString(r) + if err != nil { + return err + } + lv, err := p.loadString(r) + if err != nil { + return err + } + m[clientmodel.LabelName(ln)] = clientmodel.LabelValue(lv) + } + } + return nil +} + +func (p *diskPersistence) loadLabelPairToFingerprints(r *bufio.Reader, i *Indexes) error { + length, err := binary.ReadVarint(r) + if err != nil { + return err + } + i.LabelPairToFingerprints = make(map[metric.LabelPair]utility.Set, length) + + for ; length > 0; length-- { + ln, err := p.loadString(r) + if err != nil { + return err + } + lv, err := p.loadString(r) + if err != nil { + return err + } + numFPs, err := binary.ReadVarint(r) + if err != nil { + return err + } + s := make(utility.Set, numFPs) + i.LabelPairToFingerprints[metric.LabelPair{ + Name: clientmodel.LabelName(ln), + Value: clientmodel.LabelValue(lv), + }] = s + for ; numFPs > 0; numFPs-- { + fp, err := p.loadFingerprint(r) + if err != nil { + return err + } + s.Add(fp) + } + } + return nil +} + +func (p *diskPersistence) loadLabelNameToLabelValues(r *bufio.Reader, i *Indexes) error { + length, err := binary.ReadVarint(r) + if err != nil { + return err + } + i.LabelNameToLabelValues = make(map[clientmodel.LabelName]utility.Set, length) + + for ; length > 0; length-- { + ln, err := p.loadString(r) + if err != nil { + return err + } + numLVs, err := binary.ReadVarint(r) + if err != nil { + return err + } + s := make(utility.Set, numLVs) + i.LabelNameToLabelValues[clientmodel.LabelName(ln)] = s + for ; numLVs > 0; numLVs-- { + lv, err := p.loadString(r) + if err != nil { + return err + } + s.Add(clientmodel.LabelValue(lv)) + } + } + return nil +} + +func (p *diskPersistence) loadFingerprint(r io.Reader) (clientmodel.Fingerprint, error) { + p.setBufLen(8) + if _, err := io.ReadFull(r, p.buf); err != nil { + return 0, err + } + return clientmodel.Fingerprint(binary.BigEndian.Uint64(p.buf)), nil +} + +func (p *diskPersistence) loadString(r *bufio.Reader) (string, error) { + length, err := binary.ReadVarint(r) + if err != nil { + return "", err + } + p.setBufLen(int(length)) + if _, err := io.ReadFull(r, p.buf); err != nil { + return "", err + } + return string(p.buf), nil +} + +func (p *diskPersistence) setBufLen(l int) { + if cap(p.buf) >= l { + p.buf = p.buf[:l] + } else { + p.buf = make([]byte, l) + } } func (p *diskPersistence) headsPath() string { diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 5941fe76ee..1206aea1f1 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -2,6 +2,7 @@ package storage_ng import ( "io/ioutil" + "os" "testing" clientmodel "github.com/prometheus/client_golang/model" @@ -10,8 +11,8 @@ import ( "github.com/prometheus/prometheus/utility" ) -func TestIndexPersistence(t *testing.T) { - expected := Indexes{ +var ( + indexes = Indexes{ FingerprintToSeries: map[clientmodel.Fingerprint]*memorySeries{ 0: { metric: clientmodel.Metric{ @@ -23,6 +24,13 @@ func TestIndexPersistence(t *testing.T) { metric: clientmodel.Metric{ clientmodel.MetricNameLabel: "metric_0", "label_2": "value_2", + "label_3": "value_3", + }, + }, + 2: { + metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "metric_1", + "label_1": "value_2", }, }, }, @@ -31,77 +39,150 @@ func TestIndexPersistence(t *testing.T) { Name: clientmodel.MetricNameLabel, Value: "metric_0", }: { - 0: struct{}{}, - 1: struct{}{}, + clientmodel.Fingerprint(0): struct{}{}, + clientmodel.Fingerprint(1): struct{}{}, + }, + metric.LabelPair{ + Name: clientmodel.MetricNameLabel, + Value: "metric_1", + }: { + clientmodel.Fingerprint(2): struct{}{}, }, metric.LabelPair{ Name: "label_1", Value: "value_1", }: { - 0: struct{}{}, + clientmodel.Fingerprint(0): struct{}{}, + }, + metric.LabelPair{ + Name: "label_1", + Value: "value_2", + }: { + clientmodel.Fingerprint(2): struct{}{}, }, metric.LabelPair{ Name: "label_2", Value: "value_2", }: { - 1: struct{}{}, + clientmodel.Fingerprint(1): struct{}{}, + }, + metric.LabelPair{ + Name: "label_3", + Value: "value_2", + }: { + clientmodel.Fingerprint(1): struct{}{}, }, }, LabelNameToLabelValues: map[clientmodel.LabelName]utility.Set{ clientmodel.MetricNameLabel: { clientmodel.LabelValue("metric_0"): struct{}{}, + clientmodel.LabelValue("metric_1"): struct{}{}, }, "label_1": { clientmodel.LabelValue("value_1"): struct{}{}, + clientmodel.LabelValue("value_2"): struct{}{}, }, "label_2": { clientmodel.LabelValue("value_2"): struct{}{}, }, + "label_3": { + clientmodel.LabelValue("value_3"): struct{}{}, + }, }, } +) +func TestIndexPersistence(t *testing.T) { basePath, err := ioutil.TempDir("", "test_index_persistence") if err != nil { t.Fatal(err) } + defer os.RemoveAll(basePath) p, err := NewDiskPersistence(basePath, 1024) if err != nil { t.Fatal(err) } - p.PersistIndexes(&expected) + + if err := p.PersistIndexes(&indexes); err != nil { + t.Fatal(err) + } actual, err := p.LoadIndexes() if err != nil { t.Fatal(err) } - if len(actual.FingerprintToSeries) != len(expected.FingerprintToSeries) { - t.Fatalf("Count mismatch: Got %d; want %d", len(actual.FingerprintToSeries), len(expected.FingerprintToSeries)) + if len(actual.FingerprintToSeries) != len(indexes.FingerprintToSeries) { + t.Fatalf("Count mismatch: Got %d; want %d", len(actual.FingerprintToSeries), len(indexes.FingerprintToSeries)) } for fp, actualSeries := range actual.FingerprintToSeries { - expectedSeries := expected.FingerprintToSeries[fp] + expectedSeries := indexes.FingerprintToSeries[fp] if !expectedSeries.metric.Equal(actualSeries.metric) { t.Fatalf("%s: Got %s; want %s", fp, actualSeries.metric, expectedSeries.metric) } } - if len(actual.LabelPairToFingerprints) != len(expected.LabelPairToFingerprints) { - t.Fatalf("Count mismatch: Got %d; want %d", len(actual.LabelPairToFingerprints), len(expected.LabelPairToFingerprints)) + if len(actual.LabelPairToFingerprints) != len(indexes.LabelPairToFingerprints) { + t.Fatalf("Count mismatch: Got %d; want %d", len(actual.LabelPairToFingerprints), len(indexes.LabelPairToFingerprints)) } for lp, actualFps := range actual.LabelPairToFingerprints { - expectedFps := expected.LabelPairToFingerprints[lp] + expectedFps := indexes.LabelPairToFingerprints[lp] if len(actualFps) != len(actualFps.Intersection(expectedFps)) { t.Fatalf("%s: Got %s; want %s", lp, actualFps, expectedFps) } } - if len(actual.LabelNameToLabelValues) != len(expected.LabelNameToLabelValues) { - t.Fatalf("Count mismatch: Got %d; want %d", len(actual.LabelNameToLabelValues), len(expected.LabelNameToLabelValues)) + if len(actual.LabelNameToLabelValues) != len(indexes.LabelNameToLabelValues) { + t.Fatalf("Count mismatch: Got %d; want %d", len(actual.LabelNameToLabelValues), len(indexes.LabelNameToLabelValues)) } for name, actualVals := range actual.LabelNameToLabelValues { - expectedVals := expected.LabelNameToLabelValues[name] + expectedVals := indexes.LabelNameToLabelValues[name] if len(actualVals) != len(actualVals.Intersection(expectedVals)) { t.Fatalf("%s: Got %s; want %s", name, actualVals, expectedVals) } } } + +func BenchmarkPersistIndexes(b *testing.B) { + basePath, err := ioutil.TempDir("", "test_index_persistence") + if err != nil { + b.Fatal(err) + } + defer os.RemoveAll(basePath) + p, err := NewDiskPersistence(basePath, 1024) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := p.PersistIndexes(&indexes); err != nil { + b.Fatal(err) + } + } + b.StopTimer() +} + +func BenchmarkLoadIndexes(b *testing.B) { + basePath, err := ioutil.TempDir("", "test_index_persistence") + if err != nil { + b.Fatal(err) + } + defer os.RemoveAll(basePath) + p, err := NewDiskPersistence(basePath, 1024) + if err != nil { + b.Fatal(err) + } + if err := p.PersistIndexes(&indexes); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := p.LoadIndexes() + if err != nil { + b.Fatal(err) + } + } + b.StopTimer() +} diff --git a/storage/local/series.go b/storage/local/series.go index 3c3abc8a5d..1d0d4c6f45 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -1,8 +1,6 @@ package storage_ng import ( - "bytes" - "encoding/gob" "sort" "sync" @@ -355,30 +353,6 @@ func (s *memorySeries) values() metric.Values { return values } -var gobWriter bytes.Buffer -var seriesEncoder *gob.Encoder - -func (s *memorySeries) GobEncode() ([]byte, error) { - gobWriter.Reset() - if seriesEncoder == nil { - seriesEncoder = gob.NewEncoder(&gobWriter) - } - err := seriesEncoder.Encode(s.metric) - return gobWriter.Bytes(), err -} - -var gobReader bytes.Reader -var seriesDecoder *gob.Decoder - -func (s *memorySeries) GobDecode(buf []byte) error { - gobReader = *bytes.NewReader(buf) - if seriesDecoder == nil { - seriesDecoder = gob.NewDecoder(&gobReader) - } - err := seriesDecoder.Decode(&s.metric) - return err -} - func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values { it.mtx.Lock() defer it.mtx.Unlock() @@ -419,12 +393,11 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V it.chunks[i-1].newIterator().getValueAtTime(t)[0], it.chunks[i].newIterator().getValueAtTime(t)[0], } - } else { - // We ended up in the middle of a chunk. We might stay there for a while, - // so save it as the current chunk iterator. - it.chunkIt = it.chunks[i].newIterator() - return it.chunkIt.getValueAtTime(t) } + // We ended up in the middle of a chunk. We might stay there for a while, + // so save it as the current chunk iterator. + it.chunkIt = it.chunks[i].newIterator() + return it.chunkIt.getValueAtTime(t) } func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values { diff --git a/storage/local/storage.go b/storage/local/storage.go index 5eaf46d0d1..5b233e5e03 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -282,7 +282,7 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) { glog.Info("Purging old series data...") s.mtx.RLock() fps := make([]clientmodel.Fingerprint, 0, len(s.fingerprintToSeries)) - for fp, _ := range s.fingerprintToSeries { + for fp := range s.fingerprintToSeries { fps = append(fps, fp) } s.mtx.RUnlock()