Merge pull request #579 from prometheus/beorn7/chunk-encoding

Implement double-delta encoded chunks.
This commit is contained in:
Björn Rabenstein 2015-03-07 23:57:47 +01:00
commit d4ef509b0f
14 changed files with 929 additions and 236 deletions

View file

@ -54,6 +54,7 @@ var (
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.")
chunkType = flag.Int("storage.local.chunk-type", 1, "Which chunk encoding version to use. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).")
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
@ -122,7 +123,8 @@ func NewPrometheus() *prometheus {
PersistenceQueueCapacity: *persistenceQueueCapacity,
CheckpointInterval: *checkpointInterval,
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
Dirty: *storageDirty,
ChunkType: byte(*chunkType),
Dirty: *storageDirty,
}
memStorage, err := local.NewMemorySeriesStorage(o)
if err != nil {

View file

@ -109,7 +109,7 @@ func samplesAlmostEqual(a, b string) bool {
}
func newTestStorage(t testing.TB) (storage local.Storage, closer test.Closer) {
storage, closer = local.NewTestStorage(t)
storage, closer = local.NewTestStorage(t, 1)
storeMatrix(storage, testMatrix)
return storage, closer
}
@ -1437,7 +1437,7 @@ func TestRangedEvaluationRegressions(t *testing.T) {
}
for i, s := range scenarios {
storage, closer := local.NewTestStorage(t)
storage, closer := local.NewTestStorage(t, 1)
storeMatrix(storage, s.in)
expr, err := LoadExprFromString(s.expr)

View file

@ -15,6 +15,7 @@ package local
import (
"container/list"
"fmt"
"io"
"sync"
"sync/atomic"
@ -223,16 +224,20 @@ func chunkType(c chunk) byte {
switch c.(type) {
case *deltaEncodedChunk:
return 0
case *doubleDeltaEncodedChunk:
return 1
default:
panic("unknown chunk type")
panic(fmt.Errorf("unknown chunk type: %T", c))
}
}
func chunkForType(chunkType byte) chunk {
switch chunkType {
case 0:
return newDeltaEncodedChunk(d1, d0, true)
return newDeltaEncodedChunk(d1, d0, true, chunkLen)
case 1:
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen)
default:
panic("unknown chunk type")
panic(fmt.Errorf("unknown chunk type: %d", chunkType))
}
}

View file

@ -181,8 +181,8 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint
return fp, false
}
bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen)
chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen)
bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen)
chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen)
if bytesToTrim != 0 {
glog.Warningf(
"Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.",
@ -341,7 +341,7 @@ func (p *persistence) cleanUpArchiveIndexes(
if err := kv.Value(&m); err != nil {
return err
}
series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest)
series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest, p.chunkType)
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now())
if err != nil {
return err

View file

@ -25,16 +25,6 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
type deltaBytes byte
const (
d0 deltaBytes = 0
d1 = 1
d2 = 2
d4 = 4
d8 = 8
)
// The 21-byte header of a delta-encoded chunk looks like:
//
// - time delta bytes: 1 bytes
@ -55,126 +45,90 @@ const (
)
// A deltaEncodedChunk adaptively stores sample timestamps and values with a
// delta encoding of various types (int, float) and bit width. However, once 8
// delta encoding of various types (int, float) and bit widths. However, once 8
// bytes would be needed to encode a delta value, a fall-back to the absolute
// numbers happens (so that timestamps are saved directly as int64 and values as
// float64). It implements the chunk interface.
type deltaEncodedChunk struct {
buf []byte
}
type deltaEncodedChunk []byte
// newDeltaEncodedChunk returns a newly allocated deltaEncodedChunk.
func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool) *deltaEncodedChunk {
buf := make([]byte, deltaHeaderIsIntOffset+1, 1024)
func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncodedChunk {
if tb < 1 {
panic("need at least 1 time delta byte")
}
if length < deltaHeaderBytes+16 {
panic(fmt.Errorf(
"chunk length %d bytes is insufficient, need at least %d",
length, deltaHeaderBytes+16,
))
}
c := make(deltaEncodedChunk, deltaHeaderIsIntOffset+1, length)
buf[deltaHeaderTimeBytesOffset] = byte(tb)
buf[deltaHeaderValueBytesOffset] = byte(vb)
c[deltaHeaderTimeBytesOffset] = byte(tb)
c[deltaHeaderValueBytesOffset] = byte(vb)
if vb < d8 && isInt { // Only use int for fewer than 8 value delta bytes.
buf[deltaHeaderIsIntOffset] = 1
c[deltaHeaderIsIntOffset] = 1
} else {
buf[deltaHeaderIsIntOffset] = 0
c[deltaHeaderIsIntOffset] = 0
}
return &deltaEncodedChunk{
buf: buf,
}
return &c
}
func (c *deltaEncodedChunk) newFollowupChunk() chunk {
return newDeltaEncodedChunk(d1, d0, true)
func (c deltaEncodedChunk) newFollowupChunk() chunk {
return newDeltaEncodedChunk(d1, d0, true, cap(c))
}
// clone implements chunk.
func (c *deltaEncodedChunk) clone() chunk {
buf := make([]byte, len(c.buf), 1024)
copy(buf, c.buf)
return &deltaEncodedChunk{
buf: buf,
}
func (c deltaEncodedChunk) clone() chunk {
clone := make(deltaEncodedChunk, len(c), cap(c))
copy(clone, c)
return &clone
}
func neededDeltaBytes(deltaT clientmodel.Timestamp, deltaV clientmodel.SampleValue, isInt bool) (dtb, dvb deltaBytes) {
dtb = d1
if deltaT > math.MaxUint8 {
dtb = d2
}
if deltaT > math.MaxUint16 {
dtb = d4
}
if deltaT > math.MaxUint32 {
dtb = d8
}
if isInt {
dvb = d0
if deltaV != 0 {
dvb = d1
}
if deltaV < math.MinInt8 || deltaV > math.MaxInt8 {
dvb = d2
}
if deltaV < math.MinInt16 || deltaV > math.MaxInt16 {
dvb = d4
}
if deltaV < math.MinInt32 || deltaV > math.MaxInt32 {
dvb = d8
}
} else {
dvb = d4
if clientmodel.SampleValue(float32(deltaV)) != deltaV {
dvb = d8
}
}
return dtb, dvb
func (c deltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[deltaHeaderTimeBytesOffset])
}
func max(a, b deltaBytes) deltaBytes {
if a > b {
return a
}
return b
func (c deltaEncodedChunk) valueBytes() deltaBytes {
return deltaBytes(c[deltaHeaderValueBytesOffset])
}
func (c *deltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c.buf[deltaHeaderTimeBytesOffset])
func (c deltaEncodedChunk) isInt() bool {
return c[deltaHeaderIsIntOffset] == 1
}
func (c *deltaEncodedChunk) valueBytes() deltaBytes {
return deltaBytes(c.buf[deltaHeaderValueBytesOffset])
func (c deltaEncodedChunk) baseTime() clientmodel.Timestamp {
return clientmodel.Timestamp(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:]))
}
func (c *deltaEncodedChunk) isInt() bool {
return c.buf[deltaHeaderIsIntOffset] == 1
}
func (c *deltaEncodedChunk) baseTime() clientmodel.Timestamp {
return clientmodel.Timestamp(binary.LittleEndian.Uint64(c.buf[deltaHeaderBaseTimeOffset:]))
}
func (c *deltaEncodedChunk) baseValue() clientmodel.SampleValue {
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c.buf[deltaHeaderBaseValueOffset:])))
func (c deltaEncodedChunk) baseValue() clientmodel.SampleValue {
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:])))
}
// add implements chunk.
func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
if len(c.buf) < deltaHeaderBytes {
c.buf = c.buf[:deltaHeaderBytes]
binary.LittleEndian.PutUint64(c.buf[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp))
binary.LittleEndian.PutUint64(c.buf[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value)))
func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
if c.len() == 0 {
c = c[:deltaHeaderBytes]
binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp))
binary.LittleEndian.PutUint64(c[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value)))
}
remainingBytes := cap(c.buf) - len(c.buf)
remainingBytes := cap(c) - len(c)
sampleSize := c.sampleSize()
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks := c.newFollowupChunk().add(s)
return []chunk{c, overflowChunks[0]}
return []chunk{&c, overflowChunks[0]}
}
baseValue := c.baseValue()
// TODO(beorn7): Once https://github.com/prometheus/prometheus/issues/481 is
// fixed, we should panic here if dt is negative.
dt := s.Timestamp - c.baseTime()
dv := s.Value - c.baseValue()
dv := s.Value - baseValue
tb := c.timeBytes()
vb := c.valueBytes()
@ -182,35 +136,41 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
// existing chunk data into new chunk(s).
//
// int->float.
// Note: Using math.Modf is slower than the conversion approach below.
if c.isInt() && clientmodel.SampleValue(int64(dv)) != dv {
return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false), c, s)
if c.isInt() && !isInt64(dv) {
return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false, cap(c)), &c, s)
}
// float32->float64.
if !c.isInt() && vb == d4 && clientmodel.SampleValue(float32(dv)) != dv {
return transcodeAndAdd(newDeltaEncodedChunk(tb, d8, false), c, s)
if !c.isInt() && vb == d4 && baseValue+clientmodel.SampleValue(float32(dv)) != s.Value {
return transcodeAndAdd(newDeltaEncodedChunk(tb, d8, false, cap(c)), &c, s)
}
if tb < d8 || vb < d8 {
// Maybe more bytes per sample.
if ntb, nvb := neededDeltaBytes(dt, dv, c.isInt()); ntb > tb || nvb > vb {
ntb = max(ntb, tb)
nvb = max(nvb, vb)
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt()), c, s)
}
var ntb, nvb deltaBytes
if tb < d8 {
// Maybe more bytes for timestamp.
ntb = bytesNeededForUnsignedTimestampDelta(dt)
}
offset := len(c.buf)
c.buf = c.buf[:offset+sampleSize]
if c.isInt() && vb < d8 {
// Maybe more bytes for sample value.
nvb = bytesNeededForIntegerSampleValueDelta(dv)
}
if ntb > tb || nvb > vb {
ntb = max(ntb, tb)
nvb = max(nvb, vb)
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt(), cap(c)), &c, s)
}
offset := len(c)
c = c[:offset+sampleSize]
switch tb {
case d1:
c.buf[offset] = byte(dt)
c[offset] = byte(dt)
case d2:
binary.LittleEndian.PutUint16(c.buf[offset:], uint16(dt))
binary.LittleEndian.PutUint16(c[offset:], uint16(dt))
case d4:
binary.LittleEndian.PutUint32(c.buf[offset:], uint32(dt))
binary.LittleEndian.PutUint32(c[offset:], uint32(dt))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c.buf[offset:], uint64(s.Timestamp))
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
panic("invalid number of bytes for time delta")
}
@ -222,11 +182,11 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
case d0:
// No-op. Constant value is stored as base value.
case d1:
c.buf[offset] = byte(dv)
c[offset] = byte(dv)
case d2:
binary.LittleEndian.PutUint16(c.buf[offset:], uint16(dv))
binary.LittleEndian.PutUint16(c[offset:], uint16(dv))
case d4:
binary.LittleEndian.PutUint32(c.buf[offset:], uint32(dv))
binary.LittleEndian.PutUint32(c[offset:], uint32(dv))
// d8 must not happen. Those samples are encoded as float64.
default:
panic("invalid number of bytes for integer delta")
@ -234,30 +194,30 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
} else {
switch vb {
case d4:
binary.LittleEndian.PutUint32(c.buf[offset:], math.Float32bits(float32(dv)))
binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(dv)))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c.buf[offset:], math.Float64bits(float64(s.Value)))
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
panic("invalid number of bytes for floating point delta")
}
}
return []chunk{c}
return []chunk{&c}
}
func (c *deltaEncodedChunk) sampleSize() int {
func (c deltaEncodedChunk) sampleSize() int {
return int(c.timeBytes() + c.valueBytes())
}
func (c *deltaEncodedChunk) len() int {
if len(c.buf) < deltaHeaderBytes {
func (c deltaEncodedChunk) len() int {
if len(c) < deltaHeaderBytes {
return 0
}
return (len(c.buf) - deltaHeaderBytes) / c.sampleSize()
return (len(c) - deltaHeaderBytes) / c.sampleSize()
}
// values implements chunk.
func (c *deltaEncodedChunk) values() <-chan *metric.SamplePair {
func (c deltaEncodedChunk) values() <-chan *metric.SamplePair {
n := c.len()
valuesChan := make(chan *metric.SamplePair)
go func() {
@ -269,20 +229,20 @@ func (c *deltaEncodedChunk) values() <-chan *metric.SamplePair {
return valuesChan
}
func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
offset := deltaHeaderBytes + idx*c.sampleSize()
var ts clientmodel.Timestamp
switch c.timeBytes() {
case d1:
ts = c.baseTime() + clientmodel.Timestamp(uint8(c.buf[offset]))
ts = c.baseTime() + clientmodel.Timestamp(uint8(c[offset]))
case d2:
ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint16(c.buf[offset:]))
ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint16(c[offset:]))
case d4:
ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint32(c.buf[offset:]))
ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint32(c[offset:]))
case d8:
// Take absolute value for d8.
ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c.buf[offset:]))
ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:]))
default:
panic("Invalid number of bytes for time delta")
}
@ -295,11 +255,11 @@ func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
case d0:
v = c.baseValue()
case d1:
v = c.baseValue() + clientmodel.SampleValue(int8(c.buf[offset]))
v = c.baseValue() + clientmodel.SampleValue(int8(c[offset]))
case d2:
v = c.baseValue() + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c.buf[offset:])))
v = c.baseValue() + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:])))
case d4:
v = c.baseValue() + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c.buf[offset:])))
v = c.baseValue() + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:])))
// No d8 for ints.
default:
panic("Invalid number of bytes for integer delta")
@ -307,10 +267,10 @@ func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
} else {
switch c.valueBytes() {
case d4:
v = c.baseValue() + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c.buf[offset:])))
v = c.baseValue() + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:])))
case d8:
// Take absolute value for d8.
v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c.buf[offset:])))
v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:])))
default:
panic("Invalid number of bytes for floating point delta")
}
@ -322,44 +282,44 @@ func (c *deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
}
// firstTime implements chunk.
func (c *deltaEncodedChunk) firstTime() clientmodel.Timestamp {
func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp {
return c.valueAtIndex(0).Timestamp
}
// lastTime implements chunk.
func (c *deltaEncodedChunk) lastTime() clientmodel.Timestamp {
func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp {
return c.valueAtIndex(c.len() - 1).Timestamp
}
// marshal implements chunk.
func (c *deltaEncodedChunk) marshal(w io.Writer) error {
if len(c.buf) > math.MaxUint16 {
func (c deltaEncodedChunk) marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
}
binary.LittleEndian.PutUint16(c.buf[deltaHeaderBufLenOffset:], uint16(len(c.buf)))
binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c)))
n, err := w.Write(c.buf[:cap(c.buf)])
n, err := w.Write(c[:cap(c)])
if err != nil {
return err
}
if n != cap(c.buf) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c.buf), n)
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n)
}
return nil
}
// unmarshal implements chunk.
func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
c.buf = c.buf[:cap(c.buf)]
*c = (*c)[:cap(*c)]
readBytes := 0
for readBytes < len(c.buf) {
n, err := r.Read(c.buf[readBytes:])
for readBytes < len(*c) {
n, err := r.Read((*c)[readBytes:])
if err != nil {
return err
}
readBytes += n
}
c.buf = c.buf[:binary.LittleEndian.Uint16(c.buf[deltaHeaderBufLenOffset:])]
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
return nil
}

