mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Merge pull request #11783 from codesome/gauge-histogram
tsdb: Add gauge histogram support
This commit is contained in:
commit
57bcbf1888
|
@ -27,6 +27,8 @@ import (
|
|||
// used to represent a histogram with integer counts and thus serves as a more
|
||||
// generalized representation.
|
||||
type FloatHistogram struct {
|
||||
// Counter reset information.
|
||||
CounterResetHint CounterResetHint
|
||||
// Currently valid schema numbers are -4 <= n <= 8. They are all for
|
||||
// base-2 bucket schemas, where 1 is a bucket boundary in each case, and
|
||||
// then each power of two is divided into 2^n logarithmic buckets. Or
|
||||
|
|
|
@ -19,6 +19,17 @@ import (
|
|||
"strings"
|
||||
)
|
||||
|
||||
// CounterResetHint contains the known information about a counter reset,
|
||||
// or alternatively that we are dealing with a gauge histogram, where counter resets do not apply.
|
||||
type CounterResetHint byte
|
||||
|
||||
const (
|
||||
UnknownCounterReset CounterResetHint = iota // UnknownCounterReset means we cannot say if this histogram signals a counter reset or not.
|
||||
CounterReset // CounterReset means there was definitely a counter reset starting from this histogram.
|
||||
NotCounterReset // NotCounterReset means there was definitely no counter reset with this histogram.
|
||||
GaugeType // GaugeType means this is a gauge histogram, where counter resets do not happen.
|
||||
)
|
||||
|
||||
// Histogram encodes a sparse, high-resolution histogram. See the design
|
||||
// document for full details:
|
||||
// https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit#
|
||||
|
@ -35,6 +46,8 @@ import (
|
|||
//
|
||||
// Which bucket indices are actually used is determined by the spans.
|
||||
type Histogram struct {
|
||||
// Counter reset information.
|
||||
CounterResetHint CounterResetHint
|
||||
// Currently valid schema numbers are -4 <= n <= 8. They are all for
|
||||
// base-2 bucket schemas, where 1 is a bucket boundary in each case, and
|
||||
// then each power of two is divided into 2^n logarithmic buckets. Or
|
||||
|
@ -295,15 +308,16 @@ func (h *Histogram) ToFloat() *FloatHistogram {
|
|||
}
|
||||
|
||||
return &FloatHistogram{
|
||||
Schema: h.Schema,
|
||||
ZeroThreshold: h.ZeroThreshold,
|
||||
ZeroCount: float64(h.ZeroCount),
|
||||
Count: float64(h.Count),
|
||||
Sum: h.Sum,
|
||||
PositiveSpans: positiveSpans,
|
||||
NegativeSpans: negativeSpans,
|
||||
PositiveBuckets: positiveBuckets,
|
||||
NegativeBuckets: negativeBuckets,
|
||||
CounterResetHint: h.CounterResetHint,
|
||||
Schema: h.Schema,
|
||||
ZeroThreshold: h.ZeroThreshold,
|
||||
ZeroCount: float64(h.ZeroCount),
|
||||
Count: float64(h.Count),
|
||||
Sum: h.Sum,
|
||||
PositiveSpans: positiveSpans,
|
||||
NegativeSpans: negativeSpans,
|
||||
PositiveBuckets: positiveBuckets,
|
||||
NegativeBuckets: negativeBuckets,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3158,10 +3158,12 @@ func TestSparseHistogramRate(t *testing.T) {
|
|||
Schema: 1,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 1. / 15.,
|
||||
Count: 4. / 15.,
|
||||
Count: 8. / 15.,
|
||||
Sum: 1.226666666666667,
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
}
|
||||
require.Equal(t, expectedHistogram, actualHistogram)
|
||||
}
|
||||
|
@ -3199,10 +3201,12 @@ func TestSparseFloatHistogramRate(t *testing.T) {
|
|||
Schema: 1,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 1. / 15.,
|
||||
Count: 4. / 15.,
|
||||
Count: 8. / 15.,
|
||||
Sum: 1.226666666666667,
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
}
|
||||
require.Equal(t, expectedHistogram, actualHistogram)
|
||||
}
|
||||
|
|
|
@ -174,6 +174,7 @@ func newFloatHistogramIterator(b []byte) *floatHistogramIterator {
|
|||
// The first 3 bytes contain chunk headers.
|
||||
// We skip that for actual samples.
|
||||
_, _ = it.br.readBits(24)
|
||||
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
|
||||
return it
|
||||
}
|
||||
|
||||
|
@ -196,6 +197,14 @@ type FloatHistogramAppender struct {
|
|||
pBuckets, nBuckets []xorValue
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) GetCounterResetHeader() CounterResetHeader {
|
||||
return CounterResetHeader(a.b.bytes()[2] & 0b11000000)
|
||||
}
|
||||
|
||||
func (a *FloatHistogramAppender) NumSamples() int {
|
||||
return int(binary.BigEndian.Uint16(a.b.bytes()))
|
||||
}
|
||||
|
||||
// Append implements Appender. This implementation panics because normal float
|
||||
// samples must never be appended to a histogram chunk.
|
||||
func (a *FloatHistogramAppender) Append(int64, float64) {
|
||||
|
@ -211,19 +220,14 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) {
|
|||
// Appendable returns whether the chunk can be appended to, and if so
|
||||
// whether any recoding needs to happen using the provided interjections
|
||||
// (in case of any new buckets, positive or negative range, respectively).
|
||||
// If the sample is a gauge histogram, AppendableGauge must be used instead.
|
||||
//
|
||||
// The chunk is not appendable in the following cases:
|
||||
//
|
||||
// • The schema has changed.
|
||||
//
|
||||
// • The threshold for the zero bucket has changed.
|
||||
//
|
||||
// • Any buckets have disappeared.
|
||||
//
|
||||
// • There was a counter reset in the count of observations or in any bucket,
|
||||
// including the zero bucket.
|
||||
//
|
||||
// • The last sample in the chunk was stale while the current sample is not stale.
|
||||
// - The schema has changed.
|
||||
// - The threshold for the zero bucket has changed.
|
||||
// - Any buckets have disappeared.
|
||||
// - There was a counter reset in the count of observations or in any bucket, including the zero bucket.
|
||||
// - The last sample in the chunk was stale while the current sample is not stale.
|
||||
//
|
||||
// The method returns an additional boolean set to true if it is not appendable
|
||||
// because of a counter reset. If the given sample is stale, it is always ok to
|
||||
|
@ -232,6 +236,9 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
|
|||
positiveInterjections, negativeInterjections []Interjection,
|
||||
okToAppend, counterReset bool,
|
||||
) {
|
||||
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
|
||||
return
|
||||
}
|
||||
if value.IsStaleNaN(h.Sum) {
|
||||
// This is a stale sample whose buckets and spans don't matter.
|
||||
okToAppend = true
|
||||
|
@ -260,12 +267,12 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
|
|||
}
|
||||
|
||||
var ok bool
|
||||
positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans)
|
||||
positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
}
|
||||
negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans)
|
||||
negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
|
@ -281,6 +288,49 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
|
|||
return
|
||||
}
|
||||
|
||||
// AppendableGauge returns whether the chunk can be appended to, and if so
|
||||
// whether:
|
||||
// 1. Any recoding needs to happen to the chunk using the provided interjections
|
||||
// (in case of any new buckets, positive or negative range, respectively).
|
||||
// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections
|
||||
// (in case of any missing buckets, positive or negative range, respectively).
|
||||
//
|
||||
// This method must be only used for gauge histograms.
|
||||
//
|
||||
// The chunk is not appendable in the following cases:
|
||||
// - The schema has changed.
|
||||
// - The threshold for the zero bucket has changed.
|
||||
// - The last sample in the chunk was stale while the current sample is not stale.
|
||||
func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) (
|
||||
positiveInterjections, negativeInterjections []Interjection,
|
||||
backwardPositiveInterjections, backwardNegativeInterjections []Interjection,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
okToAppend bool,
|
||||
) {
|
||||
if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType {
|
||||
return
|
||||
}
|
||||
if value.IsStaleNaN(h.Sum) {
|
||||
// This is a stale sample whose buckets and spans don't matter.
|
||||
okToAppend = true
|
||||
return
|
||||
}
|
||||
if value.IsStaleNaN(a.sum.value) {
|
||||
// If the last sample was stale, then we can only accept stale
|
||||
// samples in this chunk.
|
||||
return
|
||||
}
|
||||
|
||||
if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans)
|
||||
negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans)
|
||||
okToAppend = true
|
||||
return
|
||||
}
|
||||
|
||||
// counterResetInAnyFloatBucket returns true if there was a counter reset for any
|
||||
// bucket. This should be called only when the bucket layout is the same or new
|
||||
// buckets were added. It does not handle the case of buckets missing.
|
||||
|
@ -502,11 +552,29 @@ func (a *FloatHistogramAppender) Recode(
|
|||
return hc, app
|
||||
}
|
||||
|
||||
// RecodeHistogramm converts the current histogram (in-place) to accommodate an expansion of the set of
|
||||
// (positive and/or negative) buckets used.
|
||||
func (a *FloatHistogramAppender) RecodeHistogramm(
|
||||
fh *histogram.FloatHistogram,
|
||||
pBackwardInter, nBackwardInter []Interjection,
|
||||
) {
|
||||
if len(pBackwardInter) > 0 {
|
||||
numPositiveBuckets := countSpans(fh.PositiveSpans)
|
||||
fh.PositiveBuckets = interject(fh.PositiveBuckets, make([]float64, numPositiveBuckets), pBackwardInter, false)
|
||||
}
|
||||
if len(nBackwardInter) > 0 {
|
||||
numNegativeBuckets := countSpans(fh.NegativeSpans)
|
||||
fh.NegativeBuckets = interject(fh.NegativeBuckets, make([]float64, numNegativeBuckets), nBackwardInter, false)
|
||||
}
|
||||
}
|
||||
|
||||
type floatHistogramIterator struct {
|
||||
br bstreamReader
|
||||
numTotal uint16
|
||||
numRead uint16
|
||||
|
||||
counterResetHeader CounterResetHeader
|
||||
|
||||
// Layout:
|
||||
schema int32
|
||||
zThreshold float64
|
||||
|
@ -559,16 +627,21 @@ func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHis
|
|||
return it.t, &histogram.FloatHistogram{Sum: it.sum.value}
|
||||
}
|
||||
it.atFloatHistogramCalled = true
|
||||
crHint := histogram.UnknownCounterReset
|
||||
if it.counterResetHeader == GaugeType {
|
||||
crHint = histogram.GaugeType
|
||||
}
|
||||
return it.t, &histogram.FloatHistogram{
|
||||
Count: it.cnt.value,
|
||||
ZeroCount: it.zCnt.value,
|
||||
Sum: it.sum.value,
|
||||
ZeroThreshold: it.zThreshold,
|
||||
Schema: it.schema,
|
||||
PositiveSpans: it.pSpans,
|
||||
NegativeSpans: it.nSpans,
|
||||
PositiveBuckets: it.pBuckets,
|
||||
NegativeBuckets: it.nBuckets,
|
||||
CounterResetHint: crHint,
|
||||
Count: it.cnt.value,
|
||||
ZeroCount: it.zCnt.value,
|
||||
Sum: it.sum.value,
|
||||
ZeroThreshold: it.zThreshold,
|
||||
Schema: it.schema,
|
||||
PositiveSpans: it.pSpans,
|
||||
NegativeSpans: it.nSpans,
|
||||
PositiveBuckets: it.pBuckets,
|
||||
NegativeBuckets: it.nBuckets,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -587,6 +660,8 @@ func (it *floatHistogramIterator) Reset(b []byte) {
|
|||
it.numTotal = binary.BigEndian.Uint16(b)
|
||||
it.numRead = 0
|
||||
|
||||
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
|
||||
|
||||
it.t, it.tDelta = 0, 0
|
||||
it.cnt, it.zCnt, it.sum = xorValue{}, xorValue{}, xorValue{}
|
||||
|
||||
|
|
|
@ -358,3 +358,171 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
require.True(t, cr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
|
||||
c := Chunk(NewFloatHistogramChunk())
|
||||
|
||||
// Create fresh appender and add the first histogram.
|
||||
app, err := c.Appender()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, c.NumSamples())
|
||||
|
||||
ts := int64(1234567890)
|
||||
h1 := &histogram.FloatHistogram{
|
||||
Count: 5,
|
||||
ZeroCount: 2,
|
||||
Sum: 18.4,
|
||||
ZeroThreshold: 1e-125,
|
||||
Schema: 1,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 2, Length: 1},
|
||||
{Offset: 3, Length: 2},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 1, Length: 1},
|
||||
},
|
||||
PositiveBuckets: []float64{6, 3, 3, 2, 4, 5, 1},
|
||||
}
|
||||
|
||||
app.AppendFloatHistogram(ts, h1.Copy())
|
||||
require.Equal(t, 1, c.NumSamples())
|
||||
c.(*FloatHistogramChunk).SetCounterResetHeader(GaugeType)
|
||||
|
||||
{ // Schema change.
|
||||
h2 := h1.Copy()
|
||||
h2.Schema++
|
||||
hApp, _ := app.(*FloatHistogramAppender)
|
||||
_, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
{ // Zero threshold change.
|
||||
h2 := h1.Copy()
|
||||
h2.ZeroThreshold += 0.1
|
||||
hApp, _ := app.(*FloatHistogramAppender)
|
||||
_, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
{ // New histogram that has more buckets.
|
||||
h2 := h1.Copy()
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 1, Length: 1},
|
||||
{Offset: 1, Length: 4},
|
||||
{Offset: 3, Length: 3},
|
||||
}
|
||||
h2.Count += 9
|
||||
h2.ZeroCount++
|
||||
h2.Sum = 30
|
||||
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1}
|
||||
|
||||
hApp, _ := app.(*FloatHistogramAppender)
|
||||
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||
require.Greater(t, len(pI), 0)
|
||||
require.Len(t, nI, 0)
|
||||
require.Len(t, pBackwardI, 0)
|
||||
require.Len(t, nBackwardI, 0)
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
{ // New histogram that has buckets missing.
|
||||
h2 := h1.Copy()
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 2, Length: 1},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 4, Length: 1},
|
||||
{Offset: 1, Length: 1},
|
||||
}
|
||||
h2.Count -= 4
|
||||
h2.Sum--
|
||||
h2.PositiveBuckets = []float64{6, 3, 3, 2, 5, 1}
|
||||
|
||||
hApp, _ := app.(*FloatHistogramAppender)
|
||||
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||
require.Len(t, pI, 0)
|
||||
require.Len(t, nI, 0)
|
||||
require.Greater(t, len(pBackwardI), 0)
|
||||
require.Len(t, nBackwardI, 0)
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
{ // New histogram that has a bucket missing and new buckets.
|
||||
h2 := h1.Copy()
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 5, Length: 2},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 1, Length: 1},
|
||||
}
|
||||
h2.Sum = 21
|
||||
h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1}
|
||||
|
||||
hApp, _ := app.(*FloatHistogramAppender)
|
||||
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||
require.Greater(t, len(pI), 0)
|
||||
require.Greater(t, len(pBackwardI), 0)
|
||||
require.Len(t, nI, 0)
|
||||
require.Len(t, nBackwardI, 0)
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
{ // New histogram that has a counter reset while buckets are same.
|
||||
h2 := h1.Copy()
|
||||
h2.Sum = 23
|
||||
h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1}
|
||||
|
||||
hApp, _ := app.(*FloatHistogramAppender)
|
||||
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||
require.Len(t, pI, 0)
|
||||
require.Len(t, nI, 0)
|
||||
require.Len(t, pBackwardI, 0)
|
||||
require.Len(t, nBackwardI, 0)
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
{ // New histogram that has a counter reset while new buckets were added.
|
||||
h2 := h1.Copy()
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 1, Length: 1},
|
||||
{Offset: 1, Length: 4},
|
||||
{Offset: 3, Length: 3},
|
||||
}
|
||||
h2.Sum = 29
|
||||
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0}
|
||||
|
||||
hApp, _ := app.(*FloatHistogramAppender)
|
||||
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||
require.Greater(t, len(pI), 0)
|
||||
require.Len(t, nI, 0)
|
||||
require.Len(t, pBackwardI, 0)
|
||||
require.Len(t, nBackwardI, 0)
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
{
|
||||
// New histogram that has a counter reset while new buckets were
|
||||
// added before the first bucket and reset on first bucket.
|
||||
h2 := h1.Copy()
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: -3, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
{Offset: 2, Length: 1},
|
||||
{Offset: 3, Length: 2},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 1, Length: 1},
|
||||
}
|
||||
h2.Sum = 26
|
||||
h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1}
|
||||
|
||||
hApp, _ := app.(*FloatHistogramAppender)
|
||||
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||
require.Greater(t, len(pI), 0)
|
||||
require.Len(t, nI, 0)
|
||||
require.Len(t, pBackwardI, 0)
|
||||
require.Len(t, nBackwardI, 0)
|
||||
require.True(t, ok)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -286,12 +286,12 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
|
|||
}
|
||||
|
||||
var ok bool
|
||||
positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans)
|
||||
positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
}
|
||||
negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans)
|
||||
negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
|
|
|
@ -165,21 +165,23 @@ func (b *bucketIterator) Next() (int, bool) {
|
|||
if b.span >= len(b.spans) {
|
||||
return 0, false
|
||||
}
|
||||
try:
|
||||
if b.bucket < int(b.spans[b.span].Length-1) { // Try to move within same span.
|
||||
if b.bucket < int(b.spans[b.span].Length)-1 { // Try to move within same span.
|
||||
b.bucket++
|
||||
b.idx++
|
||||
return b.idx, true
|
||||
} else if b.span < len(b.spans)-1 { // Try to move from one span to the next.
|
||||
}
|
||||
|
||||
for b.span < len(b.spans)-1 { // Try to move from one span to the next.
|
||||
b.span++
|
||||
b.idx += int(b.spans[b.span].Offset + 1)
|
||||
b.bucket = 0
|
||||
if b.spans[b.span].Length == 0 {
|
||||
// Pathological case that should never happen. We can't use this span, let's try again.
|
||||
goto try
|
||||
b.idx--
|
||||
continue
|
||||
}
|
||||
return b.idx, true
|
||||
}
|
||||
|
||||
// We're out of options.
|
||||
return 0, false
|
||||
}
|
||||
|
@ -191,7 +193,7 @@ type Interjection struct {
|
|||
num int
|
||||
}
|
||||
|
||||
// compareSpans returns the interjections to convert a slice of deltas to a new
|
||||
// forwardCompareSpans returns the interjections to convert a slice of deltas to a new
|
||||
// slice representing an expanded set of buckets, or false if incompatible
|
||||
// (e.g. if buckets were removed).
|
||||
//
|
||||
|
@ -226,11 +228,11 @@ type Interjection struct {
|
|||
// match a new span layout that adds buckets, we simply need to generate a list
|
||||
// of interjections.
|
||||
//
|
||||
// Note: Within compareSpans we don't have to worry about the changes to the
|
||||
// Note: Within forwardCompareSpans we don't have to worry about the changes to the
|
||||
// spans themselves, thanks to the iterators we get to work with the more useful
|
||||
// bucket indices (which of course directly correspond to the buckets we have to
|
||||
// adjust).
|
||||
func compareSpans(a, b []histogram.Span) ([]Interjection, bool) {
|
||||
func forwardCompareSpans(a, b []histogram.Span) (forward []Interjection, ok bool) {
|
||||
ai := newBucketIterator(a)
|
||||
bi := newBucketIterator(b)
|
||||
|
||||
|
@ -278,6 +280,102 @@ loop:
|
|||
return interjections, true
|
||||
}
|
||||
|
||||
// bidirectionalCompareSpans does everything that forwardCompareSpans does and
|
||||
// also returns interjections in the other direction (i.e. buckets missing in b that are missing in a).
|
||||
func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Interjection, mergedSpans []histogram.Span) {
|
||||
ai := newBucketIterator(a)
|
||||
bi := newBucketIterator(b)
|
||||
|
||||
var interjections, bInterjections []Interjection
|
||||
var lastBucket int
|
||||
addBucket := func(b int) {
|
||||
offset := b - lastBucket - 1
|
||||
if offset == 0 && len(mergedSpans) > 0 {
|
||||
mergedSpans[len(mergedSpans)-1].Length++
|
||||
} else {
|
||||
if len(mergedSpans) == 0 {
|
||||
offset++
|
||||
}
|
||||
mergedSpans = append(mergedSpans, histogram.Span{
|
||||
Offset: int32(offset),
|
||||
Length: 1,
|
||||
})
|
||||
}
|
||||
|
||||
lastBucket = b
|
||||
}
|
||||
|
||||
// When inter.num becomes > 0, this becomes a valid interjection that
|
||||
// should be yielded when we finish a streak of new buckets.
|
||||
var inter, bInter Interjection
|
||||
|
||||
av, aOK := ai.Next()
|
||||
bv, bOK := bi.Next()
|
||||
loop:
|
||||
for {
|
||||
switch {
|
||||
case aOK && bOK:
|
||||
switch {
|
||||
case av == bv: // Both have an identical value. move on!
|
||||
// Finish WIP interjection and reset.
|
||||
if inter.num > 0 {
|
||||
interjections = append(interjections, inter)
|
||||
inter.num = 0
|
||||
}
|
||||
if bInter.num > 0 {
|
||||
bInterjections = append(bInterjections, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
addBucket(av)
|
||||
av, aOK = ai.Next()
|
||||
bv, bOK = bi.Next()
|
||||
inter.pos++
|
||||
bInter.pos++
|
||||
case av < bv: // b misses a value that is in a.
|
||||
bInter.num++
|
||||
// Collect the forward interjection before advancing the
|
||||
// position of 'a'.
|
||||
if inter.num > 0 {
|
||||
interjections = append(interjections, inter)
|
||||
inter.num = 0
|
||||
}
|
||||
addBucket(av)
|
||||
inter.pos++
|
||||
av, aOK = ai.Next()
|
||||
case av > bv: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++
|
||||
// Collect the backward interjection before advancing the
|
||||
// position of 'b'.
|
||||
if bInter.num > 0 {
|
||||
bInterjections = append(bInterjections, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
addBucket(bv)
|
||||
bInter.pos++
|
||||
bv, bOK = bi.Next()
|
||||
}
|
||||
case aOK && !bOK: // b misses a value that is in a.
|
||||
bInter.num++
|
||||
addBucket(av)
|
||||
av, aOK = ai.Next()
|
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++
|
||||
addBucket(bv)
|
||||
bv, bOK = bi.Next()
|
||||
default: // Both iterators ran out. We're done.
|
||||
if inter.num > 0 {
|
||||
interjections = append(interjections, inter)
|
||||
}
|
||||
if bInter.num > 0 {
|
||||
bInterjections = append(bInterjections, bInter)
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
return interjections, bInterjections, mergedSpans
|
||||
}
|
||||
|
||||
// interject merges 'in' with the provided interjections and writes them into
|
||||
// 'out', which must already have the appropriate length.
|
||||
func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV {
|
||||
|
|
|
@ -111,13 +111,12 @@ func TestBucketIterator(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestInterjection(t *testing.T) {
|
||||
func TestCompareSpansAndInterject(t *testing.T) {
|
||||
scenarios := []struct {
|
||||
description string
|
||||
spansA, spansB []histogram.Span
|
||||
valid bool
|
||||
interjections []Interjection
|
||||
bucketsIn, bucketsOut []int64
|
||||
description string
|
||||
spansA, spansB []histogram.Span
|
||||
interjections, backwardInterjections []Interjection
|
||||
bucketsIn, bucketsOut []int64
|
||||
}{
|
||||
{
|
||||
description: "single prepend at the beginning",
|
||||
|
@ -127,7 +126,6 @@ func TestInterjection(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -11, Length: 4},
|
||||
},
|
||||
valid: true,
|
||||
interjections: []Interjection{
|
||||
{
|
||||
pos: 0,
|
||||
|
@ -145,7 +143,6 @@ func TestInterjection(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -10, Length: 4},
|
||||
},
|
||||
valid: true,
|
||||
interjections: []Interjection{
|
||||
{
|
||||
pos: 3,
|
||||
|
@ -163,7 +160,6 @@ func TestInterjection(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -12, Length: 5},
|
||||
},
|
||||
valid: true,
|
||||
interjections: []Interjection{
|
||||
{
|
||||
pos: 0,
|
||||
|
@ -181,7 +177,6 @@ func TestInterjection(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -10, Length: 5},
|
||||
},
|
||||
valid: true,
|
||||
interjections: []Interjection{
|
||||
{
|
||||
pos: 3,
|
||||
|
@ -199,7 +194,6 @@ func TestInterjection(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -12, Length: 7},
|
||||
},
|
||||
valid: true,
|
||||
interjections: []Interjection{
|
||||
{
|
||||
pos: 0,
|
||||
|
@ -221,7 +215,9 @@ func TestInterjection(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -9, Length: 3},
|
||||
},
|
||||
valid: false,
|
||||
backwardInterjections: []Interjection{
|
||||
{pos: 0, num: 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "single removal of bucket in the middle",
|
||||
|
@ -232,7 +228,9 @@ func TestInterjection(t *testing.T) {
|
|||
{Offset: -10, Length: 2},
|
||||
{Offset: 1, Length: 1},
|
||||
},
|
||||
valid: false,
|
||||
backwardInterjections: []Interjection{
|
||||
{pos: 2, num: 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "single removal of bucket at the end",
|
||||
|
@ -242,7 +240,9 @@ func TestInterjection(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -10, Length: 3},
|
||||
},
|
||||
valid: false,
|
||||
backwardInterjections: []Interjection{
|
||||
{pos: 3, num: 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
description: "as described in doc comment",
|
||||
|
@ -259,7 +259,6 @@ func TestInterjection(t *testing.T) {
|
|||
{Offset: 1, Length: 4},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
valid: true,
|
||||
interjections: []Interjection{
|
||||
{
|
||||
pos: 2,
|
||||
|
@ -277,12 +276,67 @@ func TestInterjection(t *testing.T) {
|
|||
bucketsIn: []int64{6, -3, 0, -1, 2, 1, -4},
|
||||
bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1},
|
||||
},
|
||||
{
|
||||
description: "both forward and backward interjections, complex case",
|
||||
spansA: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 2, Length: 1},
|
||||
{Offset: 3, Length: 2},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 1, Length: 1},
|
||||
},
|
||||
spansB: []histogram.Span{
|
||||
{Offset: 1, Length: 2},
|
||||
{Offset: 1, Length: 1},
|
||||
{Offset: 1, Length: 2},
|
||||
{Offset: 1, Length: 1},
|
||||
{Offset: 4, Length: 1},
|
||||
},
|
||||
interjections: []Interjection{
|
||||
{
|
||||
pos: 2,
|
||||
num: 1,
|
||||
},
|
||||
{
|
||||
pos: 3,
|
||||
num: 2,
|
||||
},
|
||||
{
|
||||
pos: 6,
|
||||
num: 1,
|
||||
},
|
||||
},
|
||||
backwardInterjections: []Interjection{
|
||||
{
|
||||
pos: 0,
|
||||
num: 1,
|
||||
},
|
||||
{
|
||||
pos: 5,
|
||||
num: 1,
|
||||
},
|
||||
{
|
||||
pos: 6,
|
||||
num: 1,
|
||||
},
|
||||
{
|
||||
pos: 7,
|
||||
num: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, s := range scenarios {
|
||||
t.Run(s.description, func(t *testing.T) {
|
||||
interjections, valid := compareSpans(s.spansA, s.spansB)
|
||||
if !s.valid {
|
||||
if len(s.backwardInterjections) > 0 {
|
||||
interjections, bInterjections, _ := bidirectionalCompareSpans(s.spansA, s.spansB)
|
||||
require.Equal(t, s.interjections, interjections)
|
||||
require.Equal(t, s.backwardInterjections, bInterjections)
|
||||
}
|
||||
|
||||
interjections, valid := forwardCompareSpans(s.spansA, s.spansB)
|
||||
if len(s.backwardInterjections) > 0 {
|
||||
require.False(t, valid, "compareScan unexpectedly returned true")
|
||||
return
|
||||
}
|
||||
|
@ -292,6 +346,24 @@ func TestInterjection(t *testing.T) {
|
|||
gotBuckets := make([]int64, len(s.bucketsOut))
|
||||
interject(s.bucketsIn, gotBuckets, interjections, true)
|
||||
require.Equal(t, s.bucketsOut, gotBuckets)
|
||||
|
||||
floatBucketsIn := make([]float64, len(s.bucketsIn))
|
||||
last := s.bucketsIn[0]
|
||||
floatBucketsIn[0] = float64(last)
|
||||
for i := 1; i < len(floatBucketsIn); i++ {
|
||||
last += s.bucketsIn[i]
|
||||
floatBucketsIn[i] = float64(last)
|
||||
}
|
||||
floatBucketsOut := make([]float64, len(s.bucketsOut))
|
||||
last = s.bucketsOut[0]
|
||||
floatBucketsOut[0] = float64(last)
|
||||
for i := 1; i < len(floatBucketsOut); i++ {
|
||||
last += s.bucketsOut[i]
|
||||
floatBucketsOut[i] = float64(last)
|
||||
}
|
||||
gotFloatBuckets := make([]float64, len(floatBucketsOut))
|
||||
interject(floatBucketsIn, gotFloatBuckets, interjections, false)
|
||||
require.Equal(t, floatBucketsOut, gotFloatBuckets)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -369,3 +441,135 @@ func TestWriteReadHistogramChunkLayout(t *testing.T) {
|
|||
require.Equal(t, want.negativeSpans, gotNegativeSpans)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpansFromBidirectionalCompareSpans(t *testing.T) {
|
||||
cases := []struct {
|
||||
s1, s2, exp []histogram.Span
|
||||
}{
|
||||
{ // All empty.
|
||||
s1: []histogram.Span{},
|
||||
s2: []histogram.Span{},
|
||||
},
|
||||
{ // Same spans.
|
||||
s1: []histogram.Span{},
|
||||
s2: []histogram.Span{},
|
||||
},
|
||||
{
|
||||
// Has the cases of
|
||||
// 1. |----| (partial overlap)
|
||||
// |----|
|
||||
//
|
||||
// 2. |-----| (no gap but no overlap as well)
|
||||
// |---|
|
||||
//
|
||||
// 3. |----| (complete overlap)
|
||||
// |----|
|
||||
s1: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 3, Length: 3},
|
||||
{Offset: 5, Length: 3},
|
||||
},
|
||||
s2: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 2, Length: 2},
|
||||
{Offset: 2, Length: 3},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
exp: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 1, Length: 7},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
// s1 is superset of s2.
|
||||
s1: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 3, Length: 5},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
s2: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 5, Length: 3},
|
||||
{Offset: 4, Length: 3},
|
||||
},
|
||||
exp: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 3, Length: 5},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
// No overlaps but one span is side by side.
|
||||
s1: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 3, Length: 3},
|
||||
{Offset: 5, Length: 3},
|
||||
},
|
||||
s2: []histogram.Span{
|
||||
{Offset: 3, Length: 3},
|
||||
{Offset: 4, Length: 2},
|
||||
},
|
||||
exp: []histogram.Span{
|
||||
{Offset: 0, Length: 9},
|
||||
{Offset: 1, Length: 2},
|
||||
{Offset: 2, Length: 3},
|
||||
},
|
||||
},
|
||||
{
|
||||
// No buckets in one of them.
|
||||
s1: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 3, Length: 3},
|
||||
{Offset: 5, Length: 3},
|
||||
},
|
||||
s2: []histogram.Span{},
|
||||
exp: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 3, Length: 3},
|
||||
{Offset: 5, Length: 3},
|
||||
},
|
||||
},
|
||||
{ // Zero length spans.
|
||||
s1: []histogram.Span{
|
||||
{Offset: -5, Length: 0},
|
||||
{Offset: 2, Length: 0},
|
||||
{Offset: 3, Length: 3},
|
||||
{Offset: 1, Length: 0},
|
||||
{Offset: 2, Length: 3},
|
||||
{Offset: 2, Length: 0},
|
||||
{Offset: 2, Length: 0},
|
||||
{Offset: 1, Length: 3},
|
||||
{Offset: 4, Length: 0},
|
||||
{Offset: 5, Length: 0},
|
||||
},
|
||||
s2: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 2, Length: 2},
|
||||
{Offset: 1, Length: 0},
|
||||
{Offset: 1, Length: 3},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
exp: []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 1, Length: 7},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
s1c := make([]histogram.Span, len(c.s1))
|
||||
s2c := make([]histogram.Span, len(c.s2))
|
||||
copy(s1c, c.s1)
|
||||
copy(s2c, c.s2)
|
||||
|
||||
_, _, act := bidirectionalCompareSpans(c.s1, c.s2)
|
||||
require.Equal(t, c.exp, act)
|
||||
// Check that s1 and s2 are not modified.
|
||||
require.Equal(t, s1c, c.s1)
|
||||
require.Equal(t, s2c, c.s2)
|
||||
_, _, act = bidirectionalCompareSpans(c.s2, c.s1)
|
||||
require.Equal(t, c.exp, act)
|
||||
}
|
||||
}
|
||||
|
|
41
tsdb/head.go
41
tsdb/head.go
|
@ -17,6 +17,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -2026,7 +2027,7 @@ func (h *Head) updateWALReplayStatusRead(current int) {
|
|||
func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
|
||||
for i := 0; i < n; i++ {
|
||||
r = append(r, &histogram.Histogram{
|
||||
Count: 5 + uint64(i*4),
|
||||
Count: 10 + uint64(i*8),
|
||||
ZeroCount: 2 + uint64(i),
|
||||
ZeroThreshold: 0.001,
|
||||
Sum: 18.4 * float64(i+1),
|
||||
|
@ -2036,6 +2037,11 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
|
|||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
|
||||
NegativeSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
NegativeBuckets: []int64{int64(i + 1), 1, -1, 0},
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2045,7 +2051,7 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
|
|||
func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
|
||||
for i := 0; i < n; i++ {
|
||||
r = append(r, &histogram.FloatHistogram{
|
||||
Count: 5 + float64(i*4),
|
||||
Count: 10 + float64(i*8),
|
||||
ZeroCount: 2 + float64(i),
|
||||
ZeroThreshold: 0.001,
|
||||
Sum: 18.4 * float64(i+1),
|
||||
|
@ -2055,6 +2061,37 @@ func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
|
|||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)},
|
||||
NegativeSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
NegativeBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)},
|
||||
})
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func GenerateTestGaugeHistograms(n int) (r []*histogram.FloatHistogram) {
|
||||
for x := 0; x < n; x++ {
|
||||
i := rand.Intn(n)
|
||||
r = append(r, &histogram.FloatHistogram{
|
||||
CounterResetHint: histogram.GaugeType,
|
||||
Count: 10 + float64(i*8),
|
||||
ZeroCount: 2 + float64(i),
|
||||
ZeroThreshold: 0.001,
|
||||
Sum: 18.4 * float64(i+1),
|
||||
Schema: 1,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)},
|
||||
NegativeSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
NegativeBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)},
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -1138,9 +1138,10 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
|
|||
|
||||
// appendHistogram adds the histogram.
|
||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
// TODO(codesome): Support gauge histograms here.
|
||||
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
|
||||
// Head controls the execution of recoding, so that we own the proper
|
||||
// chunk reference afterwards. We check for Appendable before
|
||||
// chunk reference afterwards. We check for Appendable from appender before
|
||||
// appendPreprocessor because in case it ends up creating a new chunk,
|
||||
// we need to know if there was also a counter reset or not to set the
|
||||
// meta properly.
|
||||
|
@ -1209,24 +1210,37 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
|||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
|
||||
// Head controls the execution of recoding, so that we own the proper
|
||||
// chunk reference afterwards. We check for Appendable before
|
||||
// chunk reference afterwards. We check for Appendable from appender before
|
||||
// appendPreprocessor because in case it ends up creating a new chunk,
|
||||
// we need to know if there was also a counter reset or not to set the
|
||||
// meta properly.
|
||||
app, _ := s.app.(*chunkenc.FloatHistogramAppender)
|
||||
var (
|
||||
positiveInterjections, negativeInterjections []chunkenc.Interjection
|
||||
pBackwardInter, nBackwardInter []chunkenc.Interjection
|
||||
pMergedSpans, nMergedSpans []histogram.Span
|
||||
okToAppend, counterReset bool
|
||||
)
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
gauge := fh.CounterResetHint == histogram.GaugeType
|
||||
if app != nil {
|
||||
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh)
|
||||
if gauge {
|
||||
positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter,
|
||||
pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(fh)
|
||||
} else {
|
||||
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh)
|
||||
}
|
||||
}
|
||||
|
||||
if !chunkCreated {
|
||||
if len(pBackwardInter)+len(nBackwardInter) > 0 {
|
||||
fh.PositiveSpans = pMergedSpans
|
||||
fh.NegativeSpans = nMergedSpans
|
||||
app.RecodeHistogramm(fh, pBackwardInter, nBackwardInter)
|
||||
}
|
||||
// We have 3 cases here
|
||||
// - !okToAppend -> We need to cut a new chunk.
|
||||
// - okToAppend but we have interjections → Existing chunk needs
|
||||
|
@ -1251,7 +1265,9 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
|
|||
if chunkCreated {
|
||||
hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk)
|
||||
header := chunkenc.UnknownCounterReset
|
||||
if counterReset {
|
||||
if gauge {
|
||||
header = chunkenc.GaugeType
|
||||
} else if counterReset {
|
||||
header = chunkenc.CounterReset
|
||||
} else if okToAppend {
|
||||
header = chunkenc.NotCounterReset
|
||||
|
|
|
@ -110,7 +110,9 @@ func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) {
|
|||
func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
|
||||
sr, err := wlog.NewSegmentsReader(dir)
|
||||
require.NoError(t, err)
|
||||
defer sr.Close()
|
||||
defer func() {
|
||||
require.NoError(t, sr.Close())
|
||||
}()
|
||||
|
||||
var dec record.Decoder
|
||||
r := wlog.NewReader(sr)
|
||||
|
@ -127,6 +129,14 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
|
|||
samples, err := dec.Samples(rec, nil)
|
||||
require.NoError(t, err)
|
||||
recs = append(recs, samples)
|
||||
case record.HistogramSamples:
|
||||
samples, err := dec.HistogramSamples(rec, nil)
|
||||
require.NoError(t, err)
|
||||
recs = append(recs, samples)
|
||||
case record.FloatHistogramSamples:
|
||||
samples, err := dec.FloatHistogramSamples(rec, nil)
|
||||
require.NoError(t, err)
|
||||
recs = append(recs, samples)
|
||||
case record.Tombstones:
|
||||
tstones, err := dec.Tombstones(rec, nil)
|
||||
require.NoError(t, err)
|
||||
|
@ -2824,6 +2834,7 @@ func TestAppendHistogram(t *testing.T) {
|
|||
ingestTs := int64(0)
|
||||
app := head.Appender(context.Background())
|
||||
|
||||
// Integer histograms.
|
||||
type timedHistogram struct {
|
||||
t int64
|
||||
h *histogram.Histogram
|
||||
|
@ -2844,6 +2855,7 @@ func TestAppendHistogram(t *testing.T) {
|
|||
t int64
|
||||
h *histogram.FloatHistogram
|
||||
}
|
||||
// Float counter histograms.
|
||||
expFloatHistograms := make([]timedFloatHistogram, 0, numHistograms)
|
||||
for _, fh := range GenerateTestFloatHistograms(numHistograms) {
|
||||
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
|
||||
|
@ -2855,6 +2867,18 @@ func TestAppendHistogram(t *testing.T) {
|
|||
app = head.Appender(context.Background())
|
||||
}
|
||||
}
|
||||
|
||||
// Float gauge histograms.
|
||||
for _, fh := range GenerateTestGaugeHistograms(numHistograms) {
|
||||
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
|
||||
require.NoError(t, err)
|
||||
expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh})
|
||||
ingestTs++
|
||||
if ingestTs%50 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
|
||||
|
@ -2898,7 +2922,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
|||
// Series with only histograms.
|
||||
s1 := labels.FromStrings("a", "b1")
|
||||
k1 := s1.String()
|
||||
numHistograms := 450
|
||||
numHistograms := 300
|
||||
exp := map[string][]tsdbutil.Sample{}
|
||||
app := head.Appender(context.Background())
|
||||
ts := int64(0)
|
||||
|
@ -2916,26 +2940,34 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
|||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
for _, h := range GenerateTestFloatHistograms(numHistograms) {
|
||||
h.Count = h.Count * 2
|
||||
h.NegativeSpans = h.PositiveSpans
|
||||
h.NegativeBuckets = h.PositiveBuckets
|
||||
_, err := app.AppendHistogram(0, s1, ts, nil, h)
|
||||
require.NoError(t, err)
|
||||
exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()})
|
||||
ts++
|
||||
if ts%5 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
for _, gauge := range []bool{true, false} {
|
||||
app = head.Appender(context.Background())
|
||||
var hists []*histogram.FloatHistogram
|
||||
if gauge {
|
||||
hists = GenerateTestGaugeHistograms(numHistograms)
|
||||
} else {
|
||||
hists = GenerateTestFloatHistograms(numHistograms)
|
||||
}
|
||||
for _, h := range hists {
|
||||
h.Count = h.Count * 2
|
||||
h.NegativeSpans = h.PositiveSpans
|
||||
h.NegativeBuckets = h.PositiveBuckets
|
||||
_, err := app.AppendHistogram(0, s1, ts, nil, h)
|
||||
require.NoError(t, err)
|
||||
exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()})
|
||||
ts++
|
||||
if ts%5 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// There should be 7 mmap chunks in s1.
|
||||
ms := head.series.getByHash(s1.Hash(), s1)
|
||||
require.Len(t, ms.mmappedChunks, 7)
|
||||
expMmapChunks := make([]*mmappedChunk, 0, 7)
|
||||
require.Len(t, ms.mmappedChunks, 8)
|
||||
expMmapChunks := make([]*mmappedChunk, 0, 8)
|
||||
for _, mmap := range ms.mmappedChunks {
|
||||
require.Greater(t, mmap.numSamples, uint16(0))
|
||||
cpy := *mmap
|
||||
|
@ -2972,51 +3004,68 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
|||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
for _, h := range GenerateTestFloatHistograms(100) {
|
||||
ts++
|
||||
h.Count = h.Count * 2
|
||||
h.NegativeSpans = h.PositiveSpans
|
||||
h.NegativeBuckets = h.PositiveBuckets
|
||||
_, err := app.AppendHistogram(0, s2, int64(ts), nil, h)
|
||||
require.NoError(t, err)
|
||||
exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()})
|
||||
if ts%20 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
// Add some float.
|
||||
for i := 0; i < 10; i++ {
|
||||
ts++
|
||||
_, err := app.Append(0, s2, int64(ts), float64(ts))
|
||||
require.NoError(t, err)
|
||||
exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)})
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
for _, gauge := range []bool{true, false} {
|
||||
app = head.Appender(context.Background())
|
||||
var hists []*histogram.FloatHistogram
|
||||
if gauge {
|
||||
hists = GenerateTestGaugeHistograms(100)
|
||||
} else {
|
||||
hists = GenerateTestFloatHistograms(100)
|
||||
}
|
||||
for _, h := range hists {
|
||||
ts++
|
||||
h.Count = h.Count * 2
|
||||
h.NegativeSpans = h.PositiveSpans
|
||||
h.NegativeBuckets = h.PositiveBuckets
|
||||
_, err := app.AppendHistogram(0, s2, int64(ts), nil, h)
|
||||
require.NoError(t, err)
|
||||
exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()})
|
||||
if ts%20 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
// Add some float.
|
||||
for i := 0; i < 10; i++ {
|
||||
ts++
|
||||
_, err := app.Append(0, s2, int64(ts), float64(ts))
|
||||
require.NoError(t, err)
|
||||
exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)})
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Restart head.
|
||||
require.NoError(t, head.Close())
|
||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||
require.NoError(t, err)
|
||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(0))
|
||||
startHead := func() {
|
||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||
require.NoError(t, err)
|
||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(0))
|
||||
}
|
||||
startHead()
|
||||
|
||||
// Checking contents of s1.
|
||||
ms = head.series.getByHash(s1.Hash(), s1)
|
||||
require.Equal(t, expMmapChunks, ms.mmappedChunks)
|
||||
for _, mmap := range ms.mmappedChunks {
|
||||
require.Greater(t, mmap.numSamples, uint16(0))
|
||||
}
|
||||
require.Equal(t, expHeadChunkSamples, ms.headChunk.chunk.NumSamples())
|
||||
|
||||
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
|
||||
require.NoError(t, err)
|
||||
act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*"))
|
||||
require.Equal(t, exp, act)
|
||||
testQuery := func() {
|
||||
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
|
||||
require.NoError(t, err)
|
||||
act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*"))
|
||||
require.Equal(t, exp, act)
|
||||
}
|
||||
testQuery()
|
||||
|
||||
// Restart with no mmap chunks to test WAL replay.
|
||||
require.NoError(t, head.Close())
|
||||
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
|
||||
startHead()
|
||||
testQuery()
|
||||
}
|
||||
|
||||
func TestChunkSnapshot(t *testing.T) {
|
||||
|
@ -3522,7 +3571,7 @@ func TestHistogramCounterResetHeader(t *testing.T) {
|
|||
if floatHisto {
|
||||
_, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat())
|
||||
} else {
|
||||
_, err = app.AppendHistogram(0, l, ts, h, nil)
|
||||
_, err = app.AppendHistogram(0, l, ts, h.Copy(), nil)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -3553,10 +3602,6 @@ func TestHistogramCounterResetHeader(t *testing.T) {
|
|||
}
|
||||
|
||||
h := GenerateTestHistograms(1)[0]
|
||||
if len(h.NegativeBuckets) == 0 {
|
||||
h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...)
|
||||
h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...)
|
||||
}
|
||||
h.PositiveBuckets = []int64{100, 1, 1, 1}
|
||||
h.NegativeBuckets = []int64{100, 1, 1, 1}
|
||||
h.Count = 1000
|
||||
|
@ -4517,3 +4562,78 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) {
|
|||
require.NoError(t, h.truncateOOO(0, 2))
|
||||
require.Equal(t, 295*time.Minute.Milliseconds(), h.MinOOOTime())
|
||||
}
|
||||
|
||||
func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
|
||||
l := labels.FromStrings("a", "b")
|
||||
head, _ := newTestHead(t, 1000, false, false)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, head.Close())
|
||||
})
|
||||
require.NoError(t, head.Init(0))
|
||||
|
||||
ts := int64(0)
|
||||
appendHistogram := func(h *histogram.FloatHistogram) {
|
||||
ts++
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.AppendHistogram(0, l, ts, nil, h.Copy())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
hists := GenerateTestGaugeHistograms(5)
|
||||
hists[0].CounterResetHint = histogram.UnknownCounterReset
|
||||
appendHistogram(hists[0])
|
||||
appendHistogram(hists[1])
|
||||
appendHistogram(hists[2])
|
||||
hists[3].CounterResetHint = histogram.UnknownCounterReset
|
||||
appendHistogram(hists[3])
|
||||
appendHistogram(hists[3])
|
||||
appendHistogram(hists[4])
|
||||
|
||||
checkHeaders := func() {
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, ms.mmappedChunks, 3)
|
||||
expHeaders := []chunkenc.CounterResetHeader{
|
||||
chunkenc.UnknownCounterReset,
|
||||
chunkenc.GaugeType,
|
||||
chunkenc.UnknownCounterReset,
|
||||
chunkenc.GaugeType,
|
||||
}
|
||||
for i, mmapChunk := range ms.mmappedChunks {
|
||||
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expHeaders[i], chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
|
||||
}
|
||||
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
|
||||
}
|
||||
checkHeaders()
|
||||
|
||||
recs := readTestWAL(t, head.wal.Dir())
|
||||
require.Equal(t, []interface{}{
|
||||
[]record.RefSeries{
|
||||
{
|
||||
Ref: 1,
|
||||
Labels: labels.FromStrings("a", "b"),
|
||||
},
|
||||
},
|
||||
[]record.RefFloatHistogramSample{{Ref: 1, T: 1, FH: hists[0]}},
|
||||
[]record.RefFloatHistogramSample{{Ref: 1, T: 2, FH: hists[1]}},
|
||||
[]record.RefFloatHistogramSample{{Ref: 1, T: 3, FH: hists[2]}},
|
||||
[]record.RefFloatHistogramSample{{Ref: 1, T: 4, FH: hists[3]}},
|
||||
[]record.RefFloatHistogramSample{{Ref: 1, T: 5, FH: hists[3]}},
|
||||
[]record.RefFloatHistogramSample{{Ref: 1, T: 6, FH: hists[4]}},
|
||||
}, recs)
|
||||
|
||||
// Restart Head without mmap chunks to expect the WAL replay to recognize gauge histograms.
|
||||
require.NoError(t, head.Close())
|
||||
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
|
||||
|
||||
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||
require.NoError(t, err)
|
||||
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, head.Init(0))
|
||||
|
||||
checkHeaders()
|
||||
}
|
||||
|
|
|
@ -441,6 +441,8 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample)
|
|||
H: &histogram.Histogram{},
|
||||
}
|
||||
|
||||
rh.H.CounterResetHint = histogram.CounterResetHint(dec.Byte())
|
||||
|
||||
rh.H.Schema = int32(dec.Varint64())
|
||||
rh.H.ZeroThreshold = math.Float64frombits(dec.Be64())
|
||||
|
||||
|
@ -517,6 +519,8 @@ func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogr
|
|||
FH: &histogram.FloatHistogram{},
|
||||
}
|
||||
|
||||
rh.FH.CounterResetHint = histogram.CounterResetHint(dec.Byte())
|
||||
|
||||
rh.FH.Schema = int32(dec.Varint64())
|
||||
rh.FH.ZeroThreshold = dec.Be64Float64()
|
||||
|
||||
|
@ -715,6 +719,8 @@ func (e *Encoder) HistogramSamples(histograms []RefHistogramSample, b []byte) []
|
|||
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(h.T - first.T)
|
||||
|
||||
buf.PutByte(byte(h.H.CounterResetHint))
|
||||
|
||||
buf.PutVarint64(int64(h.H.Schema))
|
||||
buf.PutBE64(math.Float64bits(h.H.ZeroThreshold))
|
||||
|
||||
|
@ -766,6 +772,8 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b
|
|||
buf.PutVarint64(int64(h.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(h.T - first.T)
|
||||
|
||||
buf.PutByte(byte(h.FH.CounterResetHint))
|
||||
|
||||
buf.PutVarint64(int64(h.FH.Schema))
|
||||
buf.PutBEFloat64(h.FH.ZeroThreshold)
|
||||
|
||||
|
|
|
@ -165,6 +165,22 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
|||
decFloatHistograms, err := dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, floatHistograms, decFloatHistograms)
|
||||
|
||||
// Gauge ingeger histograms.
|
||||
for i := range histograms {
|
||||
histograms[i].H.CounterResetHint = histogram.GaugeType
|
||||
}
|
||||
decHistograms, err = dec.HistogramSamples(enc.HistogramSamples(histograms, nil), nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, histograms, decHistograms)
|
||||
|
||||
// Gauge float histograms.
|
||||
for i := range floatHistograms {
|
||||
floatHistograms[i].FH.CounterResetHint = histogram.GaugeType
|
||||
}
|
||||
decFloatHistograms, err = dec.FloatHistogramSamples(enc.FloatHistogramSamples(floatHistograms, nil), nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, floatHistograms, decFloatHistograms)
|
||||
}
|
||||
|
||||
// TestRecord_Corrupted ensures that corrupted records return the correct error.
|
||||
|
|
Loading…
Reference in a new issue