From 6fd89a6fd2af4ed0469e86db7ab503dbd37fc535 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 20 Dec 2022 15:33:32 +0530 Subject: [PATCH] Add chunk encoding for float histogram (#11716) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Marc Tudurí Signed-off-by: Ganesh Vernekar Co-authored-by: Marc Tudurí --- tsdb/chunkenc/chunk.go | 38 +- tsdb/chunkenc/float_histogram.go | 759 ++++++++++++++++++++++++++ tsdb/chunkenc/float_histogram_test.go | 359 ++++++++++++ tsdb/chunkenc/histogram.go | 27 +- tsdb/chunkenc/histogram_meta.go | 32 +- tsdb/chunkenc/histogram_meta_test.go | 2 +- tsdb/chunkenc/xor.go | 4 + 7 files changed, 1202 insertions(+), 19 deletions(-) create mode 100644 tsdb/chunkenc/float_histogram.go create mode 100644 tsdb/chunkenc/float_histogram_test.go diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 0b7117ce9e..b7d240123b 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -30,6 +30,7 @@ const ( EncNone Encoding = iota EncXOR EncHistogram + EncFloatHistogram ) func (e Encoding) String() string { @@ -40,6 +41,8 @@ func (e Encoding) String() string { return "XOR" case EncHistogram: return "histogram" + case EncFloatHistogram: + return "floathistogram" } return "" } @@ -57,7 +60,7 @@ func IsOutOfOrderChunk(e Encoding) bool { // IsValidEncoding returns true for supported encodings. func IsValidEncoding(e Encoding) bool { - return e == EncXOR || e == EncOOOXOR || e == EncHistogram + return e == EncXOR || e == EncOOOXOR || e == EncHistogram || e == EncFloatHistogram } // Chunk holds a sequence of sample pairs that can be iterated over and appended to. @@ -91,6 +94,7 @@ type Chunk interface { type Appender interface { Append(int64, float64) AppendHistogram(t int64, h *histogram.Histogram) + AppendFloatHistogram(t int64, h *histogram.FloatHistogram) } // Iterator is a simple iterator that can only get the next value. @@ -159,6 +163,8 @@ func (v ValueType) ChunkEncoding() Encoding { return EncXOR case ValHistogram: return EncHistogram + case ValFloatHistogram: + return EncFloatHistogram default: return EncNone } @@ -228,8 +234,9 @@ type Pool interface { // pool is a memory pool of chunk objects. type pool struct { - xor sync.Pool - histogram sync.Pool + xor sync.Pool + histogram sync.Pool + floatHistogram sync.Pool } // NewPool returns a new pool. @@ -245,6 +252,11 @@ func NewPool() Pool { return &HistogramChunk{b: bstream{}} }, }, + floatHistogram: sync.Pool{ + New: func() interface{} { + return &FloatHistogramChunk{b: bstream{}} + }, + }, } } @@ -260,6 +272,11 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { c.b.stream = b c.b.count = 0 return c, nil + case EncFloatHistogram: + c := p.floatHistogram.Get().(*FloatHistogramChunk) + c.b.stream = b + c.b.count = 0 + return c, nil } return nil, errors.Errorf("invalid chunk encoding %q", e) } @@ -288,6 +305,17 @@ func (p *pool) Put(c Chunk) error { sh.b.stream = nil sh.b.count = 0 p.histogram.Put(c) + case EncFloatHistogram: + sh, ok := c.(*FloatHistogramChunk) + // This may happen often with wrapped chunks. Nothing we can really do about + // it but returning an error would cause a lot of allocations again. Thus, + // we just skip it. + if !ok { + return nil + } + sh.b.stream = nil + sh.b.count = 0 + p.floatHistogram.Put(c) default: return errors.Errorf("invalid chunk encoding %q", c.Encoding()) } @@ -303,6 +331,8 @@ func FromData(e Encoding, d []byte) (Chunk, error) { return &XORChunk{b: bstream{count: 0, stream: d}}, nil case EncHistogram: return &HistogramChunk{b: bstream{count: 0, stream: d}}, nil + case EncFloatHistogram: + return &FloatHistogramChunk{b: bstream{count: 0, stream: d}}, nil } return nil, errors.Errorf("invalid chunk encoding %q", e) } @@ -314,6 +344,8 @@ func NewEmptyChunk(e Encoding) (Chunk, error) { return NewXORChunk(), nil case EncHistogram: return NewHistogramChunk(), nil + case EncFloatHistogram: + return NewFloatHistogramChunk(), nil } return nil, errors.Errorf("invalid chunk encoding %q", e) } diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go new file mode 100644 index 0000000000..142dc42035 --- /dev/null +++ b/tsdb/chunkenc/float_histogram.go @@ -0,0 +1,759 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "encoding/binary" + "math" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/value" +) + +// FloatHistogramChunk holds encoded sample data for a sparse, high-resolution +// float histogram. +// +// Each sample has multiple "fields", stored in the following way (raw = store +// number directly, delta = store delta to the previous number, dod = store +// delta of the delta to the previous number, xor = what we do for regular +// sample values): +// +// field → ts count zeroCount sum []posbuckets []negbuckets +// sample 1 raw raw raw raw []raw []raw +// sample 2 delta xor xor xor []xor []xor +// sample >2 dod xor xor xor []xor []xor +type FloatHistogramChunk struct { + b bstream +} + +// NewFloatHistogramChunk returns a new chunk with float histogram encoding. +func NewFloatHistogramChunk() *FloatHistogramChunk { + b := make([]byte, 3, 128) + return &FloatHistogramChunk{b: bstream{stream: b, count: 0}} +} + +// xorValue holds all the necessary information to encode +// and decode XOR encoded float64 values. +type xorValue struct { + value float64 + leading uint8 + trailing uint8 +} + +// Encoding returns the encoding type. +func (c *FloatHistogramChunk) Encoding() Encoding { + return EncFloatHistogram +} + +// Bytes returns the underlying byte slice of the chunk. +func (c *FloatHistogramChunk) Bytes() []byte { + return c.b.bytes() +} + +// NumSamples returns the number of samples in the chunk. +func (c *FloatHistogramChunk) NumSamples() int { + return int(binary.BigEndian.Uint16(c.Bytes())) +} + +// Layout returns the histogram layout. Only call this on chunks that have at +// least one sample. +func (c *FloatHistogramChunk) Layout() ( + schema int32, zeroThreshold float64, + negativeSpans, positiveSpans []histogram.Span, + err error, +) { + if c.NumSamples() == 0 { + panic("FloatHistogramChunk.Layout() called on an empty chunk") + } + b := newBReader(c.Bytes()[2:]) + return readHistogramChunkLayout(&b) +} + +// SetCounterResetHeader sets the counter reset header. +func (c *FloatHistogramChunk) SetCounterResetHeader(h CounterResetHeader) { + setCounterResetHeader(h, c.Bytes()) +} + +// GetCounterResetHeader returns the info about the first 2 bits of the chunk +// header. +func (c *FloatHistogramChunk) GetCounterResetHeader() CounterResetHeader { + return CounterResetHeader(c.Bytes()[2] & 0b11000000) +} + +// Compact implements the Chunk interface. +func (c *FloatHistogramChunk) Compact() { + if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { + buf := make([]byte, l) + copy(buf, c.b.stream) + c.b.stream = buf + } +} + +// Appender implements the Chunk interface. +func (c *FloatHistogramChunk) Appender() (Appender, error) { + it := c.iterator(nil) + + // To get an appender, we must know the state it would have if we had + // appended all existing data from scratch. We iterate through the end + // and populate via the iterator's state. + for it.Next() == ValFloatHistogram { + } + if err := it.Err(); err != nil { + return nil, err + } + + pBuckets := make([]xorValue, len(it.pBuckets)) + for i := 0; i < len(it.pBuckets); i++ { + pBuckets[i] = xorValue{ + value: it.pBuckets[i], + leading: it.pBucketsLeading[i], + trailing: it.pBucketsTrailing[i], + } + } + nBuckets := make([]xorValue, len(it.nBuckets)) + for i := 0; i < len(it.nBuckets); i++ { + nBuckets[i] = xorValue{ + value: it.nBuckets[i], + leading: it.nBucketsLeading[i], + trailing: it.nBucketsTrailing[i], + } + } + + a := &FloatHistogramAppender{ + b: &c.b, + + schema: it.schema, + zThreshold: it.zThreshold, + pSpans: it.pSpans, + nSpans: it.nSpans, + t: it.t, + tDelta: it.tDelta, + cnt: it.cnt, + zCnt: it.zCnt, + pBuckets: pBuckets, + nBuckets: nBuckets, + sum: it.sum, + } + if it.numTotal == 0 { + a.sum.leading = 0xff + a.cnt.leading = 0xff + a.zCnt.leading = 0xff + } + return a, nil +} + +func (c *FloatHistogramChunk) iterator(it Iterator) *floatHistogramIterator { + // This comment is copied from XORChunk.iterator: + // Should iterators guarantee to act on a copy of the data so it doesn't lock append? + // When using striped locks to guard access to chunks, probably yes. + // Could only copy data if the chunk is not completed yet. + if histogramIter, ok := it.(*floatHistogramIterator); ok { + histogramIter.Reset(c.b.bytes()) + return histogramIter + } + return newFloatHistogramIterator(c.b.bytes()) +} + +func newFloatHistogramIterator(b []byte) *floatHistogramIterator { + it := &floatHistogramIterator{ + br: newBReader(b), + numTotal: binary.BigEndian.Uint16(b), + t: math.MinInt64, + } + // The first 3 bytes contain chunk headers. + // We skip that for actual samples. + _, _ = it.br.readBits(24) + return it +} + +// Iterator implements the Chunk interface. +func (c *FloatHistogramChunk) Iterator(it Iterator) Iterator { + return c.iterator(it) +} + +// FloatHistogramAppender is an Appender implementation for float histograms. +type FloatHistogramAppender struct { + b *bstream + + // Layout: + schema int32 + zThreshold float64 + pSpans, nSpans []histogram.Span + + t, tDelta int64 + sum, cnt, zCnt xorValue + pBuckets, nBuckets []xorValue +} + +// Append implements Appender. This implementation panics because normal float +// samples must never be appended to a histogram chunk. +func (a *FloatHistogramAppender) Append(int64, float64) { + panic("appended a float sample to a histogram chunk") +} + +// AppendHistogram implements Appender. This implementation panics because integer +// histogram samples must never be appended to a float histogram chunk. +func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) { + panic("appended an integer histogram to a float histogram chunk") +} + +// Appendable returns whether the chunk can be appended to, and if so +// whether any recoding needs to happen using the provided interjections +// (in case of any new buckets, positive or negative range, respectively). +// +// The chunk is not appendable in the following cases: +// +// • The schema has changed. +// +// • The threshold for the zero bucket has changed. +// +// • Any buckets have disappeared. +// +// • There was a counter reset in the count of observations or in any bucket, +// including the zero bucket. +// +// • The last sample in the chunk was stale while the current sample is not stale. +// +// The method returns an additional boolean set to true if it is not appendable +// because of a counter reset. If the given sample is stale, it is always ok to +// append. If counterReset is true, okToAppend is always false. +func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( + positiveInterjections, negativeInterjections []Interjection, + okToAppend, counterReset bool, +) { + if value.IsStaleNaN(h.Sum) { + // This is a stale sample whose buckets and spans don't matter. + okToAppend = true + return + } + if value.IsStaleNaN(a.sum.value) { + // If the last sample was stale, then we can only accept stale + // samples in this chunk. + return + } + + if h.Count < a.cnt.value { + // There has been a counter reset. + counterReset = true + return + } + + if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold { + return + } + + if h.ZeroCount < a.zCnt.value { + // There has been a counter reset since ZeroThreshold didn't change. + counterReset = true + return + } + + var ok bool + positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans) + if !ok { + counterReset = true + return + } + negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans) + if !ok { + counterReset = true + return + } + + if counterResetInAnyFloatBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) || + counterResetInAnyFloatBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) { + counterReset, positiveInterjections, negativeInterjections = true, nil, nil + return + } + + okToAppend = true + return +} + +// counterResetInAnyFloatBucket returns true if there was a counter reset for any +// bucket. This should be called only when the bucket layout is the same or new +// buckets were added. It does not handle the case of buckets missing. +func counterResetInAnyFloatBucket(oldBuckets []xorValue, newBuckets []float64, oldSpans, newSpans []histogram.Span) bool { + if len(oldSpans) == 0 || len(oldBuckets) == 0 { + return false + } + + oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices. + oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span. + oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset + + oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice. + oldVal, newVal := oldBuckets[0].value, newBuckets[0] + + // Since we assume that new spans won't have missing buckets, there will never be a case + // where the old index will not find a matching new index. + for { + if oldIdx == newIdx { + if newVal < oldVal { + return true + } + } + + if oldIdx <= newIdx { + // Moving ahead old bucket and span by 1 index. + if oldInsideSpanIdx == oldSpans[oldSpanSliceIdx].Length-1 { + // Current span is over. + oldSpanSliceIdx++ + oldInsideSpanIdx = 0 + if oldSpanSliceIdx >= len(oldSpans) { + // All old spans are over. + break + } + oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset + } else { + oldInsideSpanIdx++ + oldIdx++ + } + oldBucketSliceIdx++ + oldVal = oldBuckets[oldBucketSliceIdx].value + } + + if oldIdx > newIdx { + // Moving ahead new bucket and span by 1 index. + if newInsideSpanIdx == newSpans[newSpanSliceIdx].Length-1 { + // Current span is over. + newSpanSliceIdx++ + newInsideSpanIdx = 0 + if newSpanSliceIdx >= len(newSpans) { + // All new spans are over. + // This should not happen, old spans above should catch this first. + panic("new spans over before old spans in counterReset") + } + newIdx += 1 + newSpans[newSpanSliceIdx].Offset + } else { + newInsideSpanIdx++ + newIdx++ + } + newBucketSliceIdx++ + newVal = newBuckets[newBucketSliceIdx] + } + } + + return false +} + +// AppendFloatHistogram appends a float histogram to the chunk. The caller must ensure that +// the histogram is properly structured, e.g. the number of buckets used +// corresponds to the number conveyed by the span structures. First call +// Appendable() and act accordingly! +func (a *FloatHistogramAppender) AppendFloatHistogram(t int64, h *histogram.FloatHistogram) { + var tDelta int64 + num := binary.BigEndian.Uint16(a.b.bytes()) + + if value.IsStaleNaN(h.Sum) { + // Emptying out other fields to write no buckets, and an empty + // layout in case of first histogram in the chunk. + h = &histogram.FloatHistogram{Sum: h.Sum} + } + + if num == 0 { + // The first append gets the privilege to dictate the layout + // but it's also responsible for encoding it into the chunk! + writeHistogramChunkLayout(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) + a.schema = h.Schema + a.zThreshold = h.ZeroThreshold + + if len(h.PositiveSpans) > 0 { + a.pSpans = make([]histogram.Span, len(h.PositiveSpans)) + copy(a.pSpans, h.PositiveSpans) + } else { + a.pSpans = nil + } + if len(h.NegativeSpans) > 0 { + a.nSpans = make([]histogram.Span, len(h.NegativeSpans)) + copy(a.nSpans, h.NegativeSpans) + } else { + a.nSpans = nil + } + + numPBuckets, numNBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans) + if numPBuckets > 0 { + a.pBuckets = make([]xorValue, numPBuckets) + for i := 0; i < numPBuckets; i++ { + a.pBuckets[i] = xorValue{ + value: h.PositiveBuckets[i], + leading: 0xff, + } + } + } else { + a.pBuckets = nil + } + if numNBuckets > 0 { + a.nBuckets = make([]xorValue, numNBuckets) + for i := 0; i < numNBuckets; i++ { + a.nBuckets[i] = xorValue{ + value: h.NegativeBuckets[i], + leading: 0xff, + } + } + } else { + a.nBuckets = nil + } + + // Now store the actual data. + putVarbitInt(a.b, t) + a.b.writeBits(math.Float64bits(h.Count), 64) + a.b.writeBits(math.Float64bits(h.ZeroCount), 64) + a.b.writeBits(math.Float64bits(h.Sum), 64) + a.cnt.value = h.Count + a.zCnt.value = h.ZeroCount + a.sum.value = h.Sum + for _, b := range h.PositiveBuckets { + a.b.writeBits(math.Float64bits(b), 64) + } + for _, b := range h.NegativeBuckets { + a.b.writeBits(math.Float64bits(b), 64) + } + } else { + // The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code, + // so we don't need a separate single delta logic for the 2nd sample. + tDelta = t - a.t + tDod := tDelta - a.tDelta + putVarbitInt(a.b, tDod) + + a.writeXorValue(&a.cnt, h.Count) + a.writeXorValue(&a.zCnt, h.ZeroCount) + a.writeXorValue(&a.sum, h.Sum) + + for i, b := range h.PositiveBuckets { + a.writeXorValue(&a.pBuckets[i], b) + } + for i, b := range h.NegativeBuckets { + a.writeXorValue(&a.nBuckets[i], b) + } + } + + binary.BigEndian.PutUint16(a.b.bytes(), num+1) + + a.t = t + a.tDelta = tDelta +} + +func (a *FloatHistogramAppender) writeXorValue(old *xorValue, v float64) { + xorWrite(a.b, v, old.value, &old.leading, &old.trailing) + old.value = v +} + +// Recode converts the current chunk to accommodate an expansion of the set of +// (positive and/or negative) buckets used, according to the provided +// interjections, resulting in the honoring of the provided new positive and +// negative spans. To continue appending, use the returned Appender rather than +// the receiver of this method. +func (a *FloatHistogramAppender) Recode( + positiveInterjections, negativeInterjections []Interjection, + positiveSpans, negativeSpans []histogram.Span, +) (Chunk, Appender) { + // TODO(beorn7): This currently just decodes everything and then encodes + // it again with the new span layout. This can probably be done in-place + // by editing the chunk. But let's first see how expensive it is in the + // big picture. Also, in-place editing might create concurrency issues. + byts := a.b.bytes() + it := newFloatHistogramIterator(byts) + hc := NewFloatHistogramChunk() + app, err := hc.Appender() + if err != nil { + panic(err) + } + numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans) + + for it.Next() == ValFloatHistogram { + tOld, hOld := it.AtFloatHistogram() + + // We have to newly allocate slices for the modified buckets + // here because they are kept by the appender until the next + // append. + // TODO(beorn7): We might be able to optimize this. + var positiveBuckets, negativeBuckets []float64 + if numPositiveBuckets > 0 { + positiveBuckets = make([]float64, numPositiveBuckets) + } + if numNegativeBuckets > 0 { + negativeBuckets = make([]float64, numNegativeBuckets) + } + + // Save the modified histogram to the new chunk. + hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans + if len(positiveInterjections) > 0 { + hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections, false) + } + if len(negativeInterjections) > 0 { + hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections, false) + } + app.AppendFloatHistogram(tOld, hOld) + } + + hc.SetCounterResetHeader(CounterResetHeader(byts[2] & 0b11000000)) + return hc, app +} + +type floatHistogramIterator struct { + br bstreamReader + numTotal uint16 + numRead uint16 + + // Layout: + schema int32 + zThreshold float64 + pSpans, nSpans []histogram.Span + + // For the fields that are tracked as deltas and ultimately dod's. + t int64 + tDelta int64 + + // All Gorilla xor encoded. + sum, cnt, zCnt xorValue + + // Buckets are not of type xorValue to avoid creating + // new slices for every AtFloatHistogram call. + pBuckets, nBuckets []float64 + pBucketsLeading, nBucketsLeading []uint8 + pBucketsTrailing, nBucketsTrailing []uint8 + + err error + + // Track calls to retrieve methods. Once they have been called, we + // cannot recycle the bucket slices anymore because we have returned + // them in the histogram. + atFloatHistogramCalled bool +} + +func (it *floatHistogramIterator) Seek(t int64) ValueType { + if it.err != nil { + return ValNone + } + + for t > it.t || it.numRead == 0 { + if it.Next() == ValNone { + return ValNone + } + } + return ValFloatHistogram +} + +func (it *floatHistogramIterator) At() (int64, float64) { + panic("cannot call floatHistogramIterator.At") +} + +func (it *floatHistogramIterator) AtHistogram() (int64, *histogram.Histogram) { + panic("cannot call floatHistogramIterator.AtHistogram") +} + +func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { + if value.IsStaleNaN(it.sum.value) { + return it.t, &histogram.FloatHistogram{Sum: it.sum.value} + } + it.atFloatHistogramCalled = true + return it.t, &histogram.FloatHistogram{ + Count: it.cnt.value, + ZeroCount: it.zCnt.value, + Sum: it.sum.value, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pBuckets, + NegativeBuckets: it.nBuckets, + } +} + +func (it *floatHistogramIterator) AtT() int64 { + return it.t +} + +func (it *floatHistogramIterator) Err() error { + return it.err +} + +func (it *floatHistogramIterator) Reset(b []byte) { + // The first 3 bytes contain chunk headers. + // We skip that for actual samples. + it.br = newBReader(b[3:]) + it.numTotal = binary.BigEndian.Uint16(b) + it.numRead = 0 + + it.t, it.tDelta = 0, 0 + it.cnt, it.zCnt, it.sum = xorValue{}, xorValue{}, xorValue{} + + if it.atFloatHistogramCalled { + it.atFloatHistogramCalled = false + it.pBuckets, it.nBuckets = nil, nil + } else { + it.pBuckets, it.nBuckets = it.pBuckets[:0], it.nBuckets[:0] + } + it.pBucketsLeading, it.pBucketsTrailing = it.pBucketsLeading[:0], it.pBucketsTrailing[:0] + it.nBucketsLeading, it.nBucketsTrailing = it.nBucketsLeading[:0], it.nBucketsTrailing[:0] + + it.err = nil +} + +func (it *floatHistogramIterator) Next() ValueType { + if it.err != nil || it.numRead == it.numTotal { + return ValNone + } + + if it.numRead == 0 { + // The first read is responsible for reading the chunk layout + // and for initializing fields that depend on it. We give + // counter reset info at chunk level, hence we discard it here. + schema, zeroThreshold, posSpans, negSpans, err := readHistogramChunkLayout(&it.br) + if err != nil { + it.err = err + return ValNone + } + it.schema = schema + it.zThreshold = zeroThreshold + it.pSpans, it.nSpans = posSpans, negSpans + numPBuckets, numNBuckets := countSpans(posSpans), countSpans(negSpans) + // Allocate bucket slices as needed, recycling existing slices + // in case this iterator was reset and already has slices of a + // sufficient capacity. + if numPBuckets > 0 { + it.pBuckets = append(it.pBuckets, make([]float64, numPBuckets)...) + it.pBucketsLeading = append(it.pBucketsLeading, make([]uint8, numPBuckets)...) + it.pBucketsTrailing = append(it.pBucketsTrailing, make([]uint8, numPBuckets)...) + } + if numNBuckets > 0 { + it.nBuckets = append(it.nBuckets, make([]float64, numNBuckets)...) + it.nBucketsLeading = append(it.nBucketsLeading, make([]uint8, numNBuckets)...) + it.nBucketsTrailing = append(it.nBucketsTrailing, make([]uint8, numNBuckets)...) + } + + // Now read the actual data. + t, err := readVarbitInt(&it.br) + if err != nil { + it.err = err + return ValNone + } + it.t = t + + cnt, err := it.br.readBits(64) + if err != nil { + it.err = err + return ValNone + } + it.cnt.value = math.Float64frombits(cnt) + + zcnt, err := it.br.readBits(64) + if err != nil { + it.err = err + return ValNone + } + it.zCnt.value = math.Float64frombits(zcnt) + + sum, err := it.br.readBits(64) + if err != nil { + it.err = err + return ValNone + } + it.sum.value = math.Float64frombits(sum) + + for i := range it.pBuckets { + v, err := it.br.readBits(64) + if err != nil { + it.err = err + return ValNone + } + it.pBuckets[i] = math.Float64frombits(v) + } + for i := range it.nBuckets { + v, err := it.br.readBits(64) + if err != nil { + it.err = err + return ValNone + } + it.nBuckets[i] = math.Float64frombits(v) + } + + it.numRead++ + return ValFloatHistogram + } + + // The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code, + // so we don't need a separate single delta logic for the 2nd sample. + + // Recycle bucket slices that have not been returned yet. Otherwise, copy them. + // We can always recycle the slices for leading and trailing bits as they are + // never returned to the caller. + if it.atFloatHistogramCalled { + it.atFloatHistogramCalled = false + if len(it.pBuckets) > 0 { + newBuckets := make([]float64, len(it.pBuckets)) + copy(newBuckets, it.pBuckets) + it.pBuckets = newBuckets + } else { + it.pBuckets = nil + } + if len(it.nBuckets) > 0 { + newBuckets := make([]float64, len(it.nBuckets)) + copy(newBuckets, it.nBuckets) + it.nBuckets = newBuckets + } else { + it.nBuckets = nil + } + } + + tDod, err := readVarbitInt(&it.br) + if err != nil { + it.err = err + return ValNone + } + it.tDelta = it.tDelta + tDod + it.t += it.tDelta + + if ok := it.readXor(&it.cnt.value, &it.cnt.leading, &it.cnt.trailing); !ok { + return ValNone + } + + if ok := it.readXor(&it.zCnt.value, &it.zCnt.leading, &it.zCnt.trailing); !ok { + return ValNone + } + + if ok := it.readXor(&it.sum.value, &it.sum.leading, &it.sum.trailing); !ok { + return ValNone + } + + if value.IsStaleNaN(it.sum.value) { + it.numRead++ + return ValFloatHistogram + } + + for i := range it.pBuckets { + if ok := it.readXor(&it.pBuckets[i], &it.pBucketsLeading[i], &it.pBucketsTrailing[i]); !ok { + return ValNone + } + } + + for i := range it.nBuckets { + if ok := it.readXor(&it.nBuckets[i], &it.nBucketsLeading[i], &it.nBucketsTrailing[i]); !ok { + return ValNone + } + } + + it.numRead++ + return ValFloatHistogram +} + +func (it *floatHistogramIterator) readXor(v *float64, leading, trailing *uint8) bool { + err := xorRead(&it.br, v, leading, trailing) + if err != nil { + it.err = err + return false + } + return true +} diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go new file mode 100644 index 0000000000..9308c3b3d8 --- /dev/null +++ b/tsdb/chunkenc/float_histogram_test.go @@ -0,0 +1,359 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" +) + +type floatResult struct { + t int64 + h *histogram.FloatHistogram +} + +func TestFloatHistogramChunkSameBuckets(t *testing.T) { + c := NewFloatHistogramChunk() + var exp []floatResult + + // Create fresh appender and add the first histogram. + app, err := c.Appender() + require.NoError(t, err) + require.Equal(t, 0, c.NumSamples()) + + ts := int64(1234567890) + h := &histogram.Histogram{ + Count: 15, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-100, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5) + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 1}, + {Offset: 2, Length: 3}, + }, + NegativeBuckets: []int64{2, 1, -1, -1}, // counts: 2, 3, 2, 1 (total 8) + } + app.AppendFloatHistogram(ts, h.ToFloat()) + exp = append(exp, floatResult{t: ts, h: h.ToFloat()}) + require.Equal(t, 1, c.NumSamples()) + + // Add an updated histogram. + ts += 16 + h = h.Copy() + h.Count = 32 + h.ZeroCount++ + h.Sum = 24.4 + h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14) + h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15) + app.AppendFloatHistogram(ts, h.ToFloat()) + exp = append(exp, floatResult{t: ts, h: h.ToFloat()}) + require.Equal(t, 2, c.NumSamples()) + + // Add update with new appender. + app, err = c.Appender() + require.NoError(t, err) + + ts += 14 + h = h.Copy() + h.Count = 54 + h.ZeroCount += 2 + h.Sum = 24.4 + h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27) + h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22) + app.AppendFloatHistogram(ts, h.ToFloat()) + exp = append(exp, floatResult{t: ts, h: h.ToFloat()}) + require.Equal(t, 3, c.NumSamples()) + + // 1. Expand iterator in simple case. + it := c.Iterator(nil) + require.NoError(t, it.Err()) + var act []floatResult + for it.Next() == ValFloatHistogram { + fts, fh := it.AtFloatHistogram() + act = append(act, floatResult{t: fts, h: fh}) + } + require.NoError(t, it.Err()) + require.Equal(t, exp, act) + + // 2. Expand second iterator while reusing first one. + it2 := c.Iterator(it) + var act2 []floatResult + for it2.Next() == ValFloatHistogram { + fts, fh := it2.AtFloatHistogram() + act2 = append(act2, floatResult{t: fts, h: fh}) + } + require.NoError(t, it2.Err()) + require.Equal(t, exp, act2) + + // 3. Now recycle an iterator that was never used to access anything. + itX := c.Iterator(nil) + for itX.Next() == ValFloatHistogram { + // Just iterate through without accessing anything. + } + it3 := c.iterator(itX) + var act3 []floatResult + for it3.Next() == ValFloatHistogram { + fts, fh := it3.AtFloatHistogram() + act3 = append(act3, floatResult{t: fts, h: fh}) + } + require.NoError(t, it3.Err()) + require.Equal(t, exp, act3) + + // 4. Test iterator Seek. + mid := len(exp) / 2 + it4 := c.Iterator(nil) + var act4 []floatResult + require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t)) + // Below ones should not matter. + require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t)) + require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t)) + fts, fh := it4.AtFloatHistogram() + act4 = append(act4, floatResult{t: fts, h: fh}) + for it4.Next() == ValFloatHistogram { + fts, fh := it4.AtFloatHistogram() + act4 = append(act4, floatResult{t: fts, h: fh}) + } + require.NoError(t, it4.Err()) + require.Equal(t, exp[mid:], act4) + require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1)) +} + +// Mimics the scenario described for compareSpans(). +func TestFloatHistogramChunkBucketChanges(t *testing.T) { + c := Chunk(NewFloatHistogramChunk()) + + // Create fresh appender and add the first histogram. + app, err := c.Appender() + require.NoError(t, err) + require.Equal(t, 0, c.NumSamples()) + + ts1 := int64(1234567890) + h1 := &histogram.Histogram{ + Count: 27, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) + NegativeSpans: []histogram.Span{{Offset: 1, Length: 1}}, + NegativeBuckets: []int64{1}, + } + + app.AppendFloatHistogram(ts1, h1.ToFloat()) + require.Equal(t, 1, c.NumSamples()) + + // Add a new histogram that has expanded buckets. + ts2 := ts1 + 16 + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.NegativeSpans = []histogram.Span{{Offset: 0, Length: 2}} + h2.Count = 35 + h2.ZeroCount++ + h2.Sum = 30 + // Existing histogram should get values converted from the above to: + // 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between) + // so the new histogram should have new counts >= these per-bucket counts, e.g.: + h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30) + // Existing histogram should get values converted from the above to: + // 0 1 (previous values with some new empty buckets in between) + // so the new histogram should have new counts >= these per-bucket counts, e.g.: + h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3) + // This is how span changes will be handled. + hApp, _ := app.(*FloatHistogramAppender) + posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat()) + require.Greater(t, len(posInterjections), 0) + require.Greater(t, len(negInterjections), 0) + require.True(t, ok) // Only new buckets came in. + require.False(t, cr) + c, app = hApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans) + app.AppendFloatHistogram(ts2, h2.ToFloat()) + + require.Equal(t, 2, c.NumSamples()) + + // Because the 2nd histogram has expanded buckets, we should expect all + // histograms (in particular the first) to come back using the new spans + // metadata as well as the expanded buckets. + h1.PositiveSpans = h2.PositiveSpans + h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1} + h1.NegativeSpans = h2.NegativeSpans + h1.NegativeBuckets = []int64{0, 1} + exp := []floatResult{ + {t: ts1, h: h1.ToFloat()}, + {t: ts2, h: h2.ToFloat()}, + } + it := c.Iterator(nil) + var act []floatResult + for it.Next() == ValFloatHistogram { + fts, fh := it.AtFloatHistogram() + act = append(act, floatResult{t: fts, h: fh}) + } + require.NoError(t, it.Err()) + require.Equal(t, exp, act) +} + +func TestFloatHistogramChunkAppendable(t *testing.T) { + c := Chunk(NewFloatHistogramChunk()) + + // Create fresh appender and add the first histogram. + app, err := c.Appender() + require.NoError(t, err) + require.Equal(t, 0, c.NumSamples()) + + ts := int64(1234567890) + h1 := &histogram.Histogram{ + Count: 5, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) + } + + app.AppendFloatHistogram(ts, h1.ToFloat()) + require.Equal(t, 1, c.NumSamples()) + + { // New histogram that has more buckets. + h2 := h1 + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Count += 9 + h2.ZeroCount++ + h2.Sum = 30 + // Existing histogram should get values converted from the above to: + // 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between) + // so the new histogram should have new counts >= these per-bucket counts, e.g.: + h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30) + + hApp, _ := app.(*FloatHistogramAppender) + posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat()) + require.Greater(t, len(posInterjections), 0) + require.Equal(t, 0, len(negInterjections)) + require.True(t, ok) // Only new buckets came in. + require.False(t, cr) + } + + { // New histogram that has a bucket missing. + h2 := h1 + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 5, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Sum = 21 + h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21) + + hApp, _ := app.(*FloatHistogramAppender) + posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat()) + require.Equal(t, 0, len(posInterjections)) + require.Equal(t, 0, len(negInterjections)) + require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) + } + + { // New histogram that has a counter reset while buckets are same. + h2 := h1 + h2.Sum = 23 + h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23) + + hApp, _ := app.(*FloatHistogramAppender) + posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat()) + require.Equal(t, 0, len(posInterjections)) + require.Equal(t, 0, len(negInterjections)) + require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) + } + + { // New histogram that has a counter reset while new buckets were added. + h2 := h1 + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Sum = 29 + // Existing histogram should get values converted from the above to: + // 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between) + // so the new histogram should have new counts >= these per-bucket counts, e.g.: + h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29) + + hApp, _ := app.(*FloatHistogramAppender) + posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat()) + require.Equal(t, 0, len(posInterjections)) + require.Equal(t, 0, len(negInterjections)) + require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) + } + + { + // New histogram that has a counter reset while new buckets were + // added before the first bucket and reset on first bucket. (to + // catch the edge case where the new bucket should be forwarded + // ahead until first old bucket at start) + h2 := h1 + h2.PositiveSpans = []histogram.Span{ + {Offset: -3, Length: 2}, + {Offset: 1, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Sum = 26 + // Existing histogram should get values converted from the above to: + // 0, 0, 6, 3, 3, 2, 4, 5, 1 + // so the new histogram should have new counts >= these per-bucket counts, e.g.: + h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26) + + hApp, _ := app.(*FloatHistogramAppender) + posInterjections, negInterjections, ok, cr := hApp.Appendable(h2.ToFloat()) + require.Equal(t, 0, len(posInterjections)) + require.Equal(t, 0, len(negInterjections)) + require.False(t, ok) // Need to cut a new chunk. + require.True(t, cr) + } +} diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 5aad382b2b..c633c14204 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -67,7 +67,7 @@ func (c *HistogramChunk) Layout() ( err error, ) { if c.NumSamples() == 0 { - panic("HistoChunk.Layout() called on an empty chunk") + panic("HistogramChunk.Layout() called on an empty chunk") } b := newBReader(c.Bytes()[2:]) return readHistogramChunkLayout(&b) @@ -88,17 +88,22 @@ const ( UnknownCounterReset CounterResetHeader = 0b00000000 ) -// SetCounterResetHeader sets the counter reset header. -func (c *HistogramChunk) SetCounterResetHeader(h CounterResetHeader) { +// setCounterResetHeader sets the counter reset header of the chunk +// The third byte of the chunk is the counter reset header. +func setCounterResetHeader(h CounterResetHeader, bytes []byte) { switch h { case CounterReset, NotCounterReset, GaugeType, UnknownCounterReset: - bytes := c.Bytes() bytes[2] = (bytes[2] & 0b00111111) | byte(h) default: panic("invalid CounterResetHeader type") } } +// SetCounterResetHeader sets the counter reset header. +func (c *HistogramChunk) SetCounterResetHeader(h CounterResetHeader) { + setCounterResetHeader(h, c.Bytes()) +} + // GetCounterResetHeader returns the info about the first 2 bits of the chunk // header. func (c *HistogramChunk) GetCounterResetHeader() CounterResetHeader { @@ -223,6 +228,12 @@ func (a *HistogramAppender) Append(int64, float64) { panic("appended a float sample to a histogram chunk") } +// AppendFloatHistogram implements Appender. This implementation panics because float +// histogram samples must never be appended to a histogram chunk. +func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogram) { + panic("appended a float histogram to a histogram chunk") +} + // Appendable returns whether the chunk can be appended to, and if so // whether any recoding needs to happen using the provided interjections // (in case of any new buckets, positive or negative range, respectively). @@ -296,6 +307,10 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( return } +type bucketValue interface { + int64 | float64 +} + // counterResetInAnyBucket returns true if there was a counter reset for any // bucket. This should be called only when the bucket layout is the same or new // buckets were added. It does not handle the case of buckets missing. @@ -515,10 +530,10 @@ func (a *HistogramAppender) Recode( // Save the modified histogram to the new chunk. hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans if len(positiveInterjections) > 0 { - hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections) + hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections, true) } if len(negativeInterjections) > 0 { - hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections) + hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections, true) } app.AppendHistogram(tOld, hOld) } diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index 7a4407305c..34768afb28 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -280,19 +280,23 @@ loop: // interject merges 'in' with the provided interjections and writes them into // 'out', which must already have the appropriate length. -func interject(in, out []int64, interjections []Interjection) []int64 { +func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV { var ( - j int // Position in out. - v int64 // The last value seen. - interj int // The next interjection to process. + j int // Position in out. + v BV // The last value seen. + interj int // The next interjection to process. ) for i, d := range in { if interj < len(interjections) && i == interjections[interj].pos { // We have an interjection! - // Add interjection.num new delta values such that their - // bucket values equate 0. - out[j] = int64(-v) + // Add interjection.num new delta values such that their bucket values equate 0. + // When deltas==false, it means that it is an absolute value. So we set it to 0 directly. + if deltas { + out[j] = -v + } else { + out[j] = 0 + } j++ for x := 1; x < interjections[interj].num; x++ { out[j] = 0 @@ -304,7 +308,13 @@ func interject(in, out []int64, interjections []Interjection) []int64 { // should save is the original delta value + the last // value of the point before the interjection (to undo // the delta that was introduced by the interjection). - out[j] = d + v + // When deltas==false, it means that it is an absolute value, + // so we set it directly to the value in the 'in' slice. + if deltas { + out[j] = d + v + } else { + out[j] = d + } j++ v = d + v continue @@ -321,7 +331,11 @@ func interject(in, out []int64, interjections []Interjection) []int64 { // All interjections processed. Nothing more to do. case len(interjections) - 1: // One more interjection to process at the end. - out[j] = int64(-v) + if deltas { + out[j] = -v + } else { + out[j] = 0 + } j++ for x := 1; x < interjections[interj].num; x++ { out[j] = 0 diff --git a/tsdb/chunkenc/histogram_meta_test.go b/tsdb/chunkenc/histogram_meta_test.go index e3ae4149b6..30d2eef3a8 100644 --- a/tsdb/chunkenc/histogram_meta_test.go +++ b/tsdb/chunkenc/histogram_meta_test.go @@ -290,7 +290,7 @@ func TestInterjection(t *testing.T) { require.Equal(t, s.interjections, interjections) gotBuckets := make([]int64, len(s.bucketsOut)) - interject(s.bucketsIn, gotBuckets, interjections) + interject(s.bucketsIn, gotBuckets, interjections, true) require.Equal(t, s.bucketsOut, gotBuckets) }) } diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 10d650d596..62e90cbaae 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -156,6 +156,10 @@ func (a *xorAppender) AppendHistogram(t int64, h *histogram.Histogram) { panic("appended a histogram to an xor chunk") } +func (a *xorAppender) AppendFloatHistogram(t int64, h *histogram.FloatHistogram) { + panic("appended a float histogram to an xor chunk") +} + func (a *xorAppender) Append(t int64, v float64) { var tDelta uint64 num := binary.BigEndian.Uint16(a.b.bytes())