Merge pull request #593 from prometheus/beorn7/chunk-encoding

Improve various things around chunk encoding.
This commit is contained in:
Björn Rabenstein 2015-03-16 14:53:58 +01:00
commit 36531b5278
11 changed files with 360 additions and 365 deletions

View file

@ -54,7 +54,6 @@ var (
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.") samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.")
chunkType = flag.Int("storage.local.chunk-type", 1, "Which chunk encoding version to use. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).")
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
@ -123,7 +122,6 @@ func NewPrometheus() *prometheus {
PersistenceQueueCapacity: *persistenceQueueCapacity, PersistenceQueueCapacity: *persistenceQueueCapacity,
CheckpointInterval: *checkpointInterval, CheckpointInterval: *checkpointInterval,
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit, CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
ChunkType: byte(*chunkType),
Dirty: *storageDirty, Dirty: *storageDirty,
} }
memStorage, err := local.NewMemorySeriesStorage(o) memStorage, err := local.NewMemorySeriesStorage(o)

View file

@ -15,6 +15,7 @@ package local
import ( import (
"container/list" "container/list"
"flag"
"fmt" "fmt"
"io" "io"
"sync" "sync"
@ -25,6 +26,17 @@ import (
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
var (
defaultChunkEncoding = flag.Int("storage.local.chunk-encoding-version", 1, "Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).")
)
type chunkEncoding byte
const (
delta chunkEncoding = iota
doubleDelta
)
// chunkDesc contains meta-data for a chunk. Many of its methods are // chunkDesc contains meta-data for a chunk. Many of its methods are
// goroutine-safe proxies for chunk methods. // goroutine-safe proxies for chunk methods.
type chunkDesc struct { type chunkDesc struct {
@ -173,13 +185,14 @@ type chunk interface {
// any. The first chunk returned might be the same as the original one // any. The first chunk returned might be the same as the original one
// or a newly allocated version. In any case, take the returned chunk as // or a newly allocated version. In any case, take the returned chunk as
// the relevant one and discard the orginal chunk. // the relevant one and discard the orginal chunk.
add(*metric.SamplePair) []chunk add(sample *metric.SamplePair) []chunk
clone() chunk clone() chunk
firstTime() clientmodel.Timestamp firstTime() clientmodel.Timestamp
lastTime() clientmodel.Timestamp lastTime() clientmodel.Timestamp
newIterator() chunkIterator newIterator() chunkIterator
marshal(io.Writer) error marshal(io.Writer) error
unmarshal(io.Reader) error unmarshal(io.Reader) error
encoding() chunkEncoding
// values returns a channel, from which all sample values in the chunk // values returns a channel, from which all sample values in the chunk
// can be received in order. The channel is closed after the last // can be received in order. The channel is closed after the last
// one. It is generally not safe to mutate the chunk while the channel // one. It is generally not safe to mutate the chunk while the channel
@ -215,29 +228,22 @@ func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk {
head = newChunks[len(newChunks)-1] head = newChunks[len(newChunks)-1]
} }
newChunks := head.add(s) newChunks := head.add(s)
body = append(body, newChunks[:len(newChunks)-1]...) return append(body, newChunks...)
head = newChunks[len(newChunks)-1]
return append(body, head)
} }
func chunkType(c chunk) byte { // newChunk creates a new chunk according to the encoding set by the
switch c.(type) { // defaultChunkEncoding flag.
case *deltaEncodedChunk: func newChunk() chunk {
return 0 return newChunkForEncoding(chunkEncoding(*defaultChunkEncoding))
case *doubleDeltaEncodedChunk:
return 1
default:
panic(fmt.Errorf("unknown chunk type: %T", c))
}
} }
func chunkForType(chunkType byte) chunk { func newChunkForEncoding(encoding chunkEncoding) chunk {
switch chunkType { switch encoding {
case 0: case delta:
return newDeltaEncodedChunk(d1, d0, true, chunkLen) return newDeltaEncodedChunk(d1, d0, true, chunkLen)
case 1: case doubleDelta:
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen) return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen)
default: default:
panic(fmt.Errorf("unknown chunk type: %d", chunkType)) panic(fmt.Errorf("unknown chunk encoding: %v", encoding))
} }
} }

View file

@ -341,7 +341,7 @@ func (p *persistence) cleanUpArchiveIndexes(
if err := kv.Value(&m); err != nil { if err := kv.Value(&m); err != nil {
return err return err
} }
series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest, p.chunkType) series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest)
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now())
if err != nil { if err != nil {
return err return err

View file

@ -75,37 +75,6 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod
return &c return &c
} }
func (c deltaEncodedChunk) newFollowupChunk() chunk {
return newDeltaEncodedChunk(d1, d0, true, cap(c))
}
// clone implements chunk.
func (c deltaEncodedChunk) clone() chunk {
clone := make(deltaEncodedChunk, len(c), cap(c))
copy(clone, c)
return &clone
}
func (c deltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[deltaHeaderTimeBytesOffset])
}
func (c deltaEncodedChunk) valueBytes() deltaBytes {
return deltaBytes(c[deltaHeaderValueBytesOffset])
}
func (c deltaEncodedChunk) isInt() bool {
return c[deltaHeaderIsIntOffset] == 1
}
func (c deltaEncodedChunk) baseTime() clientmodel.Timestamp {
return clientmodel.Timestamp(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:]))
}
func (c deltaEncodedChunk) baseValue() clientmodel.SampleValue {
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:])))
}
// add implements chunk. // add implements chunk.
func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
if c.len() == 0 { if c.len() == 0 {
@ -120,7 +89,7 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
// Do we generally have space for another sample in this chunk? If not, // Do we generally have space for another sample in this chunk? If not,
// overflow into a new one. // overflow into a new one.
if remainingBytes < sampleSize { if remainingBytes < sampleSize {
overflowChunks := c.newFollowupChunk().add(s) overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]} return []chunk{&c, overflowChunks[0]}
} }
@ -131,33 +100,38 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
dv := s.Value - baseValue dv := s.Value - baseValue
tb := c.timeBytes() tb := c.timeBytes()
vb := c.valueBytes() vb := c.valueBytes()
isInt := c.isInt()
// If the new sample is incompatible with the current encoding, reencode the // If the new sample is incompatible with the current encoding, reencode the
// existing chunk data into new chunk(s). // existing chunk data into new chunk(s).
//
// int->float.
if c.isInt() && !isInt64(dv) {
return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false, cap(c)), &c, s)
}
// float32->float64.
if !c.isInt() && vb == d4 && baseValue+clientmodel.SampleValue(float32(dv)) != s.Value {
return transcodeAndAdd(newDeltaEncodedChunk(tb, d8, false, cap(c)), &c, s)
}
var ntb, nvb deltaBytes ntb, nvb, nInt := tb, vb, isInt
if isInt && !isInt64(dv) {
// int->float.
nvb = d4
nInt = false
} else if !isInt && vb == d4 && baseValue+clientmodel.SampleValue(float32(dv)) != s.Value {
// float32->float64.
nvb = d8
} else {
if tb < d8 { if tb < d8 {
// Maybe more bytes for timestamp. // Maybe more bytes for timestamp.
ntb = bytesNeededForUnsignedTimestampDelta(dt) ntb = max(tb, bytesNeededForUnsignedTimestampDelta(dt))
} }
if c.isInt() && vb < d8 { if c.isInt() && vb < d8 {
// Maybe more bytes for sample value. // Maybe more bytes for sample value.
nvb = bytesNeededForIntegerSampleValueDelta(dv) nvb = max(vb, bytesNeededForIntegerSampleValueDelta(dv))
} }
if ntb > tb || nvb > vb {
ntb = max(ntb, tb)
nvb = max(nvb, vb)
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt(), cap(c)), &c, s)
} }
if tb != ntb || vb != nvb || isInt != nInt {
if len(c)*2 < cap(c) {
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
}
offset := len(c) offset := len(c)
c = c[:offset+sampleSize] c = c[:offset+sampleSize]
@ -205,15 +179,60 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
return []chunk{&c} return []chunk{&c}
} }
func (c deltaEncodedChunk) sampleSize() int { // clone implements chunk.
return int(c.timeBytes() + c.valueBytes()) func (c deltaEncodedChunk) clone() chunk {
clone := make(deltaEncodedChunk, len(c), cap(c))
copy(clone, c)
return &clone
} }
func (c deltaEncodedChunk) len() int { // firstTime implements chunk.
if len(c) < deltaHeaderBytes { func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp {
return 0 return c.valueAtIndex(0).Timestamp
} }
return (len(c) - deltaHeaderBytes) / c.sampleSize()
// lastTime implements chunk.
func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp {
return c.valueAtIndex(c.len() - 1).Timestamp
}
// newIterator implements chunk.
func (c *deltaEncodedChunk) newIterator() chunkIterator {
return &deltaEncodedChunkIterator{
chunk: c,
}
}
// marshal implements chunk.
func (c deltaEncodedChunk) marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
}
binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c)))
n, err := w.Write(c[:cap(c)])
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n)
}
return nil
}
// unmarshal implements chunk.
func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
readBytes := 0
for readBytes < len(*c) {
n, err := r.Read((*c)[readBytes:])
if err != nil {
return err
}
readBytes += n
}
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
return nil
} }
// values implements chunk. // values implements chunk.
@ -229,6 +248,40 @@ func (c deltaEncodedChunk) values() <-chan *metric.SamplePair {
return valuesChan return valuesChan
} }
// encoding implements chunk.
func (c deltaEncodedChunk) encoding() chunkEncoding { return delta }
func (c deltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[deltaHeaderTimeBytesOffset])
}
func (c deltaEncodedChunk) valueBytes() deltaBytes {
return deltaBytes(c[deltaHeaderValueBytesOffset])
}
func (c deltaEncodedChunk) isInt() bool {
return c[deltaHeaderIsIntOffset] == 1
}
func (c deltaEncodedChunk) baseTime() clientmodel.Timestamp {
return clientmodel.Timestamp(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:]))
}
func (c deltaEncodedChunk) baseValue() clientmodel.SampleValue {
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:])))
}
func (c deltaEncodedChunk) sampleSize() int {
return int(c.timeBytes() + c.valueBytes())
}
func (c deltaEncodedChunk) len() int {
if len(c) < deltaHeaderBytes {
return 0
}
return (len(c) - deltaHeaderBytes) / c.sampleSize()
}
func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
offset := deltaHeaderBytes + idx*c.sampleSize() offset := deltaHeaderBytes + idx*c.sampleSize()
@ -281,61 +334,12 @@ func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
} }
} }
// firstTime implements chunk.
func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp {
return c.valueAtIndex(0).Timestamp
}
// lastTime implements chunk.
func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp {
return c.valueAtIndex(c.len() - 1).Timestamp
}
// marshal implements chunk.
func (c deltaEncodedChunk) marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
}
binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c)))
n, err := w.Write(c[:cap(c)])
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n)
}
return nil
}
// unmarshal implements chunk.
func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
readBytes := 0
for readBytes < len(*c) {
n, err := r.Read((*c)[readBytes:])
if err != nil {
return err
}
readBytes += n
}
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
return nil
}
// deltaEncodedChunkIterator implements chunkIterator. // deltaEncodedChunkIterator implements chunkIterator.
type deltaEncodedChunkIterator struct { type deltaEncodedChunkIterator struct {
chunk *deltaEncodedChunk chunk *deltaEncodedChunk
// TODO: add more fields here to keep track of last position. // TODO: add more fields here to keep track of last position.
} }
// newIterator implements chunk.
func (c *deltaEncodedChunk) newIterator() chunkIterator {
return &deltaEncodedChunkIterator{
chunk: c,
}
}
// getValueAtTime implements chunkIterator. // getValueAtTime implements chunkIterator.
func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values {
i := sort.Search(it.chunk.len(), func(i int) bool { i := sort.Search(it.chunk.len(), func(i int) bool {

View file

@ -82,10 +82,183 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub
return &c return &c
} }
func (c doubleDeltaEncodedChunk) newFollowupChunk() chunk { // add implements chunk.
return newDoubleDeltaEncodedChunk(d1, d0, true, cap(c)) func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk {
if c.len() == 0 {
return c.addFirstSample(s)
} }
tb := c.timeBytes()
vb := c.valueBytes()
if c.len() == 1 {
return c.addSecondSample(s, tb, vb)
}
remainingBytes := cap(c) - len(c)
sampleSize := c.sampleSize()
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
}
projectedTime := c.baseTime() + clientmodel.Timestamp(c.len())*c.baseTimeDelta()
ddt := s.Timestamp - projectedTime
projectedValue := c.baseValue() + clientmodel.SampleValue(c.len())*c.baseValueDelta()
ddv := s.Value - projectedValue
ntb, nvb, nInt := tb, vb, c.isInt()
// If the new sample is incompatible with the current encoding, reencode the
// existing chunk data into new chunk(s).
if c.isInt() && !isInt64(ddv) {
// int->float.
nvb = d4
nInt = false
} else if !c.isInt() && vb == d4 && projectedValue+clientmodel.SampleValue(float32(ddv)) != s.Value {
// float32->float64.
nvb = d8
} else {
if tb < d8 {
// Maybe more bytes for timestamp.
ntb = max(tb, bytesNeededForSignedTimestampDelta(ddt))
}
if c.isInt() && vb < d8 {
// Maybe more bytes for sample value.
nvb = max(vb, bytesNeededForIntegerSampleValueDelta(ddv))
}
}
if tb != ntb || vb != nvb || c.isInt() != nInt {
if len(c)*2 < cap(c) {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
}
offset := len(c)
c = c[:offset+sampleSize]
switch tb {
case d1:
c[offset] = byte(ddt)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(ddt))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(ddt))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
panic("invalid number of bytes for time delta")
}
offset += int(tb)
if c.isInt() {
switch vb {
case d0:
// No-op. Constant delta is stored as base value.
case d1:
c[offset] = byte(ddv)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(ddv))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(ddv))
// d8 must not happen. Those samples are encoded as float64.
default:
panic("invalid number of bytes for integer delta")
}
} else {
switch vb {
case d4:
binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv)))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
panic("invalid number of bytes for floating point delta")
}
}
return []chunk{&c}
}
// clone implements chunk.
func (c doubleDeltaEncodedChunk) clone() chunk {
clone := make(doubleDeltaEncodedChunk, len(c), cap(c))
copy(clone, c)
return &clone
}
// firstTime implements chunk.
func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp {
return c.baseTime()
}
// lastTime implements chunk.
func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp {
return c.valueAtIndex(c.len() - 1).Timestamp
}
// newIterator implements chunk.
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
return &doubleDeltaEncodedChunkIterator{
chunk: c,
}
}
// marshal implements chunk.
func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
}
binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
n, err := w.Write(c[:cap(c)])
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n)
}
return nil
}
// unmarshal implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
readBytes := 0
for readBytes < len(*c) {
n, err := r.Read((*c)[readBytes:])
if err != nil {
return err
}
readBytes += n
}
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
return nil
}
// values implements chunk.
func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair {
n := c.len()
valuesChan := make(chan *metric.SamplePair)
go func() {
for i := 0; i < n; i++ {
valuesChan <- c.valueAtIndex(i)
}
close(valuesChan)
}()
return valuesChan
}
// encoding implements chunk.
func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta }
func (c doubleDeltaEncodedChunk) baseTime() clientmodel.Timestamp { func (c doubleDeltaEncodedChunk) baseTime() clientmodel.Timestamp {
return clientmodel.Timestamp( return clientmodel.Timestamp(
binary.LittleEndian.Uint64( binary.LittleEndian.Uint64(
@ -148,109 +321,6 @@ func (c doubleDeltaEncodedChunk) isInt() bool {
return c[doubleDeltaHeaderIsIntOffset] == 1 return c[doubleDeltaHeaderIsIntOffset] == 1
} }
// add implements chunk.
func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk {
if c.len() == 0 {
return c.addFirstSample(s)
}
tb := c.timeBytes()
vb := c.valueBytes()
if c.len() == 1 {
return c.addSecondSample(s, tb, vb)
}
remainingBytes := cap(c) - len(c)
sampleSize := c.sampleSize()
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks := c.newFollowupChunk().add(s)
return []chunk{&c, overflowChunks[0]}
}
projectedTime := c.baseTime() + clientmodel.Timestamp(c.len())*c.baseTimeDelta()
ddt := s.Timestamp - projectedTime
projectedValue := c.baseValue() + clientmodel.SampleValue(c.len())*c.baseValueDelta()
ddv := s.Value - projectedValue
// If the new sample is incompatible with the current encoding, reencode the
// existing chunk data into new chunk(s).
//
// int->float.
if c.isInt() && !isInt64(ddv) {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(tb, d4, false, cap(c)), &c, s)
}
// float32->float64.
if !c.isInt() && vb == d4 && projectedValue+clientmodel.SampleValue(float32(ddv)) != s.Value {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(tb, d8, false, cap(c)), &c, s)
}
var ntb, nvb deltaBytes
if tb < d8 {
// Maybe more bytes for timestamp.
ntb = bytesNeededForSignedTimestampDelta(ddt)
}
if c.isInt() && vb < d8 {
// Maybe more bytes for sample value.
nvb = bytesNeededForIntegerSampleValueDelta(ddv)
}
if ntb > tb || nvb > vb {
ntb = max(ntb, tb)
nvb = max(nvb, vb)
return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, c.isInt(), cap(c)), &c, s)
}
offset := len(c)
c = c[:offset+sampleSize]
switch tb {
case d1:
c[offset] = byte(ddt)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(ddt))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(ddt))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
panic("invalid number of bytes for time delta")
}
offset += int(tb)
if c.isInt() {
switch vb {
case d0:
// No-op. Constant delta is stored as base value.
case d1:
c[offset] = byte(ddv)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(ddv))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(ddv))
// d8 must not happen. Those samples are encoded as float64.
default:
panic("invalid number of bytes for integer delta")
}
} else {
switch vb {
case d4:
binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv)))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
panic("invalid number of bytes for floating point delta")
}
}
return []chunk{&c}
}
// addFirstSample is a helper method only used by c.add(). It adds timestamp and // addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value. // value as base time and value.
func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk { func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk {
@ -315,26 +385,6 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb de
return []chunk{&c} return []chunk{&c}
} }
// clone implements chunk.
func (c doubleDeltaEncodedChunk) clone() chunk {
clone := make(doubleDeltaEncodedChunk, len(c), cap(c))
copy(clone, c)
return &clone
}
// values implements chunk.
func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair {
n := c.len()
valuesChan := make(chan *metric.SamplePair)
go func() {
for i := 0; i < n; i++ {
valuesChan <- c.valueAtIndex(i)
}
close(valuesChan)
}()
return valuesChan
}
func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
if idx == 0 { if idx == 0 {
return &metric.SamplePair{ return &metric.SamplePair{
@ -424,61 +474,12 @@ func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
} }
} }
// firstTime implements chunk.
func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp {
return c.baseTime()
}
// lastTime implements chunk.
func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp {
return c.valueAtIndex(c.len() - 1).Timestamp
}
// marshal implements chunk.
func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
}
binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
n, err := w.Write(c[:cap(c)])
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n)
}
return nil
}
// unmarshal implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
readBytes := 0
for readBytes < len(*c) {
n, err := r.Read((*c)[readBytes:])
if err != nil {
return err
}
readBytes += n
}
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
return nil
}
// doubleDeltaEncodedChunkIterator implements chunkIterator. // doubleDeltaEncodedChunkIterator implements chunkIterator.
type doubleDeltaEncodedChunkIterator struct { type doubleDeltaEncodedChunkIterator struct {
chunk *doubleDeltaEncodedChunk chunk *doubleDeltaEncodedChunk
// TODO(beorn7): add more fields here to keep track of last position. // TODO(beorn7): add more fields here to keep track of last position.
} }
// newIterator implements chunk.
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
return &doubleDeltaEncodedChunkIterator{
chunk: c,
}
}
// getValueAtTime implements chunkIterator. // getValueAtTime implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values {
// TODO(beorn7): Implement in a more efficient way making use of the // TODO(beorn7): Implement in a more efficient way making use of the

