mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-01 00:47:26 -08:00
f2064c7987
promql, tsdb (histograms): Do not re-use spans between histograms When multiple points exist with the same native histogram schemas they share their spans. This causes a problem when a native histogram (NH) schema is modified (for example, during a Sum) then the other NH's with the same spans are also modified. As such, we should create a new Span for each NH. This will ensure NH's interfaces are safe to use without considering the effect on other histograms. At the moment this doesn't present itself as a problem because in all aggregations and functions operating on native histograms they are copied by the promql query engine first. Signed-off-by: Joshua Hesketh <josh@nitrotech.org> --------- Signed-off-by: Joshua Hesketh <josh@nitrotech.org>
1320 lines
40 KiB
Go
1320 lines
40 KiB
Go
// Copyright 2021 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package chunkenc
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"math"
|
|
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/value"
|
|
)
|
|
|
|
// HistogramChunk holds encoded sample data for a sparse, high-resolution
|
|
// histogram.
|
|
//
|
|
// Each sample has multiple "fields", stored in the following way (raw = store
|
|
// number directly, delta = store delta to the previous number, dod = store
|
|
// delta of the delta to the previous number, xor = what we do for regular
|
|
// sample values):
|
|
//
|
|
// field → ts count zeroCount sum []posbuckets []negbuckets
|
|
// sample 1 raw raw raw raw []raw []raw
|
|
// sample 2 delta delta delta xor []delta []delta
|
|
// sample >2 dod dod dod xor []dod []dod
|
|
type HistogramChunk struct {
|
|
b bstream
|
|
}
|
|
|
|
// NewHistogramChunk returns a new chunk with histogram encoding of the given
|
|
// size.
|
|
func NewHistogramChunk() *HistogramChunk {
|
|
b := make([]byte, 3, 128)
|
|
return &HistogramChunk{b: bstream{stream: b, count: 0}}
|
|
}
|
|
|
|
func (c *HistogramChunk) Reset(stream []byte) {
|
|
c.b.Reset(stream)
|
|
}
|
|
|
|
// Encoding returns the encoding type.
|
|
func (c *HistogramChunk) Encoding() Encoding {
|
|
return EncHistogram
|
|
}
|
|
|
|
// Bytes returns the underlying byte slice of the chunk.
|
|
func (c *HistogramChunk) Bytes() []byte {
|
|
return c.b.bytes()
|
|
}
|
|
|
|
// NumSamples returns the number of samples in the chunk.
|
|
func (c *HistogramChunk) NumSamples() int {
|
|
return int(binary.BigEndian.Uint16(c.Bytes()))
|
|
}
|
|
|
|
// Layout returns the histogram layout. Only call this on chunks that have at
|
|
// least one sample.
|
|
func (c *HistogramChunk) Layout() (
|
|
schema int32, zeroThreshold float64,
|
|
negativeSpans, positiveSpans []histogram.Span,
|
|
customValues []float64,
|
|
err error,
|
|
) {
|
|
if c.NumSamples() == 0 {
|
|
panic("HistogramChunk.Layout() called on an empty chunk")
|
|
}
|
|
b := newBReader(c.Bytes()[2:])
|
|
return readHistogramChunkLayout(&b)
|
|
}
|
|
|
|
// CounterResetHeader defines the first 2 bits of the chunk header.
|
|
type CounterResetHeader byte
|
|
|
|
const (
|
|
// CounterReset means there was definitely a counter reset that resulted in this chunk.
|
|
CounterReset CounterResetHeader = 0b10000000
|
|
// NotCounterReset means there was definitely no counter reset when cutting this chunk.
|
|
NotCounterReset CounterResetHeader = 0b01000000
|
|
// GaugeType means this chunk contains a gauge histogram, where counter resets do not happen.
|
|
GaugeType CounterResetHeader = 0b11000000
|
|
// UnknownCounterReset means we cannot say if this chunk was created due to a counter reset or not.
|
|
// An explicit counter reset detection needs to happen during query time.
|
|
UnknownCounterReset CounterResetHeader = 0b00000000
|
|
)
|
|
|
|
// CounterResetHeaderMask is the mask to get the counter reset header bits.
|
|
const CounterResetHeaderMask byte = 0b11000000
|
|
|
|
// GetCounterResetHeader returns the info about the first 2 bits of the chunk
|
|
// header.
|
|
func (c *HistogramChunk) GetCounterResetHeader() CounterResetHeader {
|
|
return CounterResetHeader(c.Bytes()[2] & CounterResetHeaderMask)
|
|
}
|
|
|
|
// Compact implements the Chunk interface.
|
|
func (c *HistogramChunk) Compact() {
|
|
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
|
|
buf := make([]byte, l)
|
|
copy(buf, c.b.stream)
|
|
c.b.stream = buf
|
|
}
|
|
}
|
|
|
|
// Appender implements the Chunk interface.
|
|
func (c *HistogramChunk) Appender() (Appender, error) {
|
|
it := c.iterator(nil)
|
|
|
|
// To get an appender, we must know the state it would have if we had
|
|
// appended all existing data from scratch. We iterate through the end
|
|
// and populate via the iterator's state.
|
|
for it.Next() == ValHistogram {
|
|
}
|
|
if err := it.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
a := &HistogramAppender{
|
|
b: &c.b,
|
|
|
|
schema: it.schema,
|
|
zThreshold: it.zThreshold,
|
|
pSpans: it.pSpans,
|
|
nSpans: it.nSpans,
|
|
customValues: it.customValues,
|
|
t: it.t,
|
|
cnt: it.cnt,
|
|
zCnt: it.zCnt,
|
|
tDelta: it.tDelta,
|
|
cntDelta: it.cntDelta,
|
|
zCntDelta: it.zCntDelta,
|
|
pBuckets: it.pBuckets,
|
|
nBuckets: it.nBuckets,
|
|
pBucketsDelta: it.pBucketsDelta,
|
|
nBucketsDelta: it.nBucketsDelta,
|
|
|
|
sum: it.sum,
|
|
leading: it.leading,
|
|
trailing: it.trailing,
|
|
}
|
|
if it.numTotal == 0 {
|
|
a.leading = 0xff
|
|
}
|
|
return a, nil
|
|
}
|
|
|
|
func countSpans(spans []histogram.Span) int {
|
|
var cnt int
|
|
for _, s := range spans {
|
|
cnt += int(s.Length)
|
|
}
|
|
return cnt
|
|
}
|
|
|
|
func newHistogramIterator(b []byte) *histogramIterator {
|
|
it := &histogramIterator{
|
|
br: newBReader(b),
|
|
numTotal: binary.BigEndian.Uint16(b),
|
|
t: math.MinInt64,
|
|
}
|
|
// The first 3 bytes contain chunk headers.
|
|
// We skip that for actual samples.
|
|
_, _ = it.br.readBits(24)
|
|
it.counterResetHeader = CounterResetHeader(b[2] & CounterResetHeaderMask)
|
|
return it
|
|
}
|
|
|
|
func (c *HistogramChunk) iterator(it Iterator) *histogramIterator {
|
|
// This comment is copied from XORChunk.iterator:
|
|
// Should iterators guarantee to act on a copy of the data so it doesn't lock append?
|
|
// When using striped locks to guard access to chunks, probably yes.
|
|
// Could only copy data if the chunk is not completed yet.
|
|
if histogramIter, ok := it.(*histogramIterator); ok {
|
|
histogramIter.Reset(c.b.bytes())
|
|
return histogramIter
|
|
}
|
|
return newHistogramIterator(c.b.bytes())
|
|
}
|
|
|
|
// Iterator implements the Chunk interface.
|
|
func (c *HistogramChunk) Iterator(it Iterator) Iterator {
|
|
return c.iterator(it)
|
|
}
|
|
|
|
// HistogramAppender is an Appender implementation for sparse histograms.
|
|
type HistogramAppender struct {
|
|
b *bstream
|
|
|
|
// Layout:
|
|
schema int32
|
|
zThreshold float64
|
|
pSpans, nSpans []histogram.Span
|
|
customValues []float64
|
|
|
|
// Although we intend to start new chunks on counter resets, we still
|
|
// have to handle negative deltas for gauge histograms. Therefore, even
|
|
// deltas are signed types here (even for tDelta to not treat that one
|
|
// specially).
|
|
t int64
|
|
cnt, zCnt uint64
|
|
tDelta, cntDelta, zCntDelta int64
|
|
pBuckets, nBuckets []int64
|
|
pBucketsDelta, nBucketsDelta []int64
|
|
|
|
// The sum is Gorilla xor encoded.
|
|
sum float64
|
|
leading uint8
|
|
trailing uint8
|
|
}
|
|
|
|
func (a *HistogramAppender) GetCounterResetHeader() CounterResetHeader {
|
|
return CounterResetHeader(a.b.bytes()[2] & CounterResetHeaderMask)
|
|
}
|
|
|
|
func (a *HistogramAppender) setCounterResetHeader(cr CounterResetHeader) {
|
|
a.b.bytes()[2] = (a.b.bytes()[2] & (^CounterResetHeaderMask)) | (byte(cr) & CounterResetHeaderMask)
|
|
}
|
|
|
|
func (a *HistogramAppender) NumSamples() int {
|
|
return int(binary.BigEndian.Uint16(a.b.bytes()))
|
|
}
|
|
|
|
// Append implements Appender. This implementation panics because normal float
|
|
// samples must never be appended to a histogram chunk.
|
|
func (a *HistogramAppender) Append(int64, float64) {
|
|
panic("appended a float sample to a histogram chunk")
|
|
}
|
|
|
|
// appendable returns whether the chunk can be appended to, and if so whether
|
|
// 1. Any recoding needs to happen to the chunk using the provided forward
|
|
// inserts (in case of any new buckets, positive or negative range,
|
|
// respectively).
|
|
// 2. Any recoding needs to happen for the histogram being appended, using the
|
|
// backward inserts (in case of any missing buckets, positive or negative
|
|
// range, respectively).
|
|
//
|
|
// If the sample is a gauge histogram, AppendableGauge must be used instead.
|
|
//
|
|
// The chunk is not appendable in the following cases:
|
|
//
|
|
// - The schema has changed.
|
|
// - The custom bounds have changed if the current schema is custom buckets.
|
|
// - The threshold for the zero bucket has changed.
|
|
// - Any buckets have disappeared, unless the bucket count was 0, unused.
|
|
// Empty bucket can happen if the chunk was recoded and we're merging a non
|
|
// recoded histogram. In this case backward inserts will be provided.
|
|
// - There was a counter reset in the count of observations or in any bucket,
|
|
// including the zero bucket.
|
|
// - The last sample in the chunk was stale while the current sample is not stale.
|
|
//
|
|
// The method returns an additional boolean set to true if it is not appendable
|
|
// because of a counter reset. If the given sample is stale, it is always ok to
|
|
// append. If counterReset is true, okToAppend is always false.
|
|
func (a *HistogramAppender) appendable(h *histogram.Histogram) (
|
|
positiveInserts, negativeInserts []Insert,
|
|
backwardPositiveInserts, backwardNegativeInserts []Insert,
|
|
okToAppend, counterReset bool,
|
|
) {
|
|
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
|
|
return
|
|
}
|
|
if h.CounterResetHint == histogram.CounterReset {
|
|
// Always honor the explicit counter reset hint.
|
|
counterReset = true
|
|
return
|
|
}
|
|
if value.IsStaleNaN(h.Sum) {
|
|
// This is a stale sample whose buckets and spans don't matter.
|
|
okToAppend = true
|
|
return
|
|
}
|
|
if value.IsStaleNaN(a.sum) {
|
|
// If the last sample was stale, then we can only accept stale
|
|
// samples in this chunk.
|
|
return
|
|
}
|
|
|
|
if h.Count < a.cnt {
|
|
// There has been a counter reset.
|
|
counterReset = true
|
|
return
|
|
}
|
|
|
|
if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold {
|
|
return
|
|
}
|
|
|
|
if histogram.IsCustomBucketsSchema(h.Schema) && !histogram.FloatBucketsMatch(h.CustomValues, a.customValues) {
|
|
counterReset = true
|
|
return
|
|
}
|
|
|
|
if h.ZeroCount < a.zCnt {
|
|
// There has been a counter reset since ZeroThreshold didn't change.
|
|
counterReset = true
|
|
return
|
|
}
|
|
|
|
var ok bool
|
|
positiveInserts, backwardPositiveInserts, ok = expandIntSpansAndBuckets(a.pSpans, h.PositiveSpans, a.pBuckets, h.PositiveBuckets)
|
|
if !ok {
|
|
counterReset = true
|
|
return
|
|
}
|
|
negativeInserts, backwardNegativeInserts, ok = expandIntSpansAndBuckets(a.nSpans, h.NegativeSpans, a.nBuckets, h.NegativeBuckets)
|
|
if !ok {
|
|
counterReset = true
|
|
return
|
|
}
|
|
|
|
okToAppend = true
|
|
return
|
|
}
|
|
|
|
// expandIntSpansAndBuckets returns the inserts to expand the bucket spans 'a' so that
|
|
// they match the spans in 'b'. 'b' must cover the same or more buckets than
|
|
// 'a', otherwise the function will return false.
|
|
// The function also returns the inserts to expand 'b' to also cover all the
|
|
// buckets that are missing in 'b', but are present with 0 counter value in 'a'.
|
|
// The function also checks for counter resets between 'a' and 'b'.
|
|
//
|
|
// Example:
|
|
//
|
|
// Let's say the old buckets look like this:
|
|
//
|
|
// span syntax: [offset, length]
|
|
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
|
|
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
|
|
// raw values 6 3 3 2 4 5 1
|
|
// deltas 6 -3 0 -1 2 1 -4
|
|
//
|
|
// But now we introduce a new bucket layout. (Carefully chosen example where we
|
|
// have a span appended, one unchanged[*], one prepended, and two merge - in
|
|
// that order.)
|
|
//
|
|
// [*] unchanged in terms of which bucket indices they represent. but to achieve
|
|
// that, their offset needs to change if "disrupted" by spans changing ahead of
|
|
// them
|
|
//
|
|
// \/ this one is "unchanged"
|
|
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
|
|
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
|
|
// raw values 6 3 0 3 0 0 2 4 5 0 1
|
|
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
|
|
// delta mods: / \ / \ / \
|
|
//
|
|
// Note for histograms with delta-encoded buckets: Whenever any new buckets are
|
|
// introduced, the subsequent "old" bucket needs to readjust its delta to the
|
|
// new base of 0. Thus, for the caller who wants to transform the set of
|
|
// original deltas to a new set of deltas to match a new span layout that adds
|
|
// buckets, we simply need to generate a list of inserts.
|
|
//
|
|
// Note: Within expandSpansForward we don't have to worry about the changes to the
|
|
// spans themselves, thanks to the iterators we get to work with the more useful
|
|
// bucket indices (which of course directly correspond to the buckets we have to
|
|
// adjust).
|
|
func expandIntSpansAndBuckets(a, b []histogram.Span, aBuckets, bBuckets []int64) (forward, backward []Insert, ok bool) {
|
|
ai := newBucketIterator(a)
|
|
bi := newBucketIterator(b)
|
|
|
|
var aInserts []Insert // To insert into buckets of a, to make up for missing buckets in b.
|
|
var bInserts []Insert // To insert into buckets of b, to make up for missing empty(!) buckets in a.
|
|
|
|
// When aInter.num or bInter.num becomes > 0, this becomes a valid insert that should
|
|
// be yielded when we finish a streak of new buckets.
|
|
var aInter Insert
|
|
var bInter Insert
|
|
|
|
aIdx, aOK := ai.Next()
|
|
bIdx, bOK := bi.Next()
|
|
|
|
// Bucket count. Initialize the absolute count and index into the
|
|
// positive/negative counts or deltas array. The bucket count is
|
|
// used to detect counter reset as well as unused buckets in a.
|
|
var (
|
|
aCount int64
|
|
bCount int64
|
|
aCountIdx int
|
|
bCountIdx int
|
|
)
|
|
if aOK {
|
|
aCount = aBuckets[aCountIdx]
|
|
}
|
|
if bOK {
|
|
bCount = bBuckets[bCountIdx]
|
|
}
|
|
|
|
loop:
|
|
for {
|
|
switch {
|
|
case aOK && bOK:
|
|
switch {
|
|
case aIdx == bIdx: // Both have an identical bucket index.
|
|
// Bucket count. Check bucket for reset from a to b.
|
|
if aCount > bCount {
|
|
return nil, nil, false
|
|
}
|
|
|
|
// Finish WIP insert for a and reset.
|
|
if aInter.num > 0 {
|
|
aInserts = append(aInserts, aInter)
|
|
aInter.num = 0
|
|
}
|
|
|
|
// Finish WIP insert for b and reset.
|
|
if bInter.num > 0 {
|
|
bInserts = append(bInserts, bInter)
|
|
bInter.num = 0
|
|
}
|
|
|
|
aIdx, aOK = ai.Next()
|
|
bIdx, bOK = bi.Next()
|
|
aInter.pos++ // Advance potential insert position.
|
|
aCountIdx++ // Advance absolute bucket count index for a.
|
|
if aOK {
|
|
aCount += aBuckets[aCountIdx]
|
|
}
|
|
bInter.pos++ // Advance potential insert position.
|
|
bCountIdx++ // Advance absolute bucket count index for b.
|
|
if bOK {
|
|
bCount += bBuckets[bCountIdx]
|
|
}
|
|
|
|
continue
|
|
case aIdx < bIdx: // b misses a bucket index that is in a.
|
|
// This is ok if the count in a is 0, in which case we make a note to
|
|
// fill in the bucket in b and advance a.
|
|
if aCount == 0 {
|
|
bInter.num++ // Mark that we need to insert a bucket in b.
|
|
bInter.bucketIdx = aIdx
|
|
// Advance a
|
|
if aInter.num > 0 {
|
|
aInserts = append(aInserts, aInter)
|
|
aInter.num = 0
|
|
}
|
|
aIdx, aOK = ai.Next()
|
|
aInter.pos++
|
|
aCountIdx++
|
|
if aOK {
|
|
aCount += aBuckets[aCountIdx]
|
|
}
|
|
continue
|
|
}
|
|
// Otherwise we are missing a bucket that was in use in a, which is a reset.
|
|
return nil, nil, false
|
|
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
|
aInter.num++
|
|
aInter.bucketIdx = bIdx
|
|
// Advance b
|
|
if bInter.num > 0 {
|
|
bInserts = append(bInserts, bInter)
|
|
bInter.num = 0
|
|
}
|
|
bIdx, bOK = bi.Next()
|
|
bInter.pos++
|
|
bCountIdx++
|
|
if bOK {
|
|
bCount += bBuckets[bCountIdx]
|
|
}
|
|
}
|
|
case aOK && !bOK: // b misses a value that is in a.
|
|
// This is ok if the count in a is 0, in which case we make a note to
|
|
// fill in the bucket in b and advance a.
|
|
if aCount == 0 {
|
|
bInter.num++
|
|
bInter.bucketIdx = aIdx
|
|
// Advance a
|
|
if aInter.num > 0 {
|
|
aInserts = append(aInserts, aInter)
|
|
aInter.num = 0
|
|
}
|
|
aIdx, aOK = ai.Next()
|
|
aInter.pos++ // Advance potential insert position.
|
|
// Update absolute bucket counts for a.
|
|
aCountIdx++
|
|
if aOK {
|
|
aCount += aBuckets[aCountIdx]
|
|
}
|
|
continue
|
|
}
|
|
// Otherwise we are missing a bucket that was in use in a, which is a reset.
|
|
return nil, nil, false
|
|
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
|
aInter.num++
|
|
aInter.bucketIdx = bIdx
|
|
// Advance b
|
|
if bInter.num > 0 {
|
|
bInserts = append(bInserts, bInter)
|
|
bInter.num = 0
|
|
}
|
|
bIdx, bOK = bi.Next()
|
|
bInter.pos++ // Advance potential insert position.
|
|
// Update absolute bucket counts for b.
|
|
bCountIdx++
|
|
if bOK {
|
|
bCount += bBuckets[bCountIdx]
|
|
}
|
|
default: // Both iterators ran out. We're done.
|
|
if aInter.num > 0 {
|
|
aInserts = append(aInserts, aInter)
|
|
}
|
|
if bInter.num > 0 {
|
|
bInserts = append(bInserts, bInter)
|
|
}
|
|
break loop
|
|
}
|
|
}
|
|
|
|
return aInserts, bInserts, true
|
|
}
|
|
|
|
// appendableGauge returns whether the chunk can be appended to, and if so
|
|
// whether:
|
|
// 1. Any recoding needs to happen to the chunk using the provided forward
|
|
// inserts (in case of any new buckets, positive or negative range,
|
|
// respectively).
|
|
// 2. Any recoding needs to happen for the histogram being appended, using the
|
|
// backward inserts (in case of any missing buckets, positive or negative
|
|
// range, respectively).
|
|
//
|
|
// This method must be only used for gauge histograms.
|
|
//
|
|
// The chunk is not appendable in the following cases:
|
|
// - The schema has changed.
|
|
// - The custom bounds have changed if the current schema is custom buckets.
|
|
// - The threshold for the zero bucket has changed.
|
|
// - The last sample in the chunk was stale while the current sample is not stale.
|
|
func (a *HistogramAppender) appendableGauge(h *histogram.Histogram) (
|
|
positiveInserts, negativeInserts []Insert,
|
|
backwardPositiveInserts, backwardNegativeInserts []Insert,
|
|
positiveSpans, negativeSpans []histogram.Span,
|
|
okToAppend bool,
|
|
) {
|
|
if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType {
|
|
return
|
|
}
|
|
if value.IsStaleNaN(h.Sum) {
|
|
// This is a stale sample whose buckets and spans don't matter.
|
|
okToAppend = true
|
|
return
|
|
}
|
|
if value.IsStaleNaN(a.sum) {
|
|
// If the last sample was stale, then we can only accept stale
|
|
// samples in this chunk.
|
|
return
|
|
}
|
|
|
|
if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold {
|
|
return
|
|
}
|
|
|
|
if histogram.IsCustomBucketsSchema(h.Schema) && !histogram.FloatBucketsMatch(h.CustomValues, a.customValues) {
|
|
return
|
|
}
|
|
|
|
positiveInserts, backwardPositiveInserts, positiveSpans = expandSpansBothWays(a.pSpans, h.PositiveSpans)
|
|
negativeInserts, backwardNegativeInserts, negativeSpans = expandSpansBothWays(a.nSpans, h.NegativeSpans)
|
|
okToAppend = true
|
|
return
|
|
}
|
|
|
|
// appendHistogram appends a histogram to the chunk. The caller must ensure that
|
|
// the histogram is properly structured, e.g. the number of buckets used
|
|
// corresponds to the number conveyed by the span structures. First call
|
|
// Appendable() and act accordingly!
|
|
func (a *HistogramAppender) appendHistogram(t int64, h *histogram.Histogram) {
|
|
var tDelta, cntDelta, zCntDelta int64
|
|
num := binary.BigEndian.Uint16(a.b.bytes())
|
|
|
|
if value.IsStaleNaN(h.Sum) {
|
|
// Emptying out other fields to write no buckets, and an empty
|
|
// layout in case of first histogram in the chunk.
|
|
h = &histogram.Histogram{Sum: h.Sum}
|
|
}
|
|
|
|
if num == 0 {
|
|
// The first append gets the privilege to dictate the layout
|
|
// but it's also responsible for encoding it into the chunk!
|
|
writeHistogramChunkLayout(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans, h.CustomValues)
|
|
a.schema = h.Schema
|
|
a.zThreshold = h.ZeroThreshold
|
|
|
|
if len(h.PositiveSpans) > 0 {
|
|
a.pSpans = make([]histogram.Span, len(h.PositiveSpans))
|
|
copy(a.pSpans, h.PositiveSpans)
|
|
} else {
|
|
a.pSpans = nil
|
|
}
|
|
if len(h.NegativeSpans) > 0 {
|
|
a.nSpans = make([]histogram.Span, len(h.NegativeSpans))
|
|
copy(a.nSpans, h.NegativeSpans)
|
|
} else {
|
|
a.nSpans = nil
|
|
}
|
|
if len(h.CustomValues) > 0 {
|
|
a.customValues = make([]float64, len(h.CustomValues))
|
|
copy(a.customValues, h.CustomValues)
|
|
} else {
|
|
a.customValues = nil
|
|
}
|
|
|
|
numPBuckets, numNBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans)
|
|
if numPBuckets > 0 {
|
|
a.pBuckets = make([]int64, numPBuckets)
|
|
a.pBucketsDelta = make([]int64, numPBuckets)
|
|
} else {
|
|
a.pBuckets = nil
|
|
a.pBucketsDelta = nil
|
|
}
|
|
if numNBuckets > 0 {
|
|
a.nBuckets = make([]int64, numNBuckets)
|
|
a.nBucketsDelta = make([]int64, numNBuckets)
|
|
} else {
|
|
a.nBuckets = nil
|
|
a.nBucketsDelta = nil
|
|
}
|
|
|
|
// Now store the actual data.
|
|
putVarbitInt(a.b, t)
|
|
putVarbitUint(a.b, h.Count)
|
|
putVarbitUint(a.b, h.ZeroCount)
|
|
a.b.writeBits(math.Float64bits(h.Sum), 64)
|
|
for _, b := range h.PositiveBuckets {
|
|
putVarbitInt(a.b, b)
|
|
}
|
|
for _, b := range h.NegativeBuckets {
|
|
putVarbitInt(a.b, b)
|
|
}
|
|
} else {
|
|
// The case for the 2nd sample with single deltas is implicitly
|
|
// handled correctly with the double delta code, so we don't
|
|
// need a separate single delta logic for the 2nd sample.
|
|
|
|
tDelta = t - a.t
|
|
cntDelta = int64(h.Count) - int64(a.cnt)
|
|
zCntDelta = int64(h.ZeroCount) - int64(a.zCnt)
|
|
|
|
tDod := tDelta - a.tDelta
|
|
cntDod := cntDelta - a.cntDelta
|
|
zCntDod := zCntDelta - a.zCntDelta
|
|
|
|
if value.IsStaleNaN(h.Sum) {
|
|
cntDod, zCntDod = 0, 0
|
|
}
|
|
|
|
putVarbitInt(a.b, tDod)
|
|
putVarbitInt(a.b, cntDod)
|
|
putVarbitInt(a.b, zCntDod)
|
|
|
|
a.writeSumDelta(h.Sum)
|
|
|
|
for i, b := range h.PositiveBuckets {
|
|
delta := b - a.pBuckets[i]
|
|
dod := delta - a.pBucketsDelta[i]
|
|
putVarbitInt(a.b, dod)
|
|
a.pBucketsDelta[i] = delta
|
|
}
|
|
for i, b := range h.NegativeBuckets {
|
|
delta := b - a.nBuckets[i]
|
|
dod := delta - a.nBucketsDelta[i]
|
|
putVarbitInt(a.b, dod)
|
|
a.nBucketsDelta[i] = delta
|
|
}
|
|
}
|
|
|
|
binary.BigEndian.PutUint16(a.b.bytes(), num+1)
|
|
|
|
a.t = t
|
|
a.cnt = h.Count
|
|
a.zCnt = h.ZeroCount
|
|
a.tDelta = tDelta
|
|
a.cntDelta = cntDelta
|
|
a.zCntDelta = zCntDelta
|
|
|
|
copy(a.pBuckets, h.PositiveBuckets)
|
|
copy(a.nBuckets, h.NegativeBuckets)
|
|
// Note that the bucket deltas were already updated above.
|
|
a.sum = h.Sum
|
|
}
|
|
|
|
// recode converts the current chunk to accommodate an expansion of the set of
|
|
// (positive and/or negative) buckets used, according to the provided inserts,
|
|
// resulting in the honoring of the provided new positive and negative spans. To
|
|
// continue appending, use the returned Appender rather than the receiver of
|
|
// this method.
|
|
func (a *HistogramAppender) recode(
|
|
positiveInserts, negativeInserts []Insert,
|
|
positiveSpans, negativeSpans []histogram.Span,
|
|
) (Chunk, Appender) {
|
|
// TODO(beorn7): This currently just decodes everything and then encodes
|
|
// it again with the new span layout. This can probably be done in-place
|
|
// by editing the chunk. But let's first see how expensive it is in the
|
|
// big picture. Also, in-place editing might create concurrency issues.
|
|
byts := a.b.bytes()
|
|
it := newHistogramIterator(byts)
|
|
hc := NewHistogramChunk()
|
|
app, err := hc.Appender()
|
|
if err != nil {
|
|
panic(err) // This should never happen for an empty histogram chunk.
|
|
}
|
|
happ := app.(*HistogramAppender)
|
|
numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans)
|
|
|
|
for it.Next() == ValHistogram {
|
|
tOld, hOld := it.AtHistogram(nil)
|
|
|
|
// We have to newly allocate slices for the modified buckets
|
|
// here because they are kept by the appender until the next
|
|
// append.
|
|
// TODO(beorn7): We might be able to optimize this.
|
|
var positiveBuckets, negativeBuckets []int64
|
|
if numPositiveBuckets > 0 {
|
|
positiveBuckets = make([]int64, numPositiveBuckets)
|
|
}
|
|
if numNegativeBuckets > 0 {
|
|
negativeBuckets = make([]int64, numNegativeBuckets)
|
|
}
|
|
|
|
// Save the modified histogram to the new chunk.
|
|
hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans
|
|
if len(positiveInserts) > 0 {
|
|
hOld.PositiveBuckets = insert(hOld.PositiveBuckets, positiveBuckets, positiveInserts, true)
|
|
}
|
|
if len(negativeInserts) > 0 {
|
|
hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, true)
|
|
}
|
|
happ.appendHistogram(tOld, hOld)
|
|
}
|
|
|
|
happ.setCounterResetHeader(CounterResetHeader(byts[2] & CounterResetHeaderMask))
|
|
return hc, app
|
|
}
|
|
|
|
// recodeHistogram converts the current histogram (in-place) to accommodate an
|
|
// expansion of the set of (positive and/or negative) buckets used.
|
|
func (a *HistogramAppender) recodeHistogram(
|
|
h *histogram.Histogram,
|
|
pBackwardInserts, nBackwardInserts []Insert,
|
|
) {
|
|
if len(pBackwardInserts) > 0 {
|
|
numPositiveBuckets := countSpans(h.PositiveSpans)
|
|
h.PositiveBuckets = insert(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInserts, true)
|
|
}
|
|
if len(nBackwardInserts) > 0 {
|
|
numNegativeBuckets := countSpans(h.NegativeSpans)
|
|
h.NegativeBuckets = insert(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInserts, true)
|
|
}
|
|
}
|
|
|
|
func (a *HistogramAppender) writeSumDelta(v float64) {
|
|
xorWrite(a.b, v, a.sum, &a.leading, &a.trailing)
|
|
}
|
|
|
|
func (a *HistogramAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
|
|
panic("appended a float histogram sample to a histogram chunk")
|
|
}
|
|
|
|
func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h *histogram.Histogram, appendOnly bool) (Chunk, bool, Appender, error) {
|
|
if a.NumSamples() == 0 {
|
|
a.appendHistogram(t, h)
|
|
if h.CounterResetHint == histogram.GaugeType {
|
|
a.setCounterResetHeader(GaugeType)
|
|
return nil, false, a, nil
|
|
}
|
|
|
|
switch {
|
|
case h.CounterResetHint == histogram.CounterReset:
|
|
// Always honor the explicit counter reset hint.
|
|
a.setCounterResetHeader(CounterReset)
|
|
case prev != nil:
|
|
// This is a new chunk, but continued from a previous one. We need to calculate the reset header unless already set.
|
|
_, _, _, _, _, counterReset := prev.appendable(h)
|
|
if counterReset {
|
|
a.setCounterResetHeader(CounterReset)
|
|
} else {
|
|
a.setCounterResetHeader(NotCounterReset)
|
|
}
|
|
}
|
|
return nil, false, a, nil
|
|
}
|
|
|
|
// Adding counter-like histogram.
|
|
if h.CounterResetHint != histogram.GaugeType {
|
|
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, okToAppend, counterReset := a.appendable(h)
|
|
if !okToAppend || counterReset {
|
|
if appendOnly {
|
|
if counterReset {
|
|
return nil, false, a, fmt.Errorf("histogram counter reset")
|
|
}
|
|
return nil, false, a, fmt.Errorf("histogram schema change")
|
|
}
|
|
newChunk := NewHistogramChunk()
|
|
app, err := newChunk.Appender()
|
|
if err != nil {
|
|
panic(err) // This should never happen for an empty histogram chunk.
|
|
}
|
|
happ := app.(*HistogramAppender)
|
|
if counterReset {
|
|
happ.setCounterResetHeader(CounterReset)
|
|
}
|
|
happ.appendHistogram(t, h)
|
|
return newChunk, false, app, nil
|
|
}
|
|
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
|
// The histogram needs to be expanded to have the extra empty buckets
|
|
// of the chunk.
|
|
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
|
|
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
|
|
// However we need to make a copy in case the input is sharing spans from an iterator.
|
|
h.PositiveSpans = make([]histogram.Span, len(a.pSpans))
|
|
copy(h.PositiveSpans, a.pSpans)
|
|
h.NegativeSpans = make([]histogram.Span, len(a.nSpans))
|
|
copy(h.NegativeSpans, a.nSpans)
|
|
} else {
|
|
// Spans need pre-adjusting to accommodate the new buckets.
|
|
h.PositiveSpans = adjustForInserts(h.PositiveSpans, pBackwardInserts)
|
|
h.NegativeSpans = adjustForInserts(h.NegativeSpans, nBackwardInserts)
|
|
}
|
|
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
|
}
|
|
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
|
if appendOnly {
|
|
return nil, false, a, fmt.Errorf("histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
|
|
}
|
|
chk, app := a.recode(
|
|
pForwardInserts, nForwardInserts,
|
|
h.PositiveSpans, h.NegativeSpans,
|
|
)
|
|
app.(*HistogramAppender).appendHistogram(t, h)
|
|
return chk, true, app, nil
|
|
}
|
|
a.appendHistogram(t, h)
|
|
return nil, false, a, nil
|
|
}
|
|
// Adding gauge histogram.
|
|
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend := a.appendableGauge(h)
|
|
if !okToAppend {
|
|
if appendOnly {
|
|
return nil, false, a, fmt.Errorf("gauge histogram schema change")
|
|
}
|
|
newChunk := NewHistogramChunk()
|
|
app, err := newChunk.Appender()
|
|
if err != nil {
|
|
panic(err) // This should never happen for an empty histogram chunk.
|
|
}
|
|
happ := app.(*HistogramAppender)
|
|
happ.setCounterResetHeader(GaugeType)
|
|
happ.appendHistogram(t, h)
|
|
return newChunk, false, app, nil
|
|
}
|
|
|
|
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
|
|
if appendOnly {
|
|
return nil, false, a, fmt.Errorf("gauge histogram layout change with %d positive and %d negative backwards inserts", len(pBackwardInserts), len(nBackwardInserts))
|
|
}
|
|
h.PositiveSpans = pMergedSpans
|
|
h.NegativeSpans = nMergedSpans
|
|
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
|
}
|
|
|
|
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
|
if appendOnly {
|
|
return nil, false, a, fmt.Errorf("gauge histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
|
|
}
|
|
chk, app := a.recode(
|
|
pForwardInserts, nForwardInserts,
|
|
h.PositiveSpans, h.NegativeSpans,
|
|
)
|
|
app.(*HistogramAppender).appendHistogram(t, h)
|
|
return chk, true, app, nil
|
|
}
|
|
|
|
a.appendHistogram(t, h)
|
|
return nil, false, a, nil
|
|
}
|
|
|
|
func CounterResetHintToHeader(hint histogram.CounterResetHint) CounterResetHeader {
|
|
switch hint {
|
|
case histogram.CounterReset:
|
|
return CounterReset
|
|
case histogram.NotCounterReset:
|
|
return NotCounterReset
|
|
case histogram.GaugeType:
|
|
return GaugeType
|
|
default:
|
|
return UnknownCounterReset
|
|
}
|
|
}
|
|
|
|
type histogramIterator struct {
|
|
br bstreamReader
|
|
numTotal uint16
|
|
numRead uint16
|
|
|
|
counterResetHeader CounterResetHeader
|
|
|
|
// Layout:
|
|
schema int32
|
|
zThreshold float64
|
|
pSpans, nSpans []histogram.Span
|
|
customValues []float64
|
|
|
|
// For the fields that are tracked as deltas and ultimately dod's.
|
|
t int64
|
|
cnt, zCnt uint64
|
|
tDelta, cntDelta, zCntDelta int64
|
|
pBuckets, nBuckets []int64 // Delta between buckets.
|
|
pFloatBuckets, nFloatBuckets []float64 // Absolute counts.
|
|
pBucketsDelta, nBucketsDelta []int64
|
|
|
|
// The sum is Gorilla xor encoded.
|
|
sum float64
|
|
leading uint8
|
|
trailing uint8
|
|
|
|
// Track calls to retrieve methods. Once they have been called, we
|
|
// cannot recycle the bucket slices anymore because we have returned
|
|
// them in the histogram.
|
|
atHistogramCalled, atFloatHistogramCalled bool
|
|
|
|
err error
|
|
}
|
|
|
|
func (it *histogramIterator) Seek(t int64) ValueType {
|
|
if it.err != nil {
|
|
return ValNone
|
|
}
|
|
|
|
for t > it.t || it.numRead == 0 {
|
|
if it.Next() == ValNone {
|
|
return ValNone
|
|
}
|
|
}
|
|
return ValHistogram
|
|
}
|
|
|
|
func (it *histogramIterator) At() (int64, float64) {
|
|
panic("cannot call histogramIterator.At")
|
|
}
|
|
|
|
func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
|
|
if value.IsStaleNaN(it.sum) {
|
|
return it.t, &histogram.Histogram{Sum: it.sum}
|
|
}
|
|
if h == nil {
|
|
it.atHistogramCalled = true
|
|
return it.t, &histogram.Histogram{
|
|
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
|
|
Count: it.cnt,
|
|
ZeroCount: it.zCnt,
|
|
Sum: it.sum,
|
|
ZeroThreshold: it.zThreshold,
|
|
Schema: it.schema,
|
|
PositiveSpans: it.pSpans,
|
|
NegativeSpans: it.nSpans,
|
|
PositiveBuckets: it.pBuckets,
|
|
NegativeBuckets: it.nBuckets,
|
|
CustomValues: it.customValues,
|
|
}
|
|
}
|
|
|
|
h.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead)
|
|
h.Schema = it.schema
|
|
h.ZeroThreshold = it.zThreshold
|
|
h.ZeroCount = it.zCnt
|
|
h.Count = it.cnt
|
|
h.Sum = it.sum
|
|
|
|
h.PositiveSpans = resize(h.PositiveSpans, len(it.pSpans))
|
|
copy(h.PositiveSpans, it.pSpans)
|
|
|
|
h.NegativeSpans = resize(h.NegativeSpans, len(it.nSpans))
|
|
copy(h.NegativeSpans, it.nSpans)
|
|
|
|
h.PositiveBuckets = resize(h.PositiveBuckets, len(it.pBuckets))
|
|
copy(h.PositiveBuckets, it.pBuckets)
|
|
|
|
h.NegativeBuckets = resize(h.NegativeBuckets, len(it.nBuckets))
|
|
copy(h.NegativeBuckets, it.nBuckets)
|
|
|
|
h.CustomValues = resize(h.CustomValues, len(it.customValues))
|
|
copy(h.CustomValues, it.customValues)
|
|
|
|
return it.t, h
|
|
}
|
|
|
|
func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
|
if value.IsStaleNaN(it.sum) {
|
|
return it.t, &histogram.FloatHistogram{Sum: it.sum}
|
|
}
|
|
if fh == nil {
|
|
it.atFloatHistogramCalled = true
|
|
return it.t, &histogram.FloatHistogram{
|
|
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
|
|
Count: float64(it.cnt),
|
|
ZeroCount: float64(it.zCnt),
|
|
Sum: it.sum,
|
|
ZeroThreshold: it.zThreshold,
|
|
Schema: it.schema,
|
|
PositiveSpans: it.pSpans,
|
|
NegativeSpans: it.nSpans,
|
|
PositiveBuckets: it.pFloatBuckets,
|
|
NegativeBuckets: it.nFloatBuckets,
|
|
CustomValues: it.customValues,
|
|
}
|
|
}
|
|
|
|
fh.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead)
|
|
fh.Schema = it.schema
|
|
fh.ZeroThreshold = it.zThreshold
|
|
fh.ZeroCount = float64(it.zCnt)
|
|
fh.Count = float64(it.cnt)
|
|
fh.Sum = it.sum
|
|
|
|
fh.PositiveSpans = resize(fh.PositiveSpans, len(it.pSpans))
|
|
copy(fh.PositiveSpans, it.pSpans)
|
|
|
|
fh.NegativeSpans = resize(fh.NegativeSpans, len(it.nSpans))
|
|
copy(fh.NegativeSpans, it.nSpans)
|
|
|
|
fh.PositiveBuckets = resize(fh.PositiveBuckets, len(it.pBuckets))
|
|
var currentPositive float64
|
|
for i, b := range it.pBuckets {
|
|
currentPositive += float64(b)
|
|
fh.PositiveBuckets[i] = currentPositive
|
|
}
|
|
|
|
fh.NegativeBuckets = resize(fh.NegativeBuckets, len(it.nBuckets))
|
|
var currentNegative float64
|
|
for i, b := range it.nBuckets {
|
|
currentNegative += float64(b)
|
|
fh.NegativeBuckets[i] = currentNegative
|
|
}
|
|
|
|
fh.CustomValues = resize(fh.CustomValues, len(it.customValues))
|
|
copy(fh.CustomValues, it.customValues)
|
|
|
|
return it.t, fh
|
|
}
|
|
|
|
func (it *histogramIterator) AtT() int64 {
|
|
return it.t
|
|
}
|
|
|
|
func (it *histogramIterator) Err() error {
|
|
return it.err
|
|
}
|
|
|
|
func (it *histogramIterator) Reset(b []byte) {
|
|
// The first 3 bytes contain chunk headers.
|
|
// We skip that for actual samples.
|
|
it.br = newBReader(b[3:])
|
|
it.numTotal = binary.BigEndian.Uint16(b)
|
|
it.numRead = 0
|
|
|
|
it.counterResetHeader = CounterResetHeader(b[2] & CounterResetHeaderMask)
|
|
|
|
it.t, it.cnt, it.zCnt = 0, 0, 0
|
|
it.tDelta, it.cntDelta, it.zCntDelta = 0, 0, 0
|
|
|
|
// Recycle slices that have not been returned yet. Otherwise, start from
|
|
// scratch.
|
|
if it.atHistogramCalled {
|
|
it.atHistogramCalled = false
|
|
it.pBuckets, it.nBuckets = nil, nil
|
|
it.pSpans, it.nSpans = nil, nil
|
|
} else {
|
|
it.pBuckets = it.pBuckets[:0]
|
|
it.nBuckets = it.nBuckets[:0]
|
|
}
|
|
if it.atFloatHistogramCalled {
|
|
it.atFloatHistogramCalled = false
|
|
it.pFloatBuckets, it.nFloatBuckets = nil, nil
|
|
} else {
|
|
it.pFloatBuckets = it.pFloatBuckets[:0]
|
|
it.nFloatBuckets = it.nFloatBuckets[:0]
|
|
}
|
|
|
|
it.pBucketsDelta = it.pBucketsDelta[:0]
|
|
it.nBucketsDelta = it.nBucketsDelta[:0]
|
|
|
|
it.sum = 0
|
|
it.leading = 0
|
|
it.trailing = 0
|
|
it.err = nil
|
|
}
|
|
|
|
func (it *histogramIterator) Next() ValueType {
|
|
if it.err != nil || it.numRead == it.numTotal {
|
|
return ValNone
|
|
}
|
|
|
|
if it.numRead == 0 {
|
|
// The first read is responsible for reading the chunk layout
|
|
// and for initializing fields that depend on it. We give
|
|
// counter reset info at chunk level, hence we discard it here.
|
|
schema, zeroThreshold, posSpans, negSpans, customValues, err := readHistogramChunkLayout(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.schema = schema
|
|
it.zThreshold = zeroThreshold
|
|
it.pSpans, it.nSpans = posSpans, negSpans
|
|
it.customValues = customValues
|
|
numPBuckets, numNBuckets := countSpans(posSpans), countSpans(negSpans)
|
|
// The code below recycles existing slices in case this iterator
|
|
// was reset and already has slices of a sufficient capacity.
|
|
if numPBuckets > 0 {
|
|
it.pBuckets = append(it.pBuckets, make([]int64, numPBuckets)...)
|
|
it.pBucketsDelta = append(it.pBucketsDelta, make([]int64, numPBuckets)...)
|
|
it.pFloatBuckets = append(it.pFloatBuckets, make([]float64, numPBuckets)...)
|
|
}
|
|
if numNBuckets > 0 {
|
|
it.nBuckets = append(it.nBuckets, make([]int64, numNBuckets)...)
|
|
it.nBucketsDelta = append(it.nBucketsDelta, make([]int64, numNBuckets)...)
|
|
it.nFloatBuckets = append(it.nFloatBuckets, make([]float64, numNBuckets)...)
|
|
}
|
|
|
|
// Now read the actual data.
|
|
t, err := readVarbitInt(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.t = t
|
|
|
|
cnt, err := readVarbitUint(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.cnt = cnt
|
|
|
|
zcnt, err := readVarbitUint(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.zCnt = zcnt
|
|
|
|
sum, err := it.br.readBits(64)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.sum = math.Float64frombits(sum)
|
|
|
|
var current int64
|
|
for i := range it.pBuckets {
|
|
v, err := readVarbitInt(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.pBuckets[i] = v
|
|
current += it.pBuckets[i]
|
|
it.pFloatBuckets[i] = float64(current)
|
|
}
|
|
current = 0
|
|
for i := range it.nBuckets {
|
|
v, err := readVarbitInt(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.nBuckets[i] = v
|
|
current += it.nBuckets[i]
|
|
it.nFloatBuckets[i] = float64(current)
|
|
}
|
|
|
|
it.numRead++
|
|
return ValHistogram
|
|
}
|
|
|
|
// The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code,
|
|
// so we don't need a separate single delta logic for the 2nd sample.
|
|
|
|
// Recycle bucket and span slices that have not been returned yet. Otherwise, copy them.
|
|
// copy them.
|
|
if it.atFloatHistogramCalled || it.atHistogramCalled {
|
|
if len(it.pSpans) > 0 {
|
|
newSpans := make([]histogram.Span, len(it.pSpans))
|
|
copy(newSpans, it.pSpans)
|
|
it.pSpans = newSpans
|
|
} else {
|
|
it.pSpans = nil
|
|
}
|
|
if len(it.nSpans) > 0 {
|
|
newSpans := make([]histogram.Span, len(it.nSpans))
|
|
copy(newSpans, it.nSpans)
|
|
it.nSpans = newSpans
|
|
} else {
|
|
it.nSpans = nil
|
|
}
|
|
}
|
|
|
|
if it.atHistogramCalled {
|
|
it.atHistogramCalled = false
|
|
if len(it.pBuckets) > 0 {
|
|
newBuckets := make([]int64, len(it.pBuckets))
|
|
copy(newBuckets, it.pBuckets)
|
|
it.pBuckets = newBuckets
|
|
} else {
|
|
it.pBuckets = nil
|
|
}
|
|
if len(it.nBuckets) > 0 {
|
|
newBuckets := make([]int64, len(it.nBuckets))
|
|
copy(newBuckets, it.nBuckets)
|
|
it.nBuckets = newBuckets
|
|
} else {
|
|
it.nBuckets = nil
|
|
}
|
|
}
|
|
|
|
// FloatBuckets are set from scratch, so simply create empty ones.
|
|
if it.atFloatHistogramCalled {
|
|
it.atFloatHistogramCalled = false
|
|
if len(it.pFloatBuckets) > 0 {
|
|
it.pFloatBuckets = make([]float64, len(it.pFloatBuckets))
|
|
} else {
|
|
it.pFloatBuckets = nil
|
|
}
|
|
if len(it.nFloatBuckets) > 0 {
|
|
it.nFloatBuckets = make([]float64, len(it.nFloatBuckets))
|
|
} else {
|
|
it.nFloatBuckets = nil
|
|
}
|
|
}
|
|
|
|
tDod, err := readVarbitInt(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.tDelta += tDod
|
|
it.t += it.tDelta
|
|
|
|
cntDod, err := readVarbitInt(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.cntDelta += cntDod
|
|
it.cnt = uint64(int64(it.cnt) + it.cntDelta)
|
|
|
|
zcntDod, err := readVarbitInt(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.zCntDelta += zcntDod
|
|
it.zCnt = uint64(int64(it.zCnt) + it.zCntDelta)
|
|
|
|
ok := it.readSum()
|
|
if !ok {
|
|
return ValNone
|
|
}
|
|
|
|
if value.IsStaleNaN(it.sum) {
|
|
it.numRead++
|
|
return ValHistogram
|
|
}
|
|
|
|
var current int64
|
|
for i := range it.pBuckets {
|
|
dod, err := readVarbitInt(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.pBucketsDelta[i] += dod
|
|
it.pBuckets[i] += it.pBucketsDelta[i]
|
|
current += it.pBuckets[i]
|
|
it.pFloatBuckets[i] = float64(current)
|
|
}
|
|
|
|
current = 0
|
|
for i := range it.nBuckets {
|
|
dod, err := readVarbitInt(&it.br)
|
|
if err != nil {
|
|
it.err = err
|
|
return ValNone
|
|
}
|
|
it.nBucketsDelta[i] += dod
|
|
it.nBuckets[i] += it.nBucketsDelta[i]
|
|
current += it.nBuckets[i]
|
|
it.nFloatBuckets[i] = float64(current)
|
|
}
|
|
|
|
it.numRead++
|
|
return ValHistogram
|
|
}
|
|
|
|
func (it *histogramIterator) readSum() bool {
|
|
err := xorRead(&it.br, &it.sum, &it.leading, &it.trailing)
|
|
if err != nil {
|
|
it.err = err
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func resize[T any](items []T, n int) []T {
|
|
if cap(items) < n {
|
|
return make([]T, n)
|
|
}
|
|
return items[:n]
|
|
}
|