From 13fcf1ddbcc987bedf72dfc03b587d38f62421a3 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 4 Mar 2015 13:40:18 +0100 Subject: [PATCH] Implement double-delta encoded chunks. --- main.go | 4 +- rules/rules_test.go | 4 +- storage/local/chunk.go | 11 +- storage/local/crashrecovery.go | 6 +- storage/local/delta.go | 226 ++++++------- storage/local/delta_helpers.go | 95 ++++++ storage/local/doubledelta.go | 513 ++++++++++++++++++++++++++++++ storage/local/persistence.go | 23 +- storage/local/persistence_test.go | 76 +++-- storage/local/series.go | 15 +- storage/local/storage.go | 11 +- storage/local/storage_test.go | 177 ++++++++--- storage/local/test_helpers.go | 3 +- templates/templates_test.go | 2 +- 14 files changed, 934 insertions(+), 232 deletions(-) create mode 100644 storage/local/delta_helpers.go create mode 100644 storage/local/doubledelta.go diff --git a/main.go b/main.go index fd3c533cd..7dea33e72 100644 --- a/main.go +++ b/main.go @@ -54,6 +54,7 @@ var ( samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.") + chunkType = flag.Int("storage.local.chunk-type", 1, "Which chunk encoding version to use. Currently supported is 0 and 1.") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") @@ -122,7 +123,8 @@ func NewPrometheus() *prometheus { PersistenceQueueCapacity: *persistenceQueueCapacity, CheckpointInterval: *checkpointInterval, CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit, - Dirty: *storageDirty, + ChunkType: byte(*chunkType), + Dirty: *storageDirty, } memStorage, err := local.NewMemorySeriesStorage(o) if err != nil { diff --git a/rules/rules_test.go b/rules/rules_test.go index c0aceabe5..14fc46161 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -109,7 +109,7 @@ func samplesAlmostEqual(a, b string) bool { } func newTestStorage(t testing.TB) (storage local.Storage, closer test.Closer) { - storage, closer = local.NewTestStorage(t) + storage, closer = local.NewTestStorage(t, 1) storeMatrix(storage, testMatrix) return storage, closer } @@ -1437,7 +1437,7 @@ func TestRangedEvaluationRegressions(t *testing.T) { } for i, s := range scenarios { - storage, closer := local.NewTestStorage(t) + storage, closer := local.NewTestStorage(t, 1) storeMatrix(storage, s.in) expr, err := LoadExprFromString(s.expr) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index bb63be2db..4dbce0523 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -15,6 +15,7 @@ package local import ( "container/list" + "fmt" "io" "sync" "sync/atomic" @@ -223,16 +224,20 @@ func chunkType(c chunk) byte { switch c.(type) { case *deltaEncodedChunk: return 0 + case *doubleDeltaEncodedChunk: + return 1 default: - panic("unknown chunk type") + panic(fmt.Errorf("unknown chunk type: %T", c)) } } func chunkForType(chunkType byte) chunk { switch chunkType { case 0: - return newDeltaEncodedChunk(d1, d0, true) + return newDeltaEncodedChunk(d1, d0, true, chunkLen) + case 1: + return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen) default: - panic("unknown chunk type") + panic(fmt.Errorf("unknown chunk type: %d", chunkType)) } } diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index 0d2305a63..b27b5a599 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -181,8 +181,8 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint return fp, false } - bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen) - chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen) + bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen) + chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen) if bytesToTrim != 0 { glog.Warningf( "Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.", @@ -341,7 +341,7 @@ func (p *persistence) cleanUpArchiveIndexes( if err := kv.Value(&m); err != nil { return err } - series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest) + series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest, p.chunkType) cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) if err != nil { return err diff --git a/storage/local/delta.go b/storage/local/delta.go index e44153001..4d481aa77 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -25,16 +25,6 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -type deltaBytes byte - -const ( - d0 deltaBytes = 0 - d1 = 1 - d2 = 2 - d4 = 4 - d8 = 8 -) - // The 21-byte header of a delta-encoded chunk looks like: // // - time delta bytes: 1 bytes @@ -55,124 +45,87 @@ const ( ) // A deltaEncodedChunk adaptively stores sample timestamps and values with a -// delta encoding of various types (int, float) and bit width. However, once 8 +// delta encoding of various types (int, float) and bit widths. However, once 8 // bytes would be needed to encode a delta value, a fall-back to the absolute // numbers happens (so that timestamps are saved directly as int64 and values as // float64). It implements the chunk interface. -type deltaEncodedChunk struct { - buf []byte -} +type deltaEncodedChunk []byte // newDeltaEncodedChunk returns a newly allocated deltaEncodedChunk. -func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool) *deltaEncodedChunk { - buf := make([]byte, deltaHeaderIsIntOffset+1, 1024) +func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncodedChunk { + if tb < 1 { + panic("need at least 1 time delta byte") + } + if length < deltaHeaderBytes+16 { + panic(fmt.Errorf( + "chunk length %d bytes is insufficient, need at least %d", + length, deltaHeaderBytes+16, + )) + } + c := make(deltaEncodedChunk, deltaHeaderIsIntOffset+1, length) - buf[deltaHeaderTimeBytesOffset] = byte(tb) - buf[deltaHeaderValueBytesOffset] = byte(vb) + c[deltaHeaderTimeBytesOffset] = byte(tb) + c[deltaHeaderValueBytesOffset] = byte(vb) if vb < d8 && isInt { // Only use int for fewer than 8 value delta bytes. - buf[deltaHeaderIsIntOffset] = 1 + c[deltaHeaderIsIntOffset] = 1 } else { - buf[deltaHeaderIsIntOffset] = 0 + c[deltaHeaderIsIntOffset] = 0 } - return &deltaEncodedChunk{ - buf: buf, - } + return &c } -func (c *deltaEncodedChunk) newFollowupChunk() chunk { - return newDeltaEncodedChunk(d1, d0, true) +func (c deltaEncodedChunk) newFollowupChunk() chunk { + return newDeltaEncodedChunk(d1, d0, true, cap(c)) } // clone implements chunk. -func (c *deltaEncodedChunk) clone() chunk { - buf := make([]byte, len(c.buf), 1024) - copy(buf, c.buf) - return &deltaEncodedChunk{ - buf: buf, - } +func (c deltaEncodedChunk) clone() chunk { + clone := make(deltaEncodedChunk, len(c), cap(c)) + copy(clone, c) + return &clone } -func neededDeltaBytes(deltaT clientmodel.Timestamp, deltaV clientmodel.SampleValue, isInt bool) (dtb, dvb deltaBytes) { - dtb = d1 - if deltaT > math.MaxUint8 { - dtb = d2 - } - if deltaT > math.MaxUint16 { - dtb = d4 - } - if deltaT > math.MaxUint32 { - dtb = d8 - } - - if isInt { - dvb = d0 - if deltaV != 0 { - dvb = d1 - } - if deltaV < math.MinInt8 || deltaV > math.MaxInt8 { - dvb = d2 - } - if deltaV < math.MinInt16 || deltaV > math.MaxInt16 { - dvb = d4 - } - if deltaV < math.MinInt32 || deltaV > math.MaxInt32 { - dvb = d8 - } - } else { - dvb = d4 - if clientmodel.SampleValue(float32(deltaV)) != deltaV { - dvb = d8 - } - } - return dtb, dvb +func (c deltaEncodedChunk) timeBytes() deltaBytes { + return deltaBytes(c[deltaHeaderTimeBytesOffset]) } -func max(a, b deltaBytes) deltaBytes { - if a > b { - return a - } - return b +func (c deltaEncodedChunk) valueBytes() deltaBytes { + return deltaBytes(c[deltaHeaderValueBytesOffset]) } -func (c *deltaEncodedChunk) timeBytes() deltaBytes { - return deltaBytes(c.buf[deltaHeaderTimeBytesOffset]) +func (c deltaEncodedChunk) isInt() bool { + return c[deltaHeaderIsIntOffset] == 1 } -func (c *deltaEncodedChunk) valueBytes() deltaBytes { - return deltaBytes(c.buf[deltaHeaderValueBytesOffset]) +func (c deltaEncodedChunk) baseTime() clientmodel.Timestamp { + return clientmodel.Timestamp(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:])) } -func (c *deltaEncodedChunk) isInt() bool { - return c.buf[deltaHeaderIsIntOffset] == 1 -} - -func (c *deltaEncodedChunk) baseTime() clientmodel.Timestamp { - return clientmodel.Timestamp(binary.LittleEndian.Uint64(c.buf[deltaHeaderBaseTimeOffset:])) -} - -func (c *deltaEncodedChunk) baseValue() clientmodel.SampleValue { - return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c.buf[deltaHeaderBaseValueOffset:]))) +func (c deltaEncodedChunk) baseValue() clientmodel.SampleValue { + return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:]))) } // add implements chunk. -func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk { - if len(c.buf) < deltaHeaderBytes { - c.buf = c.buf[:deltaHeaderBytes] - binary.LittleEndian.PutUint64(c.buf[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) - binary.LittleEndian.PutUint64(c.buf[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value))) +func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { + if len(c) < deltaHeaderBytes { + c = c[:deltaHeaderBytes] + binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) + binary.LittleEndian.PutUint64(c[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value))) } - remainingBytes := cap(c.buf) - len(c.buf) + remainingBytes := cap(c) - len(c) sampleSize := c.sampleSize() // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { overflowChunks := c.newFollowupChunk().add(s) - return []chunk{c, overflowChunks[0]} + return []chunk{&c, overflowChunks[0]} } + // TODO(beorn7): Once https://github.com/prometheus/prometheus/issues/481 is + // fixed, we should panic here if dt is negative. dt := s.Timestamp - c.baseTime() dv := s.Value - c.baseValue() tb := c.timeBytes() @@ -182,35 +135,36 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk { // existing chunk data into new chunk(s). // // int->float. - // Note: Using math.Modf is slower than the conversion approach below. - if c.isInt() && clientmodel.SampleValue(int64(dv)) != dv { - return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false), c, s) + if c.isInt() && !isInt64(dv) { + return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false, cap(c)), &c, s) } // float32->float64. - if !c.isInt() && vb == d4 && clientmodel.SampleValue(float32(dv)) != dv { - return transcodeAndAdd(newDeltaEncodedChunk(tb, d8, false), c, s) + if !c.isInt() && vb == d4 && !isFloat32(dv) { + return transcodeAndAdd(newDeltaEncodedChunk(tb, d8, false, cap(c)), &c, s) } if tb < d8 || vb < d8 { // Maybe more bytes per sample. - if ntb, nvb := neededDeltaBytes(dt, dv, c.isInt()); ntb > tb || nvb > vb { + ntb := bytesNeededForUnsignedTimestampDelta(dt) + nvb := bytesNeededForSampleValueDelta(dv, c.isInt()) + if ntb > tb || nvb > vb { ntb = max(ntb, tb) nvb = max(nvb, vb) - return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt()), c, s) + return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt(), cap(c)), &c, s) } } - offset := len(c.buf) - c.buf = c.buf[:offset+sampleSize] + offset := len(c) + c = c[:offset+sampleSize] switch tb { case d1: - c.buf[offset] = byte(dt) + c[offset] = byte(dt) case d2: - binary.LittleEndian.PutUint16(c.buf[offset:], uint16(dt)) + binary.LittleEndian.PutUint16(c[offset:], uint16(dt)) case d4: - binary.LittleEndian.PutUint32(c.buf[offset:], uint32(dt)) + binary.LittleEndian.PutUint32(c[offset:], uint32(dt)) case d8: // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c.buf[offset:], uint64(s.Timestamp)) + binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) default: panic("invalid number of bytes for time delta") } @@ -222,11 +176,11 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk { case d0: // No-op. Constant value is stored as base value. case d1: - c.buf[offset] = byte(dv) + c[offset] = byte(dv) case d2: - binary.LittleEndian.PutUint16(c.buf[offset:], uint16(dv)) + binary.LittleEndian.PutUint16(c[offset:], uint16(dv)) case d4: - binary.LittleEndian.PutUint32(c.buf[offset:], uint32(dv)) + binary.LittleEndian.PutUint32(c[offset:], uint32(dv)) // d8 must not happen. Those samples are encoded as float64. default: panic("invalid number of bytes for integer delta") @@ -234,30 +188,30 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk { } else { switch vb { case d4: - binary.LittleEndian.PutUint32(c.buf[offset:], math.Float32bits(float32(dv))) + binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(dv))) case d8: // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c.buf[offset:], math.Float64bits(float64(s.Value))) + binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) default: panic("invalid number of bytes for floating point delta") } } - return []chunk{c} + return []chunk{&c} } -func (c *deltaEncodedChunk) sampleSize() int { +func (c deltaEncodedChunk) sampleSize() int { return int(c.timeBytes() + c.valueBytes()) } -func (c *deltaEncodedChunk) len() int { - if len(c.buf) < deltaHeaderBytes { +func (c deltaEncodedChunk) len() int { + if len(c) < deltaHeaderBytes { return 0 } - return (len(c.buf) - deltaHeaderBytes) / c.sampleSize() + return (len(c) - deltaHeaderBytes) / c.sampleSize() } // values implements chunk. -func (c *deltaEncodedChunk) values() <-chan *metric.SamplePair { +func (c deltaEncodedChunk) values() <-chan *metric.SamplePair { n := c.len() valuesChan := make(chan *metric.SamplePair) go func() { @@ -269,20 +223,20 @@ func (c *deltaEncodedChunk) values() <-chan *metric.SamplePair { return valuesChan } -func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { +func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { offset := deltaHeaderBytes + idx*c.sampleSize() var ts clientmodel.Timestamp switch c.timeBytes() { case d1: - ts = c.baseTime() + clientmodel.Timestamp(uint8(c.buf[offset])) + ts = c.baseTime() + clientmodel.Timestamp(uint8(c[offset])) case d2: - ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint16(c.buf[offset:])) + ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint16(c[offset:])) case d4: - ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint32(c.buf[offset:])) + ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint32(c[offset:])) case d8: // Take absolute value for d8. - ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c.buf[offset:])) + ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:])) default: panic("Invalid number of bytes for time delta") } @@ -295,11 +249,11 @@ func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { case d0: v = c.baseValue() case d1: - v = c.baseValue() + clientmodel.SampleValue(int8(c.buf[offset])) + v = c.baseValue() + clientmodel.SampleValue(int8(c[offset])) case d2: - v = c.baseValue() + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c.buf[offset:]))) + v = c.baseValue() + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:]))) case d4: - v = c.baseValue() + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c.buf[offset:]))) + v = c.baseValue() + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:]))) // No d8 for ints. default: panic("Invalid number of bytes for integer delta") @@ -307,10 +261,10 @@ func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { } else { switch c.valueBytes() { case d4: - v = c.baseValue() + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c.buf[offset:]))) + v = c.baseValue() + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:]))) case d8: // Take absolute value for d8. - v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c.buf[offset:]))) + v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:]))) default: panic("Invalid number of bytes for floating point delta") } @@ -322,44 +276,44 @@ func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { } // firstTime implements chunk. -func (c *deltaEncodedChunk) firstTime() clientmodel.Timestamp { +func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp { return c.valueAtIndex(0).Timestamp } // lastTime implements chunk. -func (c *deltaEncodedChunk) lastTime() clientmodel.Timestamp { +func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp { return c.valueAtIndex(c.len() - 1).Timestamp } // marshal implements chunk. -func (c *deltaEncodedChunk) marshal(w io.Writer) error { - if len(c.buf) > math.MaxUint16 { +func (c deltaEncodedChunk) marshal(w io.Writer) error { + if len(c) > math.MaxUint16 { panic("chunk buffer length would overflow a 16 bit uint.") } - binary.LittleEndian.PutUint16(c.buf[deltaHeaderBufLenOffset:], uint16(len(c.buf))) + binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c))) - n, err := w.Write(c.buf[:cap(c.buf)]) + n, err := w.Write(c[:cap(c)]) if err != nil { return err } - if n != cap(c.buf) { - return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c.buf), n) + if n != cap(c) { + return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) } return nil } // unmarshal implements chunk. func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { - c.buf = c.buf[:cap(c.buf)] + *c = (*c)[:cap(*c)] readBytes := 0 - for readBytes < len(c.buf) { - n, err := r.Read(c.buf[readBytes:]) + for readBytes < len(*c) { + n, err := r.Read((*c)[readBytes:]) if err != nil { return err } readBytes += n } - c.buf = c.buf[:binary.LittleEndian.Uint16(c.buf[deltaHeaderBufLenOffset:])] + *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] return nil } diff --git a/storage/local/delta_helpers.go b/storage/local/delta_helpers.go new file mode 100644 index 000000000..4bd45fcb0 --- /dev/null +++ b/storage/local/delta_helpers.go @@ -0,0 +1,95 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "math" + + clientmodel "github.com/prometheus/client_golang/model" +) + +type deltaBytes byte + +const ( + d0 deltaBytes = 0 + d1 deltaBytes = 1 + d2 deltaBytes = 2 + d4 deltaBytes = 4 + d8 deltaBytes = 8 +) + +func bytesNeededForUnsignedTimestampDelta(deltaT clientmodel.Timestamp) deltaBytes { + switch { + case deltaT > math.MaxUint32: + return d8 + case deltaT > math.MaxUint16: + return d4 + case deltaT > math.MaxUint8: + return d2 + default: + return d1 + } +} + +func bytesNeededForSignedTimestampDelta(deltaT clientmodel.Timestamp) deltaBytes { + switch { + case deltaT > math.MaxInt32 || deltaT < math.MinInt32: + return d8 + case deltaT > math.MaxInt16 || deltaT < math.MinInt16: + return d4 + case deltaT > math.MaxInt8 || deltaT < math.MinInt8: + return d2 + default: + return d1 + } +} + +func bytesNeededForSampleValueDelta(deltaV clientmodel.SampleValue, isInt bool) deltaBytes { + if isInt { + switch { + case deltaV < math.MinInt32 || deltaV > math.MaxInt32: + return d8 + case deltaV < math.MinInt16 || deltaV > math.MaxInt16: + return d4 + case deltaV < math.MinInt8 || deltaV > math.MaxInt8: + return d2 + case deltaV != 0: + return d1 + default: + return d0 + } + } + if clientmodel.SampleValue(float32(deltaV)) != deltaV { + return d8 + } + return d4 +} + +func max(a, b deltaBytes) deltaBytes { + if a > b { + return a + } + return b +} + +// isInt64 returns true if v can be represented as an int64. +func isInt64(v clientmodel.SampleValue) bool { + // Note: Using math.Modf is slower than the conversion approach below. + return clientmodel.SampleValue(int64(v)) == v +} + +// isFloat32 returns true if v can be represented as an float32. +func isFloat32(v clientmodel.SampleValue) bool { + return clientmodel.SampleValue(float32(v)) == v +} diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go new file mode 100644 index 000000000..58fd98955 --- /dev/null +++ b/storage/local/doubledelta.go @@ -0,0 +1,513 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package local + +import ( + "encoding/binary" + "fmt" + "io" + "math" + "sort" + + clientmodel "github.com/prometheus/client_golang/model" + + "github.com/prometheus/prometheus/storage/metric" +) + +// The 37-byte header of a delta-encoded chunk looks like: +// +// - used buf bytes: 2 bytes +// - time double-delta bytes: 1 bytes +// - value double-delta bytes: 1 bytes +// - is integer: 1 byte +// - base time: 8 bytes +// - base value: 8 bytes +// - base time delta: 8 bytes +// - base value delta: 8 bytes +const ( + doubleDeltaHeaderBytes = 37 + + doubleDeltaHeaderBufLenOffset = 0 + doubleDeltaHeaderTimeBytesOffset = 2 + doubleDeltaHeaderValueBytesOffset = 3 + doubleDeltaHeaderIsIntOffset = 4 + doubleDeltaHeaderBaseTimeOffset = 5 + doubleDeltaHeaderBaseValueOffset = 13 + doubleDeltaHeaderBaseTimeDeltaOffset = 21 + doubleDeltaHeaderBaseValueDeltaOffset = 29 +) + +// A doubleDeltaEncodedChunk adaptively stores sample timestamps and values with +// a double-delta encoding of various types (int, float) and bit widths. A base +// value and timestamp and a base delta for each is saved in the header. The +// payload consists of double-deltas, i.e. deviations from the values and +// timestamps calculated by applying the base value and time and the base deltas. +// However, once 8 bytes would be needed to encode a double-delta value, a +// fall-back to the absolute numbers happens (so that timestamps are saved +// directly as int64 and values as float64). +// doubleDeltaEncodedChunk implements the chunk interface. +type doubleDeltaEncodedChunk []byte + +// newDoubleDeltaEncodedChunk returns a newly allocated doubleDeltaEncodedChunk. +func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doubleDeltaEncodedChunk { + if tb < 1 { + panic("need at least 1 time delta byte") + } + if length < doubleDeltaHeaderBytes+16 { + panic(fmt.Errorf( + "chunk length %d bytes is insufficient, need at least %d", + length, doubleDeltaHeaderBytes+16, + )) + } + c := make(doubleDeltaEncodedChunk, doubleDeltaHeaderIsIntOffset+1, length) + + c[doubleDeltaHeaderTimeBytesOffset] = byte(tb) + c[doubleDeltaHeaderValueBytesOffset] = byte(vb) + if vb < d8 && isInt { // Only use int for fewer than 8 value double-delta bytes. + c[doubleDeltaHeaderIsIntOffset] = 1 + } else { + c[doubleDeltaHeaderIsIntOffset] = 0 + } + return &c +} + +func (c doubleDeltaEncodedChunk) newFollowupChunk() chunk { + return newDoubleDeltaEncodedChunk(d1, d0, true, cap(c)) +} + +func (c doubleDeltaEncodedChunk) baseTime() clientmodel.Timestamp { + return clientmodel.Timestamp( + binary.LittleEndian.Uint64( + c[doubleDeltaHeaderBaseTimeOffset:], + ), + ) +} + +func (c doubleDeltaEncodedChunk) baseValue() clientmodel.SampleValue { + return clientmodel.SampleValue( + math.Float64frombits( + binary.LittleEndian.Uint64( + c[doubleDeltaHeaderBaseValueOffset:], + ), + ), + ) +} + +func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp { + return clientmodel.Timestamp( + binary.LittleEndian.Uint64( + c[doubleDeltaHeaderBaseTimeDeltaOffset:], + ), + ) +} + +func (c doubleDeltaEncodedChunk) baseValueDelta() clientmodel.SampleValue { + return clientmodel.SampleValue( + math.Float64frombits( + binary.LittleEndian.Uint64( + c[doubleDeltaHeaderBaseValueDeltaOffset:], + ), + ), + ) +} + +func (c doubleDeltaEncodedChunk) timeBytes() deltaBytes { + return deltaBytes(c[doubleDeltaHeaderTimeBytesOffset]) +} + +func (c doubleDeltaEncodedChunk) valueBytes() deltaBytes { + return deltaBytes(c[doubleDeltaHeaderValueBytesOffset]) +} + +func (c doubleDeltaEncodedChunk) sampleSize() int { + return int(c.timeBytes() + c.valueBytes()) +} + +func (c doubleDeltaEncodedChunk) len() int { + if len(c) <= doubleDeltaHeaderIsIntOffset+1 { + return 0 + } + if len(c) <= doubleDeltaHeaderBaseValueOffset+8 { + return 1 + } + return (len(c)-doubleDeltaHeaderBytes)/c.sampleSize() + 2 +} + +func (c doubleDeltaEncodedChunk) isInt() bool { + return c[doubleDeltaHeaderIsIntOffset] == 1 +} + +// add implements chunk. +func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk { + if len(c) <= doubleDeltaHeaderIsIntOffset+1 { + // This is the first sample added to this chunk. Add it as base + // time and value. + c = c[:doubleDeltaHeaderBaseValueOffset+8] + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseTimeOffset:], + uint64(s.Timestamp), + ) + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseValueOffset:], + math.Float64bits(float64(s.Value)), + ) + return []chunk{&c} + } + + tb := c.timeBytes() + vb := c.valueBytes() + + if len(c) <= doubleDeltaHeaderBaseValueOffset+8 { + // This is the 2nd sample added to this chunk. Calculate the + // base deltas. + baseTimeDelta := s.Timestamp - c.baseTime() + if baseTimeDelta < 0 { + // TODO(beorn7): We ignore this irregular case for now. Once + // https://github.com/prometheus/prometheus/issues/481 is + // fixed, we should panic here instead. + return []chunk{&c} + } + c = c[:doubleDeltaHeaderBytes] + if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { + // If already the base delta needs d8 (or we are at d8 + // already, anyway), we better encode this timestamp + // directly rather than as a delta and switch everything + // to d8. + c[doubleDeltaHeaderTimeBytesOffset] = byte(d8) + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseTimeDeltaOffset:], + uint64(s.Timestamp), + ) + } else { + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseTimeDeltaOffset:], + uint64(baseTimeDelta), + ) + } + baseValue := c.baseValue() + baseValueDelta := s.Value - baseValue + if vb >= d8 || baseValue+baseValueDelta != s.Value { + // If we can't reproduce the original sample value (or + // if we are at d8 already, anyway), we better encode + // this value directly rather than as a delta and switch + // everything to d8. + c[doubleDeltaHeaderValueBytesOffset] = byte(d8) + c[doubleDeltaHeaderIsIntOffset] = 0 + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseValueDeltaOffset:], + math.Float64bits(float64(s.Value)), + ) + } else { + binary.LittleEndian.PutUint64( + c[doubleDeltaHeaderBaseValueDeltaOffset:], + math.Float64bits(float64(baseValueDelta)), + ) + } + return []chunk{&c} + } + + remainingBytes := cap(c) - len(c) + sampleSize := c.sampleSize() + + // Do we generally have space for another sample in this chunk? If not, + // overflow into a new one. + if remainingBytes < sampleSize { + overflowChunks := c.newFollowupChunk().add(s) + return []chunk{&c, overflowChunks[0]} + } + + dt := s.Timestamp - c.baseTime() - clientmodel.Timestamp(c.len())*c.baseTimeDelta() + dv := s.Value - c.baseValue() - clientmodel.SampleValue(c.len())*c.baseValueDelta() + + // If the new sample is incompatible with the current encoding, reencode the + // existing chunk data into new chunk(s). + // + // int->float. + if c.isInt() && !isInt64(dv) { + return transcodeAndAdd(newDoubleDeltaEncodedChunk(tb, d4, false, cap(c)), &c, s) + } + // float32->float64. + if !c.isInt() && vb == d4 && !isFloat32(dv) { + return transcodeAndAdd(newDoubleDeltaEncodedChunk(tb, d8, false, cap(c)), &c, s) + } + if tb < d8 || vb < d8 { + // Maybe more bytes per sample. + ntb := bytesNeededForSignedTimestampDelta(dt) + nvb := bytesNeededForSampleValueDelta(dv, c.isInt()) + if ntb > tb || nvb > vb { + ntb = max(ntb, tb) + nvb = max(nvb, vb) + return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, c.isInt(), cap(c)), &c, s) + } + } + offset := len(c) + c = c[:offset+sampleSize] + + switch tb { + case d1: + c[offset] = byte(dt) + case d2: + binary.LittleEndian.PutUint16(c[offset:], uint16(dt)) + case d4: + binary.LittleEndian.PutUint32(c[offset:], uint32(dt)) + case d8: + // Store the absolute value (no delta) in case of d8. + binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) + default: + panic("invalid number of bytes for time delta") + } + + offset += int(tb) + + if c.isInt() { + switch vb { + case d0: + // No-op. Constant delta is stored as base value. + case d1: + c[offset] = byte(dv) + case d2: + binary.LittleEndian.PutUint16(c[offset:], uint16(dv)) + case d4: + binary.LittleEndian.PutUint32(c[offset:], uint32(dv)) + // d8 must not happen. Those samples are encoded as float64. + default: + panic("invalid number of bytes for integer delta") + } + } else { + switch vb { + case d4: + binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(dv))) + case d8: + // Store the absolute value (no delta) in case of d8. + binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) + default: + panic("invalid number of bytes for floating point delta") + } + } + return []chunk{&c} +} + +// clone implements chunk. +func (c doubleDeltaEncodedChunk) clone() chunk { + clone := make(doubleDeltaEncodedChunk, len(c), cap(c)) + copy(clone, c) + return &clone +} + +// values implements chunk. +func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair { + n := c.len() + valuesChan := make(chan *metric.SamplePair) + go func() { + for i := 0; i < n; i++ { + valuesChan <- c.valueAtIndex(i) + } + close(valuesChan) + }() + return valuesChan +} + +func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { + if idx == 0 { + return &metric.SamplePair{ + Timestamp: c.baseTime(), + Value: c.baseValue(), + } + } + if idx == 1 { + // If time and/or value bytes are at d8, the time and value is + // saved directly rather than as a difference. + timestamp := c.baseTimeDelta() + if c.timeBytes() < d8 { + timestamp += c.baseTime() + } + value := c.baseValueDelta() + if c.valueBytes() < d8 { + value += c.baseValue() + } + return &metric.SamplePair{ + Timestamp: timestamp, + Value: value, + } + } + offset := doubleDeltaHeaderBytes + (idx-2)*c.sampleSize() + + var ts clientmodel.Timestamp + switch c.timeBytes() { + case d1: + ts = c.baseTime() + + clientmodel.Timestamp(idx)*c.baseTimeDelta() + + clientmodel.Timestamp(int8(c[offset])) + case d2: + ts = c.baseTime() + + clientmodel.Timestamp(idx)*c.baseTimeDelta() + + clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(c[offset:]))) + case d4: + ts = c.baseTime() + + clientmodel.Timestamp(idx)*c.baseTimeDelta() + + clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(c[offset:]))) + case d8: + // Take absolute value for d8. + ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:])) + default: + panic("Invalid number of bytes for time delta") + } + + offset += int(c.timeBytes()) + + var v clientmodel.SampleValue + if c.isInt() { + switch c.valueBytes() { + case d0: + v = c.baseValue() + + clientmodel.SampleValue(idx)*c.baseValueDelta() + case d1: + v = c.baseValue() + + clientmodel.SampleValue(idx)*c.baseValueDelta() + + clientmodel.SampleValue(int8(c[offset])) + case d2: + v = c.baseValue() + + clientmodel.SampleValue(idx)*c.baseValueDelta() + + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:]))) + case d4: + v = c.baseValue() + + clientmodel.SampleValue(idx)*c.baseValueDelta() + + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:]))) + // No d8 for ints. + default: + panic("Invalid number of bytes for integer delta") + } + } else { + switch c.valueBytes() { + case d4: + v = c.baseValue() + + clientmodel.SampleValue(idx)*c.baseValueDelta() + + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:]))) + case d8: + // Take absolute value for d8. + v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:]))) + default: + panic("Invalid number of bytes for floating point delta") + } + } + return &metric.SamplePair{ + Timestamp: ts, + Value: v, + } +} + +// firstTime implements chunk. +func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp { + return c.baseTime() +} + +// lastTime implements chunk. +func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp { + return c.valueAtIndex(c.len() - 1).Timestamp +} + +// marshal implements chunk. +func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { + if len(c) > math.MaxUint16 { + panic("chunk buffer length would overflow a 16 bit uint.") + } + binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c))) + + n, err := w.Write(c[:cap(c)]) + if err != nil { + return err + } + if n != cap(c) { + return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) + } + return nil +} + +// unmarshal implements chunk. +func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { + *c = (*c)[:cap(*c)] + readBytes := 0 + for readBytes < len(*c) { + n, err := r.Read((*c)[readBytes:]) + if err != nil { + return err + } + readBytes += n + } + *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] + return nil +} + +// doubleDeltaEncodedChunkIterator implements chunkIterator. +type doubleDeltaEncodedChunkIterator struct { + chunk *doubleDeltaEncodedChunk + // TODO(beorn7): add more fields here to keep track of last position. +} + +// newIterator implements chunk. +func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { + return &doubleDeltaEncodedChunkIterator{ + chunk: c, + } +} + +// getValueAtTime implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { + // TODO(beorn7): Implement in a more efficient way making use of the + // state of the iterator and internals of the doubleDeltaChunk. + i := sort.Search(it.chunk.len(), func(i int) bool { + return !it.chunk.valueAtIndex(i).Timestamp.Before(t) + }) + + switch i { + case 0: + return metric.Values{*it.chunk.valueAtIndex(0)} + case it.chunk.len(): + return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)} + default: + v := it.chunk.valueAtIndex(i) + if v.Timestamp.Equal(t) { + return metric.Values{*v} + } + return metric.Values{*it.chunk.valueAtIndex(i - 1), *v} + } +} + +// getRangeValues implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values { + // TODO(beorn7): Implement in a more efficient way making use of the + // state of the iterator and internals of the doubleDeltaChunk. + oldest := sort.Search(it.chunk.len(), func(i int) bool { + return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive) + }) + + newest := sort.Search(it.chunk.len(), func(i int) bool { + return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive) + }) + + if oldest == it.chunk.len() { + return nil + } + + result := make(metric.Values, 0, newest-oldest) + for i := oldest; i < newest; i++ { + result = append(result, *it.chunk.valueAtIndex(i)) + } + return result +} + +// contains implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool { + return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime()) +} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index f8bb94466..5702c8e57 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -95,8 +95,8 @@ type indexingOp struct { // dropChunks, loadChunks, and loadChunkDescs can be called concurrently with // each other if each call refers to a different fingerprint. type persistence struct { - basePath string - chunkLen int + basePath string + chunkType byte archivedFingerprintToMetrics *index.FingerprintMetricIndex archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex @@ -121,7 +121,7 @@ type persistence struct { } // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. -func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, error) { +func newPersistence(basePath string, chunkType byte, dirty bool) (*persistence, error) { dirtyPath := filepath.Join(basePath, dirtyFileName) versionPath := filepath.Join(basePath, versionFileName) @@ -178,8 +178,8 @@ func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, er } p := &persistence{ - basePath: basePath, - chunkLen: chunkLen, + basePath: basePath, + chunkType: chunkType, archivedFingerprintToMetrics: archivedFingerprintToMetrics, archivedFingerprintToTimeRange: archivedFingerprintToTimeRange, @@ -336,7 +336,7 @@ func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk) } defer f.Close() - b := bufio.NewWriterSize(f, len(chunks)*(chunkHeaderLen+p.chunkLen)) + b := bufio.NewWriterSize(f, len(chunks)*(chunkHeaderLen+chunkLen)) for _, c := range chunks { err = writeChunkHeader(b, c) @@ -420,7 +420,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie if err != nil { return nil, err } - totalChunkLen := chunkHeaderLen + p.chunkLen + totalChunkLen := chunkHeaderLen + chunkLen if fi.Size()%int64(totalChunkLen) != 0 { p.setDirty(true) return nil, fmt.Errorf( @@ -770,6 +770,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: clientmodel.Timestamp(savedFirstTime), headChunkPersisted: headChunkPersisted, + chunkType: p.chunkType, } } return sm, nil @@ -1102,17 +1103,17 @@ func writeChunkHeader(w io.Writer, c chunk) error { } func (p *persistence) offsetForChunkIndex(i int) int64 { - return int64(i * (chunkHeaderLen + p.chunkLen)) + return int64(i * (chunkHeaderLen + chunkLen)) } func (p *persistence) chunkIndexForOffset(offset int64) (int, error) { - if int(offset)%(chunkHeaderLen+p.chunkLen) != 0 { + if int(offset)%(chunkHeaderLen+chunkLen) != 0 { return -1, fmt.Errorf( "offset %d is not a multiple of on-disk chunk length %d", - offset, chunkHeaderLen+p.chunkLen, + offset, chunkHeaderLen+chunkLen, ) } - return int(offset) / (chunkHeaderLen + p.chunkLen), nil + return int(offset) / (chunkHeaderLen + chunkLen), nil } func (p *persistence) headsFileName() string { diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 666b9e4b3..bc866a615 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -31,9 +31,9 @@ var ( m3 = clientmodel.Metric{"label": "value3"} ) -func newTestPersistence(t *testing.T) (*persistence, test.Closer) { +func newTestPersistence(t *testing.T, chunkType byte) (*persistence, test.Closer) { dir := test.NewTemporaryDirectory("test_persistence", t) - p, err := newPersistence(dir.Path(), 1024, false) + p, err := newPersistence(dir.Path(), chunkType, false) if err != nil { dir.Close() t.Fatal(err) @@ -44,7 +44,7 @@ func newTestPersistence(t *testing.T) (*persistence, test.Closer) { }) } -func buildTestChunks() map[clientmodel.Fingerprint][]chunk { +func buildTestChunks(chunkType byte) map[clientmodel.Fingerprint][]chunk { fps := clientmodel.Fingerprints{ m1.Fingerprint(), m2.Fingerprint(), @@ -55,7 +55,7 @@ func buildTestChunks() map[clientmodel.Fingerprint][]chunk { for _, fp := range fps { fpToChunks[fp] = make([]chunk, 0, 10) for i := 0; i < 10; i++ { - fpToChunks[fp] = append(fpToChunks[fp], newDeltaEncodedChunk(d1, d1, true).add(&metric.SamplePair{ + fpToChunks[fp] = append(fpToChunks[fp], chunkForType(chunkType).add(&metric.SamplePair{ Timestamp: clientmodel.Timestamp(i), Value: clientmodel.SampleValue(fp), })[0]) @@ -75,11 +75,11 @@ func chunksEqual(c1, c2 chunk) bool { return true } -func TestPersistLoadDropChunks(t *testing.T) { - p, closer := newTestPersistence(t) +func testPersistLoadDropChunks(t *testing.T, chunkType byte) { + p, closer := newTestPersistence(t, chunkType) defer closer.Close() - fpToChunks := buildTestChunks() + fpToChunks := buildTestChunks(chunkType) for fp, chunks := range fpToChunks { for i, c := range chunks { @@ -183,15 +183,23 @@ func TestPersistLoadDropChunks(t *testing.T) { } } -func TestCheckpointAndLoadSeriesMapAndHeads(t *testing.T) { - p, closer := newTestPersistence(t) +func TestPersistLoadDropChunksType0(t *testing.T) { + testPersistLoadDropChunks(t, 0) +} + +func TestPersistLoadDropChunksType1(t *testing.T) { + testPersistLoadDropChunks(t, 1) +} + +func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, chunkType byte) { + p, closer := newTestPersistence(t, chunkType) defer closer.Close() fpLocker := newFingerprintLocker(10) sm := newSeriesMap() - s1 := newMemorySeries(m1, true, 0) - s2 := newMemorySeries(m2, false, 0) - s3 := newMemorySeries(m3, false, 0) + s1 := newMemorySeries(m1, true, 0, chunkType) + s2 := newMemorySeries(m2, false, 0, chunkType) + s3 := newMemorySeries(m3, false, 0, chunkType) s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14}) s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkPersisted = true @@ -244,8 +252,16 @@ func TestCheckpointAndLoadSeriesMapAndHeads(t *testing.T) { } } -func TestGetFingerprintsModifiedBefore(t *testing.T) { - p, closer := newTestPersistence(t) +func TestCheckpointAndLoadSeriesMapAndHeadsChunkType0(t *testing.T) { + testCheckpointAndLoadSeriesMapAndHeads(t, 0) +} + +func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) { + testCheckpointAndLoadSeriesMapAndHeads(t, 1) +} + +func testGetFingerprintsModifiedBefore(t *testing.T, chunkType byte) { + p, closer := newTestPersistence(t, chunkType) defer closer.Close() m1 := clientmodel.Metric{"n1": "v1"} @@ -314,8 +330,16 @@ func TestGetFingerprintsModifiedBefore(t *testing.T) { } } -func TestDropArchivedMetric(t *testing.T) { - p, closer := newTestPersistence(t) +func TestGetFingerprintsModifiedBeforeChunkType0(t *testing.T) { + testGetFingerprintsModifiedBefore(t, 0) +} + +func TestGetFingerprintsModifiedBeforeChunkType1(t *testing.T) { + testGetFingerprintsModifiedBefore(t, 1) +} + +func testDropArchivedMetric(t *testing.T, chunkType byte) { + p, closer := newTestPersistence(t, chunkType) defer closer.Close() m1 := clientmodel.Metric{"n1": "v1"} @@ -382,13 +406,21 @@ func TestDropArchivedMetric(t *testing.T) { } } +func TestDropArchivedMetricChunkType0(t *testing.T) { + testDropArchivedMetric(t, 0) +} + +func TestDropArchivedMetricChunkType1(t *testing.T) { + testDropArchivedMetric(t, 1) +} + type incrementalBatch struct { fpToMetric index.FingerprintMetricMapping expectedLnToLvs index.LabelNameLabelValuesMapping expectedLpToFps index.LabelPairFingerprintsMapping } -func TestIndexing(t *testing.T) { +func testIndexing(t *testing.T, chunkType byte) { batches := []incrementalBatch{ { fpToMetric: index.FingerprintMetricMapping{ @@ -524,7 +556,7 @@ func TestIndexing(t *testing.T) { }, } - p, closer := newTestPersistence(t) + p, closer := newTestPersistence(t, chunkType) defer closer.Close() indexedFpsToMetrics := index.FingerprintMetricMapping{} @@ -559,6 +591,14 @@ func TestIndexing(t *testing.T) { } } +func TestIndexingChunkType0(t *testing.T) { + testIndexing(t, 1) +} + +func TestIndexingChunkType1(t *testing.T) { + testIndexing(t, 1) +} + func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *persistence) { p.waitForIndexing() for fp, m := range indexedFpsToMetrics { diff --git a/storage/local/series.go b/storage/local/series.go index 696a6a9fa..c50bb9412 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -158,6 +158,8 @@ type memorySeries struct { // a non-persisted head chunk has to be cloned before more samples are // appended. headChunkUsedByIterator bool + // Which type of chunk to create if a new chunk is needed. + chunkType byte } // newMemorySeries returns a pointer to a newly allocated memorySeries for the @@ -165,8 +167,14 @@ type memorySeries struct { // or (if false) a series for a metric being unarchived, i.e. a series that // existed before but has been evicted from memory. If reallyNew is false, // firstTime is ignored (and set to the lowest possible timestamp instead - it -// will be set properly upon the first eviction of chunkDescs). -func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel.Timestamp) *memorySeries { +// will be set properly upon the first eviction of chunkDescs). chunkType is the +// type of chunks newly created by this memorySeries. +func newMemorySeries( + m clientmodel.Metric, + reallyNew bool, + firstTime clientmodel.Timestamp, + chunkType byte, +) *memorySeries { if reallyNew { firstTime = clientmodel.Earliest } @@ -174,6 +182,7 @@ func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel metric: m, headChunkPersisted: !reallyNew, savedFirstTime: firstTime, + chunkType: chunkType, } if !reallyNew { s.chunkDescsOffset = -1 @@ -186,7 +195,7 @@ func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel // The caller must have locked the fingerprint of the series. func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc { if len(s.chunkDescs) == 0 || s.headChunkPersisted { - newHead := newChunkDesc(newDeltaEncodedChunk(d1, d0, true)) + newHead := newChunkDesc(chunkForType(s.chunkType)) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkPersisted = false } else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 { diff --git a/storage/local/storage.go b/storage/local/storage.go index bbf180f35..c655e5311 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -16,6 +16,7 @@ package local import ( "container/list" + "fmt" "sync" "sync/atomic" "time" @@ -71,6 +72,7 @@ type memorySeriesStorage struct { dropAfter time.Duration checkpointInterval time.Duration checkpointDirtySeriesLimit int + chunkType byte appendQueue chan *clientmodel.Sample appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue. @@ -108,13 +110,17 @@ type MemorySeriesStorageOptions struct { PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. + ChunkType byte // Chunk type for newly created chunks. Dirty bool // Force the storage to consider itself dirty on startup. } // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // has to be called to start the storage. func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { - p, err := newPersistence(o.PersistenceStoragePath, chunkLen, o.Dirty) + if o.ChunkType > 1 { + return nil, fmt.Errorf("unsupported chunk type %d", o.ChunkType) + } + p, err := newPersistence(o.PersistenceStoragePath, o.ChunkType, o.Dirty) if err != nil { return nil, err } @@ -142,6 +148,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, + chunkType: o.ChunkType, appendLastTimestamp: clientmodel.Earliest, appendQueue: make(chan *clientmodel.Sample, appendQueueCap), @@ -451,7 +458,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl s.persistence.indexMetric(fp, m) s.seriesOps.WithLabelValues(create).Inc() } - series = newMemorySeries(m, !unarchived, firstTime) + series = newMemorySeries(m, !unarchived, firstTime, s.chunkType) s.fpToSeries.put(fp, series) s.numSeries.Inc() } diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 6769ded7f..1fb3138e2 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -15,6 +15,7 @@ package local import ( "fmt" + "math" "math/rand" "reflect" "testing" @@ -29,8 +30,16 @@ import ( "github.com/prometheus/prometheus/utility/test" ) +const ( + epsilon = 0.000001 // Relative error allowed for sample values. +) + +var ( + minNormal = math.Float64frombits(0x0010000000000000) // The smallest positive normal value of type float64. +) + func TestGetFingerprintsForLabelMatchers(t *testing.T) { - storage, closer := NewTestStorage(t) + storage, closer := NewTestStorage(t, 1) defer closer.Close() samples := make([]*clientmodel.Sample, 100) @@ -181,7 +190,7 @@ func TestLoop(t *testing.T) { } } -func TestChunk(t *testing.T) { +func testChunk(t *testing.T, chunkType byte) { samples := make(clientmodel.Samples, 500000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -189,7 +198,7 @@ func TestChunk(t *testing.T) { Value: clientmodel.SampleValue(float64(i) * 0.2), } } - s, closer := NewTestStorage(t) + s, closer := NewTestStorage(t, chunkType) defer closer.Close() s.AppendSamples(samples) @@ -212,7 +221,7 @@ func TestChunk(t *testing.T) { if samples[i].Timestamp != v.Timestamp { t.Errorf("%d. Got %v; want %v", i, v.Timestamp, samples[i].Timestamp) } - if samples[i].Value != v.Value { + if !almostEqual(samples[i].Value, v.Value) { t.Errorf("%d. Got %v; want %v", i, v.Value, samples[i].Value) } } @@ -221,7 +230,15 @@ func TestChunk(t *testing.T) { glog.Info("test done, closing") } -func TestGetValueAtTime(t *testing.T) { +func TestChunkType0(t *testing.T) { + testChunk(t, 0) +} + +func TestChunkType1(t *testing.T) { + testChunk(t, 1) +} + +func testGetValueAtTime(t *testing.T, chunkType byte) { samples := make(clientmodel.Samples, 1000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -229,7 +246,7 @@ func TestGetValueAtTime(t *testing.T) { Value: clientmodel.SampleValue(float64(i) * 0.2), } } - s, closer := NewTestStorage(t) + s, closer := NewTestStorage(t, chunkType) defer closer.Close() s.AppendSamples(samples) @@ -304,7 +321,15 @@ func TestGetValueAtTime(t *testing.T) { } } -func TestGetRangeValues(t *testing.T) { +func TestGetValueAtTimeChunkType0(t *testing.T) { + testGetValueAtTime(t, 0) +} + +func TestGetValueAtTimeChunkType1(t *testing.T) { + testGetValueAtTime(t, 1) +} + +func testGetRangeValues(t *testing.T, chunkType byte) { samples := make(clientmodel.Samples, 1000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -312,7 +337,7 @@ func TestGetRangeValues(t *testing.T) { Value: clientmodel.SampleValue(float64(i) * 0.2), } } - s, closer := NewTestStorage(t) + s, closer := NewTestStorage(t, chunkType) defer closer.Close() s.AppendSamples(samples) @@ -446,15 +471,23 @@ func TestGetRangeValues(t *testing.T) { } } -func TestEvictAndPurgeSeries(t *testing.T) { +func TestGetRangeValuesChunkType0(t *testing.T) { + testGetRangeValues(t, 0) +} + +func TestGetRangeValuesChunkType1(t *testing.T) { + testGetRangeValues(t, 1) +} + +func testEvictAndPurgeSeries(t *testing.T, chunkType byte) { samples := make(clientmodel.Samples, 1000) for i := range samples { samples[i] = &clientmodel.Sample{ Timestamp: clientmodel.Timestamp(2 * i), - Value: clientmodel.SampleValue(float64(i) * 0.2), + Value: clientmodel.SampleValue(float64(i * i)), } } - s, closer := NewTestStorage(t) + s, closer := NewTestStorage(t, chunkType) defer closer.Close() ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods. @@ -474,7 +507,7 @@ func TestEvictAndPurgeSeries(t *testing.T) { if len(actual) != 2 { t.Fatal("expected two results after purging half of series") } - if actual[0].Timestamp < 800 || actual[0].Timestamp > 1000 { + if actual[0].Timestamp < 600 || actual[0].Timestamp > 1000 { t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp) } want := clientmodel.Timestamp(1998) @@ -544,7 +577,15 @@ func TestEvictAndPurgeSeries(t *testing.T) { } } -func BenchmarkAppend(b *testing.B) { +func TestEvictAndPurgeSeriesChunkType0(t *testing.T) { + testEvictAndPurgeSeries(t, 0) +} + +func TestEvictAndPurgeSeriesChunkType1(t *testing.T) { + testEvictAndPurgeSeries(t, 1) +} + +func benchmarkAppend(b *testing.B, chunkType byte) { samples := make(clientmodel.Samples, b.N) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -558,28 +599,34 @@ func BenchmarkAppend(b *testing.B) { } } b.ResetTimer() - s, closer := NewTestStorage(b) + s, closer := NewTestStorage(b, chunkType) defer closer.Close() s.AppendSamples(samples) } +func BenchmarkAppendType0(b *testing.B) { + benchmarkAppend(b, 0) +} + +func BenchmarkAppendType1(b *testing.B) { + benchmarkAppend(b, 1) +} + // Append a large number of random samples and then check if we can get them out // of the storage alright. -func TestFuzz(t *testing.T) { +func testFuzz(t *testing.T, chunkType byte) { if testing.Short() { t.Skip("Skipping test in short mode.") } check := func(seed int64) bool { rand.Seed(seed) - s, c := NewTestStorage(t) + s, c := NewTestStorage(t, chunkType) defer c.Close() - samples := createRandomSamples() + samples := createRandomSamples("test_fuzz", 1000) s.AppendSamples(samples) - s.WaitForIndexing() - return verifyStorage(t, s, samples, 24*7*time.Hour) } @@ -588,21 +635,27 @@ func TestFuzz(t *testing.T) { } } -// BenchmarkFuzz is the benchmark version of TestFuzz. However, it will run -// several append and verify operations in parallel, if GOMAXPROC is set -// accordingly. Also, the storage options are set such that evictions, -// checkpoints, and purging will happen concurrently, too. This benchmark will -// have a very long runtime (up to minutes). You can use it as an actual -// benchmark. Run it like this: +func TestFuzzChunkType0(t *testing.T) { + testFuzz(t, 0) +} + +func TestFuzzChunkType1(t *testing.T) { + testFuzz(t, 1) +} + +// benchmarkFuzz is the benchmark version of testFuzz. The storage options are +// set such that evictions, checkpoints, and purging will happen concurrently, +// too. This benchmark will have a very long runtime (up to minutes). You can +// use it as an actual benchmark. Run it like this: // -// go test -cpu 1,2,4,8 -short -bench BenchmarkFuzz -benchmem +// go test -cpu 1,2,4,8 -test=NONE -bench BenchmarkFuzzChunkType -benchmem // // You can also use it as a test for races. In that case, run it like this (will // make things even slower): // -// go test -race -cpu 8 -short -bench BenchmarkFuzz -func BenchmarkFuzz(b *testing.B) { - b.StopTimer() +// go test -race -cpu 8 -test=short -bench BenchmarkFuzzChunkType +func benchmarkFuzz(b *testing.B, chunkType byte) { + const samplesPerRun = 20000 rand.Seed(42) directory := test.NewTemporaryDirectory("test_storage", b) defer directory.Close() @@ -610,7 +663,8 @@ func BenchmarkFuzz(b *testing.B) { MemoryChunks: 100, PersistenceRetentionPeriod: time.Hour, PersistenceStoragePath: directory.Path(), - CheckpointInterval: 3 * time.Second, + CheckpointInterval: time.Second, + ChunkType: chunkType, } s, err := NewMemorySeriesStorage(o) if err != nil { @@ -618,33 +672,40 @@ func BenchmarkFuzz(b *testing.B) { } s.Start() defer s.Stop() - b.StartTimer() - b.RunParallel(func(pb *testing.PB) { - var allSamples clientmodel.Samples - for pb.Next() { - newSamples := createRandomSamples() - allSamples = append(allSamples, newSamples[:len(newSamples)/2]...) - s.AppendSamples(newSamples[:len(newSamples)/2]) - verifyStorage(b, s, allSamples, o.PersistenceRetentionPeriod) - allSamples = append(allSamples, newSamples[len(newSamples)/2:]...) - s.AppendSamples(newSamples[len(newSamples)/2:]) - verifyStorage(b, s, allSamples, o.PersistenceRetentionPeriod) - } - }) + samples := createRandomSamples("benchmark_fuzz", samplesPerRun*b.N) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + start := samplesPerRun * i + end := samplesPerRun * (i + 1) + middle := (start + end) / 2 + s.AppendSamples(samples[start:middle]) + verifyStorage(b, s, samples[:middle], o.PersistenceRetentionPeriod) + s.AppendSamples(samples[middle:end]) + verifyStorage(b, s, samples[:end], o.PersistenceRetentionPeriod) + } } -func createRandomSamples() clientmodel.Samples { +func BenchmarkFuzzChunkType0(b *testing.B) { + benchmarkFuzz(b, 0) +} + +func BenchmarkFuzzChunkType1(b *testing.B) { + benchmarkFuzz(b, 1) +} + +func createRandomSamples(metricName string, minLen int) clientmodel.Samples { type valueCreator func() clientmodel.SampleValue type deltaApplier func(clientmodel.SampleValue) clientmodel.SampleValue var ( maxMetrics = 5 - maxCycles = 500 maxStreakLength = 500 - maxTimeDelta = 1000 + maxTimeDelta = 10000 maxTimeDeltaFactor = 10 - timestamp = clientmodel.Now() - clientmodel.Timestamp(maxTimeDelta*maxTimeDeltaFactor*maxCycles*maxStreakLength/16) // So that some timestamps are in the future. + timestamp = clientmodel.Now() - clientmodel.Timestamp(maxTimeDelta*maxTimeDeltaFactor*minLen/4) // So that some timestamps are in the future. generators = []struct { createValue valueCreator applyDelta []deltaApplier @@ -696,11 +757,12 @@ func createRandomSamples() clientmodel.Samples { metrics := []clientmodel.Metric{} for n := rand.Intn(maxMetrics); n >= 0; n-- { metrics = append(metrics, clientmodel.Metric{ + clientmodel.MetricNameLabel: clientmodel.LabelValue(metricName), clientmodel.LabelName(fmt.Sprintf("labelname_%d", n+1)): clientmodel.LabelValue(fmt.Sprintf("labelvalue_%d", rand.Int())), }) } - for n := rand.Intn(maxCycles); n >= 0; n-- { + for len(result) < minLen { // Pick a metric for this cycle. metric := metrics[rand.Intn(len(metrics))] timeDelta := rand.Intn(maxTimeDelta) + 1 @@ -753,6 +815,7 @@ func createRandomSamples() clientmodel.Samples { } func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge time.Duration) bool { + s.WaitForIndexing() result := true for _, i := range rand.Perm(len(samples)) { sample := samples[i] @@ -772,9 +835,9 @@ func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge p.Close() continue } - want := float64(sample.Value) - got := float64(found[0].Value) - if want != got || sample.Timestamp != found[0].Timestamp { + want := sample.Value + got := found[0].Value + if !almostEqual(want, got) || sample.Timestamp != found[0].Timestamp { t.Errorf( "Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).", want, sample.Timestamp, got, found[0].Timestamp, @@ -875,3 +938,15 @@ func TestChunkMaps(t *testing.T) { } } + +func almostEqual(a, b clientmodel.SampleValue) bool { + // Cf. http://floating-point-gui.de/errors/comparison/ + if a == b { + return true + } + diff := math.Abs(float64(a - b)) + if a == 0 || b == 0 || diff < minNormal { + return diff < epsilon*minNormal + } + return diff/(math.Abs(float64(a))+math.Abs(float64(b))) < epsilon +} diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index e52d87ea2..319398497 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -37,13 +37,14 @@ func (t *testStorageCloser) Close() { // NewTestStorage creates a storage instance backed by files in a temporary // directory. The returned storage is already in serving state. Upon closing the // returned test.Closer, the temporary directory is cleaned up. -func NewTestStorage(t test.T) (Storage, test.Closer) { +func NewTestStorage(t test.T, chunkType byte) (Storage, test.Closer) { directory := test.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ MemoryChunks: 1000000, PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging. PersistenceStoragePath: directory.Path(), CheckpointInterval: time.Hour, + ChunkType: chunkType, } storage, err := NewMemorySeriesStorage(o) if err != nil { diff --git a/templates/templates_test.go b/templates/templates_test.go index 2800d30db..ccb7899bc 100644 --- a/templates/templates_test.go +++ b/templates/templates_test.go @@ -152,7 +152,7 @@ func TestTemplateExpansion(t *testing.T) { time := clientmodel.Timestamp(0) - storage, closer := local.NewTestStorage(t) + storage, closer := local.NewTestStorage(t, 1) defer closer.Close() storage.AppendSamples(clientmodel.Samples{ {