View file

@ -96,7 +96,6 @@ type indexingOp struct {
// each other if each call refers to a different fingerprint. // each other if each call refers to a different fingerprint.
type persistence struct { type persistence struct {
basePath string basePath string
chunkType byte
archivedFingerprintToMetrics *index.FingerprintMetricIndex archivedFingerprintToMetrics *index.FingerprintMetricIndex
archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex
@ -121,7 +120,7 @@ type persistence struct {
} }
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
func newPersistence(basePath string, chunkType byte, dirty bool) (*persistence, error) { func newPersistence(basePath string, dirty bool) (*persistence, error) {
dirtyPath := filepath.Join(basePath, dirtyFileName) dirtyPath := filepath.Join(basePath, dirtyFileName)
versionPath := filepath.Join(basePath, versionFileName) versionPath := filepath.Join(basePath, versionFileName)
@ -179,7 +178,6 @@ func newPersistence(basePath string, chunkType byte, dirty bool) (*persistence,
p := &persistence{ p := &persistence{
basePath: basePath, basePath: basePath,
chunkType: chunkType,
archivedFingerprintToMetrics: archivedFingerprintToMetrics, archivedFingerprintToMetrics: archivedFingerprintToMetrics,
archivedFingerprintToTimeRange: archivedFingerprintToTimeRange, archivedFingerprintToTimeRange: archivedFingerprintToTimeRange,
@ -396,7 +394,7 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde
if err != nil { if err != nil {
return nil, err return nil, err
} }
chunk := chunkForType(typeBuf[0]) chunk := newChunkForEncoding(chunkEncoding(typeBuf[0]))
chunk.unmarshal(f) chunk.unmarshal(f)
chunks = append(chunks, chunk) chunks = append(chunks, chunk)
} }
@ -590,7 +588,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
} }
} else { } else {
// This is the non-persisted head chunk. Fully marshal it. // This is the non-persisted head chunk. Fully marshal it.
if err = w.WriteByte(chunkType(chunkDesc.chunk)); err != nil { if err = w.WriteByte(byte(chunkDesc.chunk.encoding())); err != nil {
return return
} }
if err = chunkDesc.chunk.marshal(w); err != nil { if err = chunkDesc.chunk.marshal(w); err != nil {
@ -742,13 +740,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
} }
} else { } else {
// Non-persisted head chunk. // Non-persisted head chunk.
chunkType, err := r.ReadByte() encoding, err := r.ReadByte()
if err != nil { if err != nil {
glog.Warning("Could not decode chunk type:", err) glog.Warning("Could not decode chunk type:", err)
p.dirty = true p.dirty = true
return sm, nil return sm, nil
} }
chunk := chunkForType(chunkType) chunk := newChunkForEncoding(chunkEncoding(encoding))
if err := chunk.unmarshal(r); err != nil { if err := chunk.unmarshal(r); err != nil {
glog.Warning("Could not decode chunk type:", err) glog.Warning("Could not decode chunk type:", err)
p.dirty = true p.dirty = true
@ -771,7 +769,6 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
chunkDescsOffset: int(chunkDescsOffset), chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: clientmodel.Timestamp(savedFirstTime), savedFirstTime: clientmodel.Timestamp(savedFirstTime),
headChunkPersisted: headChunkPersisted, headChunkPersisted: headChunkPersisted,
chunkType: p.chunkType,
} }
} }
return sm, nil return sm, nil
@ -1096,7 +1093,7 @@ func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.F
func writeChunkHeader(w io.Writer, c chunk) error { func writeChunkHeader(w io.Writer, c chunk) error {
header := make([]byte, chunkHeaderLen) header := make([]byte, chunkHeaderLen)
header[chunkHeaderTypeOffset] = chunkType(c) header[chunkHeaderTypeOffset] = byte(c.encoding())
binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime())) binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime()))
binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime())) binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime()))
_, err := w.Write(header) _, err := w.Write(header)