View file

@ -0,0 +1,84 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"math"
clientmodel "github.com/prometheus/client_golang/model"
)
type deltaBytes byte
const (
d0 deltaBytes = 0
d1 deltaBytes = 1
d2 deltaBytes = 2
d4 deltaBytes = 4
d8 deltaBytes = 8
)
func bytesNeededForUnsignedTimestampDelta(deltaT clientmodel.Timestamp) deltaBytes {
switch {
case deltaT > math.MaxUint32:
return d8
case deltaT > math.MaxUint16:
return d4
case deltaT > math.MaxUint8:
return d2
default:
return d1
}
}
func bytesNeededForSignedTimestampDelta(deltaT clientmodel.Timestamp) deltaBytes {
switch {
case deltaT > math.MaxInt32 || deltaT < math.MinInt32:
return d8
case deltaT > math.MaxInt16 || deltaT < math.MinInt16:
return d4
case deltaT > math.MaxInt8 || deltaT < math.MinInt8:
return d2
default:
return d1
}
}
func bytesNeededForIntegerSampleValueDelta(deltaV clientmodel.SampleValue) deltaBytes {
switch {
case deltaV < math.MinInt32 || deltaV > math.MaxInt32:
return d8
case deltaV < math.MinInt16 || deltaV > math.MaxInt16:
return d4
case deltaV < math.MinInt8 || deltaV > math.MaxInt8:
return d2
case deltaV != 0:
return d1
default:
return d0
}
}
func max(a, b deltaBytes) deltaBytes {
if a > b {
return a
}
return b
}
// isInt64 returns true if v can be represented as an int64.
func isInt64(v clientmodel.SampleValue) bool {
// Note: Using math.Modf is slower than the conversion approach below.
return clientmodel.SampleValue(int64(v)) == v
}

