From 5bea942d8e2cc4e8730510332980275cd983a21a Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 13 Mar 2015 15:49:07 +0100 Subject: [PATCH] Improve various things around chunk encoding. A number of mostly minor things: - Rename chunk type -> chunk encoding. - After all, do not carry around the chunk encoding to all parts of the system, but just have one place where the encoding for new chunks is set based on the flag. The new approach has caveats as well, but the polution of so many method signatures is worse. - Use the default chunk encoding for new chunks of existing series. (Previously, only new _series_ would get chunks with the default encoding.) - Use an enum for chunk encoding. (But keep the version number for the flag, for reasons discussed previously.) - Add encoding() to the chunk interface (so that a chunk knows its own encoding - no need to have that in a different top-level function). - Got rid of newFollowUpChunk (which would keep the existing encoding for all chunks of a time series). Now only use newChunk(), which will create a chunk encoding according to the flag. - Simplified transcodeAndAdd. - Reordered methods of deltaEncodedChunk and doubleDeltaEncoded chunk to match the order in the chunk interface. - Only transcode if the chunk is not yet half full. If more than half full, add a new chunk instead. --- main.go | 4 +- storage/local/chunk.go | 42 ++-- storage/local/crashrecovery.go | 2 +- storage/local/delta.go | 220 ++++++++++--------- storage/local/doubledelta.go | 349 +++++++++++++++--------------- storage/local/persistence.go | 19 +- storage/local/persistence_test.go | 37 ++-- storage/local/series.go | 9 +- storage/local/storage.go | 11 +- storage/local/storage_test.go | 28 +-- storage/local/test_helpers.go | 4 +- 11 files changed, 360 insertions(+), 365 deletions(-) diff --git a/main.go b/main.go index 1efeedbd2..fd3c533cd 100644 --- a/main.go +++ b/main.go @@ -54,7 +54,6 @@ var ( samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.") - chunkType = flag.Int("storage.local.chunk-type", 1, "Which chunk encoding version to use. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") @@ -123,8 +122,7 @@ func NewPrometheus() *prometheus { PersistenceQueueCapacity: *persistenceQueueCapacity, CheckpointInterval: *checkpointInterval, CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit, - ChunkType: byte(*chunkType), - Dirty: *storageDirty, + Dirty: *storageDirty, } memStorage, err := local.NewMemorySeriesStorage(o) if err != nil { diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 4dbce0523..58110c350 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -15,6 +15,7 @@ package local import ( "container/list" + "flag" "fmt" "io" "sync" @@ -25,6 +26,17 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) +var ( + defaultChunkEncoding = flag.Int("storage.local.chunk-encoding-version", 1, "Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).") +) + +type chunkEncoding byte + +const ( + delta chunkEncoding = iota + doubleDelta +) + // chunkDesc contains meta-data for a chunk. Many of its methods are // goroutine-safe proxies for chunk methods. type chunkDesc struct { @@ -173,13 +185,14 @@ type chunk interface { // any. The first chunk returned might be the same as the original one // or a newly allocated version. In any case, take the returned chunk as // the relevant one and discard the orginal chunk. - add(*metric.SamplePair) []chunk + add(sample *metric.SamplePair) []chunk clone() chunk firstTime() clientmodel.Timestamp lastTime() clientmodel.Timestamp newIterator() chunkIterator marshal(io.Writer) error unmarshal(io.Reader) error + encoding() chunkEncoding // values returns a channel, from which all sample values in the chunk // can be received in order. The channel is closed after the last // one. It is generally not safe to mutate the chunk while the channel @@ -215,29 +228,22 @@ func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk { head = newChunks[len(newChunks)-1] } newChunks := head.add(s) - body = append(body, newChunks[:len(newChunks)-1]...) - head = newChunks[len(newChunks)-1] - return append(body, head) + return append(body, newChunks...) } -func chunkType(c chunk) byte { - switch c.(type) { - case *deltaEncodedChunk: - return 0 - case *doubleDeltaEncodedChunk: - return 1 - default: - panic(fmt.Errorf("unknown chunk type: %T", c)) - } +// newChunk creates a new chunk according to the encoding set by the +// defaultChunkEncoding flag. +func newChunk() chunk { + return newChunkForEncoding(chunkEncoding(*defaultChunkEncoding)) } -func chunkForType(chunkType byte) chunk { - switch chunkType { - case 0: +func newChunkForEncoding(encoding chunkEncoding) chunk { + switch encoding { + case delta: return newDeltaEncodedChunk(d1, d0, true, chunkLen) - case 1: + case doubleDelta: return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen) default: - panic(fmt.Errorf("unknown chunk type: %d", chunkType)) + panic(fmt.Errorf("unknown chunk encoding: %v", encoding)) } } diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index b27b5a599..0dc741e40 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -341,7 +341,7 @@ func (p *persistence) cleanUpArchiveIndexes( if err := kv.Value(&m); err != nil { return err } - series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest, p.chunkType) + series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest) cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) if err != nil { return err diff --git a/storage/local/delta.go b/storage/local/delta.go index 17c374f76..0e4816935 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -75,37 +75,6 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod return &c } -func (c deltaEncodedChunk) newFollowupChunk() chunk { - return newDeltaEncodedChunk(d1, d0, true, cap(c)) -} - -// clone implements chunk. -func (c deltaEncodedChunk) clone() chunk { - clone := make(deltaEncodedChunk, len(c), cap(c)) - copy(clone, c) - return &clone -} - -func (c deltaEncodedChunk) timeBytes() deltaBytes { - return deltaBytes(c[deltaHeaderTimeBytesOffset]) -} - -func (c deltaEncodedChunk) valueBytes() deltaBytes { - return deltaBytes(c[deltaHeaderValueBytesOffset]) -} - -func (c deltaEncodedChunk) isInt() bool { - return c[deltaHeaderIsIntOffset] == 1 -} - -func (c deltaEncodedChunk) baseTime() clientmodel.Timestamp { - return clientmodel.Timestamp(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:])) -} - -func (c deltaEncodedChunk) baseValue() clientmodel.SampleValue { - return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:]))) -} - // add implements chunk. func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { if c.len() == 0 { @@ -120,7 +89,7 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - overflowChunks := c.newFollowupChunk().add(s) + overflowChunks := newChunk().add(s) return []chunk{&c, overflowChunks[0]} } @@ -131,33 +100,38 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { dv := s.Value - baseValue tb := c.timeBytes() vb := c.valueBytes() + isInt := c.isInt() // If the new sample is incompatible with the current encoding, reencode the // existing chunk data into new chunk(s). - // - // int->float. - if c.isInt() && !isInt64(dv) { - return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false, cap(c)), &c, s) + + ntb, nvb, nInt := tb, vb, isInt + if isInt && !isInt64(dv) { + // int->float. + nvb = d4 + nInt = false + } else if !isInt && vb == d4 && baseValue+clientmodel.SampleValue(float32(dv)) != s.Value { + // float32->float64. + nvb = d8 + } else { + if tb < d8 { + // Maybe more bytes for timestamp. + ntb = max(tb, bytesNeededForUnsignedTimestampDelta(dt)) + } + if c.isInt() && vb < d8 { + // Maybe more bytes for sample value. + nvb = max(vb, bytesNeededForIntegerSampleValueDelta(dv)) + } } - // float32->float64. - if !c.isInt() && vb == d4 && baseValue+clientmodel.SampleValue(float32(dv)) != s.Value { - return transcodeAndAdd(newDeltaEncodedChunk(tb, d8, false, cap(c)), &c, s) + if tb != ntb || vb != nvb || isInt != nInt { + if len(c)*2 < cap(c) { + return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) + } + // Chunk is already half full. Better create a new one and save the transcoding efforts. + overflowChunks := newChunk().add(s) + return []chunk{&c, overflowChunks[0]} } - var ntb, nvb deltaBytes - if tb < d8 { - // Maybe more bytes for timestamp. - ntb = bytesNeededForUnsignedTimestampDelta(dt) - } - if c.isInt() && vb < d8 { - // Maybe more bytes for sample value. - nvb = bytesNeededForIntegerSampleValueDelta(dv) - } - if ntb > tb || nvb > vb { - ntb = max(ntb, tb) - nvb = max(nvb, vb) - return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt(), cap(c)), &c, s) - } offset := len(c) c = c[:offset+sampleSize] @@ -205,15 +179,60 @@ func (c deltaEncodedChunk) add(s *metric.SamplePair) []chunk { return []chunk{&c} } -func (c deltaEncodedChunk) sampleSize() int { - return int(c.timeBytes() + c.valueBytes()) +// clone implements chunk. +func (c deltaEncodedChunk) clone() chunk { + clone := make(deltaEncodedChunk, len(c), cap(c)) + copy(clone, c) + return &clone } -func (c deltaEncodedChunk) len() int { - if len(c) < deltaHeaderBytes { - return 0 +// firstTime implements chunk. +func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp { + return c.valueAtIndex(0).Timestamp +} + +// lastTime implements chunk. +func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp { + return c.valueAtIndex(c.len() - 1).Timestamp +} + +// newIterator implements chunk. +func (c *deltaEncodedChunk) newIterator() chunkIterator { + return &deltaEncodedChunkIterator{ + chunk: c, } - return (len(c) - deltaHeaderBytes) / c.sampleSize() +} + +// marshal implements chunk. +func (c deltaEncodedChunk) marshal(w io.Writer) error { + if len(c) > math.MaxUint16 { + panic("chunk buffer length would overflow a 16 bit uint.") + } + binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c))) + + n, err := w.Write(c[:cap(c)]) + if err != nil { + return err + } + if n != cap(c) { + return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) + } + return nil +} + +// unmarshal implements chunk. +func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { + *c = (*c)[:cap(*c)] + readBytes := 0 + for readBytes < len(*c) { + n, err := r.Read((*c)[readBytes:]) + if err != nil { + return err + } + readBytes += n + } + *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] + return nil } // values implements chunk. @@ -229,6 +248,40 @@ func (c deltaEncodedChunk) values() <-chan *metric.SamplePair { return valuesChan } +// encoding implements chunk. +func (c deltaEncodedChunk) encoding() chunkEncoding { return delta } + +func (c deltaEncodedChunk) timeBytes() deltaBytes { + return deltaBytes(c[deltaHeaderTimeBytesOffset]) +} + +func (c deltaEncodedChunk) valueBytes() deltaBytes { + return deltaBytes(c[deltaHeaderValueBytesOffset]) +} + +func (c deltaEncodedChunk) isInt() bool { + return c[deltaHeaderIsIntOffset] == 1 +} + +func (c deltaEncodedChunk) baseTime() clientmodel.Timestamp { + return clientmodel.Timestamp(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:])) +} + +func (c deltaEncodedChunk) baseValue() clientmodel.SampleValue { + return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:]))) +} + +func (c deltaEncodedChunk) sampleSize() int { + return int(c.timeBytes() + c.valueBytes()) +} + +func (c deltaEncodedChunk) len() int { + if len(c) < deltaHeaderBytes { + return 0 + } + return (len(c) - deltaHeaderBytes) / c.sampleSize() +} + func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { offset := deltaHeaderBytes + idx*c.sampleSize() @@ -281,61 +334,12 @@ func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { } } -// firstTime implements chunk. -func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp { - return c.valueAtIndex(0).Timestamp -} - -// lastTime implements chunk. -func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp { - return c.valueAtIndex(c.len() - 1).Timestamp -} - -// marshal implements chunk. -func (c deltaEncodedChunk) marshal(w io.Writer) error { - if len(c) > math.MaxUint16 { - panic("chunk buffer length would overflow a 16 bit uint.") - } - binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c))) - - n, err := w.Write(c[:cap(c)]) - if err != nil { - return err - } - if n != cap(c) { - return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) - } - return nil -} - -// unmarshal implements chunk. -func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { - *c = (*c)[:cap(*c)] - readBytes := 0 - for readBytes < len(*c) { - n, err := r.Read((*c)[readBytes:]) - if err != nil { - return err - } - readBytes += n - } - *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] - return nil -} - // deltaEncodedChunkIterator implements chunkIterator. type deltaEncodedChunkIterator struct { chunk *deltaEncodedChunk // TODO: add more fields here to keep track of last position. } -// newIterator implements chunk. -func (c *deltaEncodedChunk) newIterator() chunkIterator { - return &deltaEncodedChunkIterator{ - chunk: c, - } -} - // getValueAtTime implements chunkIterator. func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { i := sort.Search(it.chunk.len(), func(i int) bool { diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index c5ad79817..a1c9ad224 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -82,10 +82,183 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub return &c } -func (c doubleDeltaEncodedChunk) newFollowupChunk() chunk { - return newDoubleDeltaEncodedChunk(d1, d0, true, cap(c)) +// add implements chunk. +func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk { + if c.len() == 0 { + return c.addFirstSample(s) + } + + tb := c.timeBytes() + vb := c.valueBytes() + + if c.len() == 1 { + return c.addSecondSample(s, tb, vb) + } + + remainingBytes := cap(c) - len(c) + sampleSize := c.sampleSize() + + // Do we generally have space for another sample in this chunk? If not, + // overflow into a new one. + if remainingBytes < sampleSize { + overflowChunks := newChunk().add(s) + return []chunk{&c, overflowChunks[0]} + } + + projectedTime := c.baseTime() + clientmodel.Timestamp(c.len())*c.baseTimeDelta() + ddt := s.Timestamp - projectedTime + + projectedValue := c.baseValue() + clientmodel.SampleValue(c.len())*c.baseValueDelta() + ddv := s.Value - projectedValue + + ntb, nvb, nInt := tb, vb, c.isInt() + // If the new sample is incompatible with the current encoding, reencode the + // existing chunk data into new chunk(s). + if c.isInt() && !isInt64(ddv) { + // int->float. + nvb = d4 + nInt = false + } else if !c.isInt() && vb == d4 && projectedValue+clientmodel.SampleValue(float32(ddv)) != s.Value { + // float32->float64. + nvb = d8 + } else { + if tb < d8 { + // Maybe more bytes for timestamp. + ntb = max(tb, bytesNeededForSignedTimestampDelta(ddt)) + } + if c.isInt() && vb < d8 { + // Maybe more bytes for sample value. + nvb = max(vb, bytesNeededForIntegerSampleValueDelta(ddv)) + } + } + if tb != ntb || vb != nvb || c.isInt() != nInt { + if len(c)*2 < cap(c) { + return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) + } + // Chunk is already half full. Better create a new one and save the transcoding efforts. + overflowChunks := newChunk().add(s) + return []chunk{&c, overflowChunks[0]} + } + + offset := len(c) + c = c[:offset+sampleSize] + + switch tb { + case d1: + c[offset] = byte(ddt) + case d2: + binary.LittleEndian.PutUint16(c[offset:], uint16(ddt)) + case d4: + binary.LittleEndian.PutUint32(c[offset:], uint32(ddt)) + case d8: + // Store the absolute value (no delta) in case of d8. + binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) + default: + panic("invalid number of bytes for time delta") + } + + offset += int(tb) + + if c.isInt() { + switch vb { + case d0: + // No-op. Constant delta is stored as base value. + case d1: + c[offset] = byte(ddv) + case d2: + binary.LittleEndian.PutUint16(c[offset:], uint16(ddv)) + case d4: + binary.LittleEndian.PutUint32(c[offset:], uint32(ddv)) + // d8 must not happen. Those samples are encoded as float64. + default: + panic("invalid number of bytes for integer delta") + } + } else { + switch vb { + case d4: + binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv))) + case d8: + // Store the absolute value (no delta) in case of d8. + binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) + default: + panic("invalid number of bytes for floating point delta") + } + } + return []chunk{&c} } +// clone implements chunk. +func (c doubleDeltaEncodedChunk) clone() chunk { + clone := make(doubleDeltaEncodedChunk, len(c), cap(c)) + copy(clone, c) + return &clone +} + +// firstTime implements chunk. +func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp { + return c.baseTime() +} + +// lastTime implements chunk. +func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp { + return c.valueAtIndex(c.len() - 1).Timestamp +} + +// newIterator implements chunk. +func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { + return &doubleDeltaEncodedChunkIterator{ + chunk: c, + } +} + +// marshal implements chunk. +func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { + if len(c) > math.MaxUint16 { + panic("chunk buffer length would overflow a 16 bit uint.") + } + binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c))) + + n, err := w.Write(c[:cap(c)]) + if err != nil { + return err + } + if n != cap(c) { + return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) + } + return nil +} + +// unmarshal implements chunk. +func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { + *c = (*c)[:cap(*c)] + readBytes := 0 + for readBytes < len(*c) { + n, err := r.Read((*c)[readBytes:]) + if err != nil { + return err + } + readBytes += n + } + *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] + return nil +} + +// values implements chunk. +func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair { + n := c.len() + valuesChan := make(chan *metric.SamplePair) + go func() { + for i := 0; i < n; i++ { + valuesChan <- c.valueAtIndex(i) + } + close(valuesChan) + }() + return valuesChan +} + +// encoding implements chunk. +func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta } + func (c doubleDeltaEncodedChunk) baseTime() clientmodel.Timestamp { return clientmodel.Timestamp( binary.LittleEndian.Uint64( @@ -148,109 +321,6 @@ func (c doubleDeltaEncodedChunk) isInt() bool { return c[doubleDeltaHeaderIsIntOffset] == 1 } -// add implements chunk. -func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk { - if c.len() == 0 { - return c.addFirstSample(s) - } - - tb := c.timeBytes() - vb := c.valueBytes() - - if c.len() == 1 { - return c.addSecondSample(s, tb, vb) - } - - remainingBytes := cap(c) - len(c) - sampleSize := c.sampleSize() - - // Do we generally have space for another sample in this chunk? If not, - // overflow into a new one. - if remainingBytes < sampleSize { - overflowChunks := c.newFollowupChunk().add(s) - return []chunk{&c, overflowChunks[0]} - } - - projectedTime := c.baseTime() + clientmodel.Timestamp(c.len())*c.baseTimeDelta() - ddt := s.Timestamp - projectedTime - - projectedValue := c.baseValue() + clientmodel.SampleValue(c.len())*c.baseValueDelta() - ddv := s.Value - projectedValue - - // If the new sample is incompatible with the current encoding, reencode the - // existing chunk data into new chunk(s). - // - // int->float. - if c.isInt() && !isInt64(ddv) { - return transcodeAndAdd(newDoubleDeltaEncodedChunk(tb, d4, false, cap(c)), &c, s) - } - // float32->float64. - if !c.isInt() && vb == d4 && projectedValue+clientmodel.SampleValue(float32(ddv)) != s.Value { - return transcodeAndAdd(newDoubleDeltaEncodedChunk(tb, d8, false, cap(c)), &c, s) - } - - var ntb, nvb deltaBytes - if tb < d8 { - // Maybe more bytes for timestamp. - ntb = bytesNeededForSignedTimestampDelta(ddt) - } - if c.isInt() && vb < d8 { - // Maybe more bytes for sample value. - nvb = bytesNeededForIntegerSampleValueDelta(ddv) - } - if ntb > tb || nvb > vb { - ntb = max(ntb, tb) - nvb = max(nvb, vb) - return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, c.isInt(), cap(c)), &c, s) - } - - offset := len(c) - c = c[:offset+sampleSize] - - switch tb { - case d1: - c[offset] = byte(ddt) - case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(ddt)) - case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(ddt)) - case d8: - // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) - default: - panic("invalid number of bytes for time delta") - } - - offset += int(tb) - - if c.isInt() { - switch vb { - case d0: - // No-op. Constant delta is stored as base value. - case d1: - c[offset] = byte(ddv) - case d2: - binary.LittleEndian.PutUint16(c[offset:], uint16(ddv)) - case d4: - binary.LittleEndian.PutUint32(c[offset:], uint32(ddv)) - // d8 must not happen. Those samples are encoded as float64. - default: - panic("invalid number of bytes for integer delta") - } - } else { - switch vb { - case d4: - binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv))) - case d8: - // Store the absolute value (no delta) in case of d8. - binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) - default: - panic("invalid number of bytes for floating point delta") - } - } - return []chunk{&c} -} - // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk { @@ -315,26 +385,6 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb de return []chunk{&c} } -// clone implements chunk. -func (c doubleDeltaEncodedChunk) clone() chunk { - clone := make(doubleDeltaEncodedChunk, len(c), cap(c)) - copy(clone, c) - return &clone -} - -// values implements chunk. -func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair { - n := c.len() - valuesChan := make(chan *metric.SamplePair) - go func() { - for i := 0; i < n; i++ { - valuesChan <- c.valueAtIndex(i) - } - close(valuesChan) - }() - return valuesChan -} - func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { if idx == 0 { return &metric.SamplePair{ @@ -424,61 +474,12 @@ func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { } } -// firstTime implements chunk. -func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp { - return c.baseTime() -} - -// lastTime implements chunk. -func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp { - return c.valueAtIndex(c.len() - 1).Timestamp -} - -// marshal implements chunk. -func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { - if len(c) > math.MaxUint16 { - panic("chunk buffer length would overflow a 16 bit uint.") - } - binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c))) - - n, err := w.Write(c[:cap(c)]) - if err != nil { - return err - } - if n != cap(c) { - return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) - } - return nil -} - -// unmarshal implements chunk. -func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { - *c = (*c)[:cap(*c)] - readBytes := 0 - for readBytes < len(*c) { - n, err := r.Read((*c)[readBytes:]) - if err != nil { - return err - } - readBytes += n - } - *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] - return nil -} - // doubleDeltaEncodedChunkIterator implements chunkIterator. type doubleDeltaEncodedChunkIterator struct { chunk *doubleDeltaEncodedChunk // TODO(beorn7): add more fields here to keep track of last position. } -// newIterator implements chunk. -func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { - return &doubleDeltaEncodedChunkIterator{ - chunk: c, - } -} - // getValueAtTime implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { // TODO(beorn7): Implement in a more efficient way making use of the diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 590a6256a..74a02da6c 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -95,8 +95,7 @@ type indexingOp struct { // dropChunks, loadChunks, and loadChunkDescs can be called concurrently with // each other if each call refers to a different fingerprint. type persistence struct { - basePath string - chunkType byte + basePath string archivedFingerprintToMetrics *index.FingerprintMetricIndex archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex @@ -121,7 +120,7 @@ type persistence struct { } // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. -func newPersistence(basePath string, chunkType byte, dirty bool) (*persistence, error) { +func newPersistence(basePath string, dirty bool) (*persistence, error) { dirtyPath := filepath.Join(basePath, dirtyFileName) versionPath := filepath.Join(basePath, versionFileName) @@ -178,8 +177,7 @@ func newPersistence(basePath string, chunkType byte, dirty bool) (*persistence, } p := &persistence{ - basePath: basePath, - chunkType: chunkType, + basePath: basePath, archivedFingerprintToMetrics: archivedFingerprintToMetrics, archivedFingerprintToTimeRange: archivedFingerprintToTimeRange, @@ -396,7 +394,7 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde if err != nil { return nil, err } - chunk := chunkForType(typeBuf[0]) + chunk := newChunkForEncoding(chunkEncoding(typeBuf[0])) chunk.unmarshal(f) chunks = append(chunks, chunk) } @@ -590,7 +588,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } } else { // This is the non-persisted head chunk. Fully marshal it. - if err = w.WriteByte(chunkType(chunkDesc.chunk)); err != nil { + if err = w.WriteByte(byte(chunkDesc.chunk.encoding())); err != nil { return } if err = chunkDesc.chunk.marshal(w); err != nil { @@ -742,13 +740,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { } } else { // Non-persisted head chunk. - chunkType, err := r.ReadByte() + encoding, err := r.ReadByte() if err != nil { glog.Warning("Could not decode chunk type:", err) p.dirty = true return sm, nil } - chunk := chunkForType(chunkType) + chunk := newChunkForEncoding(chunkEncoding(encoding)) if err := chunk.unmarshal(r); err != nil { glog.Warning("Could not decode chunk type:", err) p.dirty = true @@ -771,7 +769,6 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: clientmodel.Timestamp(savedFirstTime), headChunkPersisted: headChunkPersisted, - chunkType: p.chunkType, } } return sm, nil @@ -1096,7 +1093,7 @@ func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.F func writeChunkHeader(w io.Writer, c chunk) error { header := make([]byte, chunkHeaderLen) - header[chunkHeaderTypeOffset] = chunkType(c) + header[chunkHeaderTypeOffset] = byte(c.encoding()) binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime())) binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime())) _, err := w.Write(header) diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index a33c5fba4..382eb034a 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -31,9 +31,10 @@ var ( m3 = clientmodel.Metric{"label": "value3"} ) -func newTestPersistence(t *testing.T, chunkType byte) (*persistence, test.Closer) { +func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) { + *defaultChunkEncoding = int(encoding) dir := test.NewTemporaryDirectory("test_persistence", t) - p, err := newPersistence(dir.Path(), chunkType, false) + p, err := newPersistence(dir.Path(), false) if err != nil { dir.Close() t.Fatal(err) @@ -44,7 +45,7 @@ func newTestPersistence(t *testing.T, chunkType byte) (*persistence, test.Closer }) } -func buildTestChunks(chunkType byte) map[clientmodel.Fingerprint][]chunk { +func buildTestChunks(encoding chunkEncoding) map[clientmodel.Fingerprint][]chunk { fps := clientmodel.Fingerprints{ m1.Fingerprint(), m2.Fingerprint(), @@ -55,7 +56,7 @@ func buildTestChunks(chunkType byte) map[clientmodel.Fingerprint][]chunk { for _, fp := range fps { fpToChunks[fp] = make([]chunk, 0, 10) for i := 0; i < 10; i++ { - fpToChunks[fp] = append(fpToChunks[fp], chunkForType(chunkType).add(&metric.SamplePair{ + fpToChunks[fp] = append(fpToChunks[fp], newChunkForEncoding(encoding).add(&metric.SamplePair{ Timestamp: clientmodel.Timestamp(i), Value: clientmodel.SampleValue(fp), })[0]) @@ -75,11 +76,11 @@ func chunksEqual(c1, c2 chunk) bool { return true } -func testPersistLoadDropChunks(t *testing.T, chunkType byte) { - p, closer := newTestPersistence(t, chunkType) +func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { + p, closer := newTestPersistence(t, encoding) defer closer.Close() - fpToChunks := buildTestChunks(chunkType) + fpToChunks := buildTestChunks(encoding) for fp, chunks := range fpToChunks { for i, c := range chunks { @@ -191,15 +192,15 @@ func TestPersistLoadDropChunksType1(t *testing.T) { testPersistLoadDropChunks(t, 1) } -func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, chunkType byte) { - p, closer := newTestPersistence(t, chunkType) +func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding) { + p, closer := newTestPersistence(t, encoding) defer closer.Close() fpLocker := newFingerprintLocker(10) sm := newSeriesMap() - s1 := newMemorySeries(m1, true, 0, chunkType) - s2 := newMemorySeries(m2, false, 0, chunkType) - s3 := newMemorySeries(m3, false, 0, chunkType) + s1 := newMemorySeries(m1, true, 0) + s2 := newMemorySeries(m2, false, 0) + s3 := newMemorySeries(m3, false, 0) s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14}) s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkPersisted = true @@ -260,8 +261,8 @@ func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) { testCheckpointAndLoadSeriesMapAndHeads(t, 1) } -func testGetFingerprintsModifiedBefore(t *testing.T, chunkType byte) { - p, closer := newTestPersistence(t, chunkType) +func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { + p, closer := newTestPersistence(t, encoding) defer closer.Close() m1 := clientmodel.Metric{"n1": "v1"} @@ -338,8 +339,8 @@ func TestGetFingerprintsModifiedBeforeChunkType1(t *testing.T) { testGetFingerprintsModifiedBefore(t, 1) } -func testDropArchivedMetric(t *testing.T, chunkType byte) { - p, closer := newTestPersistence(t, chunkType) +func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { + p, closer := newTestPersistence(t, encoding) defer closer.Close() m1 := clientmodel.Metric{"n1": "v1"} @@ -420,7 +421,7 @@ type incrementalBatch struct { expectedLpToFps index.LabelPairFingerprintsMapping } -func testIndexing(t *testing.T, chunkType byte) { +func testIndexing(t *testing.T, encoding chunkEncoding) { batches := []incrementalBatch{ { fpToMetric: index.FingerprintMetricMapping{ @@ -556,7 +557,7 @@ func testIndexing(t *testing.T, chunkType byte) { }, } - p, closer := newTestPersistence(t, chunkType) + p, closer := newTestPersistence(t, encoding) defer closer.Close() indexedFpsToMetrics := index.FingerprintMetricMapping{} diff --git a/storage/local/series.go b/storage/local/series.go index c50bb9412..7a4ace7fc 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -158,8 +158,6 @@ type memorySeries struct { // a non-persisted head chunk has to be cloned before more samples are // appended. headChunkUsedByIterator bool - // Which type of chunk to create if a new chunk is needed. - chunkType byte } // newMemorySeries returns a pointer to a newly allocated memorySeries for the @@ -167,13 +165,11 @@ type memorySeries struct { // or (if false) a series for a metric being unarchived, i.e. a series that // existed before but has been evicted from memory. If reallyNew is false, // firstTime is ignored (and set to the lowest possible timestamp instead - it -// will be set properly upon the first eviction of chunkDescs). chunkType is the -// type of chunks newly created by this memorySeries. +// will be set properly upon the first eviction of chunkDescs). func newMemorySeries( m clientmodel.Metric, reallyNew bool, firstTime clientmodel.Timestamp, - chunkType byte, ) *memorySeries { if reallyNew { firstTime = clientmodel.Earliest @@ -182,7 +178,6 @@ func newMemorySeries( metric: m, headChunkPersisted: !reallyNew, savedFirstTime: firstTime, - chunkType: chunkType, } if !reallyNew { s.chunkDescsOffset = -1 @@ -195,7 +190,7 @@ func newMemorySeries( // The caller must have locked the fingerprint of the series. func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc { if len(s.chunkDescs) == 0 || s.headChunkPersisted { - newHead := newChunkDesc(chunkForType(s.chunkType)) + newHead := newChunkDesc(newChunk()) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkPersisted = false } else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 { diff --git a/storage/local/storage.go b/storage/local/storage.go index c655e5311..d71117050 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -16,7 +16,6 @@ package local import ( "container/list" - "fmt" "sync" "sync/atomic" "time" @@ -72,7 +71,6 @@ type memorySeriesStorage struct { dropAfter time.Duration checkpointInterval time.Duration checkpointDirtySeriesLimit int - chunkType byte appendQueue chan *clientmodel.Sample appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue. @@ -110,17 +108,13 @@ type MemorySeriesStorageOptions struct { PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. - ChunkType byte // Chunk type for newly created chunks. Dirty bool // Force the storage to consider itself dirty on startup. } // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // has to be called to start the storage. func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { - if o.ChunkType > 1 { - return nil, fmt.Errorf("unsupported chunk type %d", o.ChunkType) - } - p, err := newPersistence(o.PersistenceStoragePath, o.ChunkType, o.Dirty) + p, err := newPersistence(o.PersistenceStoragePath, o.Dirty) if err != nil { return nil, err } @@ -148,7 +142,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, - chunkType: o.ChunkType, appendLastTimestamp: clientmodel.Earliest, appendQueue: make(chan *clientmodel.Sample, appendQueueCap), @@ -458,7 +451,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl s.persistence.indexMetric(fp, m) s.seriesOps.WithLabelValues(create).Inc() } - series = newMemorySeries(m, !unarchived, firstTime, s.chunkType) + series = newMemorySeries(m, !unarchived, firstTime) s.fpToSeries.put(fp, series) s.numSeries.Inc() } diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index d045dee77..7e773dcc0 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -181,7 +181,7 @@ func TestLoop(t *testing.T) { } } -func testChunk(t *testing.T, chunkType byte) { +func testChunk(t *testing.T, encoding chunkEncoding) { samples := make(clientmodel.Samples, 500000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -189,7 +189,7 @@ func testChunk(t *testing.T, chunkType byte) { Value: clientmodel.SampleValue(float64(i) * 0.2), } } - s, closer := NewTestStorage(t, chunkType) + s, closer := NewTestStorage(t, encoding) defer closer.Close() s.AppendSamples(samples) @@ -229,7 +229,7 @@ func TestChunkType1(t *testing.T) { testChunk(t, 1) } -func testGetValueAtTime(t *testing.T, chunkType byte) { +func testGetValueAtTime(t *testing.T, encoding chunkEncoding) { samples := make(clientmodel.Samples, 1000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -237,7 +237,7 @@ func testGetValueAtTime(t *testing.T, chunkType byte) { Value: clientmodel.SampleValue(float64(i) * 0.2), } } - s, closer := NewTestStorage(t, chunkType) + s, closer := NewTestStorage(t, encoding) defer closer.Close() s.AppendSamples(samples) @@ -320,7 +320,7 @@ func TestGetValueAtTimeChunkType1(t *testing.T) { testGetValueAtTime(t, 1) } -func testGetRangeValues(t *testing.T, chunkType byte) { +func testGetRangeValues(t *testing.T, encoding chunkEncoding) { samples := make(clientmodel.Samples, 1000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -328,7 +328,7 @@ func testGetRangeValues(t *testing.T, chunkType byte) { Value: clientmodel.SampleValue(float64(i) * 0.2), } } - s, closer := NewTestStorage(t, chunkType) + s, closer := NewTestStorage(t, encoding) defer closer.Close() s.AppendSamples(samples) @@ -470,7 +470,7 @@ func TestGetRangeValuesChunkType1(t *testing.T) { testGetRangeValues(t, 1) } -func testEvictAndPurgeSeries(t *testing.T, chunkType byte) { +func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { samples := make(clientmodel.Samples, 1000) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -478,7 +478,7 @@ func testEvictAndPurgeSeries(t *testing.T, chunkType byte) { Value: clientmodel.SampleValue(float64(i * i)), } } - s, closer := NewTestStorage(t, chunkType) + s, closer := NewTestStorage(t, encoding) defer closer.Close() ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods. @@ -576,7 +576,7 @@ func TestEvictAndPurgeSeriesChunkType1(t *testing.T) { testEvictAndPurgeSeries(t, 1) } -func benchmarkAppend(b *testing.B, chunkType byte) { +func benchmarkAppend(b *testing.B, encoding chunkEncoding) { samples := make(clientmodel.Samples, b.N) for i := range samples { samples[i] = &clientmodel.Sample{ @@ -590,7 +590,7 @@ func benchmarkAppend(b *testing.B, chunkType byte) { } } b.ResetTimer() - s, closer := NewTestStorage(b, chunkType) + s, closer := NewTestStorage(b, encoding) defer closer.Close() s.AppendSamples(samples) @@ -606,14 +606,14 @@ func BenchmarkAppendType1(b *testing.B) { // Append a large number of random samples and then check if we can get them out // of the storage alright. -func testFuzz(t *testing.T, chunkType byte) { +func testFuzz(t *testing.T, encoding chunkEncoding) { if testing.Short() { t.Skip("Skipping test in short mode.") } check := func(seed int64) bool { rand.Seed(seed) - s, c := NewTestStorage(t, chunkType) + s, c := NewTestStorage(t, encoding) defer c.Close() samples := createRandomSamples("test_fuzz", 1000) @@ -645,7 +645,8 @@ func TestFuzzChunkType1(t *testing.T) { // make things even slower): // // go test -race -cpu 8 -test=short -bench BenchmarkFuzzChunkType -func benchmarkFuzz(b *testing.B, chunkType byte) { +func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { + *defaultChunkEncoding = int(encoding) const samplesPerRun = 100000 rand.Seed(42) directory := test.NewTemporaryDirectory("test_storage", b) @@ -655,7 +656,6 @@ func benchmarkFuzz(b *testing.B, chunkType byte) { PersistenceRetentionPeriod: time.Hour, PersistenceStoragePath: directory.Path(), CheckpointInterval: time.Second, - ChunkType: chunkType, } s, err := NewMemorySeriesStorage(o) if err != nil { diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 319398497..2fc78d429 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -37,14 +37,14 @@ func (t *testStorageCloser) Close() { // NewTestStorage creates a storage instance backed by files in a temporary // directory. The returned storage is already in serving state. Upon closing the // returned test.Closer, the temporary directory is cleaned up. -func NewTestStorage(t test.T, chunkType byte) (Storage, test.Closer) { +func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) { + *defaultChunkEncoding = int(encoding) directory := test.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ MemoryChunks: 1000000, PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging. PersistenceStoragePath: directory.Path(), CheckpointInterval: time.Hour, - ChunkType: chunkType, } storage, err := NewMemorySeriesStorage(o) if err != nil {