View file

@ -31,9 +31,10 @@ var (
m3 = clientmodel.Metric{"label": "value3"} m3 = clientmodel.Metric{"label": "value3"}
) )
func newTestPersistence(t *testing.T, chunkType byte) (*persistence, test.Closer) { func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) {
*defaultChunkEncoding = int(encoding)
dir := test.NewTemporaryDirectory("test_persistence", t) dir := test.NewTemporaryDirectory("test_persistence", t)
p, err := newPersistence(dir.Path(), chunkType, false) p, err := newPersistence(dir.Path(), false)
if err != nil { if err != nil {
dir.Close() dir.Close()
t.Fatal(err) t.Fatal(err)
@ -44,7 +45,7 @@ func newTestPersistence(t *testing.T, chunkType byte) (*persistence, test.Closer
}) })
} }
func buildTestChunks(chunkType byte) map[clientmodel.Fingerprint][]chunk { func buildTestChunks(encoding chunkEncoding) map[clientmodel.Fingerprint][]chunk {
fps := clientmodel.Fingerprints{ fps := clientmodel.Fingerprints{
m1.Fingerprint(), m1.Fingerprint(),
m2.Fingerprint(), m2.Fingerprint(),
@ -55,7 +56,7 @@ func buildTestChunks(chunkType byte) map[clientmodel.Fingerprint][]chunk {
for _, fp := range fps { for _, fp := range fps {
fpToChunks[fp] = make([]chunk, 0, 10) fpToChunks[fp] = make([]chunk, 0, 10)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
fpToChunks[fp] = append(fpToChunks[fp], chunkForType(chunkType).add(&metric.SamplePair{ fpToChunks[fp] = append(fpToChunks[fp], newChunkForEncoding(encoding).add(&metric.SamplePair{
Timestamp: clientmodel.Timestamp(i), Timestamp: clientmodel.Timestamp(i),
Value: clientmodel.SampleValue(fp), Value: clientmodel.SampleValue(fp),
})[0]) })[0])
@ -75,11 +76,11 @@ func chunksEqual(c1, c2 chunk) bool {
return true return true
} }
func testPersistLoadDropChunks(t *testing.T, chunkType byte) { func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, chunkType) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
fpToChunks := buildTestChunks(chunkType) fpToChunks := buildTestChunks(encoding)
for fp, chunks := range fpToChunks { for fp, chunks := range fpToChunks {
for i, c := range chunks { for i, c := range chunks {
@ -191,15 +192,15 @@ func TestPersistLoadDropChunksType1(t *testing.T) {
testPersistLoadDropChunks(t, 1) testPersistLoadDropChunks(t, 1)
} }
func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, chunkType byte) { func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, chunkType) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
fpLocker := newFingerprintLocker(10) fpLocker := newFingerprintLocker(10)
sm := newSeriesMap() sm := newSeriesMap()
s1 := newMemorySeries(m1, true, 0, chunkType) s1 := newMemorySeries(m1, true, 0)
s2 := newMemorySeries(m2, false, 0, chunkType) s2 := newMemorySeries(m2, false, 0)
s3 := newMemorySeries(m3, false, 0, chunkType) s3 := newMemorySeries(m3, false, 0)
s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14}) s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7}) s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkPersisted = true s3.headChunkPersisted = true
@ -260,8 +261,8 @@ func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 1) testCheckpointAndLoadSeriesMapAndHeads(t, 1)
} }
func testGetFingerprintsModifiedBefore(t *testing.T, chunkType byte) { func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, chunkType) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
m1 := clientmodel.Metric{"n1": "v1"} m1 := clientmodel.Metric{"n1": "v1"}
@ -338,8 +339,8 @@ func TestGetFingerprintsModifiedBeforeChunkType1(t *testing.T) {
testGetFingerprintsModifiedBefore(t, 1) testGetFingerprintsModifiedBefore(t, 1)
} }
func testDropArchivedMetric(t *testing.T, chunkType byte) { func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, chunkType) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
m1 := clientmodel.Metric{"n1": "v1"} m1 := clientmodel.Metric{"n1": "v1"}
@ -420,7 +421,7 @@ type incrementalBatch struct {
expectedLpToFps index.LabelPairFingerprintsMapping expectedLpToFps index.LabelPairFingerprintsMapping
} }
func testIndexing(t *testing.T, chunkType byte) { func testIndexing(t *testing.T, encoding chunkEncoding) {
batches := []incrementalBatch{ batches := []incrementalBatch{
{ {
fpToMetric: index.FingerprintMetricMapping{ fpToMetric: index.FingerprintMetricMapping{
@ -556,7 +557,7 @@ func testIndexing(t *testing.T, chunkType byte) {
}, },
} }
p, closer := newTestPersistence(t, chunkType) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
indexedFpsToMetrics := index.FingerprintMetricMapping{} indexedFpsToMetrics := index.FingerprintMetricMapping{}

View file

@ -158,8 +158,6 @@ type memorySeries struct {
// a non-persisted head chunk has to be cloned before more samples are // a non-persisted head chunk has to be cloned before more samples are
// appended. // appended.
headChunkUsedByIterator bool headChunkUsedByIterator bool
// Which type of chunk to create if a new chunk is needed.
chunkType byte
} }
// newMemorySeries returns a pointer to a newly allocated memorySeries for the // newMemorySeries returns a pointer to a newly allocated memorySeries for the
@ -167,13 +165,11 @@ type memorySeries struct {
// or (if false) a series for a metric being unarchived, i.e. a series that // or (if false) a series for a metric being unarchived, i.e. a series that
// existed before but has been evicted from memory. If reallyNew is false, // existed before but has been evicted from memory. If reallyNew is false,
// firstTime is ignored (and set to the lowest possible timestamp instead - it // firstTime is ignored (and set to the lowest possible timestamp instead - it
// will be set properly upon the first eviction of chunkDescs). chunkType is the // will be set properly upon the first eviction of chunkDescs).
// type of chunks newly created by this memorySeries.
func newMemorySeries( func newMemorySeries(
m clientmodel.Metric, m clientmodel.Metric,
reallyNew bool, reallyNew bool,
firstTime clientmodel.Timestamp, firstTime clientmodel.Timestamp,
chunkType byte,
) *memorySeries { ) *memorySeries {
if reallyNew { if reallyNew {
firstTime = clientmodel.Earliest firstTime = clientmodel.Earliest
@ -182,7 +178,6 @@ func newMemorySeries(
metric: m, metric: m,
headChunkPersisted: !reallyNew, headChunkPersisted: !reallyNew,
savedFirstTime: firstTime, savedFirstTime: firstTime,
chunkType: chunkType,
} }
if !reallyNew { if !reallyNew {
s.chunkDescsOffset = -1 s.chunkDescsOffset = -1
@ -195,7 +190,7 @@ func newMemorySeries(
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc { func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc {
if len(s.chunkDescs) == 0 || s.headChunkPersisted { if len(s.chunkDescs) == 0 || s.headChunkPersisted {
newHead := newChunkDesc(chunkForType(s.chunkType)) newHead := newChunkDesc(newChunk())
s.chunkDescs = append(s.chunkDescs, newHead) s.chunkDescs = append(s.chunkDescs, newHead)
s.headChunkPersisted = false s.headChunkPersisted = false
} else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 { } else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 {

View file

@ -16,7 +16,6 @@ package local
import ( import (
"container/list" "container/list"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -72,7 +71,6 @@ type memorySeriesStorage struct {
dropAfter time.Duration dropAfter time.Duration
checkpointInterval time.Duration checkpointInterval time.Duration
checkpointDirtySeriesLimit int checkpointDirtySeriesLimit int
chunkType byte
appendQueue chan *clientmodel.Sample appendQueue chan *clientmodel.Sample
appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue. appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue.
@ -110,17 +108,13 @@ type MemorySeriesStorageOptions struct {
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted. PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
ChunkType byte // Chunk type for newly created chunks.
Dirty bool // Force the storage to consider itself dirty on startup. Dirty bool // Force the storage to consider itself dirty on startup.
} }
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage. // has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
if o.ChunkType > 1 { p, err := newPersistence(o.PersistenceStoragePath, o.Dirty)
return nil, fmt.Errorf("unsupported chunk type %d", o.ChunkType)
}
p, err := newPersistence(o.PersistenceStoragePath, o.ChunkType, o.Dirty)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -148,7 +142,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
dropAfter: o.PersistenceRetentionPeriod, dropAfter: o.PersistenceRetentionPeriod,
checkpointInterval: o.CheckpointInterval, checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
chunkType: o.ChunkType,
appendLastTimestamp: clientmodel.Earliest, appendLastTimestamp: clientmodel.Earliest,
appendQueue: make(chan *clientmodel.Sample, appendQueueCap), appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
@ -458,7 +451,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl
s.persistence.indexMetric(fp, m) s.persistence.indexMetric(fp, m)
s.seriesOps.WithLabelValues(create).Inc() s.seriesOps.WithLabelValues(create).Inc()
} }
series = newMemorySeries(m, !unarchived, firstTime, s.chunkType) series = newMemorySeries(m, !unarchived, firstTime)
s.fpToSeries.put(fp, series) s.fpToSeries.put(fp, series)
s.numSeries.Inc() s.numSeries.Inc()
} }

View file

@ -181,7 +181,7 @@ func TestLoop(t *testing.T) {
} }
} }
func testChunk(t *testing.T, chunkType byte) { func testChunk(t *testing.T, encoding chunkEncoding) {
samples := make(clientmodel.Samples, 500000) samples := make(clientmodel.Samples, 500000)
for i := range samples { for i := range samples {
samples[i] = &clientmodel.Sample{ samples[i] = &clientmodel.Sample{
@ -189,7 +189,7 @@ func testChunk(t *testing.T, chunkType byte) {
Value: clientmodel.SampleValue(float64(i) * 0.2), Value: clientmodel.SampleValue(float64(i) * 0.2),
} }
} }
s, closer := NewTestStorage(t, chunkType) s, closer := NewTestStorage(t, encoding)
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) s.AppendSamples(samples)
@ -229,7 +229,7 @@ func TestChunkType1(t *testing.T) {
testChunk(t, 1) testChunk(t, 1)
} }
func testGetValueAtTime(t *testing.T, chunkType byte) { func testGetValueAtTime(t *testing.T, encoding chunkEncoding) {
samples := make(clientmodel.Samples, 1000) samples := make(clientmodel.Samples, 1000)
for i := range samples { for i := range samples {
samples[i] = &clientmodel.Sample{ samples[i] = &clientmodel.Sample{
@ -237,7 +237,7 @@ func testGetValueAtTime(t *testing.T, chunkType byte) {
Value: clientmodel.SampleValue(float64(i) * 0.2), Value: clientmodel.SampleValue(float64(i) * 0.2),
} }
} }
s, closer := NewTestStorage(t, chunkType) s, closer := NewTestStorage(t, encoding)
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) s.AppendSamples(samples)
@ -320,7 +320,7 @@ func TestGetValueAtTimeChunkType1(t *testing.T) {
testGetValueAtTime(t, 1) testGetValueAtTime(t, 1)
} }
func testGetRangeValues(t *testing.T, chunkType byte) { func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
samples := make(clientmodel.Samples, 1000) samples := make(clientmodel.Samples, 1000)
for i := range samples { for i := range samples {
samples[i] = &clientmodel.Sample{ samples[i] = &clientmodel.Sample{
@ -328,7 +328,7 @@ func testGetRangeValues(t *testing.T, chunkType byte) {
Value: clientmodel.SampleValue(float64(i) * 0.2), Value: clientmodel.SampleValue(float64(i) * 0.2),
} }
} }
s, closer := NewTestStorage(t, chunkType) s, closer := NewTestStorage(t, encoding)
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) s.AppendSamples(samples)
@ -470,7 +470,7 @@ func TestGetRangeValuesChunkType1(t *testing.T) {
testGetRangeValues(t, 1) testGetRangeValues(t, 1)
} }
func testEvictAndPurgeSeries(t *testing.T, chunkType byte) { func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
samples := make(clientmodel.Samples, 1000) samples := make(clientmodel.Samples, 1000)
for i := range samples { for i := range samples {
samples[i] = &clientmodel.Sample{ samples[i] = &clientmodel.Sample{
@ -478,7 +478,7 @@ func testEvictAndPurgeSeries(t *testing.T, chunkType byte) {
Value: clientmodel.SampleValue(float64(i * i)), Value: clientmodel.SampleValue(float64(i * i)),
} }
} }
s, closer := NewTestStorage(t, chunkType) s, closer := NewTestStorage(t, encoding)
defer closer.Close() defer closer.Close()
ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods. ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods.
@ -576,7 +576,7 @@ func TestEvictAndPurgeSeriesChunkType1(t *testing.T) {
testEvictAndPurgeSeries(t, 1) testEvictAndPurgeSeries(t, 1)
} }
func benchmarkAppend(b *testing.B, chunkType byte) { func benchmarkAppend(b *testing.B, encoding chunkEncoding) {
samples := make(clientmodel.Samples, b.N) samples := make(clientmodel.Samples, b.N)
for i := range samples { for i := range samples {
samples[i] = &clientmodel.Sample{ samples[i] = &clientmodel.Sample{
@ -590,7 +590,7 @@ func benchmarkAppend(b *testing.B, chunkType byte) {
} }
} }
b.ResetTimer() b.ResetTimer()
s, closer := NewTestStorage(b, chunkType) s, closer := NewTestStorage(b, encoding)
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) s.AppendSamples(samples)
@ -606,14 +606,14 @@ func BenchmarkAppendType1(b *testing.B) {
// Append a large number of random samples and then check if we can get them out // Append a large number of random samples and then check if we can get them out
// of the storage alright. // of the storage alright.
func testFuzz(t *testing.T, chunkType byte) { func testFuzz(t *testing.T, encoding chunkEncoding) {
if testing.Short() { if testing.Short() {
t.Skip("Skipping test in short mode.") t.Skip("Skipping test in short mode.")
} }
check := func(seed int64) bool { check := func(seed int64) bool {
rand.Seed(seed) rand.Seed(seed)
s, c := NewTestStorage(t, chunkType) s, c := NewTestStorage(t, encoding)
defer c.Close() defer c.Close()
samples := createRandomSamples("test_fuzz", 1000) samples := createRandomSamples("test_fuzz", 1000)
@ -645,7 +645,8 @@ func TestFuzzChunkType1(t *testing.T) {
// make things even slower): // make things even slower):
// //
// go test -race -cpu 8 -test=short -bench BenchmarkFuzzChunkType // go test -race -cpu 8 -test=short -bench BenchmarkFuzzChunkType
func benchmarkFuzz(b *testing.B, chunkType byte) { func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
*defaultChunkEncoding = int(encoding)
const samplesPerRun = 100000 const samplesPerRun = 100000
rand.Seed(42) rand.Seed(42)
directory := test.NewTemporaryDirectory("test_storage", b) directory := test.NewTemporaryDirectory("test_storage", b)
@ -655,7 +656,6 @@ func benchmarkFuzz(b *testing.B, chunkType byte) {
PersistenceRetentionPeriod: time.Hour, PersistenceRetentionPeriod: time.Hour,
PersistenceStoragePath: directory.Path(), PersistenceStoragePath: directory.Path(),
CheckpointInterval: time.Second, CheckpointInterval: time.Second,
ChunkType: chunkType,
} }
s, err := NewMemorySeriesStorage(o) s, err := NewMemorySeriesStorage(o)
if err != nil { if err != nil {

View file

@ -37,14 +37,14 @@ func (t *testStorageCloser) Close() {
// NewTestStorage creates a storage instance backed by files in a temporary // NewTestStorage creates a storage instance backed by files in a temporary
// directory. The returned storage is already in serving state. Upon closing the // directory. The returned storage is already in serving state. Upon closing the
// returned test.Closer, the temporary directory is cleaned up. // returned test.Closer, the temporary directory is cleaned up.
func NewTestStorage(t test.T, chunkType byte) (Storage, test.Closer) { func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) {
*defaultChunkEncoding = int(encoding)
directory := test.NewTemporaryDirectory("test_storage", t) directory := test.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{ o := &MemorySeriesStorageOptions{
MemoryChunks: 1000000, MemoryChunks: 1000000,
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging. PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
PersistenceStoragePath: directory.Path(), PersistenceStoragePath: directory.Path(),
CheckpointInterval: time.Hour, CheckpointInterval: time.Hour,
ChunkType: chunkType,
} }
storage, err := NewMemorySeriesStorage(o) storage, err := NewMemorySeriesStorage(o)
if err != nil { if err != nil {