mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Merge pull request #11864 from prometheus/beorn7/histogram2
histograms: Return actually useful counter reset hints
This commit is contained in:
commit
60d763282e
|
@ -3125,7 +3125,7 @@ func TestRangeQuery(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSparseHistogramRate(t *testing.T) {
|
||||
func TestNativeHistogramRate(t *testing.T) {
|
||||
// TODO(beorn7): Integrate histograms into the PromQL testing framework
|
||||
// and write more tests there.
|
||||
test, err := NewTest(t, "")
|
||||
|
@ -3155,20 +3155,22 @@ func TestSparseHistogramRate(t *testing.T) {
|
|||
require.Len(t, vector, 1)
|
||||
actualHistogram := vector[0].H
|
||||
expectedHistogram := &histogram.FloatHistogram{
|
||||
Schema: 1,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 1. / 15.,
|
||||
Count: 8. / 15.,
|
||||
Sum: 1.226666666666667,
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
// TODO(beorn7): This should be GaugeType. Change it once supported by PromQL.
|
||||
CounterResetHint: histogram.NotCounterReset,
|
||||
Schema: 1,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 1. / 15.,
|
||||
Count: 8. / 15.,
|
||||
Sum: 1.226666666666667,
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
}
|
||||
require.Equal(t, expectedHistogram, actualHistogram)
|
||||
}
|
||||
|
||||
func TestSparseFloatHistogramRate(t *testing.T) {
|
||||
func TestNativeFloatHistogramRate(t *testing.T) {
|
||||
// TODO(beorn7): Integrate histograms into the PromQL testing framework
|
||||
// and write more tests there.
|
||||
test, err := NewTest(t, "")
|
||||
|
@ -3198,20 +3200,22 @@ func TestSparseFloatHistogramRate(t *testing.T) {
|
|||
require.Len(t, vector, 1)
|
||||
actualHistogram := vector[0].H
|
||||
expectedHistogram := &histogram.FloatHistogram{
|
||||
Schema: 1,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 1. / 15.,
|
||||
Count: 8. / 15.,
|
||||
Sum: 1.226666666666667,
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
// TODO(beorn7): This should be GaugeType. Change it once supported by PromQL.
|
||||
CounterResetHint: histogram.NotCounterReset,
|
||||
Schema: 1,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 1. / 15.,
|
||||
Count: 8. / 15.,
|
||||
Sum: 1.226666666666667,
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
|
||||
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
|
||||
}
|
||||
require.Equal(t, expectedHistogram, actualHistogram)
|
||||
}
|
||||
|
||||
func TestSparseHistogram_HistogramCountAndSum(t *testing.T) {
|
||||
func TestNativeHistogram_HistogramCountAndSum(t *testing.T) {
|
||||
// TODO(codesome): Integrate histograms into the PromQL testing framework
|
||||
// and write more tests there.
|
||||
h := &histogram.Histogram{
|
||||
|
@ -3290,7 +3294,7 @@ func TestSparseHistogram_HistogramCountAndSum(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSparseHistogram_HistogramQuantile(t *testing.T) {
|
||||
func TestNativeHistogram_HistogramQuantile(t *testing.T) {
|
||||
// TODO(codesome): Integrate histograms into the PromQL testing framework
|
||||
// and write more tests there.
|
||||
type subCase struct {
|
||||
|
@ -3526,7 +3530,7 @@ func TestSparseHistogram_HistogramQuantile(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSparseHistogram_HistogramFraction(t *testing.T) {
|
||||
func TestNativeHistogram_HistogramFraction(t *testing.T) {
|
||||
// TODO(codesome): Integrate histograms into the PromQL testing framework
|
||||
// and write more tests there.
|
||||
type subCase struct {
|
||||
|
@ -3961,7 +3965,7 @@ func TestSparseHistogram_HistogramFraction(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSparseHistogram_Sum_Count_AddOperator(t *testing.T) {
|
||||
func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) {
|
||||
// TODO(codesome): Integrate histograms into the PromQL testing framework
|
||||
// and write more tests there.
|
||||
cases := []struct {
|
||||
|
|
|
@ -440,6 +440,10 @@ type chainSampleIterator struct {
|
|||
|
||||
curr chunkenc.Iterator
|
||||
lastT int64
|
||||
|
||||
// Whether the previous and the current sample are direct neighbors
|
||||
// within the same base iterator.
|
||||
consecutive bool
|
||||
}
|
||||
|
||||
// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible.
|
||||
|
@ -485,6 +489,9 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
|
|||
if c.curr != nil && c.lastT >= t {
|
||||
return c.curr.Seek(c.lastT)
|
||||
}
|
||||
// Don't bother to find out if the next sample is consecutive. Callers
|
||||
// of Seek usually aren't interested anyway.
|
||||
c.consecutive = false
|
||||
c.h = samplesIteratorHeap{}
|
||||
for _, iter := range c.iterators {
|
||||
if iter.Seek(t) != chunkenc.ValNone {
|
||||
|
@ -511,14 +518,30 @@ func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) {
|
|||
if c.curr == nil {
|
||||
panic("chainSampleIterator.AtHistogram called before first .Next or after .Next returned false.")
|
||||
}
|
||||
return c.curr.AtHistogram()
|
||||
t, h := c.curr.AtHistogram()
|
||||
// If the current sample is not consecutive with the previous one, we
|
||||
// cannot be sure anymore that there was no counter reset.
|
||||
if !c.consecutive && h.CounterResetHint == histogram.NotCounterReset {
|
||||
h.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
return t, h
|
||||
}
|
||||
|
||||
func (c *chainSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
|
||||
if c.curr == nil {
|
||||
panic("chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false.")
|
||||
}
|
||||
return c.curr.AtFloatHistogram()
|
||||
t, fh := c.curr.AtFloatHistogram()
|
||||
// If the current sample is not consecutive with the previous one, we
|
||||
// cannot be sure anymore about counter resets for counter histograms.
|
||||
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
|
||||
// non-consecutive `CounterReset` sample, we could keep the hint as
|
||||
// `CounterReset`. But then we needed to track the previous sample
|
||||
// in more detail, which might not be worth it.
|
||||
if !c.consecutive && fh.CounterResetHint != histogram.GaugeType {
|
||||
fh.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
return t, fh
|
||||
}
|
||||
|
||||
func (c *chainSampleIterator) AtT() int64 {
|
||||
|
@ -529,7 +552,13 @@ func (c *chainSampleIterator) AtT() int64 {
|
|||
}
|
||||
|
||||
func (c *chainSampleIterator) Next() chunkenc.ValueType {
|
||||
var (
|
||||
currT int64
|
||||
currValueType chunkenc.ValueType
|
||||
iteratorChanged bool
|
||||
)
|
||||
if c.h == nil {
|
||||
iteratorChanged = true
|
||||
c.h = samplesIteratorHeap{}
|
||||
// We call c.curr.Next() as the first thing below.
|
||||
// So, we don't call Next() on it here.
|
||||
|
@ -545,8 +574,6 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
|
|||
return chunkenc.ValNone
|
||||
}
|
||||
|
||||
var currT int64
|
||||
var currValueType chunkenc.ValueType
|
||||
for {
|
||||
currValueType = c.curr.Next()
|
||||
if currValueType != chunkenc.ValNone {
|
||||
|
@ -576,6 +603,7 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
|
|||
}
|
||||
|
||||
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
||||
iteratorChanged = true
|
||||
currT = c.curr.AtT()
|
||||
currValueType = c.curr.Seek(currT)
|
||||
if currT != c.lastT {
|
||||
|
@ -583,6 +611,7 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
|
|||
}
|
||||
}
|
||||
|
||||
c.consecutive = !iteratorChanged
|
||||
c.lastT = currT
|
||||
return currValueType
|
||||
}
|
||||
|
|
|
@ -632,6 +632,16 @@ func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64, flo
|
|||
},
|
||||
PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0},
|
||||
}
|
||||
if ts != mint {
|
||||
// By setting the counter reset hint to "no counter
|
||||
// reset" for all histograms but the first, we cover the
|
||||
// most common cases. If the series is manipulated later
|
||||
// or spans more than one block when ingested into the
|
||||
// storage, the hint has to be adjusted. Note that the
|
||||
// storage itself treats this particular hint the same
|
||||
// as "unknown".
|
||||
h.CounterResetHint = histogram.NotCounterReset
|
||||
}
|
||||
if floatHistogram {
|
||||
return sample{t: ts, fh: h.ToFloat()}
|
||||
}
|
||||
|
@ -661,6 +671,13 @@ func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step in
|
|||
},
|
||||
PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0},
|
||||
}
|
||||
if count > 1 && count%5 != 1 {
|
||||
// Same rationale for this as above in
|
||||
// genHistogramSeries, just that we have to be
|
||||
// smarter to find out if the previous sample
|
||||
// was a histogram, too.
|
||||
h.CounterResetHint = histogram.NotCounterReset
|
||||
}
|
||||
if floatHistogram {
|
||||
s = sample{t: ts, fh: h.ToFloat()}
|
||||
} else {
|
||||
|
|
|
@ -217,10 +217,10 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) {
|
|||
panic("appended an integer histogram to a float histogram chunk")
|
||||
}
|
||||
|
||||
// Appendable returns whether the chunk can be appended to, and if so
|
||||
// whether any recoding needs to happen using the provided interjections
|
||||
// (in case of any new buckets, positive or negative range, respectively).
|
||||
// If the sample is a gauge histogram, AppendableGauge must be used instead.
|
||||
// Appendable returns whether the chunk can be appended to, and if so whether
|
||||
// any recoding needs to happen using the provided inserts (in case of any new
|
||||
// buckets, positive or negative range, respectively). If the sample is a gauge
|
||||
// histogram, AppendableGauge must be used instead.
|
||||
//
|
||||
// The chunk is not appendable in the following cases:
|
||||
// - The schema has changed.
|
||||
|
@ -233,7 +233,7 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) {
|
|||
// because of a counter reset. If the given sample is stale, it is always ok to
|
||||
// append. If counterReset is true, okToAppend is always false.
|
||||
func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
|
||||
positiveInterjections, negativeInterjections []Interjection,
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
okToAppend, counterReset bool,
|
||||
) {
|
||||
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
|
||||
|
@ -267,12 +267,12 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
|
|||
}
|
||||
|
||||
var ok bool
|
||||
positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans)
|
||||
positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
}
|
||||
negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans)
|
||||
negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
|
@ -280,7 +280,7 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
|
|||
|
||||
if counterResetInAnyFloatBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) ||
|
||||
counterResetInAnyFloatBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) {
|
||||
counterReset, positiveInterjections, negativeInterjections = true, nil, nil
|
||||
counterReset, positiveInserts, negativeInserts = true, nil, nil
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -290,10 +290,11 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
|
|||
|
||||
// AppendableGauge returns whether the chunk can be appended to, and if so
|
||||
// whether:
|
||||
// 1. Any recoding needs to happen to the chunk using the provided interjections
|
||||
// 1. Any recoding needs to happen to the chunk using the provided inserts
|
||||
// (in case of any new buckets, positive or negative range, respectively).
|
||||
// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections
|
||||
// (in case of any missing buckets, positive or negative range, respectively).
|
||||
// 2. Any recoding needs to happen for the histogram being appended, using the
|
||||
// backward inserts (in case of any missing buckets, positive or negative
|
||||
// range, respectively).
|
||||
//
|
||||
// This method must be only used for gauge histograms.
|
||||
//
|
||||
|
@ -302,8 +303,8 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
|
|||
// - The threshold for the zero bucket has changed.
|
||||
// - The last sample in the chunk was stale while the current sample is not stale.
|
||||
func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) (
|
||||
positiveInterjections, negativeInterjections []Interjection,
|
||||
backwardPositiveInterjections, backwardNegativeInterjections []Interjection,
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
backwardPositiveInserts, backwardNegativeInserts []Insert,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
okToAppend bool,
|
||||
) {
|
||||
|
@ -325,8 +326,8 @@ func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) (
|
|||
return
|
||||
}
|
||||
|
||||
positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans)
|
||||
negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans)
|
||||
positiveInserts, backwardPositiveInserts, positiveSpans = expandSpansBothWays(a.pSpans, h.PositiveSpans)
|
||||
negativeInserts, backwardNegativeInserts, negativeSpans = expandSpansBothWays(a.nSpans, h.NegativeSpans)
|
||||
okToAppend = true
|
||||
return
|
||||
}
|
||||
|
@ -501,12 +502,12 @@ func (a *FloatHistogramAppender) writeXorValue(old *xorValue, v float64) {
|
|||
}
|
||||
|
||||
// Recode converts the current chunk to accommodate an expansion of the set of
|
||||
// (positive and/or negative) buckets used, according to the provided
|
||||
// interjections, resulting in the honoring of the provided new positive and
|
||||
// negative spans. To continue appending, use the returned Appender rather than
|
||||
// the receiver of this method.
|
||||
// (positive and/or negative) buckets used, according to the provided inserts,
|
||||
// resulting in the honoring of the provided new positive and negative spans. To
|
||||
// continue appending, use the returned Appender rather than the receiver of
|
||||
// this method.
|
||||
func (a *FloatHistogramAppender) Recode(
|
||||
positiveInterjections, negativeInterjections []Interjection,
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
) (Chunk, Appender) {
|
||||
// TODO(beorn7): This currently just decodes everything and then encodes
|
||||
|
@ -539,11 +540,11 @@ func (a *FloatHistogramAppender) Recode(
|
|||
|
||||
// Save the modified histogram to the new chunk.
|
||||
hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans
|
||||
if len(positiveInterjections) > 0 {
|
||||
hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections, false)
|
||||
if len(positiveInserts) > 0 {
|
||||
hOld.PositiveBuckets = insert(hOld.PositiveBuckets, positiveBuckets, positiveInserts, false)
|
||||
}
|
||||
if len(negativeInterjections) > 0 {
|
||||
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections, false)
|
||||
if len(negativeInserts) > 0 {
|
||||
hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, false)
|
||||
}
|
||||
app.AppendFloatHistogram(tOld, hOld)
|
||||
}
|
||||
|
@ -556,15 +557,15 @@ func (a *FloatHistogramAppender) Recode(
|
|||
// (positive and/or negative) buckets used.
|
||||
func (a *FloatHistogramAppender) RecodeHistogramm(
|
||||
fh *histogram.FloatHistogram,
|
||||
pBackwardInter, nBackwardInter []Interjection,
|
||||
pBackwardInter, nBackwardInter []Insert,
|
||||
) {
|
||||
if len(pBackwardInter) > 0 {
|
||||
numPositiveBuckets := countSpans(fh.PositiveSpans)
|
||||
fh.PositiveBuckets = interject(fh.PositiveBuckets, make([]float64, numPositiveBuckets), pBackwardInter, false)
|
||||
fh.PositiveBuckets = insert(fh.PositiveBuckets, make([]float64, numPositiveBuckets), pBackwardInter, false)
|
||||
}
|
||||
if len(nBackwardInter) > 0 {
|
||||
numNegativeBuckets := countSpans(fh.NegativeSpans)
|
||||
fh.NegativeBuckets = interject(fh.NegativeBuckets, make([]float64, numNegativeBuckets), nBackwardInter, false)
|
||||
fh.NegativeBuckets = insert(fh.NegativeBuckets, make([]float64, numNegativeBuckets), nBackwardInter, false)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -627,12 +628,8 @@ func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHis
|
|||
return it.t, &histogram.FloatHistogram{Sum: it.sum.value}
|
||||
}
|
||||
it.atFloatHistogramCalled = true
|
||||
crHint := histogram.UnknownCounterReset
|
||||
if it.counterResetHeader == GaugeType {
|
||||
crHint = histogram.GaugeType
|
||||
}
|
||||
return it.t, &histogram.FloatHistogram{
|
||||
CounterResetHint: crHint,
|
||||
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
|
||||
Count: it.cnt.value,
|
||||
ZeroCount: it.zCnt.value,
|
||||
Sum: it.sum.value,
|
||||
|
|
|
@ -66,7 +66,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
|
|||
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
|
||||
h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15)
|
||||
app.AppendFloatHistogram(ts, h.ToFloat())
|
||||
exp = append(exp, floatResult{t: ts, h: h.ToFloat()})
|
||||
expH := h.ToFloat()
|
||||
expH.CounterResetHint = histogram.NotCounterReset
|
||||
exp = append(exp, floatResult{t: ts, h: expH})
|
||||
require.Equal(t, 2, c.NumSamples())
|
||||
|
||||
// Add update with new appender.
|
||||
|
@ -81,7 +83,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
|
|||
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
|
||||
h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22)
|
||||
app.AppendFloatHistogram(ts, h.ToFloat())
|
||||
exp = append(exp, floatResult{t: ts, h: h.ToFloat()})
|
||||
expH = h.ToFloat()
|
||||
expH.CounterResetHint = histogram.NotCounterReset
|
||||
exp = append(exp, floatResult{t: ts, h: expH})
|
||||
require.Equal(t, 3, c.NumSamples())
|
||||
|
||||
// 1. Expand iterator in simple case.
|
||||
|
@ -138,7 +142,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
|
|||
require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1))
|
||||
}
|
||||
|
||||
// Mimics the scenario described for compareSpans().
|
||||
// Mimics the scenario described for expandSpansForward.
|
||||
func TestFloatHistogramChunkBucketChanges(t *testing.T) {
|
||||
c := Chunk(NewFloatHistogramChunk())
|
||||
|
||||
|
@ -209,9 +213,11 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) {
|
|||
h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}
|
||||
h1.NegativeSpans = h2.NegativeSpans
|
||||
h1.NegativeBuckets = []int64{0, 1}
|
||||
expH2 := h2.ToFloat()
|
||||
expH2.CounterResetHint = histogram.NotCounterReset
|
||||
exp := []floatResult{
|
||||
{t: ts1, h: h1.ToFloat()},
|
||||
{t: ts2, h: h2.ToFloat()},
|
||||
{t: ts2, h: expH2},
|
||||
}
|
||||
it := c.Iterator(nil)
|
||||
var act []floatResult
|
||||
|
|
|
@ -243,10 +243,10 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra
|
|||
panic("appended a float histogram to a histogram chunk")
|
||||
}
|
||||
|
||||
// Appendable returns whether the chunk can be appended to, and if so
|
||||
// whether any recoding needs to happen using the provided interjections
|
||||
// (in case of any new buckets, positive or negative range, respectively).
|
||||
// If the sample is a gauge histogram, AppendableGauge must be used instead.
|
||||
// Appendable returns whether the chunk can be appended to, and if so whether
|
||||
// any recoding needs to happen using the provided inserts (in case of any new
|
||||
// buckets, positive or negative range, respectively). If the sample is a gauge
|
||||
// histogram, AppendableGauge must be used instead.
|
||||
//
|
||||
// The chunk is not appendable in the following cases:
|
||||
//
|
||||
|
@ -261,7 +261,7 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra
|
|||
// because of a counter reset. If the given sample is stale, it is always ok to
|
||||
// append. If counterReset is true, okToAppend is always false.
|
||||
func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
|
||||
positiveInterjections, negativeInterjections []Interjection,
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
okToAppend, counterReset bool,
|
||||
) {
|
||||
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
|
||||
|
@ -295,12 +295,12 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
|
|||
}
|
||||
|
||||
var ok bool
|
||||
positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans)
|
||||
positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
}
|
||||
negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans)
|
||||
negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans)
|
||||
if !ok {
|
||||
counterReset = true
|
||||
return
|
||||
|
@ -308,7 +308,7 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
|
|||
|
||||
if counterResetInAnyBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) ||
|
||||
counterResetInAnyBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) {
|
||||
counterReset, positiveInterjections, negativeInterjections = true, nil, nil
|
||||
counterReset, positiveInserts, negativeInserts = true, nil, nil
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -318,10 +318,11 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
|
|||
|
||||
// AppendableGauge returns whether the chunk can be appended to, and if so
|
||||
// whether:
|
||||
// 1. Any recoding needs to happen to the chunk using the provided interjections
|
||||
// 1. Any recoding needs to happen to the chunk using the provided inserts
|
||||
// (in case of any new buckets, positive or negative range, respectively).
|
||||
// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections
|
||||
// (in case of any missing buckets, positive or negative range, respectively).
|
||||
// 2. Any recoding needs to happen for the histogram being appended, using the
|
||||
// backward inserts (in case of any missing buckets, positive or negative
|
||||
// range, respectively).
|
||||
//
|
||||
// This method must be only used for gauge histograms.
|
||||
//
|
||||
|
@ -330,8 +331,8 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
|
|||
// - The threshold for the zero bucket has changed.
|
||||
// - The last sample in the chunk was stale while the current sample is not stale.
|
||||
func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) (
|
||||
positiveInterjections, negativeInterjections []Interjection,
|
||||
backwardPositiveInterjections, backwardNegativeInterjections []Interjection,
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
backwardPositiveInserts, backwardNegativeInserts []Insert,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
okToAppend bool,
|
||||
) {
|
||||
|
@ -353,8 +354,8 @@ func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) (
|
|||
return
|
||||
}
|
||||
|
||||
positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans)
|
||||
negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans)
|
||||
positiveInserts, backwardPositiveInserts, positiveSpans = expandSpansBothWays(a.pSpans, h.PositiveSpans)
|
||||
negativeInserts, backwardNegativeInserts, negativeSpans = expandSpansBothWays(a.nSpans, h.NegativeSpans)
|
||||
okToAppend = true
|
||||
return
|
||||
}
|
||||
|
@ -488,8 +489,9 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) {
|
|||
putVarbitInt(a.b, b)
|
||||
}
|
||||
} else {
|
||||
// The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code,
|
||||
// so we don't need a separate single delta logic for the 2nd sample.
|
||||
// The case for the 2nd sample with single deltas is implicitly
|
||||
// handled correctly with the double delta code, so we don't
|
||||
// need a separate single delta logic for the 2nd sample.
|
||||
|
||||
tDelta = t - a.t
|
||||
cntDelta = int64(h.Count) - int64(a.cnt)
|
||||
|
@ -539,12 +541,12 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) {
|
|||
}
|
||||
|
||||
// Recode converts the current chunk to accommodate an expansion of the set of
|
||||
// (positive and/or negative) buckets used, according to the provided
|
||||
// interjections, resulting in the honoring of the provided new positive and
|
||||
// negative spans. To continue appending, use the returned Appender rather than
|
||||
// the receiver of this method.
|
||||
// (positive and/or negative) buckets used, according to the provided inserts,
|
||||
// resulting in the honoring of the provided new positive and negative spans. To
|
||||
// continue appending, use the returned Appender rather than the receiver of
|
||||
// this method.
|
||||
func (a *HistogramAppender) Recode(
|
||||
positiveInterjections, negativeInterjections []Interjection,
|
||||
positiveInserts, negativeInserts []Insert,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
) (Chunk, Appender) {
|
||||
// TODO(beorn7): This currently just decodes everything and then encodes
|
||||
|
@ -577,11 +579,11 @@ func (a *HistogramAppender) Recode(
|
|||
|
||||
// Save the modified histogram to the new chunk.
|
||||
hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans
|
||||
if len(positiveInterjections) > 0 {
|
||||
hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections, true)
|
||||
if len(positiveInserts) > 0 {
|
||||
hOld.PositiveBuckets = insert(hOld.PositiveBuckets, positiveBuckets, positiveInserts, true)
|
||||
}
|
||||
if len(negativeInterjections) > 0 {
|
||||
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections, true)
|
||||
if len(negativeInserts) > 0 {
|
||||
hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, true)
|
||||
}
|
||||
app.AppendHistogram(tOld, hOld)
|
||||
}
|
||||
|
@ -590,19 +592,19 @@ func (a *HistogramAppender) Recode(
|
|||
return hc, app
|
||||
}
|
||||
|
||||
// RecodeHistogram converts the current histogram (in-place) to accommodate an expansion of the set of
|
||||
// (positive and/or negative) buckets used.
|
||||
// RecodeHistogram converts the current histogram (in-place) to accommodate an
|
||||
// expansion of the set of (positive and/or negative) buckets used.
|
||||
func (a *HistogramAppender) RecodeHistogram(
|
||||
h *histogram.Histogram,
|
||||
pBackwardInter, nBackwardInter []Interjection,
|
||||
pBackwardInserts, nBackwardInserts []Insert,
|
||||
) {
|
||||
if len(pBackwardInter) > 0 {
|
||||
if len(pBackwardInserts) > 0 {
|
||||
numPositiveBuckets := countSpans(h.PositiveSpans)
|
||||
h.PositiveBuckets = interject(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInter, true)
|
||||
h.PositiveBuckets = insert(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInserts, true)
|
||||
}
|
||||
if len(nBackwardInter) > 0 {
|
||||
if len(nBackwardInserts) > 0 {
|
||||
numNegativeBuckets := countSpans(h.NegativeSpans)
|
||||
h.NegativeBuckets = interject(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInter, true)
|
||||
h.NegativeBuckets = insert(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInserts, true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -665,12 +667,8 @@ func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) {
|
|||
return it.t, &histogram.Histogram{Sum: it.sum}
|
||||
}
|
||||
it.atHistogramCalled = true
|
||||
crHint := histogram.UnknownCounterReset
|
||||
if it.counterResetHeader == GaugeType {
|
||||
crHint = histogram.GaugeType
|
||||
}
|
||||
return it.t, &histogram.Histogram{
|
||||
CounterResetHint: crHint,
|
||||
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
|
||||
Count: it.cnt,
|
||||
ZeroCount: it.zCnt,
|
||||
Sum: it.sum,
|
||||
|
@ -688,12 +686,8 @@ func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogra
|
|||
return it.t, &histogram.FloatHistogram{Sum: it.sum}
|
||||
}
|
||||
it.atFloatHistogramCalled = true
|
||||
crHint := histogram.UnknownCounterReset
|
||||
if it.counterResetHeader == GaugeType {
|
||||
crHint = histogram.GaugeType
|
||||
}
|
||||
return it.t, &histogram.FloatHistogram{
|
||||
CounterResetHint: crHint,
|
||||
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
|
||||
Count: float64(it.cnt),
|
||||
ZeroCount: float64(it.zCnt),
|
||||
Sum: it.sum,
|
||||
|
|
|
@ -19,7 +19,10 @@ import (
|
|||
"github.com/prometheus/prometheus/model/histogram"
|
||||
)
|
||||
|
||||
func writeHistogramChunkLayout(b *bstream, schema int32, zeroThreshold float64, positiveSpans, negativeSpans []histogram.Span) {
|
||||
func writeHistogramChunkLayout(
|
||||
b *bstream, schema int32, zeroThreshold float64,
|
||||
positiveSpans, negativeSpans []histogram.Span,
|
||||
) {
|
||||
putZeroThreshold(b, zeroThreshold)
|
||||
putVarbitInt(b, int64(schema))
|
||||
putHistogramChunkLayoutSpans(b, positiveSpans)
|
||||
|
@ -91,9 +94,7 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
|
|||
|
||||
// putZeroThreshold writes the zero threshold to the bstream. It stores typical
|
||||
// values in just one byte, but needs 9 bytes for other values. In detail:
|
||||
//
|
||||
// * If the threshold is 0, store a single zero byte.
|
||||
//
|
||||
// - If the threshold is 0, store a single zero byte.
|
||||
// - If the threshold is a power of 2 between (and including) 2^-243 and 2^10,
|
||||
// take the exponent from the IEEE 754 representation of the threshold, which
|
||||
// covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242
|
||||
|
@ -103,7 +104,6 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
|
|||
// threshold. The default value for the zero threshold is 2^-128 (or
|
||||
// 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a
|
||||
// single byte (with value 116).
|
||||
//
|
||||
// - In all other cases, store 255 as a single byte, followed by the 8 bytes of
|
||||
// the threshold as a float64, i.e. taking 9 bytes in total.
|
||||
func putZeroThreshold(b *bstream, threshold float64) {
|
||||
|
@ -186,16 +186,16 @@ func (b *bucketIterator) Next() (int, bool) {
|
|||
return 0, false
|
||||
}
|
||||
|
||||
// An Interjection describes how many new buckets have to be introduced before
|
||||
// processing the pos'th delta from the original slice.
|
||||
type Interjection struct {
|
||||
// An Insert describes how many new buckets have to be inserted before
|
||||
// processing the pos'th bucket from the original slice.
|
||||
type Insert struct {
|
||||
pos int
|
||||
num int
|
||||
}
|
||||
|
||||
// forwardCompareSpans returns the interjections to convert a slice of deltas to a new
|
||||
// slice representing an expanded set of buckets, or false if incompatible
|
||||
// (e.g. if buckets were removed).
|
||||
// expandSpansForward returns the inserts to expand the bucket spans 'a' so that
|
||||
// they match the spans in 'b'. 'b' must cover the same or more buckets than
|
||||
// 'a', otherwise the function will return false.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
|
@ -222,25 +222,25 @@ type Interjection struct {
|
|||
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
|
||||
// delta mods: / \ / \ / \
|
||||
//
|
||||
// Note that whenever any new buckets are introduced, the subsequent "old"
|
||||
// bucket needs to readjust its delta to the new base of 0. Thus, for the caller
|
||||
// who wants to transform the set of original deltas to a new set of deltas to
|
||||
// match a new span layout that adds buckets, we simply need to generate a list
|
||||
// of interjections.
|
||||
// Note for histograms with delta-encoded buckets: Whenever any new buckets are
|
||||
// introduced, the subsequent "old" bucket needs to readjust its delta to the
|
||||
// new base of 0. Thus, for the caller who wants to transform the set of
|
||||
// original deltas to a new set of deltas to match a new span layout that adds
|
||||
// buckets, we simply need to generate a list of inserts.
|
||||
//
|
||||
// Note: Within forwardCompareSpans we don't have to worry about the changes to the
|
||||
// Note: Within expandSpansForward we don't have to worry about the changes to the
|
||||
// spans themselves, thanks to the iterators we get to work with the more useful
|
||||
// bucket indices (which of course directly correspond to the buckets we have to
|
||||
// adjust).
|
||||
func forwardCompareSpans(a, b []histogram.Span) (forward []Interjection, ok bool) {
|
||||
func expandSpansForward(a, b []histogram.Span) (forward []Insert, ok bool) {
|
||||
ai := newBucketIterator(a)
|
||||
bi := newBucketIterator(b)
|
||||
|
||||
var interjections []Interjection
|
||||
var inserts []Insert
|
||||
|
||||
// When inter.num becomes > 0, this becomes a valid interjection that
|
||||
// should be yielded when we finish a streak of new buckets.
|
||||
var inter Interjection
|
||||
// When inter.num becomes > 0, this becomes a valid insert that should
|
||||
// be yielded when we finish a streak of new buckets.
|
||||
var inter Insert
|
||||
|
||||
av, aOK := ai.Next()
|
||||
bv, bOK := bi.Next()
|
||||
|
@ -250,43 +250,46 @@ loop:
|
|||
case aOK && bOK:
|
||||
switch {
|
||||
case av == bv: // Both have an identical value. move on!
|
||||
// Finish WIP interjection and reset.
|
||||
// Finish WIP insert and reset.
|
||||
if inter.num > 0 {
|
||||
interjections = append(interjections, inter)
|
||||
inserts = append(inserts, inter)
|
||||
}
|
||||
inter.num = 0
|
||||
av, aOK = ai.Next()
|
||||
bv, bOK = bi.Next()
|
||||
inter.pos++
|
||||
case av < bv: // b misses a value that is in a.
|
||||
return interjections, false
|
||||
return inserts, false
|
||||
case av > bv: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++
|
||||
bv, bOK = bi.Next()
|
||||
}
|
||||
case aOK && !bOK: // b misses a value that is in a.
|
||||
return interjections, false
|
||||
return inserts, false
|
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++
|
||||
bv, bOK = bi.Next()
|
||||
default: // Both iterators ran out. We're done.
|
||||
if inter.num > 0 {
|
||||
interjections = append(interjections, inter)
|
||||
inserts = append(inserts, inter)
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
return interjections, true
|
||||
return inserts, true
|
||||
}
|
||||
|
||||
// bidirectionalCompareSpans does everything that forwardCompareSpans does and
|
||||
// also returns interjections in the other direction (i.e. buckets missing in b that are missing in a).
|
||||
func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Interjection, mergedSpans []histogram.Span) {
|
||||
// expandSpansBothWays is similar to expandSpansForward, but now b may also
|
||||
// cover an entirely different set of buckets. The function returns the
|
||||
// “forward” inserts to expand 'a' to also cover all the buckets exclusively
|
||||
// covered by 'b', and it returns the “backward” inserts to expand 'b' to also
|
||||
// cover all the buckets exclusively covered by 'a'
|
||||
func expandSpansBothWays(a, b []histogram.Span) (forward, backward []Insert, mergedSpans []histogram.Span) {
|
||||
ai := newBucketIterator(a)
|
||||
bi := newBucketIterator(b)
|
||||
|
||||
var interjections, bInterjections []Interjection
|
||||
var fInserts, bInserts []Insert
|
||||
var lastBucket int
|
||||
addBucket := func(b int) {
|
||||
offset := b - lastBucket - 1
|
||||
|
@ -305,9 +308,10 @@ func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Inter
|
|||
lastBucket = b
|
||||
}
|
||||
|
||||
// When inter.num becomes > 0, this becomes a valid interjection that
|
||||
// should be yielded when we finish a streak of new buckets.
|
||||
var inter, bInter Interjection
|
||||
// When fInter.num (or bInter.num, respectively) becomes > 0, this
|
||||
// becomes a valid insert that should be yielded when we finish a streak
|
||||
// of new buckets.
|
||||
var fInter, bInter Insert
|
||||
|
||||
av, aOK := ai.Next()
|
||||
bv, bOK := bi.Next()
|
||||
|
@ -317,37 +321,37 @@ loop:
|
|||
case aOK && bOK:
|
||||
switch {
|
||||
case av == bv: // Both have an identical value. move on!
|
||||
// Finish WIP interjection and reset.
|
||||
if inter.num > 0 {
|
||||
interjections = append(interjections, inter)
|
||||
inter.num = 0
|
||||
// Finish WIP insert and reset.
|
||||
if fInter.num > 0 {
|
||||
fInserts = append(fInserts, fInter)
|
||||
fInter.num = 0
|
||||
}
|
||||
if bInter.num > 0 {
|
||||
bInterjections = append(bInterjections, bInter)
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
addBucket(av)
|
||||
av, aOK = ai.Next()
|
||||
bv, bOK = bi.Next()
|
||||
inter.pos++
|
||||
fInter.pos++
|
||||
bInter.pos++
|
||||
case av < bv: // b misses a value that is in a.
|
||||
bInter.num++
|
||||
// Collect the forward interjection before advancing the
|
||||
// position of 'a'.
|
||||
if inter.num > 0 {
|
||||
interjections = append(interjections, inter)
|
||||
inter.num = 0
|
||||
// Collect the forward inserts before advancing
|
||||
// the position of 'a'.
|
||||
if fInter.num > 0 {
|
||||
fInserts = append(fInserts, fInter)
|
||||
fInter.num = 0
|
||||
}
|
||||
addBucket(av)
|
||||
inter.pos++
|
||||
fInter.pos++
|
||||
av, aOK = ai.Next()
|
||||
case av > bv: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++
|
||||
// Collect the backward interjection before advancing the
|
||||
fInter.num++
|
||||
// Collect the backward inserts before advancing the
|
||||
// position of 'b'.
|
||||
if bInter.num > 0 {
|
||||
bInterjections = append(bInterjections, bInter)
|
||||
bInserts = append(bInserts, bInter)
|
||||
bInter.num = 0
|
||||
}
|
||||
addBucket(bv)
|
||||
|
@ -359,92 +363,127 @@ loop:
|
|||
addBucket(av)
|
||||
av, aOK = ai.Next()
|
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
inter.num++
|
||||
fInter.num++
|
||||
addBucket(bv)
|
||||
bv, bOK = bi.Next()
|
||||
default: // Both iterators ran out. We're done.
|
||||
if inter.num > 0 {
|
||||
interjections = append(interjections, inter)
|
||||
if fInter.num > 0 {
|
||||
fInserts = append(fInserts, fInter)
|
||||
}
|
||||
if bInter.num > 0 {
|
||||
bInterjections = append(bInterjections, bInter)
|
||||
bInserts = append(bInserts, bInter)
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
return interjections, bInterjections, mergedSpans
|
||||
return fInserts, bInserts, mergedSpans
|
||||
}
|
||||
|
||||
type bucketValue interface {
|
||||
int64 | float64
|
||||
}
|
||||
|
||||
// interject merges 'in' with the provided interjections and writes them into
|
||||
// 'out', which must already have the appropriate length.
|
||||
func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV {
|
||||
// insert merges 'in' with the provided inserts and writes them into 'out',
|
||||
// which must already have the appropriate length. 'out' is also returned for
|
||||
// convenience.
|
||||
func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV {
|
||||
var (
|
||||
j int // Position in out.
|
||||
v BV // The last value seen.
|
||||
interj int // The next interjection to process.
|
||||
oi int // Position in out.
|
||||
v BV // The last value seen.
|
||||
ii int // The next insert to process.
|
||||
)
|
||||
for i, d := range in {
|
||||
if interj < len(interjections) && i == interjections[interj].pos {
|
||||
|
||||
// We have an interjection!
|
||||
// Add interjection.num new delta values such that their bucket values equate 0.
|
||||
// When deltas==false, it means that it is an absolute value. So we set it to 0 directly.
|
||||
if ii < len(inserts) && i == inserts[ii].pos {
|
||||
// We have an insert!
|
||||
// Add insert.num new delta values such that their
|
||||
// bucket values equate 0. When deltas==false, it means
|
||||
// that it is an absolute value. So we set it to 0
|
||||
// directly.
|
||||
if deltas {
|
||||
out[j] = -v
|
||||
out[oi] = -v
|
||||
} else {
|
||||
out[j] = 0
|
||||
out[oi] = 0
|
||||
}
|
||||
j++
|
||||
for x := 1; x < interjections[interj].num; x++ {
|
||||
out[j] = 0
|
||||
j++
|
||||
oi++
|
||||
for x := 1; x < inserts[ii].num; x++ {
|
||||
out[oi] = 0
|
||||
oi++
|
||||
}
|
||||
interj++
|
||||
ii++
|
||||
|
||||
// Now save the value from the input. The delta value we
|
||||
// should save is the original delta value + the last
|
||||
// value of the point before the interjection (to undo
|
||||
// the delta that was introduced by the interjection).
|
||||
// When deltas==false, it means that it is an absolute value,
|
||||
// value of the point before the insert (to undo the
|
||||
// delta that was introduced by the insert). When
|
||||
// deltas==false, it means that it is an absolute value,
|
||||
// so we set it directly to the value in the 'in' slice.
|
||||
if deltas {
|
||||
out[j] = d + v
|
||||
out[oi] = d + v
|
||||
} else {
|
||||
out[j] = d
|
||||
out[oi] = d
|
||||
}
|
||||
j++
|
||||
oi++
|
||||
v = d + v
|
||||
continue
|
||||
}
|
||||
|
||||
// If there was no interjection, the original delta is still
|
||||
// valid.
|
||||
out[j] = d
|
||||
j++
|
||||
// If there was no insert, the original delta is still valid.
|
||||
out[oi] = d
|
||||
oi++
|
||||
v += d
|
||||
}
|
||||
switch interj {
|
||||
case len(interjections):
|
||||
// All interjections processed. Nothing more to do.
|
||||
case len(interjections) - 1:
|
||||
// One more interjection to process at the end.
|
||||
switch ii {
|
||||
case len(inserts):
|
||||
// All inserts processed. Nothing more to do.
|
||||
case len(inserts) - 1:
|
||||
// One more insert to process at the end.
|
||||
if deltas {
|
||||
out[j] = -v
|
||||
out[oi] = -v
|
||||
} else {
|
||||
out[j] = 0
|
||||
out[oi] = 0
|
||||
}
|
||||
j++
|
||||
for x := 1; x < interjections[interj].num; x++ {
|
||||
out[j] = 0
|
||||
j++
|
||||
oi++
|
||||
for x := 1; x < inserts[ii].num; x++ {
|
||||
out[oi] = 0
|
||||
oi++
|
||||
}
|
||||
default:
|
||||
panic("unprocessed interjections left")
|
||||
panic("unprocessed inserts left")
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// counterResetHint returns a CounterResetHint based on the CounterResetHeader
|
||||
// and on the position into the chunk.
|
||||
func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterResetHint {
|
||||
switch {
|
||||
case crh == GaugeType:
|
||||
// A gauge histogram chunk only contains gauge histograms.
|
||||
return histogram.GaugeType
|
||||
case numRead > 1:
|
||||
// In a counter histogram chunk, there will not be any counter
|
||||
// resets after the first histogram.
|
||||
return histogram.NotCounterReset
|
||||
case crh == CounterReset:
|
||||
// If the chunk was started because of a counter reset, we can
|
||||
// safely return that hint. This histogram always has to be
|
||||
// treated as a counter reset.
|
||||
return histogram.CounterReset
|
||||
default:
|
||||
// Sadly, we have to return "unknown" as the hint for all other
|
||||
// cases, even if we know that the chunk was started without a
|
||||
// counter reset. But we cannot be sure that the previous chunk
|
||||
// still exists in the TSDB, so we conservatively return
|
||||
// "unknown". On the bright side, this case should be relatively
|
||||
// rare.
|
||||
//
|
||||
// TODO(beorn7): Nevertheless, if the current chunk is in the
|
||||
// middle of a block (not the first chunk in the block for this
|
||||
// series), it's probably safe to assume that the previous chunk
|
||||
// will exist in the TSDB for as long as the current chunk
|
||||
// exist, and we could safely return
|
||||
// "histogram.NotCounterReset". This needs some more work and
|
||||
// might not be worth the effort and/or risk. To be vetted...
|
||||
return histogram.UnknownCounterReset
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ func TestBucketIterator(t *testing.T) {
|
|||
},
|
||||
idxs: []int{100, 101, 102, 103, 112, 113, 114, 115, 116, 117, 118, 119},
|
||||
},
|
||||
// The below 2 sets ore the ones described in compareSpans's comments.
|
||||
// The below 2 sets ore the ones described in expandSpansForward's comments.
|
||||
{
|
||||
spans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
|
@ -111,12 +111,12 @@ func TestBucketIterator(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCompareSpansAndInterject(t *testing.T) {
|
||||
func TestCompareSpansAndInsert(t *testing.T) {
|
||||
scenarios := []struct {
|
||||
description string
|
||||
spansA, spansB []histogram.Span
|
||||
interjections, backwardInterjections []Interjection
|
||||
bucketsIn, bucketsOut []int64
|
||||
description string
|
||||
spansA, spansB []histogram.Span
|
||||
fInserts, bInserts []Insert
|
||||
bucketsIn, bucketsOut []int64
|
||||
}{
|
||||
{
|
||||
description: "single prepend at the beginning",
|
||||
|
@ -126,7 +126,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -11, Length: 4},
|
||||
},
|
||||
interjections: []Interjection{
|
||||
fInserts: []Insert{
|
||||
{
|
||||
pos: 0,
|
||||
num: 1,
|
||||
|
@ -143,7 +143,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -10, Length: 4},
|
||||
},
|
||||
interjections: []Interjection{
|
||||
fInserts: []Insert{
|
||||
{
|
||||
pos: 3,
|
||||
num: 1,
|
||||
|
@ -160,7 +160,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -12, Length: 5},
|
||||
},
|
||||
interjections: []Interjection{
|
||||
fInserts: []Insert{
|
||||
{
|
||||
pos: 0,
|
||||
num: 2,
|
||||
|
@ -177,7 +177,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -10, Length: 5},
|
||||
},
|
||||
interjections: []Interjection{
|
||||
fInserts: []Insert{
|
||||
{
|
||||
pos: 3,
|
||||
num: 2,
|
||||
|
@ -194,7 +194,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -12, Length: 7},
|
||||
},
|
||||
interjections: []Interjection{
|
||||
fInserts: []Insert{
|
||||
{
|
||||
pos: 0,
|
||||
num: 2,
|
||||
|
@ -215,7 +215,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -9, Length: 3},
|
||||
},
|
||||
backwardInterjections: []Interjection{
|
||||
bInserts: []Insert{
|
||||
{pos: 0, num: 1},
|
||||
},
|
||||
},
|
||||
|
@ -228,7 +228,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
{Offset: -10, Length: 2},
|
||||
{Offset: 1, Length: 1},
|
||||
},
|
||||
backwardInterjections: []Interjection{
|
||||
bInserts: []Insert{
|
||||
{pos: 2, num: 1},
|
||||
},
|
||||
},
|
||||
|
@ -240,7 +240,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
spansB: []histogram.Span{
|
||||
{Offset: -10, Length: 3},
|
||||
},
|
||||
backwardInterjections: []Interjection{
|
||||
bInserts: []Insert{
|
||||
{pos: 3, num: 1},
|
||||
},
|
||||
},
|
||||
|
@ -259,7 +259,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
{Offset: 1, Length: 4},
|
||||
{Offset: 3, Length: 3},
|
||||
},
|
||||
interjections: []Interjection{
|
||||
fInserts: []Insert{
|
||||
{
|
||||
pos: 2,
|
||||
num: 1,
|
||||
|
@ -277,7 +277,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1},
|
||||
},
|
||||
{
|
||||
description: "both forward and backward interjections, complex case",
|
||||
description: "both forward and backward inserts, complex case",
|
||||
spansA: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 2, Length: 1},
|
||||
|
@ -292,7 +292,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
{Offset: 1, Length: 1},
|
||||
{Offset: 4, Length: 1},
|
||||
},
|
||||
interjections: []Interjection{
|
||||
fInserts: []Insert{
|
||||
{
|
||||
pos: 2,
|
||||
num: 1,
|
||||
|
@ -306,7 +306,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
num: 1,
|
||||
},
|
||||
},
|
||||
backwardInterjections: []Interjection{
|
||||
bInserts: []Insert{
|
||||
{
|
||||
pos: 0,
|
||||
num: 1,
|
||||
|
@ -329,22 +329,22 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
|
||||
for _, s := range scenarios {
|
||||
t.Run(s.description, func(t *testing.T) {
|
||||
if len(s.backwardInterjections) > 0 {
|
||||
interjections, bInterjections, _ := bidirectionalCompareSpans(s.spansA, s.spansB)
|
||||
require.Equal(t, s.interjections, interjections)
|
||||
require.Equal(t, s.backwardInterjections, bInterjections)
|
||||
if len(s.bInserts) > 0 {
|
||||
fInserts, bInserts, _ := expandSpansBothWays(s.spansA, s.spansB)
|
||||
require.Equal(t, s.fInserts, fInserts)
|
||||
require.Equal(t, s.bInserts, bInserts)
|
||||
}
|
||||
|
||||
interjections, valid := forwardCompareSpans(s.spansA, s.spansB)
|
||||
if len(s.backwardInterjections) > 0 {
|
||||
inserts, valid := expandSpansForward(s.spansA, s.spansB)
|
||||
if len(s.bInserts) > 0 {
|
||||
require.False(t, valid, "compareScan unexpectedly returned true")
|
||||
return
|
||||
}
|
||||
require.True(t, valid, "compareScan unexpectedly returned false")
|
||||
require.Equal(t, s.interjections, interjections)
|
||||
require.Equal(t, s.fInserts, inserts)
|
||||
|
||||
gotBuckets := make([]int64, len(s.bucketsOut))
|
||||
interject(s.bucketsIn, gotBuckets, interjections, true)
|
||||
insert(s.bucketsIn, gotBuckets, inserts, true)
|
||||
require.Equal(t, s.bucketsOut, gotBuckets)
|
||||
|
||||
floatBucketsIn := make([]float64, len(s.bucketsIn))
|
||||
|
@ -362,7 +362,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
|
|||
floatBucketsOut[i] = float64(last)
|
||||
}
|
||||
gotFloatBuckets := make([]float64, len(floatBucketsOut))
|
||||
interject(floatBucketsIn, gotFloatBuckets, interjections, false)
|
||||
insert(floatBucketsIn, gotFloatBuckets, inserts, false)
|
||||
require.Equal(t, floatBucketsOut, gotFloatBuckets)
|
||||
})
|
||||
}
|
||||
|
@ -564,12 +564,12 @@ func TestSpansFromBidirectionalCompareSpans(t *testing.T) {
|
|||
copy(s1c, c.s1)
|
||||
copy(s2c, c.s2)
|
||||
|
||||
_, _, act := bidirectionalCompareSpans(c.s1, c.s2)
|
||||
_, _, act := expandSpansBothWays(c.s1, c.s2)
|
||||
require.Equal(t, c.exp, act)
|
||||
// Check that s1 and s2 are not modified.
|
||||
require.Equal(t, s1c, c.s1)
|
||||
require.Equal(t, s2c, c.s2)
|
||||
_, _, act = bidirectionalCompareSpans(c.s2, c.s1)
|
||||
_, _, act = expandSpansBothWays(c.s2, c.s1)
|
||||
require.Equal(t, c.exp, act)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,7 +67,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
|
|||
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
|
||||
h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15)
|
||||
app.AppendHistogram(ts, h)
|
||||
exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()})
|
||||
hExp := h.Copy()
|
||||
hExp.CounterResetHint = histogram.NotCounterReset
|
||||
exp = append(exp, result{t: ts, h: hExp, fh: hExp.ToFloat()})
|
||||
require.Equal(t, 2, c.NumSamples())
|
||||
|
||||
// Add update with new appender.
|
||||
|
@ -82,7 +84,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
|
|||
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
|
||||
h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22)
|
||||
app.AppendHistogram(ts, h)
|
||||
exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()})
|
||||
hExp = h.Copy()
|
||||
hExp.CounterResetHint = histogram.NotCounterReset
|
||||
exp = append(exp, result{t: ts, h: hExp, fh: hExp.ToFloat()})
|
||||
require.Equal(t, 3, c.NumSamples())
|
||||
|
||||
// 1. Expand iterator in simple case.
|
||||
|
@ -149,7 +153,7 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
|
|||
require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1))
|
||||
}
|
||||
|
||||
// Mimics the scenario described for compareSpans().
|
||||
// Mimics the scenario described for expandSpansForward.
|
||||
func TestHistogramChunkBucketChanges(t *testing.T) {
|
||||
c := Chunk(NewHistogramChunk())
|
||||
|
||||
|
@ -220,9 +224,11 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
|
|||
h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}
|
||||
h1.NegativeSpans = h2.NegativeSpans
|
||||
h1.NegativeBuckets = []int64{0, 1}
|
||||
hExp := h2.Copy()
|
||||
hExp.CounterResetHint = histogram.NotCounterReset
|
||||
exp := []result{
|
||||
{t: ts1, h: h1, fh: h1.ToFloat()},
|
||||
{t: ts2, h: h2, fh: h2.ToFloat()},
|
||||
{t: ts2, h: hExp, fh: hExp.ToFloat()},
|
||||
}
|
||||
it := c.Iterator(nil)
|
||||
var act []result
|
||||
|
@ -463,11 +469,12 @@ func TestAtFloatHistogram(t *testing.T) {
|
|||
NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2},
|
||||
},
|
||||
{
|
||||
Schema: 0,
|
||||
Count: 36,
|
||||
Sum: 2345.6,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 5,
|
||||
CounterResetHint: histogram.NotCounterReset,
|
||||
Schema: 0,
|
||||
Count: 36,
|
||||
Sum: 2345.6,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 5,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 4},
|
||||
{Offset: 0, Length: 0},
|
||||
|
@ -482,11 +489,12 @@ func TestAtFloatHistogram(t *testing.T) {
|
|||
NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2},
|
||||
},
|
||||
{
|
||||
Schema: 0,
|
||||
Count: 36,
|
||||
Sum: 1111.1,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 5,
|
||||
CounterResetHint: histogram.NotCounterReset,
|
||||
Schema: 0,
|
||||
Count: 36,
|
||||
Sum: 1111.1,
|
||||
ZeroThreshold: 0.001,
|
||||
ZeroCount: 5,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 4},
|
||||
{Offset: 0, Length: 0},
|
||||
|
|
|
@ -1308,17 +1308,31 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
|
|||
|
||||
minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() }
|
||||
ctx := context.Background()
|
||||
appendHistogram := func(lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample) {
|
||||
appendHistogram := func(
|
||||
lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample,
|
||||
) {
|
||||
t.Helper()
|
||||
app := head.Appender(ctx)
|
||||
for tsMinute := from; tsMinute <= to; tsMinute++ {
|
||||
var err error
|
||||
if floatTest {
|
||||
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat())
|
||||
*exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()})
|
||||
efh := h.ToFloat()
|
||||
if tsMinute == from {
|
||||
efh.CounterResetHint = histogram.UnknownCounterReset
|
||||
} else {
|
||||
efh.CounterResetHint = histogram.NotCounterReset
|
||||
}
|
||||
*exp = append(*exp, sample{t: minute(tsMinute), fh: efh})
|
||||
} else {
|
||||
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), h, nil)
|
||||
*exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()})
|
||||
eh := h.Copy()
|
||||
if tsMinute == from {
|
||||
eh.CounterResetHint = histogram.UnknownCounterReset
|
||||
} else {
|
||||
eh.CounterResetHint = histogram.NotCounterReset
|
||||
}
|
||||
*exp = append(*exp, sample{t: minute(tsMinute), h: eh})
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
118
tsdb/db_test.go
118
tsdb/db_test.go
|
@ -5836,16 +5836,23 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
|
|||
})
|
||||
|
||||
ctx := context.Background()
|
||||
appendHistogram := func(lbls labels.Labels, tsMinute int, h *histogram.Histogram, exp *[]tsdbutil.Sample) {
|
||||
appendHistogram := func(
|
||||
lbls labels.Labels, tsMinute int, h *histogram.Histogram,
|
||||
exp *[]tsdbutil.Sample, expCRH histogram.CounterResetHint,
|
||||
) {
|
||||
t.Helper()
|
||||
var err error
|
||||
app := db.Appender(ctx)
|
||||
if floatHistogram {
|
||||
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat())
|
||||
*exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()})
|
||||
efh := h.ToFloat()
|
||||
efh.CounterResetHint = expCRH
|
||||
*exp = append(*exp, sample{t: minute(tsMinute), fh: efh})
|
||||
} else {
|
||||
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), h.Copy(), nil)
|
||||
*exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()})
|
||||
eh := h.Copy()
|
||||
eh.CounterResetHint = expCRH
|
||||
*exp = append(*exp, sample{t: minute(tsMinute), h: eh})
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -5897,23 +5904,23 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
|
|||
t.Run("series with only histograms", func(t *testing.T) {
|
||||
h := baseH.Copy() // This is shared across all sub tests.
|
||||
|
||||
appendHistogram(series1, 100, h, &exp1)
|
||||
appendHistogram(series1, 100, h, &exp1, histogram.UnknownCounterReset)
|
||||
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
|
||||
|
||||
h.PositiveBuckets[0]++
|
||||
h.NegativeBuckets[0] += 2
|
||||
h.Count += 10
|
||||
appendHistogram(series1, 101, h, &exp1)
|
||||
appendHistogram(series1, 101, h, &exp1, histogram.NotCounterReset)
|
||||
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
|
||||
|
||||
t.Run("changing schema", func(t *testing.T) {
|
||||
h.Schema = 2
|
||||
appendHistogram(series1, 102, h, &exp1)
|
||||
appendHistogram(series1, 102, h, &exp1, histogram.UnknownCounterReset)
|
||||
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
|
||||
|
||||
// Schema back to old.
|
||||
h.Schema = 1
|
||||
appendHistogram(series1, 103, h, &exp1)
|
||||
appendHistogram(series1, 103, h, &exp1, histogram.UnknownCounterReset)
|
||||
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
|
||||
})
|
||||
|
||||
|
@ -5942,7 +5949,7 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
|
|||
h.PositiveSpans[1].Length++
|
||||
h.PositiveBuckets = append(h.PositiveBuckets, 1)
|
||||
h.Count += 3
|
||||
appendHistogram(series1, 104, h, &exp1)
|
||||
appendHistogram(series1, 104, h, &exp1, histogram.NotCounterReset)
|
||||
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
|
||||
|
||||
// Because of the previous two histograms being on the active chunk,
|
||||
|
@ -5980,14 +5987,14 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
|
|||
h.Count += 3
|
||||
// {2, 1, -1, 0, 1} -> {2, 1, 0, -1, 0, 1}
|
||||
h.PositiveBuckets = append(h.PositiveBuckets[:2], append([]int64{0}, h.PositiveBuckets[2:]...)...)
|
||||
appendHistogram(series1, 105, h, &exp1)
|
||||
appendHistogram(series1, 105, h, &exp1, histogram.NotCounterReset)
|
||||
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
|
||||
|
||||
// We add 4 more histograms to clear out the buffer and see the re-encoded histograms.
|
||||
appendHistogram(series1, 106, h, &exp1)
|
||||
appendHistogram(series1, 107, h, &exp1)
|
||||
appendHistogram(series1, 108, h, &exp1)
|
||||
appendHistogram(series1, 109, h, &exp1)
|
||||
appendHistogram(series1, 106, h, &exp1, histogram.NotCounterReset)
|
||||
appendHistogram(series1, 107, h, &exp1, histogram.NotCounterReset)
|
||||
appendHistogram(series1, 108, h, &exp1, histogram.NotCounterReset)
|
||||
appendHistogram(series1, 109, h, &exp1, histogram.NotCounterReset)
|
||||
|
||||
// Update the expected histograms to reflect the re-encoding.
|
||||
if floatHistogram {
|
||||
|
@ -6020,7 +6027,7 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
|
|||
t.Run("buckets disappearing", func(t *testing.T) {
|
||||
h.PositiveSpans[1].Length--
|
||||
h.PositiveBuckets = h.PositiveBuckets[:len(h.PositiveBuckets)-1]
|
||||
appendHistogram(series1, 110, h, &exp1)
|
||||
appendHistogram(series1, 110, h, &exp1, histogram.CounterReset)
|
||||
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
|
||||
})
|
||||
})
|
||||
|
@ -6032,9 +6039,9 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
|
|||
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
|
||||
|
||||
h := baseH.Copy()
|
||||
appendHistogram(series2, 103, h, &exp2)
|
||||
appendHistogram(series2, 104, h, &exp2)
|
||||
appendHistogram(series2, 105, h, &exp2)
|
||||
appendHistogram(series2, 103, h, &exp2, histogram.UnknownCounterReset)
|
||||
appendHistogram(series2, 104, h, &exp2, histogram.NotCounterReset)
|
||||
appendHistogram(series2, 105, h, &exp2, histogram.NotCounterReset)
|
||||
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
|
||||
|
||||
// Switching between float and histograms again.
|
||||
|
@ -6042,16 +6049,16 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
|
|||
appendFloat(series2, 107, 107, &exp2)
|
||||
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
|
||||
|
||||
appendHistogram(series2, 108, h, &exp2)
|
||||
appendHistogram(series2, 109, h, &exp2)
|
||||
appendHistogram(series2, 108, h, &exp2, histogram.UnknownCounterReset)
|
||||
appendHistogram(series2, 109, h, &exp2, histogram.NotCounterReset)
|
||||
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
|
||||
})
|
||||
|
||||
t.Run("series starting with histogram and then getting float", func(t *testing.T) {
|
||||
h := baseH.Copy()
|
||||
appendHistogram(series3, 101, h, &exp3)
|
||||
appendHistogram(series3, 102, h, &exp3)
|
||||
appendHistogram(series3, 103, h, &exp3)
|
||||
appendHistogram(series3, 101, h, &exp3, histogram.UnknownCounterReset)
|
||||
appendHistogram(series3, 102, h, &exp3, histogram.NotCounterReset)
|
||||
appendHistogram(series3, 103, h, &exp3, histogram.NotCounterReset)
|
||||
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
|
||||
|
||||
appendFloat(series3, 104, 100, &exp3)
|
||||
|
@ -6060,8 +6067,8 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
|
|||
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
|
||||
|
||||
// Switching between histogram and float again.
|
||||
appendHistogram(series3, 107, h, &exp3)
|
||||
appendHistogram(series3, 108, h, &exp3)
|
||||
appendHistogram(series3, 107, h, &exp3, histogram.UnknownCounterReset)
|
||||
appendHistogram(series3, 108, h, &exp3, histogram.NotCounterReset)
|
||||
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
|
||||
|
||||
appendFloat(series3, 109, 106, &exp3)
|
||||
|
@ -6091,7 +6098,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
|
|||
t.Helper()
|
||||
|
||||
opts := DefaultOptions()
|
||||
opts.AllowOverlappingCompaction = true // TODO(jesus.vazquez) This replaced AllowOverlappingBlocks, make sure that works
|
||||
opts.AllowOverlappingCompaction = true // TODO(jesusvazquez): This replaced AllowOverlappingBlocks, make sure that works.
|
||||
db := openTestDB(t, opts, nil)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, db.Close())
|
||||
|
@ -6137,7 +6144,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
|
|||
q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
res := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
require.Equal(t, exp, res)
|
||||
compareSeries(t, exp, res)
|
||||
|
||||
// Compact all the blocks together and query again.
|
||||
blocks := db.Blocks()
|
||||
|
@ -6154,10 +6161,23 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
|
|||
q, err = db.Querier(ctx, math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
res = query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
require.Equal(t, exp, res)
|
||||
|
||||
// After compaction, we do not require "unknown" counter resets
|
||||
// due to origin from different overlapping chunks anymore.
|
||||
for _, ss := range exp {
|
||||
for i, s := range ss[1:] {
|
||||
if s.H() != nil && ss[i].H() != nil && s.H().CounterResetHint == histogram.UnknownCounterReset {
|
||||
s.H().CounterResetHint = histogram.NotCounterReset
|
||||
}
|
||||
if s.FH() != nil && ss[i].FH() != nil && s.FH().CounterResetHint == histogram.UnknownCounterReset {
|
||||
s.FH().CounterResetHint = histogram.NotCounterReset
|
||||
}
|
||||
}
|
||||
}
|
||||
compareSeries(t, exp, res)
|
||||
}
|
||||
|
||||
for _, floatHistogram := range []bool{true} {
|
||||
for _, floatHistogram := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("floatHistogram=%t", floatHistogram), func(t *testing.T) {
|
||||
t.Run("serial blocks with only histograms", func(t *testing.T) {
|
||||
testBlockQuerying(t,
|
||||
|
@ -6272,3 +6292,45 @@ func TestNativeHistogramFlag(t *testing.T) {
|
|||
l.String(): {sample{t: 200, h: h}, sample{t: 205, fh: h.ToFloat()}},
|
||||
}, act)
|
||||
}
|
||||
|
||||
// compareSeries essentially replaces `require.Equal(t, expected, actual) in
|
||||
// situations where the actual series might contain more counter reset hints
|
||||
// "unknown" than the expected series. This can easily happen for long series
|
||||
// that trigger new chunks. This function therefore tolerates counter reset
|
||||
// hints "CounterReset" and "NotCounterReset" in an expected series where the
|
||||
// actual series contains a counter reset hint "UnknownCounterReset".
|
||||
// "GaugeType" hints are still strictly checked, and any "UnknownCounterReset"
|
||||
// in an expected series has to be matched precisely by the actual series.
|
||||
func compareSeries(t require.TestingT, expected, actual map[string][]tsdbutil.Sample) {
|
||||
if len(expected) != len(actual) {
|
||||
// The reason for the difference is not the counter reset hints
|
||||
// (alone), so let's use the pretty diffing by the require
|
||||
// package.
|
||||
require.Equal(t, expected, actual, "number of series differs")
|
||||
}
|
||||
for key, eSamples := range expected {
|
||||
aSamples, ok := actual[key]
|
||||
if !ok {
|
||||
require.Equal(t, expected, actual, "expected series %q not found", key)
|
||||
}
|
||||
if len(eSamples) != len(aSamples) {
|
||||
require.Equal(t, eSamples, aSamples, "number of samples for series %q differs", key)
|
||||
}
|
||||
for i, eS := range eSamples {
|
||||
aS := aSamples[i]
|
||||
aH, eH := aS.H(), eS.H()
|
||||
aFH, eFH := aS.FH(), eS.FH()
|
||||
switch {
|
||||
case aH != nil && eH != nil && aH.CounterResetHint == histogram.UnknownCounterReset && eH.CounterResetHint != histogram.GaugeType:
|
||||
eH = eH.Copy()
|
||||
eH.CounterResetHint = histogram.UnknownCounterReset
|
||||
eS = sample{t: eS.T(), h: eH}
|
||||
case aFH != nil && eFH != nil && aFH.CounterResetHint == histogram.UnknownCounterReset && eFH.CounterResetHint != histogram.GaugeType:
|
||||
eFH = eFH.Copy()
|
||||
eFH.CounterResetHint = histogram.UnknownCounterReset
|
||||
eS = sample{t: eS.T(), fh: eFH}
|
||||
}
|
||||
require.Equal(t, eS, aS, "sample %d in series %q differs", i, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
18
tsdb/head.go
18
tsdb/head.go
|
@ -2040,7 +2040,7 @@ func (h *Head) updateWALReplayStatusRead(current int) {
|
|||
|
||||
func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
|
||||
for i := 0; i < n; i++ {
|
||||
r = append(r, &histogram.Histogram{
|
||||
h := histogram.Histogram{
|
||||
Count: 10 + uint64(i*8),
|
||||
ZeroCount: 2 + uint64(i),
|
||||
ZeroThreshold: 0.001,
|
||||
|
@ -2056,9 +2056,12 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
|
|||
{Offset: 1, Length: 2},
|
||||
},
|
||||
NegativeBuckets: []int64{int64(i + 1), 1, -1, 0},
|
||||
})
|
||||
}
|
||||
if i > 0 {
|
||||
h.CounterResetHint = histogram.NotCounterReset
|
||||
}
|
||||
r = append(r, &h)
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
|
@ -2084,13 +2087,12 @@ func GenerateTestGaugeHistograms(n int) (r []*histogram.Histogram) {
|
|||
NegativeBuckets: []int64{int64(i + 1), 1, -1, 0},
|
||||
})
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
|
||||
for i := 0; i < n; i++ {
|
||||
r = append(r, &histogram.FloatHistogram{
|
||||
h := histogram.FloatHistogram{
|
||||
Count: 10 + float64(i*8),
|
||||
ZeroCount: 2 + float64(i),
|
||||
ZeroThreshold: 0.001,
|
||||
|
@ -2106,7 +2108,11 @@ func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
|
|||
{Offset: 1, Length: 2},
|
||||
},
|
||||
NegativeBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)},
|
||||
})
|
||||
}
|
||||
if i > 0 {
|
||||
h.CounterResetHint = histogram.NotCounterReset
|
||||
}
|
||||
r = append(r, &h)
|
||||
}
|
||||
|
||||
return r
|
||||
|
|
|
@ -1141,7 +1141,6 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
|
|||
|
||||
// appendHistogram adds the histogram.
|
||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
// TODO(codesome): Support gauge histograms here.
|
||||
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
|
||||
// Head controls the execution of recoding, so that we own the proper
|
||||
// chunk reference afterwards. We check for Appendable from appender before
|
||||
|
@ -1150,44 +1149,54 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
|||
// meta properly.
|
||||
app, _ := s.app.(*chunkenc.HistogramAppender)
|
||||
var (
|
||||
positiveInterjections, negativeInterjections []chunkenc.Interjection
|
||||
pBackwardInter, nBackwardInter []chunkenc.Interjection
|
||||
pMergedSpans, nMergedSpans []histogram.Span
|
||||
okToAppend, counterReset bool
|
||||
pForwardInserts, nForwardInserts []chunkenc.Insert
|
||||
pBackwardInserts, nBackwardInserts []chunkenc.Insert
|
||||
pMergedSpans, nMergedSpans []histogram.Span
|
||||
okToAppend, counterReset, gauge bool
|
||||
)
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
gauge := h.CounterResetHint == histogram.GaugeType
|
||||
if app != nil {
|
||||
if gauge {
|
||||
positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter, pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(h)
|
||||
} else {
|
||||
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h)
|
||||
switch h.CounterResetHint {
|
||||
case histogram.GaugeType:
|
||||
gauge = true
|
||||
if app != nil {
|
||||
pForwardInserts, nForwardInserts,
|
||||
pBackwardInserts, nBackwardInserts,
|
||||
pMergedSpans, nMergedSpans,
|
||||
okToAppend = app.AppendableGauge(h)
|
||||
}
|
||||
case histogram.CounterReset:
|
||||
// The caller tells us this is a counter reset, even if it
|
||||
// doesn't look like one.
|
||||
counterReset = true
|
||||
default:
|
||||
if app != nil {
|
||||
pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(h)
|
||||
}
|
||||
}
|
||||
|
||||
if !chunkCreated {
|
||||
if len(pBackwardInter)+len(nBackwardInter) > 0 {
|
||||
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
|
||||
h.PositiveSpans = pMergedSpans
|
||||
h.NegativeSpans = nMergedSpans
|
||||
app.RecodeHistogram(h, pBackwardInter, nBackwardInter)
|
||||
app.RecodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
||||
}
|
||||
// We have 3 cases here
|
||||
// - !okToAppend -> We need to cut a new chunk.
|
||||
// - okToAppend but we have interjections → Existing chunk needs
|
||||
// - okToAppend but we have inserts → Existing chunk needs
|
||||
// recoding before we can append our histogram.
|
||||
// - okToAppend and no interjections → Chunk is ready to support our histogram.
|
||||
// - okToAppend and no inserts → Chunk is ready to support our histogram.
|
||||
if !okToAppend || counterReset {
|
||||
c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
|
||||
chunkCreated = true
|
||||
} else if len(positiveInterjections) > 0 || len(negativeInterjections) > 0 {
|
||||
} else if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
||||
// New buckets have appeared. We need to recode all
|
||||
// prior histogram samples within the chunk before we
|
||||
// can process this one.
|
||||
chunk, app := app.Recode(
|
||||
positiveInterjections, negativeInterjections,
|
||||
pForwardInserts, nForwardInserts,
|
||||
h.PositiveSpans, h.NegativeSpans,
|
||||
)
|
||||
c.chunk = chunk
|
||||
|
@ -1233,45 +1242,54 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
|
|||
// meta properly.
|
||||
app, _ := s.app.(*chunkenc.FloatHistogramAppender)
|
||||
var (
|
||||
positiveInterjections, negativeInterjections []chunkenc.Interjection
|
||||
pBackwardInter, nBackwardInter []chunkenc.Interjection
|
||||
pMergedSpans, nMergedSpans []histogram.Span
|
||||
okToAppend, counterReset bool
|
||||
pForwardInserts, nForwardInserts []chunkenc.Insert
|
||||
pBackwardInserts, nBackwardInserts []chunkenc.Insert
|
||||
pMergedSpans, nMergedSpans []histogram.Span
|
||||
okToAppend, counterReset, gauge bool
|
||||
)
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
gauge := fh.CounterResetHint == histogram.GaugeType
|
||||
if app != nil {
|
||||
if gauge {
|
||||
positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter,
|
||||
pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(fh)
|
||||
} else {
|
||||
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh)
|
||||
switch fh.CounterResetHint {
|
||||
case histogram.GaugeType:
|
||||
gauge = true
|
||||
if app != nil {
|
||||
pForwardInserts, nForwardInserts,
|
||||
pBackwardInserts, nBackwardInserts,
|
||||
pMergedSpans, nMergedSpans,
|
||||
okToAppend = app.AppendableGauge(fh)
|
||||
}
|
||||
case histogram.CounterReset:
|
||||
// The caller tells us this is a counter reset, even if it
|
||||
// doesn't look like one.
|
||||
counterReset = true
|
||||
default:
|
||||
if app != nil {
|
||||
pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(fh)
|
||||
}
|
||||
}
|
||||
|
||||
if !chunkCreated {
|
||||
if len(pBackwardInter)+len(nBackwardInter) > 0 {
|
||||
if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
|
||||
fh.PositiveSpans = pMergedSpans
|
||||
fh.NegativeSpans = nMergedSpans
|
||||
app.RecodeHistogramm(fh, pBackwardInter, nBackwardInter)
|
||||
app.RecodeHistogramm(fh, pBackwardInserts, nBackwardInserts)
|
||||
}
|
||||
// We have 3 cases here
|
||||
// - !okToAppend -> We need to cut a new chunk.
|
||||
// - okToAppend but we have interjections → Existing chunk needs
|
||||
// - okToAppend but we have inserts → Existing chunk needs
|
||||
// recoding before we can append our histogram.
|
||||
// - okToAppend and no interjections → Chunk is ready to support our histogram.
|
||||
// - okToAppend and no inserts → Chunk is ready to support our histogram.
|
||||
if !okToAppend || counterReset {
|
||||
c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange)
|
||||
chunkCreated = true
|
||||
} else if len(positiveInterjections) > 0 || len(negativeInterjections) > 0 {
|
||||
} else if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
||||
// New buckets have appeared. We need to recode all
|
||||
// prior histogram samples within the chunk before we
|
||||
// can process this one.
|
||||
chunk, app := app.Recode(
|
||||
positiveInterjections, negativeInterjections,
|
||||
pForwardInserts, nForwardInserts,
|
||||
fh.PositiveSpans, fh.NegativeSpans,
|
||||
)
|
||||
c.chunk = chunk
|
||||
|
|
|
@ -197,7 +197,6 @@ func BenchmarkLoadWAL(b *testing.B) {
|
|||
continue
|
||||
}
|
||||
lastExemplarsPerSeries = exemplarsPerSeries
|
||||
// fmt.Println("exemplars per series: ", exemplarsPerSeries)
|
||||
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT),
|
||||
func(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
|
@ -2834,17 +2833,13 @@ func TestAppendHistogram(t *testing.T) {
|
|||
ingestTs := int64(0)
|
||||
app := head.Appender(context.Background())
|
||||
|
||||
type timedHistogram struct {
|
||||
t int64
|
||||
h *histogram.Histogram
|
||||
}
|
||||
expHistograms := make([]timedHistogram, 0, 2*numHistograms)
|
||||
expHistograms := make([]tsdbutil.Sample, 0, 2*numHistograms)
|
||||
|
||||
// Counter integer histograms.
|
||||
for _, h := range GenerateTestHistograms(numHistograms) {
|
||||
_, err := app.AppendHistogram(0, l, ingestTs, h, nil)
|
||||
require.NoError(t, err)
|
||||
expHistograms = append(expHistograms, timedHistogram{ingestTs, h})
|
||||
expHistograms = append(expHistograms, sample{t: ingestTs, h: h})
|
||||
ingestTs++
|
||||
if ingestTs%50 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -2856,7 +2851,7 @@ func TestAppendHistogram(t *testing.T) {
|
|||
for _, h := range GenerateTestGaugeHistograms(numHistograms) {
|
||||
_, err := app.AppendHistogram(0, l, ingestTs, h, nil)
|
||||
require.NoError(t, err)
|
||||
expHistograms = append(expHistograms, timedHistogram{ingestTs, h})
|
||||
expHistograms = append(expHistograms, sample{t: ingestTs, h: h})
|
||||
ingestTs++
|
||||
if ingestTs%50 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -2864,17 +2859,13 @@ func TestAppendHistogram(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type timedFloatHistogram struct {
|
||||
t int64
|
||||
h *histogram.FloatHistogram
|
||||
}
|
||||
expFloatHistograms := make([]timedFloatHistogram, 0, 2*numHistograms)
|
||||
expFloatHistograms := make([]tsdbutil.Sample, 0, 2*numHistograms)
|
||||
|
||||
// Counter float histograms.
|
||||
for _, fh := range GenerateTestFloatHistograms(numHistograms) {
|
||||
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
|
||||
require.NoError(t, err)
|
||||
expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh})
|
||||
expFloatHistograms = append(expFloatHistograms, sample{t: ingestTs, fh: fh})
|
||||
ingestTs++
|
||||
if ingestTs%50 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -2886,7 +2877,7 @@ func TestAppendHistogram(t *testing.T) {
|
|||
for _, fh := range GenerateTestGaugeFloatHistograms(numHistograms) {
|
||||
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
|
||||
require.NoError(t, err)
|
||||
expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh})
|
||||
expFloatHistograms = append(expFloatHistograms, sample{t: ingestTs, fh: fh})
|
||||
ingestTs++
|
||||
if ingestTs%50 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -2909,20 +2900,28 @@ func TestAppendHistogram(t *testing.T) {
|
|||
require.False(t, ss.Next())
|
||||
|
||||
it := s.Iterator(nil)
|
||||
actHistograms := make([]timedHistogram, 0, len(expHistograms))
|
||||
actFloatHistograms := make([]timedFloatHistogram, 0, len(expFloatHistograms))
|
||||
actHistograms := make([]tsdbutil.Sample, 0, len(expHistograms))
|
||||
actFloatHistograms := make([]tsdbutil.Sample, 0, len(expFloatHistograms))
|
||||
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
|
||||
if typ == chunkenc.ValHistogram {
|
||||
ts, h := it.AtHistogram()
|
||||
actHistograms = append(actHistograms, timedHistogram{ts, h})
|
||||
actHistograms = append(actHistograms, sample{t: ts, h: h})
|
||||
} else if typ == chunkenc.ValFloatHistogram {
|
||||
ts, fh := it.AtFloatHistogram()
|
||||
actFloatHistograms = append(actFloatHistograms, timedFloatHistogram{ts, fh})
|
||||
actFloatHistograms = append(actFloatHistograms, sample{t: ts, fh: fh})
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, expHistograms, actHistograms)
|
||||
require.Equal(t, expFloatHistograms, actFloatHistograms)
|
||||
compareSeries(
|
||||
t,
|
||||
map[string][]tsdbutil.Sample{"dummy": expHistograms},
|
||||
map[string][]tsdbutil.Sample{"dummy": actHistograms},
|
||||
)
|
||||
compareSeries(
|
||||
t,
|
||||
map[string][]tsdbutil.Sample{"dummy": expFloatHistograms},
|
||||
map[string][]tsdbutil.Sample{"dummy": actFloatHistograms},
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -3019,7 +3018,12 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
|||
h.NegativeBuckets = h.PositiveBuckets
|
||||
_, err := app.AppendHistogram(0, s2, int64(ts), h, nil)
|
||||
require.NoError(t, err)
|
||||
exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()})
|
||||
eh := h.Copy()
|
||||
if !gauge && ts > 30 && (ts-10)%20 == 1 {
|
||||
// Need "unknown" hint after float sample.
|
||||
eh.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
exp[k2] = append(exp[k2], sample{t: int64(ts), h: eh})
|
||||
if ts%20 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
|
@ -3051,7 +3055,12 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
|||
h.NegativeBuckets = h.PositiveBuckets
|
||||
_, err := app.AppendHistogram(0, s2, int64(ts), nil, h)
|
||||
require.NoError(t, err)
|
||||
exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()})
|
||||
eh := h.Copy()
|
||||
if !gauge && ts > 30 && (ts-10)%20 == 1 {
|
||||
// Need "unknown" hint after float sample.
|
||||
eh.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
exp[k2] = append(exp[k2], sample{t: int64(ts), fh: eh})
|
||||
if ts%20 == 0 {
|
||||
require.NoError(t, app.Commit())
|
||||
app = head.Appender(context.Background())
|
||||
|
@ -3089,7 +3098,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
|||
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
|
||||
require.NoError(t, err)
|
||||
act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*"))
|
||||
require.Equal(t, exp, act)
|
||||
compareSeries(t, exp, act)
|
||||
}
|
||||
testQuery()
|
||||
|
||||
|
@ -3506,6 +3515,11 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
|
|||
ah.fh.Sum = 0
|
||||
eh.fh = eh.fh.Copy()
|
||||
eh.fh.Sum = 0
|
||||
} else if i > 0 {
|
||||
prev := expHistograms[i-1]
|
||||
if prev.fh == nil || value.IsStaleNaN(prev.fh.Sum) {
|
||||
eh.fh.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
}
|
||||
require.Equal(t, eh, ah)
|
||||
} else {
|
||||
|
@ -3516,6 +3530,11 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
|
|||
ah.h.Sum = 0
|
||||
eh.h = eh.h.Copy()
|
||||
eh.h.Sum = 0
|
||||
} else if i > 0 {
|
||||
prev := expHistograms[i-1]
|
||||
if prev.h == nil || value.IsStaleNaN(prev.h.Sum) {
|
||||
eh.h.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
}
|
||||
require.Equal(t, eh, ah)
|
||||
}
|
||||
|
@ -3730,8 +3749,10 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
// If this is empty, samples above will be taken instead of this.
|
||||
addToExp []tsdbutil.Sample
|
||||
}{
|
||||
// Histograms that end up in the expected samples are copied here so that we
|
||||
// can independently set the CounterResetHint later.
|
||||
{
|
||||
samples: []tsdbutil.Sample{sample{t: 100, h: hists[1]}},
|
||||
samples: []tsdbutil.Sample{sample{t: 100, h: hists[0].Copy()}},
|
||||
expChunks: 1,
|
||||
},
|
||||
{
|
||||
|
@ -3739,23 +3760,23 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
expChunks: 2,
|
||||
},
|
||||
{
|
||||
samples: []tsdbutil.Sample{sample{t: 210, fh: floatHists[1]}},
|
||||
samples: []tsdbutil.Sample{sample{t: 210, fh: floatHists[0].Copy()}},
|
||||
expChunks: 3,
|
||||
},
|
||||
{
|
||||
samples: []tsdbutil.Sample{sample{t: 220, h: hists[1]}},
|
||||
samples: []tsdbutil.Sample{sample{t: 220, h: hists[1].Copy()}},
|
||||
expChunks: 4,
|
||||
},
|
||||
{
|
||||
samples: []tsdbutil.Sample{sample{t: 230, fh: floatHists[3]}},
|
||||
samples: []tsdbutil.Sample{sample{t: 230, fh: floatHists[3].Copy()}},
|
||||
expChunks: 5,
|
||||
},
|
||||
{
|
||||
samples: []tsdbutil.Sample{sample{t: 100, h: hists[2]}},
|
||||
samples: []tsdbutil.Sample{sample{t: 100, h: hists[2].Copy()}},
|
||||
err: storage.ErrOutOfOrderSample,
|
||||
},
|
||||
{
|
||||
samples: []tsdbutil.Sample{sample{t: 300, h: hists[3]}},
|
||||
samples: []tsdbutil.Sample{sample{t: 300, h: hists[3].Copy()}},
|
||||
expChunks: 6,
|
||||
},
|
||||
{
|
||||
|
@ -3763,7 +3784,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
err: storage.ErrOutOfOrderSample,
|
||||
},
|
||||
{
|
||||
samples: []tsdbutil.Sample{sample{t: 100, fh: floatHists[4]}},
|
||||
samples: []tsdbutil.Sample{sample{t: 100, fh: floatHists[4].Copy()}},
|
||||
err: storage.ErrOutOfOrderSample,
|
||||
},
|
||||
{
|
||||
|
@ -3789,7 +3810,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
},
|
||||
addToExp: []tsdbutil.Sample{
|
||||
sample{t: 800, v: 8},
|
||||
sample{t: 900, h: hists[9]},
|
||||
sample{t: 900, h: hists[9].Copy()},
|
||||
},
|
||||
expChunks: 8, // float64 added to old chunk, only 1 new for histograms.
|
||||
},
|
||||
|
@ -3800,7 +3821,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
sample{t: 1100, h: hists[9]},
|
||||
},
|
||||
addToExp: []tsdbutil.Sample{
|
||||
sample{t: 1100, h: hists[9]},
|
||||
sample{t: 1100, h: hists[9].Copy()},
|
||||
},
|
||||
expChunks: 8,
|
||||
},
|
||||
|
@ -3830,6 +3851,14 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
require.NoError(t, app.Rollback())
|
||||
}
|
||||
}
|
||||
for i, s := range expResult[1:] {
|
||||
switch {
|
||||
case s.H() != nil && expResult[i].H() == nil:
|
||||
s.(sample).h.CounterResetHint = histogram.UnknownCounterReset
|
||||
case s.FH() != nil && expResult[i].FH() == nil:
|
||||
s.(sample).fh.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
}
|
||||
|
||||
// Query back and expect same order of samples.
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
|
|
Loading…
Reference in a new issue