mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Add chunk encoding for float histogram (#11716)
Signed-off-by: Marc Tudurí <marctc@protonmail.com> Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Marc Tudurí <marctc@protonmail.com>
This commit is contained in:
parent
46fb802791
commit
6fd89a6fd2
|
@ -30,6 +30,7 @@ const (
|
||||||
EncNone Encoding = iota
|
EncNone Encoding = iota
|
||||||
EncXOR
|
EncXOR
|
||||||
EncHistogram
|
EncHistogram
|
||||||
|
EncFloatHistogram
|
||||||
)
|
)
|
||||||
|
|
||||||
func (e Encoding) String() string {
|
func (e Encoding) String() string {
|
||||||
|
@ -40,6 +41,8 @@ func (e Encoding) String() string {
|
||||||
return "XOR"
|
return "XOR"
|
||||||
case EncHistogram:
|
case EncHistogram:
|
||||||
return "histogram"
|
return "histogram"
|
||||||
|
case EncFloatHistogram:
|
||||||
|
return "floathistogram"
|
||||||
}
|
}
|
||||||
return "<unknown>"
|
return "<unknown>"
|
||||||
}
|
}
|
||||||
|
@ -57,7 +60,7 @@ func IsOutOfOrderChunk(e Encoding) bool {
|
||||||
|
|
||||||
// IsValidEncoding returns true for supported encodings.
|
// IsValidEncoding returns true for supported encodings.
|
||||||
func IsValidEncoding(e Encoding) bool {
|
func IsValidEncoding(e Encoding) bool {
|
||||||
return e == EncXOR || e == EncOOOXOR || e == EncHistogram
|
return e == EncXOR || e == EncOOOXOR || e == EncHistogram || e == EncFloatHistogram
|
||||||
}
|
}
|
||||||
|
|
||||||
// Chunk holds a sequence of sample pairs that can be iterated over and appended to.
|
// Chunk holds a sequence of sample pairs that can be iterated over and appended to.
|
||||||
|
@ -91,6 +94,7 @@ type Chunk interface {
|
||||||
type Appender interface {
|
type Appender interface {
|
||||||
Append(int64, float64)
|
Append(int64, float64)
|
||||||
AppendHistogram(t int64, h *histogram.Histogram)
|
AppendHistogram(t int64, h *histogram.Histogram)
|
||||||
|
AppendFloatHistogram(t int64, h *histogram.FloatHistogram)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterator is a simple iterator that can only get the next value.
|
// Iterator is a simple iterator that can only get the next value.
|
||||||
|
@ -159,6 +163,8 @@ func (v ValueType) ChunkEncoding() Encoding {
|
||||||
return EncXOR
|
return EncXOR
|
||||||
case ValHistogram:
|
case ValHistogram:
|
||||||
return EncHistogram
|
return EncHistogram
|
||||||
|
case ValFloatHistogram:
|
||||||
|
return EncFloatHistogram
|
||||||
default:
|
default:
|
||||||
return EncNone
|
return EncNone
|
||||||
}
|
}
|
||||||
|
@ -230,6 +236,7 @@ type Pool interface {
|
||||||
type pool struct {
|
type pool struct {
|
||||||
xor sync.Pool
|
xor sync.Pool
|
||||||
histogram sync.Pool
|
histogram sync.Pool
|
||||||
|
floatHistogram sync.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPool returns a new pool.
|
// NewPool returns a new pool.
|
||||||
|
@ -245,6 +252,11 @@ func NewPool() Pool {
|
||||||
return &HistogramChunk{b: bstream{}}
|
return &HistogramChunk{b: bstream{}}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
floatHistogram: sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return &FloatHistogramChunk{b: bstream{}}
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,6 +272,11 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
|
||||||
c.b.stream = b
|
c.b.stream = b
|
||||||
c.b.count = 0
|
c.b.count = 0
|
||||||
return c, nil
|
return c, nil
|
||||||
|
case EncFloatHistogram:
|
||||||
|
c := p.floatHistogram.Get().(*FloatHistogramChunk)
|
||||||
|
c.b.stream = b
|
||||||
|
c.b.count = 0
|
||||||
|
return c, nil
|
||||||
}
|
}
|
||||||
return nil, errors.Errorf("invalid chunk encoding %q", e)
|
return nil, errors.Errorf("invalid chunk encoding %q", e)
|
||||||
}
|
}
|
||||||
|
@ -288,6 +305,17 @@ func (p *pool) Put(c Chunk) error {
|
||||||
sh.b.stream = nil
|
sh.b.stream = nil
|
||||||
sh.b.count = 0
|
sh.b.count = 0
|
||||||
p.histogram.Put(c)
|
p.histogram.Put(c)
|
||||||
|
case EncFloatHistogram:
|
||||||
|
sh, ok := c.(*FloatHistogramChunk)
|
||||||
|
// This may happen often with wrapped chunks. Nothing we can really do about
|
||||||
|
// it but returning an error would cause a lot of allocations again. Thus,
|
||||||
|
// we just skip it.
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
sh.b.stream = nil
|
||||||
|
sh.b.count = 0
|
||||||
|
p.floatHistogram.Put(c)
|
||||||
default:
|
default:
|
||||||
return errors.Errorf("invalid chunk encoding %q", c.Encoding())
|
return errors.Errorf("invalid chunk encoding %q", c.Encoding())
|
||||||
}
|
}
|
||||||
|
@ -303,6 +331,8 @@ func FromData(e Encoding, d []byte) (Chunk, error) {
|
||||||
return &XORChunk{b: bstream{count: 0, stream: d}}, nil
|
return &XORChunk{b: bstream{count: 0, stream: d}}, nil
|
||||||
case EncHistogram:
|
case EncHistogram:
|
||||||
return &HistogramChunk{b: bstream{count: 0, stream: d}}, nil
|
return &HistogramChunk{b: bstream{count: 0, stream: d}}, nil
|
||||||
|
case EncFloatHistogram:
|
||||||
|
return &FloatHistogramChunk{b: bstream{count: 0, stream: d}}, nil
|
||||||
}
|
}
|
||||||
return nil, errors.Errorf("invalid chunk encoding %q", e)
|
return nil, errors.Errorf("invalid chunk encoding %q", e)
|
||||||
}
|
}
|
||||||
|
@ -314,6 +344,8 @@ func NewEmptyChunk(e Encoding) (Chunk, error) {
|
||||||
return NewXORChunk(), nil
|
return NewXORChunk(), nil
|
||||||
case EncHistogram:
|
case EncHistogram:
|
||||||
return NewHistogramChunk(), nil
|
return NewHistogramChunk(), nil
|
||||||
|
case EncFloatHistogram:
|
||||||
|
return NewFloatHistogramChunk(), nil
|
||||||
}
|
}
|
||||||
return nil, errors.Errorf("invalid chunk encoding %q", e)
|
return nil, errors.Errorf("invalid chunk encoding %q", e)
|
||||||
}
|
}
|
||||||
|
|
759
tsdb/chunkenc/float_histogram.go
Normal file
759
tsdb/chunkenc/float_histogram.go
Normal file
|
@ -0,0 +1,759 @@
|
||||||
|
// Copyright 2022 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package chunkenc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"math"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
"github.com/prometheus/prometheus/model/value"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FloatHistogramChunk holds encoded sample data for a sparse, high-resolution
|
||||||
|
// float histogram.
|
||||||
|
//
|
||||||
|
// Each sample has multiple "fields", stored in the following way (raw = store
|
||||||
|
// number directly, delta = store delta to the previous number, dod = store
|
||||||
|
// delta of the delta to the previous number, xor = what we do for regular
|
||||||
|
// sample values):
|
||||||
|
//
|
||||||
|
// field → ts count zeroCount sum []posbuckets []negbuckets
|
||||||
|
// sample 1 raw raw raw raw []raw []raw
|
||||||
|
// sample 2 delta xor xor xor []xor []xor
|
||||||
|
// sample >2 dod xor xor xor []xor []xor
|
||||||
|
type FloatHistogramChunk struct {
|
||||||
|
b bstream
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFloatHistogramChunk returns a new chunk with float histogram encoding.
|
||||||
|
func NewFloatHistogramChunk() *FloatHistogramChunk {
|
||||||
|
b := make([]byte, 3, 128)
|
||||||
|
return &FloatHistogramChunk{b: bstream{stream: b, count: 0}}
|
||||||
|
}
|
||||||
|
|
||||||
|
// xorValue holds all the necessary information to encode
|
||||||
|
// and decode XOR encoded float64 values.
|
||||||
|
type xorValue struct {
|
||||||
|
value float64
|
||||||
|
leading uint8
|
||||||
|
trailing uint8
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encoding returns the encoding type.
|
||||||
|
func (c *FloatHistogramChunk) Encoding() Encoding {
|
||||||
|
return EncFloatHistogram
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bytes returns the underlying byte slice of the chunk.
|
||||||
|
func (c *FloatHistogramChunk) Bytes() []byte {
|
||||||
|
return c.b.bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NumSamples returns the number of samples in the chunk.
|
||||||
|
func (c *FloatHistogramChunk) NumSamples() int {
|
||||||
|
return int(binary.BigEndian.Uint16(c.Bytes()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Layout returns the histogram layout. Only call this on chunks that have at
|
||||||
|
// least one sample.
|
||||||
|
func (c *FloatHistogramChunk) Layout() (
|
||||||
|
schema int32, zeroThreshold float64,
|
||||||
|
negativeSpans, positiveSpans []histogram.Span,
|
||||||
|
err error,
|
||||||
|
) {
|
||||||
|
if c.NumSamples() == 0 {
|
||||||
|
panic("FloatHistogramChunk.Layout() called on an empty chunk")
|
||||||
|
}
|
||||||
|
b := newBReader(c.Bytes()[2:])
|
||||||
|
return readHistogramChunkLayout(&b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetCounterResetHeader sets the counter reset header.
|
||||||
|
func (c *FloatHistogramChunk) SetCounterResetHeader(h CounterResetHeader) {
|
||||||
|
setCounterResetHeader(h, c.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCounterResetHeader returns the info about the first 2 bits of the chunk
|
||||||
|
// header.
|
||||||
|
func (c *FloatHistogramChunk) GetCounterResetHeader() CounterResetHeader {
|
||||||
|
return CounterResetHeader(c.Bytes()[2] & 0b11000000)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compact implements the Chunk interface.
|
||||||
|
func (c *FloatHistogramChunk) Compact() {
|
||||||
|
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
|
||||||
|
buf := make([]byte, l)
|
||||||
|
copy(buf, c.b.stream)
|
||||||
|
c.b.stream = buf
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Appender implements the Chunk interface.
|
||||||
|
func (c *FloatHistogramChunk) Appender() (Appender, error) {
|
||||||
|
it := c.iterator(nil)
|
||||||
|
|
||||||
|
// To get an appender, we must know the state it would have if we had
|
||||||
|
// appended all existing data from scratch. We iterate through the end
|
||||||
|
// and populate via the iterator's state.
|
||||||
|
for it.Next() == ValFloatHistogram {
|
||||||
|
}
|
||||||
|
if err := it.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pBuckets := make([]xorValue, len(it.pBuckets))
|
||||||
|
for i := 0; i < len(it.pBuckets); i++ {
|
||||||
|
pBuckets[i] = xorValue{
|
||||||
|
value: it.pBuckets[i],
|
||||||
|
leading: it.pBucketsLeading[i],
|
||||||
|
trailing: it.pBucketsTrailing[i],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nBuckets := make([]xorValue, len(it.nBuckets))
|
||||||
|
for i := 0; i < len(it.nBuckets); i++ {
|
||||||
|
nBuckets[i] = xorValue{
|
||||||
|
value: it.nBuckets[i],
|
||||||
|
leading: it.nBucketsLeading[i],
|
||||||
|
trailing: it.nBucketsTrailing[i],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
a := &FloatHistogramAppender{
|
||||||
|
b: &c.b,
|
||||||
|
|
||||||
|
schema: it.schema,
|
||||||
|
zThreshold: it.zThreshold,
|
||||||
|
pSpans: it.pSpans,
|
||||||
|
nSpans: it.nSpans,
|
||||||
|
t: it.t,
|
||||||
|
tDelta: it.tDelta,
|
||||||
|
cnt: it.cnt,
|
||||||
|
zCnt: it.zCnt,
|
||||||
|
pBuckets: pBuckets,
|
||||||
|
nBuckets: nBuckets,
|
||||||
|
sum: it.sum,
|
||||||
|
}
|
||||||
|
if it.numTotal == 0 {
|
||||||
|
a.sum.leading = 0xff
|
||||||
|
a.cnt.leading = 0xff
|
||||||
|
a.zCnt.leading = 0xff
|
||||||
|
}
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FloatHistogramChunk) iterator(it Iterator) *floatHistogramIterator {
|
||||||
|
// This comment is copied from XORChunk.iterator:
|
||||||
|
// Should iterators guarantee to act on a copy of the data so it doesn't lock append?
|
||||||
|
// When using striped locks to guard access to chunks, probably yes.
|
||||||
|
// Could only copy data if the chunk is not completed yet.
|
||||||
|
if histogramIter, ok := it.(*floatHistogramIterator); ok {
|
||||||
|
histogramIter.Reset(c.b.bytes())
|
||||||
|
return histogramIter
|
||||||
|
}
|
||||||
|
return newFloatHistogramIterator(c.b.bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFloatHistogramIterator(b []byte) *floatHistogramIterator {
|
||||||
|
it := &floatHistogramIterator{
|
||||||
|
br: newBReader(b),
|
||||||
|
numTotal: binary.BigEndian.Uint16(b),
|
||||||
|
t: math.MinInt64,
|
||||||
|
}
|
||||||
|
// The first 3 bytes contain chunk headers.
|
||||||
|
// We skip that for actual samples.
|
||||||
|
_, _ = it.br.readBits(24)
|
||||||
|
return it
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterator implements the Chunk interface.
|
||||||
|
func (c *FloatHistogramChunk) Iterator(it Iterator) Iterator {
|
||||||
|
return c.iterator(it)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FloatHistogramAppender is an Appender implementation for float histograms.
|
||||||
|
type FloatHistogramAppender struct {
|
||||||
|
b *bstream
|
||||||
|
|
||||||
|
// Layout:
|
||||||
|
schema int32
|
||||||
|
zThreshold float64
|
||||||
|
pSpans, nSpans []histogram.Span
|
||||||
|
|
||||||
|
t, tDelta int64
|
||||||
|
sum, cnt, zCnt xorValue
|
||||||
|
pBuckets, nBuckets []xorValue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append implements Appender. This implementation panics because normal float
|
||||||
|
// samples must never be appended to a histogram chunk.
|
||||||
|
func (a *FloatHistogramAppender) Append(int64, float64) {
|
||||||
|
panic("appended a float sample to a histogram chunk")
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendHistogram implements Appender. This implementation panics because integer
|
||||||
|
// histogram samples must never be appended to a float histogram chunk.
|
||||||
|
func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) {
|
||||||
|
panic("appended an integer histogram to a float histogram chunk")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Appendable returns whether the chunk can be appended to, and if so
|
||||||
|
// whether any recoding needs to happen using the provided interjections
|
||||||
|
// (in case of any new buckets, positive or negative range, respectively).
|
||||||
|
//
|
||||||
|
// The chunk is not appendable in the following cases:
|
||||||
|
//
|
||||||
|
// • The schema has changed.
|
||||||
|
//
|
||||||
|
// • The threshold for the zero bucket has changed.
|
||||||
|
//
|
||||||
|
// • Any buckets have disappeared.
|
||||||
|
//
|
||||||
|
// • There was a counter reset in the count of observations or in any bucket,
|
||||||
|
// including the zero bucket.
|
||||||
|
//
|
||||||
|
// • The last sample in the chunk was stale while the current sample is not stale.
|
||||||
|
//
|
||||||
|
// The method returns an additional boolean set to true if it is not appendable
|
||||||
|
// because of a counter reset. If the given sample is stale, it is always ok to
|
||||||
|
// append. If counterReset is true, okToAppend is always false.
|
||||||
|
func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
|
||||||
|
positiveInterjections, negativeInterjections []Interjection,
|
||||||
|
okToAppend, counterReset bool,
|
||||||
|
) {
|
||||||
|
if value.IsStaleNaN(h.Sum) {
|
||||||
|
// This is a stale sample whose buckets and spans don't matter.
|
||||||
|
okToAppend = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if value.IsStaleNaN(a.sum.value) {
|
||||||
|
// If the last sample was stale, then we can only accept stale
|
||||||
|
// samples in this chunk.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.Count < a.cnt.value {
|
||||||
|
// There has been a counter reset.
|
||||||
|
counterReset = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.ZeroCount < a.zCnt.value {
|
||||||
|
// There has been a counter reset since ZeroThreshold didn't change.
|
||||||
|
counterReset = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var ok bool
|
||||||
|
positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans)
|
||||||
|
if !ok {
|
||||||
|
counterReset = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans)
|
||||||
|
if !ok {
|
||||||
|
counterReset = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if counterResetInAnyFloatBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) ||
|
||||||
|
counterResetInAnyFloatBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) {
|
||||||
|
counterReset, positiveInterjections, negativeInterjections = true, nil, nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
okToAppend = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// counterResetInAnyFloatBucket returns true if there was a counter reset for any
|
||||||
|
// bucket. This should be called only when the bucket layout is the same or new
|
||||||
|
// buckets were added. It does not handle the case of buckets missing.
|
||||||
|
func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, oldSpans, newSpans []histogram.Span) bool {
|
||||||
|
if len(oldSpans) == 0 || len(oldBuckets) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices.
|
||||||
|
oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span.
|
||||||
|
oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset
|
||||||
|
|
||||||
|
oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice.
|
||||||
|
oldVal, newVal := oldBuckets[0].value, newBuckets[0]
|
||||||
|
|
||||||
|
// Since we assume that new spans won't have missing buckets, there will never be a case
|
||||||
|
// where the old index will not find a matching new index.
|
||||||
|
for {
|
||||||
|
if oldIdx == newIdx {
|
||||||
|
if newVal < oldVal {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if oldIdx <= newIdx {
|
||||||
|
// Moving ahead old bucket and span by 1 index.
|
||||||
|
if oldInsideSpanIdx == oldSpans[oldSpanSliceIdx].Length-1 {
|
||||||
|
// Current span is over.
|
||||||
|
oldSpanSliceIdx++
|
||||||
|
oldInsideSpanIdx = 0
|
||||||
|
if oldSpanSliceIdx >= len(oldSpans) {
|
||||||
|
// All old spans are over.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset
|
||||||
|
} else {
|
||||||
|
oldInsideSpanIdx++
|
||||||
|
oldIdx++
|
||||||
|
}
|
||||||
|
oldBucketSliceIdx++
|
||||||
|
oldVal = oldBuckets[oldBucketSliceIdx].value
|
||||||
|
}
|
||||||
|
|
||||||
|
if oldIdx > newIdx {
|
||||||
|
// Moving ahead new bucket and span by 1 index.
|
||||||
|
if newInsideSpanIdx == newSpans[newSpanSliceIdx].Length-1 {
|
||||||
|
// Current span is over.
|
||||||
|
newSpanSliceIdx++
|
||||||
|
newInsideSpanIdx = 0
|
||||||
|
if newSpanSliceIdx >= len(newSpans) {
|
||||||
|
// All new spans are over.
|
||||||
|
// This should not happen, old spans above should catch this first.
|
||||||
|
panic("new spans over before old spans in counterReset")
|
||||||
|
}
|
||||||
|
newIdx += 1 + newSpans[newSpanSliceIdx].Offset
|
||||||
|
} else {
|
||||||
|
newInsideSpanIdx++
|
||||||
|
newIdx++
|
||||||
|
}
|
||||||
|
newBucketSliceIdx++
|
||||||
|
newVal = newBuckets[newBucketSliceIdx]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendFloatHistogram appends a float histogram to the chunk. The caller must ensure that
|
||||||
|
// the histogram is properly structured, e.g. the number of buckets used
|
||||||
|
// corresponds to the number conveyed by the span structures. First call
|
||||||
|
// Appendable() and act accordingly!
|
||||||
|
func (a *FloatHistogramAppender) AppendFloatHistogram(t int64, h *histogram.FloatHistogram) {
|
||||||
|
var tDelta int64
|
||||||
|
num := binary.BigEndian.Uint16(a.b.bytes())
|
||||||
|
|
||||||
|
if value.IsStaleNaN(h.Sum) {
|
||||||
|
// Emptying out other fields to write no buckets, and an empty
|
||||||
|
// layout in case of first histogram in the chunk.
|
||||||
|
h = &histogram.FloatHistogram{Sum: h.Sum}
|
||||||
|
}
|
||||||
|
|
||||||
|
if num == 0 {
|
||||||
|
// The first append gets the privilege to dictate the layout
|
||||||
|
// but it's also responsible for encoding it into the chunk!
|
||||||
|
writeHistogramChunkLayout(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans)
|
||||||
|
a.schema = h.Schema
|
||||||
|
a.zThreshold = h.ZeroThreshold
|
||||||
|
|
||||||
|
if len(h.PositiveSpans) > 0 {
|
||||||
|
a.pSpans = make([]histogram.Span, len(h.PositiveSpans))
|
||||||
|
copy(a.pSpans, h.PositiveSpans)
|
||||||
|
} else {
|
||||||
|
a.pSpans = nil
|
||||||
|
}
|
||||||
|
if len(h.NegativeSpans) > 0 {
|
||||||
|
a.nSpans = make([]histogram.Span, len(h.NegativeSpans))
|
||||||
|
copy(a.nSpans, h.NegativeSpans)
|
||||||
|
} else {
|
||||||
|
a.nSpans = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
numPBuckets, numNBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans)
|
||||||
|
if numPBuckets > 0 {
|
||||||
|
a.pBuckets = make([]xorValue, numPBuckets)
|
||||||
|
for i := 0; i < numPBuckets; i++ {
|
||||||
|
a.pBuckets[i] = xorValue{
|
||||||
|
value: h.PositiveBuckets[i],
|
||||||
|
leading: 0xff,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
a.pBuckets = nil
|
||||||
|
}
|
||||||
|
if numNBuckets > 0 {
|
||||||
|
a.nBuckets = make([]xorValue, numNBuckets)
|
||||||
|
for i := 0; i < numNBuckets; i++ {
|
||||||
|
a.nBuckets[i] = xorValue{
|
||||||
|
value: h.NegativeBuckets[i],
|
||||||
|
leading: 0xff,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
a.nBuckets = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now store the actual data.
|
||||||
|
putVarbitInt(a.b, t)
|
||||||
|
a.b.writeBits(math.Float64bits(h.Count), 64)
|
||||||
|
a.b.writeBits(math.Float64bits(h.ZeroCount), 64)
|
||||||
|
a.b.writeBits(math.Float64bits(h.Sum), 64)
|
||||||
|
a.cnt.value = h.Count
|
||||||
|
a.zCnt.value = h.ZeroCount
|
||||||
|
a.sum.value = h.Sum
|
||||||
|
for _, b := range h.PositiveBuckets {
|
||||||
|
a.b.writeBits(math.Float64bits(b), 64)
|
||||||
|
}
|
||||||
|
for _, b := range h.NegativeBuckets {
|
||||||
|
a.b.writeBits(math.Float64bits(b), 64)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code,
|
||||||
|
// so we don't need a separate single delta logic for the 2nd sample.
|
||||||
|
tDelta = t - a.t
|
||||||
|
tDod := tDelta - a.tDelta
|
||||||
|
putVarbitInt(a.b, tDod)
|
||||||
|
|
||||||
|
a.writeXorValue(&a.cnt, h.Count)
|
||||||
|
a.writeXorValue(&a.zCnt, h.ZeroCount)
|
||||||
|
a.writeXorValue(&a.sum, h.Sum)
|
||||||
|
|
||||||
|
for i, b := range h.PositiveBuckets {
|
||||||
|
a.writeXorValue(&a.pBuckets[i], b)
|
||||||
|
}
|
||||||
|
for i, b := range h.NegativeBuckets {
|
||||||
|
a.writeXorValue(&a.nBuckets[i], b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
binary.BigEndian.PutUint16(a.b.bytes(), num+1)
|
||||||
|
|
||||||
|
a.t = t
|
||||||
|
a.tDelta = tDelta
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *FloatHistogramAppender) writeXorValue(old *xorValue, v float64) {
|
||||||
|
xorWrite(a.b, v, old.value, &old.leading, &old.trailing)
|
||||||
|
old.value = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recode converts the current chunk to accommodate an expansion of the set of
|
||||||
|
// (positive and/or negative) buckets used, according to the provided
|
||||||
|
// interjections, resulting in the honoring of the provided new positive and
|
||||||
|
// negative spans. To continue appending, use the returned Appender rather than
|
||||||
|
// the receiver of this method.
|
||||||
|
func (a *FloatHistogramAppender) Recode(
|
||||||
|
positiveInterjections, negativeInterjections []Interjection,
|
||||||
|
positiveSpans, negativeSpans []histogram.Span,
|
||||||
|
) (Chunk, Appender) {
|
||||||
|
// TODO(beorn7): This currently just decodes everything and then encodes
|
||||||
|
// it again with the new span layout. This can probably be done in-place
|
||||||
|
// by editing the chunk. But let's first see how expensive it is in the
|
||||||
|
// big picture. Also, in-place editing might create concurrency issues.
|
||||||
|
byts := a.b.bytes()
|
||||||
|
it := newFloatHistogramIterator(byts)
|
||||||
|
hc := NewFloatHistogramChunk()
|
||||||
|
app, err := hc.Appender()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans)
|
||||||
|
|
||||||
|
for it.Next() == ValFloatHistogram {
|
||||||
|
tOld, hOld := it.AtFloatHistogram()
|
||||||
|
|
||||||
|
// We have to newly allocate slices for the modified buckets
|
||||||
|
// here because they are kept by the appender until the next
|
||||||
|
// append.
|
||||||
|
// TODO(beorn7): We might be able to optimize this.
|
||||||
|
var positiveBuckets, negativeBuckets []float64
|
||||||
|
if numPositiveBuckets > 0 {
|
||||||
|
positiveBuckets = make([]float64, numPositiveBuckets)
|
||||||
|
}
|
||||||
|
if numNegativeBuckets > 0 {
|
||||||
|
negativeBuckets = make([]float64, numNegativeBuckets)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the modified histogram to the new chunk.
|
||||||
|
hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans
|
||||||
|
if len(positiveInterjections) > 0 {
|
||||||
|
hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections, false)
|
||||||
|
}
|
||||||
|
if len(negativeInterjections) > 0 {
|
||||||
|
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections, false)
|
||||||
|
}
|
||||||
|
app.AppendFloatHistogram(tOld, hOld)
|
||||||
|
}
|
||||||
|
|
||||||
|
hc.SetCounterResetHeader(CounterResetHeader(byts[2] & 0b11000000))
|
||||||
|
return hc, app
|
||||||
|
}
|
||||||
|
|
||||||
|
type floatHistogramIterator struct {
|
||||||
|
br bstreamReader
|
||||||
|
numTotal uint16
|
||||||
|
numRead uint16
|
||||||
|
|
||||||
|
// Layout:
|
||||||
|
schema int32
|
||||||
|
zThreshold float64
|
||||||
|
pSpans, nSpans []histogram.Span
|
||||||
|
|
||||||
|
// For the fields that are tracked as deltas and ultimately dod's.
|
||||||
|
t int64
|
||||||
|
tDelta int64
|
||||||
|
|
||||||
|
// All Gorilla xor encoded.
|
||||||
|
sum, cnt, zCnt xorValue
|
||||||
|
|
||||||
|
// Buckets are not of type xorValue to avoid creating
|
||||||
|
// new slices for every AtFloatHistogram call.
|
||||||
|
pBuckets, nBuckets []float64
|
||||||
|
pBucketsLeading, nBucketsLeading []uint8
|
||||||
|
pBucketsTrailing, nBucketsTrailing []uint8
|
||||||
|
|
||||||
|
err error
|
||||||
|
|
||||||
|
// Track calls to retrieve methods. Once they have been called, we
|
||||||
|
// cannot recycle the bucket slices anymore because we have returned
|
||||||
|
// them in the histogram.
|
||||||
|
atFloatHistogramCalled bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *floatHistogramIterator) Seek(t int64) ValueType {
|
||||||
|
if it.err != nil {
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
|
||||||
|
for t > it.t || it.numRead == 0 {
|
||||||
|
if it.Next() == ValNone {
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ValFloatHistogram
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *floatHistogramIterator) At() (int64, float64) {
|
||||||
|
panic("cannot call floatHistogramIterator.At")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *floatHistogramIterator) AtHistogram() (int64, *histogram.Histogram) {
|
||||||
|
panic("cannot call floatHistogramIterator.AtHistogram")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
|
||||||
|
if value.IsStaleNaN(it.sum.value) {
|
||||||
|
return it.t, &histogram.FloatHistogram{Sum: it.sum.value}
|
||||||
|
}
|
||||||
|
it.atFloatHistogramCalled = true
|
||||||
|
return it.t, &histogram.FloatHistogram{
|
||||||
|
Count: it.cnt.value,
|
||||||
|
ZeroCount: it.zCnt.value,
|
||||||
|
Sum: it.sum.value,
|
||||||
|
ZeroThreshold: it.zThreshold,
|
||||||
|
Schema: it.schema,
|
||||||
|
PositiveSpans: it.pSpans,
|
||||||
|
NegativeSpans: it.nSpans,
|
||||||
|
PositiveBuckets: it.pBuckets,
|
||||||
|
NegativeBuckets: it.nBuckets,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *floatHistogramIterator) AtT() int64 {
|
||||||
|
return it.t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *floatHistogramIterator) Err() error {
|
||||||
|
return it.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *floatHistogramIterator) Reset(b []byte) {
|
||||||
|
// The first 3 bytes contain chunk headers.
|
||||||
|
// We skip that for actual samples.
|
||||||
|
it.br = newBReader(b[3:])
|
||||||
|
it.numTotal = binary.BigEndian.Uint16(b)
|
||||||
|
it.numRead = 0
|
||||||
|
|
||||||
|
it.t, it.tDelta = 0, 0
|
||||||
|
it.cnt, it.zCnt, it.sum = xorValue{}, xorValue{}, xorValue{}
|
||||||
|
|
||||||
|
if it.atFloatHistogramCalled {
|
||||||
|
it.atFloatHistogramCalled = false
|
||||||
|
it.pBuckets, it.nBuckets = nil, nil
|
||||||
|
} else {
|
||||||
|
it.pBuckets, it.nBuckets = it.pBuckets[:0], it.nBuckets[:0]
|
||||||
|
}
|
||||||
|
it.pBucketsLeading, it.pBucketsTrailing = it.pBucketsLeading[:0], it.pBucketsTrailing[:0]
|
||||||
|
it.nBucketsLeading, it.nBucketsTrailing = it.nBucketsLeading[:0], it.nBucketsTrailing[:0]
|
||||||
|
|
||||||
|
it.err = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *floatHistogramIterator) Next() ValueType {
|
||||||
|
if it.err != nil || it.numRead == it.numTotal {
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
|
||||||
|
if it.numRead == 0 {
|
||||||
|
// The first read is responsible for reading the chunk layout
|
||||||
|
// and for initializing fields that depend on it. We give
|
||||||
|
// counter reset info at chunk level, hence we discard it here.
|
||||||
|
schema, zeroThreshold, posSpans, negSpans, err := readHistogramChunkLayout(&it.br)
|
||||||
|
if err != nil {
|
||||||
|
it.err = err
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
it.schema = schema
|
||||||
|
it.zThreshold = zeroThreshold
|
||||||
|
it.pSpans, it.nSpans = posSpans, negSpans
|
||||||
|
numPBuckets, numNBuckets := countSpans(posSpans), countSpans(negSpans)
|
||||||
|
// Allocate bucket slices as needed, recycling existing slices
|
||||||
|
// in case this iterator was reset and already has slices of a
|
||||||
|
// sufficient capacity.
|
||||||
|
if numPBuckets > 0 {
|
||||||
|
it.pBuckets = append(it.pBuckets, make([]float64, numPBuckets)...)
|
||||||
|
it.pBucketsLeading = append(it.pBucketsLeading, make([]uint8, numPBuckets)...)
|
||||||
|
it.pBucketsTrailing = append(it.pBucketsTrailing, make([]uint8, numPBuckets)...)
|
||||||
|
}
|
||||||
|
if numNBuckets > 0 {
|
||||||
|
it.nBuckets = append(it.nBuckets, make([]float64, numNBuckets)...)
|
||||||
|
it.nBucketsLeading = append(it.nBucketsLeading, make([]uint8, numNBuckets)...)
|
||||||
|
it.nBucketsTrailing = append(it.nBucketsTrailing, make([]uint8, numNBuckets)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now read the actual data.
|
||||||
|
t, err := readVarbitInt(&it.br)
|
||||||
|
if err != nil {
|
||||||
|
it.err = err
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
it.t = t
|
||||||
|
|
||||||
|
cnt, err := it.br.readBits(64)
|
||||||
|
if err != nil {
|
||||||
|
it.err = err
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
it.cnt.value = math.Float64frombits(cnt)
|
||||||
|
|
||||||
|
zcnt, err := it.br.readBits(64)
|
||||||
|
if err != nil {
|
||||||
|
it.err = err
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
it.zCnt.value = math.Float64frombits(zcnt)
|
||||||
|
|
||||||
|
sum, err := it.br.readBits(64)
|
||||||
|
if err != nil {
|
||||||
|
it.err = err
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
it.sum.value = math.Float64frombits(sum)
|
||||||
|
|
||||||
|
for i := range it.pBuckets {
|
||||||
|
v, err := it.br.readBits(64)
|
||||||
|
if err != nil {
|
||||||
|
it.err = err
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
it.pBuckets[i] = math.Float64frombits(v)
|
||||||
|
}
|
||||||
|
for i := range it.nBuckets {
|
||||||
|
v, err := it.br.readBits(64)
|
||||||
|
if err != nil {
|
||||||
|
it.err = err
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
it.nBuckets[i] = math.Float64frombits(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
it.numRead++
|
||||||
|
return ValFloatHistogram
|
||||||
|
}
|
||||||
|
|
||||||
|
// The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code,
|
||||||
|
// so we don't need a separate single delta logic for the 2nd sample.
|
||||||
|
|
||||||
|
// Recycle bucket slices that have not been returned yet. Otherwise, copy them.
|
||||||
|
// We can always recycle the slices for leading and trailing bits as they are
|
||||||
|
// never returned to the caller.
|
||||||
|
if it.atFloatHistogramCalled {
|
||||||
|
it.atFloatHistogramCalled = false
|
||||||
|
if len(it.pBuckets) > 0 {
|
||||||
|
newBuckets := make([]float64, len(it.pBuckets))
|
||||||
|
copy(newBuckets, it.pBuckets)
|
||||||
|
it.pBuckets = newBuckets
|
||||||
|
} else {
|
||||||
|
it.pBuckets = nil
|
||||||
|
}
|
||||||
|
if len(it.nBuckets) > 0 {
|
||||||
|
newBuckets := make([]float64, len(it.nBuckets))
|
||||||
|
copy(newBuckets, it.nBuckets)
|
||||||
|
it.nBuckets = newBuckets
|
||||||
|
} else {
|
||||||
|
it.nBuckets = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tDod, err := readVarbitInt(&it.br)
|
||||||
|
if err != nil {
|
||||||
|
it.err = err
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
it.tDelta = it.tDelta + tDod
|
||||||
|
it.t += it.tDelta
|
||||||
|
|
||||||
|
if ok := it.readXor(&it.cnt.value, &it.cnt.leading, &it.cnt.trailing); !ok {
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok := it.readXor(&it.zCnt.value, &it.zCnt.leading, &it.zCnt.trailing); !ok {
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok := it.readXor(&it.sum.value, &it.sum.leading, &it.sum.trailing); !ok {
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
|
||||||
|
if value.IsStaleNaN(it.sum.value) {
|
||||||
|
it.numRead++
|
||||||
|
return ValFloatHistogram
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range it.pBuckets {
|
||||||
|
if ok := it.readXor(&it.pBuckets[i], &it.pBucketsLeading[i], &it.pBucketsTrailing[i]); !ok {
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range it.nBuckets {
|
||||||
|
if ok := it.readXor(&it.nBuckets[i], &it.nBucketsLeading[i], &it.nBucketsTrailing[i]); !ok {
|
||||||
|
return ValNone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it.numRead++
|
||||||
|
return ValFloatHistogram
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *floatHistogramIterator) readXor(v *float64, leading, trailing *uint8) bool {
|
||||||
|
err := xorRead(&it.br, v, leading, trailing)
|
||||||
|
if err != nil {
|
||||||
|
it.err = err
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
359
tsdb/chunkenc/float_histogram_test.go
Normal file
359
tsdb/chunkenc/float_histogram_test.go
Normal file
|
@ -0,0 +1,359 @@
|
||||||
|
// Copyright 2021 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package chunkenc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
)
|
||||||
|
|
||||||
|
type floatResult struct {
|
||||||
|
t int64
|
||||||
|
h *histogram.FloatHistogram
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFloatHistogramChunkSameBuckets(t *testing.T) {
|
||||||
|
c := NewFloatHistogramChunk()
|
||||||
|
var exp []floatResult
|
||||||
|
|
||||||
|
// Create fresh appender and add the first histogram.
|
||||||
|
app, err := c.Appender()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, c.NumSamples())
|
||||||
|
|
||||||
|
ts := int64(1234567890)
|
||||||
|
h := &histogram.Histogram{
|
||||||
|
Count: 15,
|
||||||
|
ZeroCount: 2,
|
||||||
|
Sum: 18.4,
|
||||||
|
ZeroThreshold: 1e-100,
|
||||||
|
Schema: 1,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5)
|
||||||
|
NegativeSpans: []histogram.Span{
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
{Offset: 2, Length: 3},
|
||||||
|
},
|
||||||
|
NegativeBuckets: []int64{2, 1, -1, -1}, // counts: 2, 3, 2, 1 (total 8)
|
||||||
|
}
|
||||||
|
app.AppendFloatHistogram(ts, h.ToFloat())
|
||||||
|
exp = append(exp, floatResult{t: ts, h: h.ToFloat()})
|
||||||
|
require.Equal(t, 1, c.NumSamples())
|
||||||
|
|
||||||
|
// Add an updated histogram.
|
||||||
|
ts += 16
|
||||||
|
h = h.Copy()
|
||||||
|
h.Count = 32
|
||||||
|
h.ZeroCount++
|
||||||
|
h.Sum = 24.4
|
||||||
|
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
|
||||||
|
h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15)
|
||||||
|
app.AppendFloatHistogram(ts, h.ToFloat())
|
||||||
|
exp = append(exp, floatResult{t: ts, h: h.ToFloat()})
|
||||||
|
require.Equal(t, 2, c.NumSamples())
|
||||||
|
|
||||||
|
// Add update with new appender.
|
||||||
|
app, err = c.Appender()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ts += 14
|
||||||
|
h = h.Copy()
|
||||||
|
h.Count = 54
|
||||||
|
h.ZeroCount += 2
|
||||||
|
h.Sum = 24.4
|
||||||
|
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
|
||||||
|
h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22)
|
||||||
|
app.AppendFloatHistogram(ts, h.ToFloat())
|
||||||
|
exp = append(exp, floatResult{t: ts, h: h.ToFloat()})
|
||||||
|
require.Equal(t, 3, c.NumSamples())
|
||||||
|
|
||||||
|
// 1. Expand iterator in simple case.
|
||||||
|
it := c.Iterator(nil)
|
||||||
|
require.NoError(t, it.Err())
|
||||||
|
var act []floatResult
|
||||||
|
for it.Next() == ValFloatHistogram {
|
||||||
|
fts, fh := it.AtFloatHistogram()
|
||||||
|
act = append(act, floatResult{t: fts, h: fh})
|
||||||
|
}
|
||||||
|
require.NoError(t, it.Err())
|
||||||
|
require.Equal(t, exp, act)
|
||||||
|
|
||||||
|
// 2. Expand second iterator while reusing first one.
|
||||||
|
it2 := c.Iterator(it)
|
||||||
|
var act2 []floatResult
|
||||||
|
for it2.Next() == ValFloatHistogram {
|
||||||
|
fts, fh := it2.AtFloatHistogram()
|
||||||
|
act2 = append(act2, floatResult{t: fts, h: fh})
|
||||||
|
}
|
||||||
|
require.NoError(t, it2.Err())
|
||||||
|
require.Equal(t, exp, act2)
|
||||||
|
|
||||||
|
// 3. Now recycle an iterator that was never used to access anything.
|
||||||
|
itX := c.Iterator(nil)
|
||||||
|
for itX.Next() == ValFloatHistogram {
|
||||||
|
// Just iterate through without accessing anything.
|
||||||
|
}
|
||||||
|
it3 := c.iterator(itX)
|
||||||
|
var act3 []floatResult
|
||||||
|
for it3.Next() == ValFloatHistogram {
|
||||||
|
fts, fh := it3.AtFloatHistogram()
|
||||||
|
act3 = append(act3, floatResult{t: fts, h: fh})
|
||||||
|
}
|
||||||
|
require.NoError(t, it3.Err())
|
||||||
|
require.Equal(t, exp, act3)
|
||||||
|
|
||||||
|
// 4. Test iterator Seek.
|
||||||
|
mid := len(exp) / 2
|
||||||
|
it4 := c.Iterator(nil)
|
||||||
|
var act4 []floatResult
|
||||||
|
require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t))
|
||||||
|
// Below ones should not matter.
|
||||||
|
require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t))
|
||||||
|
require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t))
|
||||||
|
fts, fh := it4.AtFloatHistogram()
|
||||||
|
act4 = append(act4, floatResult{t: fts, h: fh})
|
||||||
|
for it4.Next() == ValFloatHistogram {
|
||||||
|
fts, fh := it4.AtFloatHistogram()
|
||||||
|
act4 = append(act4, floatResult{t: fts, h: fh})
|
||||||
|
}
|
||||||
|
require.NoError(t, it4.Err())
|
||||||
|
require.Equal(t, exp[mid:], act4)
|
||||||
|
require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mimics the scenario described for compareSpans().
|
||||||
|
func TestFloatHistogramChunkBucketChanges(t *testing.T) {
|
||||||
|
c := Chunk(NewFloatHistogramChunk())
|
||||||
|
|
||||||
|
// Create fresh appender and add the first histogram.
|
||||||
|
app, err := c.Appender()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, c.NumSamples())
|
||||||
|
|
||||||
|
ts1 := int64(1234567890)
|
||||||
|
h1 := &histogram.Histogram{
|
||||||
|
Count: 27,
|
||||||
|
ZeroCount: 2,
|
||||||
|
Sum: 18.4,
|
||||||
|
ZeroThreshold: 1e-125,
|
||||||
|
Schema: 1,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 2, Length: 1},
|
||||||
|
{Offset: 3, Length: 2},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
|
||||||
|
NegativeSpans: []histogram.Span{{Offset: 1, Length: 1}},
|
||||||
|
NegativeBuckets: []int64{1},
|
||||||
|
}
|
||||||
|
|
||||||
|
app.AppendFloatHistogram(ts1, h1.ToFloat())
|
||||||
|
require.Equal(t, 1, c.NumSamples())
|
||||||
|
|
||||||
|
// Add a new histogram that has expanded buckets.
|
||||||
|
ts2 := ts1 + 16
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
{Offset: 1, Length: 4},
|
||||||
|
{Offset: 3, Length: 3},
|
||||||
|
}
|
||||||
|
h2.NegativeSpans = []histogram.Span{{Offset: 0, Length: 2}}
|
||||||
|
h2.Count = 35
|
||||||
|
h2.ZeroCount++
|
||||||
|
h2.Sum = 30
|
||||||
|
// Existing histogram should get values converted from the above to:
|
||||||
|
// 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between)
|
||||||
|
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||||
|
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
|
||||||
|
// Existing histogram should get values converted from the above to:
|
||||||
|
// 0 1 (previous values with some new empty buckets in between)
|
||||||
|
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||||
|
h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3)
|
||||||
|
// This is how span changes will be handled.
|
||||||
|
hApp, _ := app.(*FloatHistogramAppender)
|
||||||
|
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat())
|
||||||
|
require.Greater(t, len(posInterjections), 0)
|
||||||
|
require.Greater(t, len(negInterjections), 0)
|
||||||
|
require.True(t, ok) // Only new buckets came in.
|
||||||
|
require.False(t, cr)
|
||||||
|
c, app = hApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
|
||||||
|
app.AppendFloatHistogram(ts2, h2.ToFloat())
|
||||||
|
|
||||||
|
require.Equal(t, 2, c.NumSamples())
|
||||||
|
|
||||||
|
// Because the 2nd histogram has expanded buckets, we should expect all
|
||||||
|
// histograms (in particular the first) to come back using the new spans
|
||||||
|
// metadata as well as the expanded buckets.
|
||||||
|
h1.PositiveSpans = h2.PositiveSpans
|
||||||
|
h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}
|
||||||
|
h1.NegativeSpans = h2.NegativeSpans
|
||||||
|
h1.NegativeBuckets = []int64{0, 1}
|
||||||
|
exp := []floatResult{
|
||||||
|
{t: ts1, h: h1.ToFloat()},
|
||||||
|
{t: ts2, h: h2.ToFloat()},
|
||||||
|
}
|
||||||
|
it := c.Iterator(nil)
|
||||||
|
var act []floatResult
|
||||||
|
for it.Next() == ValFloatHistogram {
|
||||||
|
fts, fh := it.AtFloatHistogram()
|
||||||
|
act = append(act, floatResult{t: fts, h: fh})
|
||||||
|
}
|
||||||
|
require.NoError(t, it.Err())
|
||||||
|
require.Equal(t, exp, act)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFloatHistogramChunkAppendable(t *testing.T) {
|
||||||
|
c := Chunk(NewFloatHistogramChunk())
|
||||||
|
|
||||||
|
// Create fresh appender and add the first histogram.
|
||||||
|
app, err := c.Appender()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, c.NumSamples())
|
||||||
|
|
||||||
|
ts := int64(1234567890)
|
||||||
|
h1 := &histogram.Histogram{
|
||||||
|
Count: 5,
|
||||||
|
ZeroCount: 2,
|
||||||
|
Sum: 18.4,
|
||||||
|
ZeroThreshold: 1e-125,
|
||||||
|
Schema: 1,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 2, Length: 1},
|
||||||
|
{Offset: 3, Length: 2},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
|
||||||
|
}
|
||||||
|
|
||||||
|
app.AppendFloatHistogram(ts, h1.ToFloat())
|
||||||
|
require.Equal(t, 1, c.NumSamples())
|
||||||
|
|
||||||
|
{ // New histogram that has more buckets.
|
||||||
|
h2 := h1
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
{Offset: 1, Length: 4},
|
||||||
|
{Offset: 3, Length: 3},
|
||||||
|
}
|
||||||
|
h2.Count += 9
|
||||||
|
h2.ZeroCount++
|
||||||
|
h2.Sum = 30
|
||||||
|
// Existing histogram should get values converted from the above to:
|
||||||
|
// 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between)
|
||||||
|
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||||
|
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
|
||||||
|
|
||||||
|
hApp, _ := app.(*FloatHistogramAppender)
|
||||||
|
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat())
|
||||||
|
require.Greater(t, len(posInterjections), 0)
|
||||||
|
require.Equal(t, 0, len(negInterjections))
|
||||||
|
require.True(t, ok) // Only new buckets came in.
|
||||||
|
require.False(t, cr)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // New histogram that has a bucket missing.
|
||||||
|
h2 := h1
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 5, Length: 2},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
}
|
||||||
|
h2.Sum = 21
|
||||||
|
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21)
|
||||||
|
|
||||||
|
hApp, _ := app.(*FloatHistogramAppender)
|
||||||
|
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat())
|
||||||
|
require.Equal(t, 0, len(posInterjections))
|
||||||
|
require.Equal(t, 0, len(negInterjections))
|
||||||
|
require.False(t, ok) // Need to cut a new chunk.
|
||||||
|
require.True(t, cr)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // New histogram that has a counter reset while buckets are same.
|
||||||
|
h2 := h1
|
||||||
|
h2.Sum = 23
|
||||||
|
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23)
|
||||||
|
|
||||||
|
hApp, _ := app.(*FloatHistogramAppender)
|
||||||
|
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat())
|
||||||
|
require.Equal(t, 0, len(posInterjections))
|
||||||
|
require.Equal(t, 0, len(negInterjections))
|
||||||
|
require.False(t, ok) // Need to cut a new chunk.
|
||||||
|
require.True(t, cr)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // New histogram that has a counter reset while new buckets were added.
|
||||||
|
h2 := h1
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
{Offset: 1, Length: 4},
|
||||||
|
{Offset: 3, Length: 3},
|
||||||
|
}
|
||||||
|
h2.Sum = 29
|
||||||
|
// Existing histogram should get values converted from the above to:
|
||||||
|
// 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between)
|
||||||
|
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||||
|
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29)
|
||||||
|
|
||||||
|
hApp, _ := app.(*FloatHistogramAppender)
|
||||||
|
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat())
|
||||||
|
require.Equal(t, 0, len(posInterjections))
|
||||||
|
require.Equal(t, 0, len(negInterjections))
|
||||||
|
require.False(t, ok) // Need to cut a new chunk.
|
||||||
|
require.True(t, cr)
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// New histogram that has a counter reset while new buckets were
|
||||||
|
// added before the first bucket and reset on first bucket. (to
|
||||||
|
// catch the edge case where the new bucket should be forwarded
|
||||||
|
// ahead until first old bucket at start)
|
||||||
|
h2 := h1
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: -3, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
{Offset: 2, Length: 1},
|
||||||
|
{Offset: 3, Length: 2},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
}
|
||||||
|
h2.Sum = 26
|
||||||
|
// Existing histogram should get values converted from the above to:
|
||||||
|
// 0, 0, 6, 3, 3, 2, 4, 5, 1
|
||||||
|
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||||
|
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26)
|
||||||
|
|
||||||
|
hApp, _ := app.(*FloatHistogramAppender)
|
||||||
|
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat())
|
||||||
|
require.Equal(t, 0, len(posInterjections))
|
||||||
|
require.Equal(t, 0, len(negInterjections))
|
||||||
|
require.False(t, ok) // Need to cut a new chunk.
|
||||||
|
require.True(t, cr)
|
||||||
|
}
|
||||||
|
}
|
|
@ -67,7 +67,7 @@ func (c *HistogramChunk) Layout() (
|
||||||
err error,
|
err error,
|
||||||
) {
|
) {
|
||||||
if c.NumSamples() == 0 {
|
if c.NumSamples() == 0 {
|
||||||
panic("HistoChunk.Layout() called on an empty chunk")
|
panic("HistogramChunk.Layout() called on an empty chunk")
|
||||||
}
|
}
|
||||||
b := newBReader(c.Bytes()[2:])
|
b := newBReader(c.Bytes()[2:])
|
||||||
return readHistogramChunkLayout(&b)
|
return readHistogramChunkLayout(&b)
|
||||||
|
@ -88,17 +88,22 @@ const (
|
||||||
UnknownCounterReset CounterResetHeader = 0b00000000
|
UnknownCounterReset CounterResetHeader = 0b00000000
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetCounterResetHeader sets the counter reset header.
|
// setCounterResetHeader sets the counter reset header of the chunk
|
||||||
func (c *HistogramChunk) SetCounterResetHeader(h CounterResetHeader) {
|
// The third byte of the chunk is the counter reset header.
|
||||||
|
func setCounterResetHeader(h CounterResetHeader, bytes []byte) {
|
||||||
switch h {
|
switch h {
|
||||||
case CounterReset, NotCounterReset, GaugeType, UnknownCounterReset:
|
case CounterReset, NotCounterReset, GaugeType, UnknownCounterReset:
|
||||||
bytes := c.Bytes()
|
|
||||||
bytes[2] = (bytes[2] & 0b00111111) | byte(h)
|
bytes[2] = (bytes[2] & 0b00111111) | byte(h)
|
||||||
default:
|
default:
|
||||||
panic("invalid CounterResetHeader type")
|
panic("invalid CounterResetHeader type")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetCounterResetHeader sets the counter reset header.
|
||||||
|
func (c *HistogramChunk) SetCounterResetHeader(h CounterResetHeader) {
|
||||||
|
setCounterResetHeader(h, c.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
// GetCounterResetHeader returns the info about the first 2 bits of the chunk
|
// GetCounterResetHeader returns the info about the first 2 bits of the chunk
|
||||||
// header.
|
// header.
|
||||||
func (c *HistogramChunk) GetCounterResetHeader() CounterResetHeader {
|
func (c *HistogramChunk) GetCounterResetHeader() CounterResetHeader {
|
||||||
|
@ -223,6 +228,12 @@ func (a *HistogramAppender) Append(int64, float64) {
|
||||||
panic("appended a float sample to a histogram chunk")
|
panic("appended a float sample to a histogram chunk")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AppendFloatHistogram implements Appender. This implementation panics because float
|
||||||
|
// histogram samples must never be appended to a histogram chunk.
|
||||||
|
func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogram) {
|
||||||
|
panic("appended a float histogram to a histogram chunk")
|
||||||
|
}
|
||||||
|
|
||||||
// Appendable returns whether the chunk can be appended to, and if so
|
// Appendable returns whether the chunk can be appended to, and if so
|
||||||
// whether any recoding needs to happen using the provided interjections
|
// whether any recoding needs to happen using the provided interjections
|
||||||
// (in case of any new buckets, positive or negative range, respectively).
|
// (in case of any new buckets, positive or negative range, respectively).
|
||||||
|
@ -296,6 +307,10 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type bucketValue interface {
|
||||||
|
int64 | float64
|
||||||
|
}
|
||||||
|
|
||||||
// counterResetInAnyBucket returns true if there was a counter reset for any
|
// counterResetInAnyBucket returns true if there was a counter reset for any
|
||||||
// bucket. This should be called only when the bucket layout is the same or new
|
// bucket. This should be called only when the bucket layout is the same or new
|
||||||
// buckets were added. It does not handle the case of buckets missing.
|
// buckets were added. It does not handle the case of buckets missing.
|
||||||
|
@ -515,10 +530,10 @@ func (a *HistogramAppender) Recode(
|
||||||
// Save the modified histogram to the new chunk.
|
// Save the modified histogram to the new chunk.
|
||||||
hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans
|
hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans
|
||||||
if len(positiveInterjections) > 0 {
|
if len(positiveInterjections) > 0 {
|
||||||
hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections)
|
hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections, true)
|
||||||
}
|
}
|
||||||
if len(negativeInterjections) > 0 {
|
if len(negativeInterjections) > 0 {
|
||||||
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections)
|
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections, true)
|
||||||
}
|
}
|
||||||
app.AppendHistogram(tOld, hOld)
|
app.AppendHistogram(tOld, hOld)
|
||||||
}
|
}
|
||||||
|
|
|
@ -280,19 +280,23 @@ loop:
|
||||||
|
|
||||||
// interject merges 'in' with the provided interjections and writes them into
|
// interject merges 'in' with the provided interjections and writes them into
|
||||||
// 'out', which must already have the appropriate length.
|
// 'out', which must already have the appropriate length.
|
||||||
func interject(in, out []int64, interjections []Interjection) []int64 {
|
func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV {
|
||||||
var (
|
var (
|
||||||
j int // Position in out.
|
j int // Position in out.
|
||||||
v int64 // The last value seen.
|
v BV // The last value seen.
|
||||||
interj int // The next interjection to process.
|
interj int // The next interjection to process.
|
||||||
)
|
)
|
||||||
for i, d := range in {
|
for i, d := range in {
|
||||||
if interj < len(interjections) && i == interjections[interj].pos {
|
if interj < len(interjections) && i == interjections[interj].pos {
|
||||||
|
|
||||||
// We have an interjection!
|
// We have an interjection!
|
||||||
// Add interjection.num new delta values such that their
|
// Add interjection.num new delta values such that their bucket values equate 0.
|
||||||
// bucket values equate 0.
|
// When deltas==false, it means that it is an absolute value. So we set it to 0 directly.
|
||||||
out[j] = int64(-v)
|
if deltas {
|
||||||
|
out[j] = -v
|
||||||
|
} else {
|
||||||
|
out[j] = 0
|
||||||
|
}
|
||||||
j++
|
j++
|
||||||
for x := 1; x < interjections[interj].num; x++ {
|
for x := 1; x < interjections[interj].num; x++ {
|
||||||
out[j] = 0
|
out[j] = 0
|
||||||
|
@ -304,7 +308,13 @@ func interject(in, out []int64, interjections []Interjection) []int64 {
|
||||||
// should save is the original delta value + the last
|
// should save is the original delta value + the last
|
||||||
// value of the point before the interjection (to undo
|
// value of the point before the interjection (to undo
|
||||||
// the delta that was introduced by the interjection).
|
// the delta that was introduced by the interjection).
|
||||||
|
// When deltas==false, it means that it is an absolute value,
|
||||||
|
// so we set it directly to the value in the 'in' slice.
|
||||||
|
if deltas {
|
||||||
out[j] = d + v
|
out[j] = d + v
|
||||||
|
} else {
|
||||||
|
out[j] = d
|
||||||
|
}
|
||||||
j++
|
j++
|
||||||
v = d + v
|
v = d + v
|
||||||
continue
|
continue
|
||||||
|
@ -321,7 +331,11 @@ func interject(in, out []int64, interjections []Interjection) []int64 {
|
||||||
// All interjections processed. Nothing more to do.
|
// All interjections processed. Nothing more to do.
|
||||||
case len(interjections) - 1:
|
case len(interjections) - 1:
|
||||||
// One more interjection to process at the end.
|
// One more interjection to process at the end.
|
||||||
out[j] = int64(-v)
|
if deltas {
|
||||||
|
out[j] = -v
|
||||||
|
} else {
|
||||||
|
out[j] = 0
|
||||||
|
}
|
||||||
j++
|
j++
|
||||||
for x := 1; x < interjections[interj].num; x++ {
|
for x := 1; x < interjections[interj].num; x++ {
|
||||||
out[j] = 0
|
out[j] = 0
|
||||||
|
|
|
@ -290,7 +290,7 @@ func TestInterjection(t *testing.T) {
|
||||||
require.Equal(t, s.interjections, interjections)
|
require.Equal(t, s.interjections, interjections)
|
||||||
|
|
||||||
gotBuckets := make([]int64, len(s.bucketsOut))
|
gotBuckets := make([]int64, len(s.bucketsOut))
|
||||||
interject(s.bucketsIn, gotBuckets, interjections)
|
interject(s.bucketsIn, gotBuckets, interjections, true)
|
||||||
require.Equal(t, s.bucketsOut, gotBuckets)
|
require.Equal(t, s.bucketsOut, gotBuckets)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -156,6 +156,10 @@ func (a *xorAppender) AppendHistogram(t int64, h *histogram.Histogram) {
|
||||||
panic("appended a histogram to an xor chunk")
|
panic("appended a histogram to an xor chunk")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *xorAppender) AppendFloatHistogram(t int64, h *histogram.FloatHistogram) {
|
||||||
|
panic("appended a float histogram to an xor chunk")
|
||||||
|
}
|
||||||
|
|
||||||
func (a *xorAppender) Append(t int64, v float64) {
|
func (a *xorAppender) Append(t int64, v float64) {
|
||||||
var tDelta uint64
|
var tDelta uint64
|
||||||
num := binary.BigEndian.Uint16(a.b.bytes())
|
num := binary.BigEndian.Uint16(a.b.bytes())
|
||||||
|
|
Loading…
Reference in a new issue