prometheus/tsdb/chunkenc/histogram_meta.go
beorn7 1cfc8f65a3 histograms: Return actually useful counter reset hints
This is a bit more conservative than we could be. As long as a chunk
isn't the first in a block, we can be pretty sure that the previous
chunk won't disappear. However, the incremental gain of returning
NotCounterReset in these cases is probably very small and might not be
worth the code complications.

Wwith this, we now also pay attention to an explicitly set counter
reset during ingestion. While the case doesn't show up in practice
yet, there could be scenarios where the metric source knows there was
a counter reset even if it might not be visible from the values in the
histogram. It is also useful for testing.

Signed-off-by: beorn7 <beorn@grafana.com>
2023-01-25 16:57:21 +01:00

490 lines
14 KiB
Go

// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkenc
import (
"math"
"github.com/prometheus/prometheus/model/histogram"
)
func writeHistogramChunkLayout(
b *bstream, schema int32, zeroThreshold float64,
positiveSpans, negativeSpans []histogram.Span,
) {
putZeroThreshold(b, zeroThreshold)
putVarbitInt(b, int64(schema))
putHistogramChunkLayoutSpans(b, positiveSpans)
putHistogramChunkLayoutSpans(b, negativeSpans)
}
func readHistogramChunkLayout(b *bstreamReader) (
schema int32, zeroThreshold float64,
positiveSpans, negativeSpans []histogram.Span,
err error,
) {
zeroThreshold, err = readZeroThreshold(b)
if err != nil {
return
}
v, err := readVarbitInt(b)
if err != nil {
return
}
schema = int32(v)
positiveSpans, err = readHistogramChunkLayoutSpans(b)
if err != nil {
return
}
negativeSpans, err = readHistogramChunkLayoutSpans(b)
if err != nil {
return
}
return
}
func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) {
putVarbitUint(b, uint64(len(spans)))
for _, s := range spans {
putVarbitUint(b, uint64(s.Length))
putVarbitInt(b, int64(s.Offset))
}
}
func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
var spans []histogram.Span
num, err := readVarbitUint(b)
if err != nil {
return nil, err
}
for i := 0; i < int(num); i++ {
length, err := readVarbitUint(b)
if err != nil {
return nil, err
}
offset, err := readVarbitInt(b)
if err != nil {
return nil, err
}
spans = append(spans, histogram.Span{
Length: uint32(length),
Offset: int32(offset),
})
}
return spans, nil
}
// putZeroThreshold writes the zero threshold to the bstream. It stores typical
// values in just one byte, but needs 9 bytes for other values. In detail:
// - If the threshold is 0, store a single zero byte.
// - If the threshold is a power of 2 between (and including) 2^-243 and 2^10,
// take the exponent from the IEEE 754 representation of the threshold, which
// covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242
// in IEEE 754 representation, and 2^10 is 0.5*2^11.) Add 243 to the exponent
// and store the result (which will be between 1 and 254) as a single
// byte. Note that small powers of two are preferred values for the zero
// threshold. The default value for the zero threshold is 2^-128 (or
// 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a
// single byte (with value 116).
// - In all other cases, store 255 as a single byte, followed by the 8 bytes of
// the threshold as a float64, i.e. taking 9 bytes in total.
func putZeroThreshold(b *bstream, threshold float64) {
if threshold == 0 {
b.writeByte(0)
return
}
frac, exp := math.Frexp(threshold)
if frac != 0.5 || exp < -242 || exp > 11 {
b.writeByte(255)
b.writeBits(math.Float64bits(threshold), 64)
return
}
b.writeByte(byte(exp + 243))
}
// readZeroThreshold reads the zero threshold written with putZeroThreshold.
func readZeroThreshold(br *bstreamReader) (float64, error) {
b, err := br.ReadByte()
if err != nil {
return 0, err
}
switch b {
case 0:
return 0, nil
case 255:
v, err := br.readBits(64)
if err != nil {
return 0, err
}
return math.Float64frombits(v), nil
default:
return math.Ldexp(0.5, int(b)-243), nil
}
}
type bucketIterator struct {
spans []histogram.Span
span int // Span position of last yielded bucket.
bucket int // Bucket position within span of last yielded bucket.
idx int // Bucket index (globally across all spans) of last yielded bucket.
}
func newBucketIterator(spans []histogram.Span) *bucketIterator {
b := bucketIterator{
spans: spans,
span: 0,
bucket: -1,
idx: -1,
}
if len(spans) > 0 {
b.idx += int(spans[0].Offset)
}
return &b
}
func (b *bucketIterator) Next() (int, bool) {
// We're already out of bounds.
if b.span >= len(b.spans) {
return 0, false
}
if b.bucket < int(b.spans[b.span].Length)-1 { // Try to move within same span.
b.bucket++
b.idx++
return b.idx, true
}
for b.span < len(b.spans)-1 { // Try to move from one span to the next.
b.span++
b.idx += int(b.spans[b.span].Offset + 1)
b.bucket = 0
if b.spans[b.span].Length == 0 {
b.idx--
continue
}
return b.idx, true
}
// We're out of options.
return 0, false
}
// An Insert describes how many new buckets have to be inserted before
// processing the pos'th bucket from the original slice.
type Insert struct {
pos int
num int
}
// expandSpansForward returns the inserts to expand the bucket spans 'a' so that
// they match the spans in 'b'. 'b' must cover the same or more buckets than
// 'a', otherwise the function will return false.
//
// Example:
//
// Let's say the old buckets look like this:
//
// span syntax: [offset, length]
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
// raw values 6 3 3 2 4 5 1
// deltas 6 -3 0 -1 2 1 -4
//
// But now we introduce a new bucket layout. (Carefully chosen example where we
// have a span appended, one unchanged[*], one prepended, and two merge - in
// that order.)
//
// [*] unchanged in terms of which bucket indices they represent. but to achieve
// that, their offset needs to change if "disrupted" by spans changing ahead of
// them
//
// \/ this one is "unchanged"
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
// raw values 6 3 0 3 0 0 2 4 5 0 1
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
// delta mods: / \ / \ / \
//
// Note for histograms with delta-encoded buckets: Whenever any new buckets are
// introduced, the subsequent "old" bucket needs to readjust its delta to the
// new base of 0. Thus, for the caller who wants to transform the set of
// original deltas to a new set of deltas to match a new span layout that adds
// buckets, we simply need to generate a list of inserts.
//
// Note: Within expandSpansForward we don't have to worry about the changes to the
// spans themselves, thanks to the iterators we get to work with the more useful
// bucket indices (which of course directly correspond to the buckets we have to
// adjust).
func expandSpansForward(a, b []histogram.Span) (forward []Insert, ok bool) {
ai := newBucketIterator(a)
bi := newBucketIterator(b)
var inserts []Insert
// When inter.num becomes > 0, this becomes a valid insert that should
// be yielded when we finish a streak of new buckets.
var inter Insert
av, aOK := ai.Next()
bv, bOK := bi.Next()
loop:
for {
switch {
case aOK && bOK:
switch {
case av == bv: // Both have an identical value. move on!
// Finish WIP insert and reset.
if inter.num > 0 {
inserts = append(inserts, inter)
}
inter.num = 0
av, aOK = ai.Next()
bv, bOK = bi.Next()
inter.pos++
case av < bv: // b misses a value that is in a.
return inserts, false
case av > bv: // a misses a value that is in b. Forward b and recompare.
inter.num++
bv, bOK = bi.Next()
}
case aOK && !bOK: // b misses a value that is in a.
return inserts, false
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
inter.num++
bv, bOK = bi.Next()
default: // Both iterators ran out. We're done.
if inter.num > 0 {
inserts = append(inserts, inter)
}
break loop
}
}
return inserts, true
}
// expandSpansBothWays is similar to expandSpansForward, but now b may also
// cover an entirely different set of buckets. The function returns the
// “forward” inserts to expand 'a' to also cover all the buckets exclusively
// covered by 'b', and it returns the “backward” inserts to expand 'b' to also
// cover all the buckets exclusively covered by 'a'
func expandSpansBothWays(a, b []histogram.Span) (forward, backward []Insert, mergedSpans []histogram.Span) {
ai := newBucketIterator(a)
bi := newBucketIterator(b)
var fInserts, bInserts []Insert
var lastBucket int
addBucket := func(b int) {
offset := b - lastBucket - 1
if offset == 0 && len(mergedSpans) > 0 {
mergedSpans[len(mergedSpans)-1].Length++
} else {
if len(mergedSpans) == 0 {
offset++
}
mergedSpans = append(mergedSpans, histogram.Span{
Offset: int32(offset),
Length: 1,
})
}
lastBucket = b
}
// When fInter.num (or bInter.num, respectively) becomes > 0, this
// becomes a valid insert that should be yielded when we finish a streak
// of new buckets.
var fInter, bInter Insert
av, aOK := ai.Next()
bv, bOK := bi.Next()
loop:
for {
switch {
case aOK && bOK:
switch {
case av == bv: // Both have an identical value. move on!
// Finish WIP insert and reset.
if fInter.num > 0 {
fInserts = append(fInserts, fInter)
fInter.num = 0
}
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
bInter.num = 0
}
addBucket(av)
av, aOK = ai.Next()
bv, bOK = bi.Next()
fInter.pos++
bInter.pos++
case av < bv: // b misses a value that is in a.
bInter.num++
// Collect the forward inserts before advancing
// the position of 'a'.
if fInter.num > 0 {
fInserts = append(fInserts, fInter)
fInter.num = 0
}
addBucket(av)
fInter.pos++
av, aOK = ai.Next()
case av > bv: // a misses a value that is in b. Forward b and recompare.
fInter.num++
// Collect the backward inserts before advancing the
// position of 'b'.
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
bInter.num = 0
}
addBucket(bv)
bInter.pos++
bv, bOK = bi.Next()
}
case aOK && !bOK: // b misses a value that is in a.
bInter.num++
addBucket(av)
av, aOK = ai.Next()
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
fInter.num++
addBucket(bv)
bv, bOK = bi.Next()
default: // Both iterators ran out. We're done.
if fInter.num > 0 {
fInserts = append(fInserts, fInter)
}
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
}
break loop
}
}
return fInserts, bInserts, mergedSpans
}
type bucketValue interface {
int64 | float64
}
// insert merges 'in' with the provided inserts and writes them into 'out',
// which must already have the appropriate length. 'out' is also returned for
// convenience.
func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV {
var (
oi int // Position in out.
v BV // The last value seen.
ii int // The next insert to process.
)
for i, d := range in {
if ii < len(inserts) && i == inserts[ii].pos {
// We have an insert!
// Add insert.num new delta values such that their
// bucket values equate 0. When deltas==false, it means
// that it is an absolute value. So we set it to 0
// directly.
if deltas {
out[oi] = -v
} else {
out[oi] = 0
}
oi++
for x := 1; x < inserts[ii].num; x++ {
out[oi] = 0
oi++
}
ii++
// Now save the value from the input. The delta value we
// should save is the original delta value + the last
// value of the point before the insert (to undo the
// delta that was introduced by the insert). When
// deltas==false, it means that it is an absolute value,
// so we set it directly to the value in the 'in' slice.
if deltas {
out[oi] = d + v
} else {
out[oi] = d
}
oi++
v = d + v
continue
}
// If there was no insert, the original delta is still valid.
out[oi] = d
oi++
v += d
}
switch ii {
case len(inserts):
// All inserts processed. Nothing more to do.
case len(inserts) - 1:
// One more insert to process at the end.
if deltas {
out[oi] = -v
} else {
out[oi] = 0
}
oi++
for x := 1; x < inserts[ii].num; x++ {
out[oi] = 0
oi++
}
default:
panic("unprocessed inserts left")
}
return out
}
// counterResetHint returns a CounterResetHint based on the CounterResetHeader
// and on the position into the chunk.
func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterResetHint {
switch {
case crh == GaugeType:
// A gauge histogram chunk only contains gauge histograms.
return histogram.GaugeType
case numRead > 1:
// In a counter histogram chunk, there will not be any counter
// resets after the first histogram.
return histogram.NotCounterReset
case crh == CounterReset:
// If the chunk was started because of a counter reset, we can
// safely return that hint. This histogram always has to be
// treated as a counter reset.
return histogram.CounterReset
default:
// Sadly, we have to return "unknown" as the hint for all other
// cases, even if we know that the chunk was started without a
// counter reset. But we cannot be sure that the previous chunk
// still exists in the TSDB, so we conservatively return
// "unknown". On the bright side, this case should be relatively
// rare.
//
// TODO(beorn7): Nevertheless, if the current chunk is in the
// middle of a block (not the first chunk in the block for this
// series), it's probably safe to assume that the previous chunk
// will exist in the TSDB for as long as the current chunk
// exist, and we could safely return
// "histogram.NotCounterReset". This needs some more work and
// might not be worth the effort and/or risk. To be vetted...
return histogram.UnknownCounterReset
}
}