View file

@ -0,0 +1,530 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"encoding/binary"
"fmt"
"io"
"math"
"sort"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
)
// The 37-byte header of a delta-encoded chunk looks like:
//
// - used buf bytes: 2 bytes
// - time double-delta bytes: 1 bytes
// - value double-delta bytes: 1 bytes
// - is integer: 1 byte
// - base time: 8 bytes
// - base value: 8 bytes
// - base time delta: 8 bytes
// - base value delta: 8 bytes
const (
doubleDeltaHeaderBytes = 37
doubleDeltaHeaderBufLenOffset = 0
doubleDeltaHeaderTimeBytesOffset = 2
doubleDeltaHeaderValueBytesOffset = 3
doubleDeltaHeaderIsIntOffset = 4
doubleDeltaHeaderBaseTimeOffset = 5
doubleDeltaHeaderBaseValueOffset = 13
doubleDeltaHeaderBaseTimeDeltaOffset = 21
doubleDeltaHeaderBaseValueDeltaOffset = 29
)
// A doubleDeltaEncodedChunk adaptively stores sample timestamps and values with
// a double-delta encoding of various types (int, float) and bit widths. A base
// value and timestamp and a base delta for each is saved in the header. The
// payload consists of double-deltas, i.e. deviations from the values and
// timestamps calculated by applying the base value and time and the base deltas.
// However, once 8 bytes would be needed to encode a double-delta value, a
// fall-back to the absolute numbers happens (so that timestamps are saved
// directly as int64 and values as float64).
// doubleDeltaEncodedChunk implements the chunk interface.
type doubleDeltaEncodedChunk []byte
// newDoubleDeltaEncodedChunk returns a newly allocated doubleDeltaEncodedChunk.
func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doubleDeltaEncodedChunk {
if tb < 1 {
panic("need at least 1 time delta byte")
}
if length < doubleDeltaHeaderBytes+16 {
panic(fmt.Errorf(
"chunk length %d bytes is insufficient, need at least %d",
length, doubleDeltaHeaderBytes+16,
))
}
c := make(doubleDeltaEncodedChunk, doubleDeltaHeaderIsIntOffset+1, length)
c[doubleDeltaHeaderTimeBytesOffset] = byte(tb)
c[doubleDeltaHeaderValueBytesOffset] = byte(vb)
if vb < d8 && isInt { // Only use int for fewer than 8 value double-delta bytes.
c[doubleDeltaHeaderIsIntOffset] = 1
} else {
c[doubleDeltaHeaderIsIntOffset] = 0
}
return &c
}
func (c doubleDeltaEncodedChunk) newFollowupChunk() chunk {
return newDoubleDeltaEncodedChunk(d1, d0, true, cap(c))
}
func (c doubleDeltaEncodedChunk) baseTime() clientmodel.Timestamp {
return clientmodel.Timestamp(
binary.LittleEndian.Uint64(
c[doubleDeltaHeaderBaseTimeOffset:],
),
)
}
func (c doubleDeltaEncodedChunk) baseValue() clientmodel.SampleValue {
return clientmodel.SampleValue(
math.Float64frombits(
binary.LittleEndian.Uint64(
c[doubleDeltaHeaderBaseValueOffset:],
),
),
)
}
func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp {
return clientmodel.Timestamp(
binary.LittleEndian.Uint64(
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
),
)
}
func (c doubleDeltaEncodedChunk) baseValueDelta() clientmodel.SampleValue {
return clientmodel.SampleValue(
math.Float64frombits(
binary.LittleEndian.Uint64(
c[doubleDeltaHeaderBaseValueDeltaOffset:],
),
),
)
}
func (c doubleDeltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[doubleDeltaHeaderTimeBytesOffset])
}
func (c doubleDeltaEncodedChunk) valueBytes() deltaBytes {
return deltaBytes(c[doubleDeltaHeaderValueBytesOffset])
}
func (c doubleDeltaEncodedChunk) sampleSize() int {
return int(c.timeBytes() + c.valueBytes())
}
func (c doubleDeltaEncodedChunk) len() int {
if len(c) <= doubleDeltaHeaderIsIntOffset+1 {
return 0
}
if len(c) <= doubleDeltaHeaderBaseValueOffset+8 {
return 1
}
return (len(c)-doubleDeltaHeaderBytes)/c.sampleSize() + 2
}
func (c doubleDeltaEncodedChunk) isInt() bool {
return c[doubleDeltaHeaderIsIntOffset] == 1
}
// add implements chunk.
func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk {
if c.len() == 0 {
return c.addFirstSample(s)
}
tb := c.timeBytes()
vb := c.valueBytes()
if c.len() == 1 {
return c.addSecondSample(s, tb, vb)
}
remainingBytes := cap(c) - len(c)
sampleSize := c.sampleSize()
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks := c.newFollowupChunk().add(s)
return []chunk{&c, overflowChunks[0]}
}
projectedTime := c.baseTime() + clientmodel.Timestamp(c.len())*c.baseTimeDelta()
ddt := s.Timestamp - projectedTime
projectedValue := c.baseValue() + clientmodel.SampleValue(c.len())*c.baseValueDelta()
ddv := s.Value - projectedValue
// If the new sample is incompatible with the current encoding, reencode the
// existing chunk data into new chunk(s).
//
// int->float.
if c.isInt() && !isInt64(ddv) {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(tb, d4, false, cap(c)), &c, s)
}
// float32->float64.
if !c.isInt() && vb == d4 && projectedValue+clientmodel.SampleValue(float32(ddv)) != s.Value {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(tb, d8, false, cap(c)), &c, s)
}
var ntb, nvb deltaBytes
if tb < d8 {
// Maybe more bytes for timestamp.
ntb = bytesNeededForSignedTimestampDelta(ddt)
}
if c.isInt() && vb < d8 {
// Maybe more bytes for sample value.
nvb = bytesNeededForIntegerSampleValueDelta(ddv)
}
if ntb > tb || nvb > vb {
ntb = max(ntb, tb)
nvb = max(nvb, vb)
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt(), cap(c)), &c, s)
}
offset := len(c)
c = c[:offset+sampleSize]
switch tb {
case d1:
c[offset] = byte(ddt)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(ddt))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(ddt))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
panic("invalid number of bytes for time delta")
}
offset += int(tb)
if c.isInt() {
switch vb {
case d0:
// No-op. Constant delta is stored as base value.
case d1:
c[offset] = byte(ddv)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(ddv))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(ddv))
// d8 must not happen. Those samples are encoded as float64.
default:
panic("invalid number of bytes for integer delta")
}
} else {
switch vb {
case d4:
binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv)))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
panic("invalid number of bytes for floating point delta")
}
}
return []chunk{&c}
}
// addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value.
func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk {
c = c[:doubleDeltaHeaderBaseValueOffset+8]
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeOffset:],
uint64(s.Timestamp),
)
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseValueOffset:],
math.Float64bits(float64(s.Value)),
)
return []chunk{&c}
}
// addSecondSample is a helper method only used by c.add(). It calculates the
// base delta from the provided sample and adds it to the chunk.
func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb deltaBytes) []chunk {
baseTimeDelta := s.Timestamp - c.baseTime()
if baseTimeDelta < 0 {
// TODO(beorn7): We ignore this irregular case for now. Once
// https://github.com/prometheus/prometheus/issues/481 is
// fixed, we should panic here instead.
return []chunk{&c}
}
c = c[:doubleDeltaHeaderBytes]
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
// If already the base delta needs d8 (or we are at d8
// already, anyway), we better encode this timestamp
// directly rather than as a delta and switch everything
// to d8.
c[doubleDeltaHeaderTimeBytesOffset] = byte(d8)
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
uint64(s.Timestamp),
)
} else {
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
uint64(baseTimeDelta),
)
}
baseValue := c.baseValue()
baseValueDelta := s.Value - baseValue
if vb >= d8 || baseValue+baseValueDelta != s.Value {
// If we can't reproduce the original sample value (or
// if we are at d8 already, anyway), we better encode
// this value directly rather than as a delta and switch
// everything to d8.
c[doubleDeltaHeaderValueBytesOffset] = byte(d8)
c[doubleDeltaHeaderIsIntOffset] = 0
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseValueDeltaOffset:],
math.Float64bits(float64(s.Value)),
)
} else {
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseValueDeltaOffset:],
math.Float64bits(float64(baseValueDelta)),
)
}
return []chunk{&c}
}
// clone implements chunk.
func (c doubleDeltaEncodedChunk) clone() chunk {
clone := make(doubleDeltaEncodedChunk, len(c), cap(c))
copy(clone, c)
return &clone
}
// values implements chunk.
func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair {
n := c.len()
valuesChan := make(chan *metric.SamplePair)
go func() {
for i := 0; i < n; i++ {
valuesChan <- c.valueAtIndex(i)
}
close(valuesChan)
}()
return valuesChan
}
func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
if idx == 0 {
return &metric.SamplePair{
Timestamp: c.baseTime(),
Value: c.baseValue(),
}
}
if idx == 1 {
// If time and/or value bytes are at d8, the time and value is
// saved directly rather than as a difference.
timestamp := c.baseTimeDelta()
if c.timeBytes() < d8 {
timestamp += c.baseTime()
}
value := c.baseValueDelta()
if c.valueBytes() < d8 {
value += c.baseValue()
}
return &metric.SamplePair{
Timestamp: timestamp,
Value: value,
}
}
offset := doubleDeltaHeaderBytes + (idx-2)*c.sampleSize()
var ts clientmodel.Timestamp
switch c.timeBytes() {
case d1:
ts = c.baseTime() +
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
clientmodel.Timestamp(int8(c[offset]))
case d2:
ts = c.baseTime() +
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(c[offset:])))
case d4:
ts = c.baseTime() +
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(c[offset:])))
case d8:
// Take absolute value for d8.
ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:]))
default:
panic("Invalid number of bytes for time delta")
}
offset += int(c.timeBytes())
var v clientmodel.SampleValue
if c.isInt() {
switch c.valueBytes() {
case d0:
v = c.baseValue() +
clientmodel.SampleValue(idx)*c.baseValueDelta()
case d1:
v = c.baseValue() +
clientmodel.SampleValue(idx)*c.baseValueDelta() +
clientmodel.SampleValue(int8(c[offset]))
case d2:
v = c.baseValue() +
clientmodel.SampleValue(idx)*c.baseValueDelta() +
clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:])))
case d4:
v = c.baseValue() +
clientmodel.SampleValue(idx)*c.baseValueDelta() +
clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:])))
// No d8 for ints.
default:
panic("Invalid number of bytes for integer delta")
}
} else {
switch c.valueBytes() {
case d4:
v = c.baseValue() +
clientmodel.SampleValue(idx)*c.baseValueDelta() +
clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:])))
case d8:
// Take absolute value for d8.
v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:])))
default:
panic("Invalid number of bytes for floating point delta")
}
}
return &metric.SamplePair{
Timestamp: ts,
Value: v,
}
}
// firstTime implements chunk.
func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp {
return c.baseTime()
}
// lastTime implements chunk.
func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp {
return c.valueAtIndex(c.len() - 1).Timestamp
}
// marshal implements chunk.
func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
}
binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
n, err := w.Write(c[:cap(c)])
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n)
}
return nil
}
// unmarshal implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
readBytes := 0
for readBytes < len(*c) {
n, err := r.Read((*c)[readBytes:])
if err != nil {
return err
}
readBytes += n
}
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
return nil
}
// doubleDeltaEncodedChunkIterator implements chunkIterator.
type doubleDeltaEncodedChunkIterator struct {
chunk *doubleDeltaEncodedChunk
// TODO(beorn7): add more fields here to keep track of last position.
}
// newIterator implements chunk.
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
return &doubleDeltaEncodedChunkIterator{
chunk: c,
}
}
// getValueAtTime implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values {
// TODO(beorn7): Implement in a more efficient way making use of the
// state of the iterator and internals of the doubleDeltaChunk.
i := sort.Search(it.chunk.len(), func(i int) bool {
return !it.chunk.valueAtIndex(i).Timestamp.Before(t)
})
switch i {
case 0:
return metric.Values{*it.chunk.valueAtIndex(0)}
case it.chunk.len():
return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)}
default:
v := it.chunk.valueAtIndex(i)
if v.Timestamp.Equal(t) {
return metric.Values{*v}
}
return metric.Values{*it.chunk.valueAtIndex(i - 1), *v}
}
}
// getRangeValues implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values {
// TODO(beorn7): Implement in a more efficient way making use of the
// state of the iterator and internals of the doubleDeltaChunk.
oldest := sort.Search(it.chunk.len(), func(i int) bool {
return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive)
})
newest := sort.Search(it.chunk.len(), func(i int) bool {
return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive)
})
if oldest == it.chunk.len() {
return nil
}
result := make(metric.Values, 0, newest-oldest)
for i := oldest; i < newest; i++ {
result = append(result, *it.chunk.valueAtIndex(i))
}
return result
}
// contains implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool {
return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime())
}

View file

@ -95,8 +95,8 @@ type indexingOp struct {
// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
// each other if each call refers to a different fingerprint.
type persistence struct {
basePath string
chunkLen int
basePath string
chunkType byte
archivedFingerprintToMetrics *index.FingerprintMetricIndex
archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex
@ -121,7 +121,7 @@ type persistence struct {
}
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, error) {
func newPersistence(basePath string, chunkType byte, dirty bool) (*persistence, error) {
dirtyPath := filepath.Join(basePath, dirtyFileName)
versionPath := filepath.Join(basePath, versionFileName)
@ -178,8 +178,8 @@ func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, er
}
p := &persistence{
basePath: basePath,
chunkLen: chunkLen,
basePath: basePath,
chunkType: chunkType,
archivedFingerprintToMetrics: archivedFingerprintToMetrics,
archivedFingerprintToTimeRange: archivedFingerprintToTimeRange,
@ -336,7 +336,7 @@ func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk)
}
defer f.Close()
b := bufio.NewWriterSize(f, len(chunks)*(chunkHeaderLen+p.chunkLen))
b := bufio.NewWriterSize(f, len(chunks)*(chunkHeaderLen+chunkLen))
for _, c := range chunks {
err = writeChunkHeader(b, c)
@ -420,7 +420,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
if err != nil {
return nil, err
}
totalChunkLen := chunkHeaderLen + p.chunkLen
totalChunkLen := chunkHeaderLen + chunkLen
if fi.Size()%int64(totalChunkLen) != 0 {
p.setDirty(true)
return nil, fmt.Errorf(
@ -770,6 +770,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: clientmodel.Timestamp(savedFirstTime),
headChunkPersisted: headChunkPersisted,
chunkType: p.chunkType,
}
}
return sm, nil
@ -1102,17 +1103,17 @@ func writeChunkHeader(w io.Writer, c chunk) error {
}
func (p *persistence) offsetForChunkIndex(i int) int64 {
return int64(i * (chunkHeaderLen + p.chunkLen))
return int64(i * (chunkHeaderLen + chunkLen))
}
func (p *persistence) chunkIndexForOffset(offset int64) (int, error) {
if int(offset)%(chunkHeaderLen+p.chunkLen) != 0 {
if int(offset)%(chunkHeaderLen+chunkLen) != 0 {
return -1, fmt.Errorf(
"offset %d is not a multiple of on-disk chunk length %d",
offset, chunkHeaderLen+p.chunkLen,
offset, chunkHeaderLen+chunkLen,
)
}
return int(offset) / (chunkHeaderLen + p.chunkLen), nil
return int(offset) / (chunkHeaderLen + chunkLen), nil
}
func (p *persistence) headsFileName() string {

View file

@ -31,9 +31,9 @@ var (
m3 = clientmodel.Metric{"label": "value3"}
)
func newTestPersistence(t *testing.T) (*persistence, test.Closer) {
func newTestPersistence(t *testing.T, chunkType byte) (*persistence, test.Closer) {
dir := test.NewTemporaryDirectory("test_persistence", t)
p, err := newPersistence(dir.Path(), 1024, false)
p, err := newPersistence(dir.Path(), chunkType, false)
if err != nil {
dir.Close()
t.Fatal(err)
@ -44,7 +44,7 @@ func newTestPersistence(t *testing.T) (*persistence, test.Closer) {
})
}
func buildTestChunks() map[clientmodel.Fingerprint][]chunk {
func buildTestChunks(chunkType byte) map[clientmodel.Fingerprint][]chunk {
fps := clientmodel.Fingerprints{
m1.Fingerprint(),
m2.Fingerprint(),
@ -55,7 +55,7 @@ func buildTestChunks() map[clientmodel.Fingerprint][]chunk {
for _, fp := range fps {
fpToChunks[fp] = make([]chunk, 0, 10)
for i := 0; i < 10; i++ {
fpToChunks[fp] = append(fpToChunks[fp], newDeltaEncodedChunk(d1, d1, true).add(&metric.SamplePair{
fpToChunks[fp] = append(fpToChunks[fp], chunkForType(chunkType).add(&metric.SamplePair{
Timestamp: clientmodel.Timestamp(i),
Value: clientmodel.SampleValue(fp),
})[0])
@ -75,11 +75,11 @@ func chunksEqual(c1, c2 chunk) bool {
return true
}
func TestPersistLoadDropChunks(t *testing.T) {
p, closer := newTestPersistence(t)
func testPersistLoadDropChunks(t *testing.T, chunkType byte) {
p, closer := newTestPersistence(t, chunkType)
defer closer.Close()
fpToChunks := buildTestChunks()
fpToChunks := buildTestChunks(chunkType)
for fp, chunks := range fpToChunks {
for i, c := range chunks {
@ -183,15 +183,23 @@ func TestPersistLoadDropChunks(t *testing.T) {
}
}
func TestCheckpointAndLoadSeriesMapAndHeads(t *testing.T) {
p, closer := newTestPersistence(t)
func TestPersistLoadDropChunksType0(t *testing.T) {
testPersistLoadDropChunks(t, 0)
}
func TestPersistLoadDropChunksType1(t *testing.T) {
testPersistLoadDropChunks(t, 1)
}
func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, chunkType byte) {
p, closer := newTestPersistence(t, chunkType)
defer closer.Close()
fpLocker := newFingerprintLocker(10)
sm := newSeriesMap()
s1 := newMemorySeries(m1, true, 0)
s2 := newMemorySeries(m2, false, 0)
s3 := newMemorySeries(m3, false, 0)
s1 := newMemorySeries(m1, true, 0, chunkType)
s2 := newMemorySeries(m2, false, 0, chunkType)
s3 := newMemorySeries(m3, false, 0, chunkType)
s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkPersisted = true
@ -244,8 +252,16 @@ func TestCheckpointAndLoadSeriesMapAndHeads(t *testing.T) {
}
}
func TestGetFingerprintsModifiedBefore(t *testing.T) {
p, closer := newTestPersistence(t)
func TestCheckpointAndLoadSeriesMapAndHeadsChunkType0(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 0)
}
func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 1)
}
func testGetFingerprintsModifiedBefore(t *testing.T, chunkType byte) {
p, closer := newTestPersistence(t, chunkType)
defer closer.Close()
m1 := clientmodel.Metric{"n1": "v1"}
@ -314,8 +330,16 @@ func TestGetFingerprintsModifiedBefore(t *testing.T) {
}
}
func TestDropArchivedMetric(t *testing.T) {
p, closer := newTestPersistence(t)
func TestGetFingerprintsModifiedBeforeChunkType0(t *testing.T) {
testGetFingerprintsModifiedBefore(t, 0)
}
func TestGetFingerprintsModifiedBeforeChunkType1(t *testing.T) {
testGetFingerprintsModifiedBefore(t, 1)
}
func testDropArchivedMetric(t *testing.T, chunkType byte) {
p, closer := newTestPersistence(t, chunkType)
defer closer.Close()
m1 := clientmodel.Metric{"n1": "v1"}
@ -382,13 +406,21 @@ func TestDropArchivedMetric(t *testing.T) {
}
}
func TestDropArchivedMetricChunkType0(t *testing.T) {
testDropArchivedMetric(t, 0)
}
func TestDropArchivedMetricChunkType1(t *testing.T) {
testDropArchivedMetric(t, 1)
}
type incrementalBatch struct {
fpToMetric index.FingerprintMetricMapping
expectedLnToLvs index.LabelNameLabelValuesMapping
expectedLpToFps index.LabelPairFingerprintsMapping
}
func TestIndexing(t *testing.T) {
func testIndexing(t *testing.T, chunkType byte) {
batches := []incrementalBatch{
{
fpToMetric: index.FingerprintMetricMapping{
@ -524,7 +556,7 @@ func TestIndexing(t *testing.T) {
},
}
p, closer := newTestPersistence(t)
p, closer := newTestPersistence(t, chunkType)
defer closer.Close()
indexedFpsToMetrics := index.FingerprintMetricMapping{}
@ -559,6 +591,14 @@ func TestIndexing(t *testing.T) {
}
}
func TestIndexingChunkType0(t *testing.T) {
testIndexing(t, 0)
}
func TestIndexingChunkType1(t *testing.T) {
testIndexing(t, 1)
}
func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *persistence) {
p.waitForIndexing()
for fp, m := range indexedFpsToMetrics {

View file

@ -158,6 +158,8 @@ type memorySeries struct {
// a non-persisted head chunk has to be cloned before more samples are
// appended.
headChunkUsedByIterator bool
// Which type of chunk to create if a new chunk is needed.
chunkType byte
}
// newMemorySeries returns a pointer to a newly allocated memorySeries for the
@ -165,8 +167,14 @@ type memorySeries struct {
// or (if false) a series for a metric being unarchived, i.e. a series that
// existed before but has been evicted from memory. If reallyNew is false,
// firstTime is ignored (and set to the lowest possible timestamp instead - it
// will be set properly upon the first eviction of chunkDescs).
func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel.Timestamp) *memorySeries {
// will be set properly upon the first eviction of chunkDescs). chunkType is the
// type of chunks newly created by this memorySeries.
func newMemorySeries(
m clientmodel.Metric,
reallyNew bool,
firstTime clientmodel.Timestamp,
chunkType byte,
) *memorySeries {
if reallyNew {
firstTime = clientmodel.Earliest
}
@ -174,6 +182,7 @@ func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel
metric: m,
headChunkPersisted: !reallyNew,
savedFirstTime: firstTime,
chunkType: chunkType,
}
if !reallyNew {
s.chunkDescsOffset = -1
@ -186,7 +195,7 @@ func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc {
if len(s.chunkDescs) == 0 || s.headChunkPersisted {
newHead := newChunkDesc(newDeltaEncodedChunk(d1, d0, true))
newHead := newChunkDesc(chunkForType(s.chunkType))
s.chunkDescs = append(s.chunkDescs, newHead)
s.headChunkPersisted = false
} else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 {

View file

@ -16,6 +16,7 @@ package local
import (
"container/list"
"fmt"
"sync"
"sync/atomic"
"time"
@ -71,6 +72,7 @@ type memorySeriesStorage struct {
dropAfter time.Duration
checkpointInterval time.Duration
checkpointDirtySeriesLimit int
chunkType byte
appendQueue chan *clientmodel.Sample
appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue.
@ -108,13 +110,17 @@ type MemorySeriesStorageOptions struct {
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
ChunkType byte // Chunk type for newly created chunks.
Dirty bool // Force the storage to consider itself dirty on startup.
}
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
p, err := newPersistence(o.PersistenceStoragePath, chunkLen, o.Dirty)
if o.ChunkType > 1 {
return nil, fmt.Errorf("unsupported chunk type %d", o.ChunkType)
}
p, err := newPersistence(o.PersistenceStoragePath, o.ChunkType, o.Dirty)
if err != nil {
return nil, err
}
@ -142,6 +148,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
dropAfter: o.PersistenceRetentionPeriod,
checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
chunkType: o.ChunkType,
appendLastTimestamp: clientmodel.Earliest,
appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
@ -451,7 +458,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl
s.persistence.indexMetric(fp, m)
s.seriesOps.WithLabelValues(create).Inc()
}
series = newMemorySeries(m, !unarchived, firstTime)
series = newMemorySeries(m, !unarchived, firstTime, s.chunkType)
s.fpToSeries.put(fp, series)
s.numSeries.Inc()
}

View file

@ -30,7 +30,7 @@ import (
)
func TestGetFingerprintsForLabelMatchers(t *testing.T) {
storage, closer := NewTestStorage(t)
storage, closer := NewTestStorage(t, 1)
defer closer.Close()
samples := make([]*clientmodel.Sample, 100)
@ -181,7 +181,7 @@ func TestLoop(t *testing.T) {
}
}
func TestChunk(t *testing.T) {
func testChunk(t *testing.T, chunkType byte) {
samples := make(clientmodel.Samples, 500000)
for i := range samples {
samples[i] = &clientmodel.Sample{
@ -189,7 +189,7 @@ func TestChunk(t *testing.T) {
Value: clientmodel.SampleValue(float64(i) * 0.2),
}
}
s, closer := NewTestStorage(t)
s, closer := NewTestStorage(t, chunkType)
defer closer.Close()
s.AppendSamples(samples)
@ -221,7 +221,15 @@ func TestChunk(t *testing.T) {
glog.Info("test done, closing")
}
func TestGetValueAtTime(t *testing.T) {
func TestChunkType0(t *testing.T) {
testChunk(t, 0)
}
func TestChunkType1(t *testing.T) {
testChunk(t, 1)
}
func testGetValueAtTime(t *testing.T, chunkType byte) {
samples := make(clientmodel.Samples, 1000)
for i := range samples {
samples[i] = &clientmodel.Sample{
@ -229,7 +237,7 @@ func TestGetValueAtTime(t *testing.T) {
Value: clientmodel.SampleValue(float64(i) * 0.2),
}
}
s, closer := NewTestStorage(t)
s, closer := NewTestStorage(t, chunkType)
defer closer.Close()
s.AppendSamples(samples)
@ -304,7 +312,15 @@ func TestGetValueAtTime(t *testing.T) {
}
}
func TestGetRangeValues(t *testing.T) {
func TestGetValueAtTimeChunkType0(t *testing.T) {
testGetValueAtTime(t, 0)
}
func TestGetValueAtTimeChunkType1(t *testing.T) {
testGetValueAtTime(t, 1)
}
func testGetRangeValues(t *testing.T, chunkType byte) {
samples := make(clientmodel.Samples, 1000)
for i := range samples {
samples[i] = &clientmodel.Sample{
@ -312,7 +328,7 @@ func TestGetRangeValues(t *testing.T) {
Value: clientmodel.SampleValue(float64(i) * 0.2),
}
}
s, closer := NewTestStorage(t)
s, closer := NewTestStorage(t, chunkType)
defer closer.Close()
s.AppendSamples(samples)
@ -446,15 +462,23 @@ func TestGetRangeValues(t *testing.T) {
}
}
func TestEvictAndPurgeSeries(t *testing.T) {
func TestGetRangeValuesChunkType0(t *testing.T) {
testGetRangeValues(t, 0)
}
func TestGetRangeValuesChunkType1(t *testing.T) {
testGetRangeValues(t, 1)
}
func testEvictAndPurgeSeries(t *testing.T, chunkType byte) {
samples := make(clientmodel.Samples, 1000)
for i := range samples {
samples[i] = &clientmodel.Sample{
Timestamp: clientmodel.Timestamp(2 * i),
Value: clientmodel.SampleValue(float64(i) * 0.2),
Value: clientmodel.SampleValue(float64(i * i)),
}
}
s, closer := NewTestStorage(t)
s, closer := NewTestStorage(t, chunkType)
defer closer.Close()
ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods.
@ -474,7 +498,7 @@ func TestEvictAndPurgeSeries(t *testing.T) {
if len(actual) != 2 {
t.Fatal("expected two results after purging half of series")
}
if actual[0].Timestamp < 800 || actual[0].Timestamp > 1000 {
if actual[0].Timestamp < 600 || actual[0].Timestamp > 1000 {
t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp)
}
want := clientmodel.Timestamp(1998)
@ -544,7 +568,15 @@ func TestEvictAndPurgeSeries(t *testing.T) {
}
}
func BenchmarkAppend(b *testing.B) {
func TestEvictAndPurgeSeriesChunkType0(t *testing.T) {
testEvictAndPurgeSeries(t, 0)
}
func TestEvictAndPurgeSeriesChunkType1(t *testing.T) {
testEvictAndPurgeSeries(t, 1)
}
func benchmarkAppend(b *testing.B, chunkType byte) {
samples := make(clientmodel.Samples, b.N)
for i := range samples {
samples[i] = &clientmodel.Sample{
@ -558,28 +590,34 @@ func BenchmarkAppend(b *testing.B) {
}
}
b.ResetTimer()
s, closer := NewTestStorage(b)
s, closer := NewTestStorage(b, chunkType)
defer closer.Close()
s.AppendSamples(samples)
}
func BenchmarkAppendType0(b *testing.B) {
benchmarkAppend(b, 0)
}
func BenchmarkAppendType1(b *testing.B) {
benchmarkAppend(b, 1)
}
// Append a large number of random samples and then check if we can get them out
// of the storage alright.
func TestFuzz(t *testing.T) {
func testFuzz(t *testing.T, chunkType byte) {
if testing.Short() {
t.Skip("Skipping test in short mode.")
}
check := func(seed int64) bool {
rand.Seed(seed)
s, c := NewTestStorage(t)
s, c := NewTestStorage(t, chunkType)
defer c.Close()
samples := createRandomSamples()
samples := createRandomSamples("test_fuzz", 1000)
s.AppendSamples(samples)
s.WaitForIndexing()
return verifyStorage(t, s, samples, 24*7*time.Hour)
}
@ -588,21 +626,27 @@ func TestFuzz(t *testing.T) {
}
}
// BenchmarkFuzz is the benchmark version of TestFuzz. However, it will run
// several append and verify operations in parallel, if GOMAXPROC is set
// accordingly. Also, the storage options are set such that evictions,
// checkpoints, and purging will happen concurrently, too. This benchmark will
// have a very long runtime (up to minutes). You can use it as an actual
// benchmark. Run it like this:
func TestFuzzChunkType0(t *testing.T) {
testFuzz(t, 0)
}
func TestFuzzChunkType1(t *testing.T) {
testFuzz(t, 1)
}
// benchmarkFuzz is the benchmark version of testFuzz. The storage options are
// set such that evictions, checkpoints, and purging will happen concurrently,
// too. This benchmark will have a very long runtime (up to minutes). You can
// use it as an actual benchmark. Run it like this:
//
// go test -cpu 1,2,4,8 -short -bench BenchmarkFuzz -benchmem
// go test -cpu 1,2,4,8 -test=NONE -bench BenchmarkFuzzChunkType -benchmem
//
// You can also use it as a test for races. In that case, run it like this (will
// make things even slower):
//
// go test -race -cpu 8 -short -bench BenchmarkFuzz
func BenchmarkFuzz(b *testing.B) {
b.StopTimer()
// go test -race -cpu 8 -test=short -bench BenchmarkFuzzChunkType
func benchmarkFuzz(b *testing.B, chunkType byte) {
const samplesPerRun = 100000
rand.Seed(42)
directory := test.NewTemporaryDirectory("test_storage", b)
defer directory.Close()
@ -610,7 +654,8 @@ func BenchmarkFuzz(b *testing.B) {
MemoryChunks: 100,
PersistenceRetentionPeriod: time.Hour,
PersistenceStoragePath: directory.Path(),
CheckpointInterval: 3 * time.Second,
CheckpointInterval: time.Second,
ChunkType: chunkType,
}
s, err := NewMemorySeriesStorage(o)
if err != nil {
@ -618,33 +663,40 @@ func BenchmarkFuzz(b *testing.B) {
}
s.Start()
defer s.Stop()
b.StartTimer()
b.RunParallel(func(pb *testing.PB) {
var allSamples clientmodel.Samples
for pb.Next() {
newSamples := createRandomSamples()
allSamples = append(allSamples, newSamples[:len(newSamples)/2]...)
s.AppendSamples(newSamples[:len(newSamples)/2])
verifyStorage(b, s, allSamples, o.PersistenceRetentionPeriod)
allSamples = append(allSamples, newSamples[len(newSamples)/2:]...)
s.AppendSamples(newSamples[len(newSamples)/2:])
verifyStorage(b, s, allSamples, o.PersistenceRetentionPeriod)
}
})
samples := createRandomSamples("benchmark_fuzz", samplesPerRun*b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
start := samplesPerRun * i
end := samplesPerRun * (i + 1)
middle := (start + end) / 2
s.AppendSamples(samples[start:middle])
verifyStorage(b, s, samples[:middle], o.PersistenceRetentionPeriod)
s.AppendSamples(samples[middle:end])
verifyStorage(b, s, samples[:end], o.PersistenceRetentionPeriod)
}
}
func createRandomSamples() clientmodel.Samples {
func BenchmarkFuzzChunkType0(b *testing.B) {
benchmarkFuzz(b, 0)
}
func BenchmarkFuzzChunkType1(b *testing.B) {
benchmarkFuzz(b, 1)
}
func createRandomSamples(metricName string, minLen int) clientmodel.Samples {
type valueCreator func() clientmodel.SampleValue
type deltaApplier func(clientmodel.SampleValue) clientmodel.SampleValue
var (
maxMetrics = 5
maxCycles = 500
maxStreakLength = 500
maxTimeDelta = 1000
maxTimeDelta = 10000
maxTimeDeltaFactor = 10
timestamp = clientmodel.Now() - clientmodel.Timestamp(maxTimeDelta*maxTimeDeltaFactor*maxCycles*maxStreakLength/16) // So that some timestamps are in the future.
timestamp = clientmodel.Now() - clientmodel.Timestamp(maxTimeDelta*maxTimeDeltaFactor*minLen/4) // So that some timestamps are in the future.
generators = []struct {
createValue valueCreator
applyDelta []deltaApplier
@ -696,11 +748,12 @@ func createRandomSamples() clientmodel.Samples {
metrics := []clientmodel.Metric{}
for n := rand.Intn(maxMetrics); n >= 0; n-- {
metrics = append(metrics, clientmodel.Metric{
clientmodel.MetricNameLabel: clientmodel.LabelValue(metricName),
clientmodel.LabelName(fmt.Sprintf("labelname_%d", n+1)): clientmodel.LabelValue(fmt.Sprintf("labelvalue_%d", rand.Int())),
})
}
for n := rand.Intn(maxCycles); n >= 0; n-- {
for len(result) < minLen {
// Pick a metric for this cycle.
metric := metrics[rand.Intn(len(metrics))]
timeDelta := rand.Intn(maxTimeDelta) + 1
@ -753,6 +806,7 @@ func createRandomSamples() clientmodel.Samples {
}
func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge time.Duration) bool {
s.WaitForIndexing()
result := true
for _, i := range rand.Perm(len(samples)) {
sample := samples[i]
@ -772,8 +826,8 @@ func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge
p.Close()
continue
}
want := float64(sample.Value)
got := float64(found[0].Value)
want := sample.Value
got := found[0].Value
if want != got || sample.Timestamp != found[0].Timestamp {
t.Errorf(
"Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).",

View file

@ -37,13 +37,14 @@ func (t *testStorageCloser) Close() {
// NewTestStorage creates a storage instance backed by files in a temporary
// directory. The returned storage is already in serving state. Upon closing the
// returned test.Closer, the temporary directory is cleaned up.
func NewTestStorage(t test.T) (Storage, test.Closer) {
func NewTestStorage(t test.T, chunkType byte) (Storage, test.Closer) {
directory := test.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{
MemoryChunks: 1000000,
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
PersistenceStoragePath: directory.Path(),
CheckpointInterval: time.Hour,
ChunkType: chunkType,
}
storage, err := NewMemorySeriesStorage(o)
if err != nil {

View file

@ -152,7 +152,7 @@ func TestTemplateExpansion(t *testing.T) {
time := clientmodel.Timestamp(0)
storage, closer := local.NewTestStorage(t)
storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()
storage.AppendSamples(clientmodel.Samples{
{