mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-27 22:49:40 -08:00
tsdb: Add integer gauge histogram support
This follows what #11783 has done for float gauge histograms. Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
parent
3e5ad99c33
commit
6dcd03dbf3
|
@ -177,6 +177,7 @@ func newHistogramIterator(b []byte) *histogramIterator {
|
||||||
// The first 3 bytes contain chunk headers.
|
// The first 3 bytes contain chunk headers.
|
||||||
// We skip that for actual samples.
|
// We skip that for actual samples.
|
||||||
_, _ = it.br.readBits(24)
|
_, _ = it.br.readBits(24)
|
||||||
|
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
|
||||||
return it
|
return it
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,6 +223,14 @@ type HistogramAppender struct {
|
||||||
trailing uint8
|
trailing uint8
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *HistogramAppender) GetCounterResetHeader() CounterResetHeader {
|
||||||
|
return CounterResetHeader(a.b.bytes()[2] & 0b11000000)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *HistogramAppender) NumSamples() int {
|
||||||
|
return int(binary.BigEndian.Uint16(a.b.bytes()))
|
||||||
|
}
|
||||||
|
|
||||||
// Append implements Appender. This implementation panics because normal float
|
// Append implements Appender. This implementation panics because normal float
|
||||||
// samples must never be appended to a histogram chunk.
|
// samples must never be appended to a histogram chunk.
|
||||||
func (a *HistogramAppender) Append(int64, float64) {
|
func (a *HistogramAppender) Append(int64, float64) {
|
||||||
|
@ -237,19 +246,16 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra
|
||||||
// Appendable returns whether the chunk can be appended to, and if so
|
// Appendable returns whether the chunk can be appended to, and if so
|
||||||
// whether any recoding needs to happen using the provided interjections
|
// whether any recoding needs to happen using the provided interjections
|
||||||
// (in case of any new buckets, positive or negative range, respectively).
|
// (in case of any new buckets, positive or negative range, respectively).
|
||||||
|
// If the sample is a gauge histogram, AppendableGauge must be used instead.
|
||||||
//
|
//
|
||||||
// The chunk is not appendable in the following cases:
|
// The chunk is not appendable in the following cases:
|
||||||
//
|
//
|
||||||
// • The schema has changed.
|
// - The schema has changed.
|
||||||
//
|
// - The threshold for the zero bucket has changed.
|
||||||
// • The threshold for the zero bucket has changed.
|
// - Any buckets have disappeared.
|
||||||
//
|
// - There was a counter reset in the count of observations or in any bucket,
|
||||||
// • Any buckets have disappeared.
|
|
||||||
//
|
|
||||||
// • There was a counter reset in the count of observations or in any bucket,
|
|
||||||
// including the zero bucket.
|
// including the zero bucket.
|
||||||
//
|
// - The last sample in the chunk was stale while the current sample is not stale.
|
||||||
// • The last sample in the chunk was stale while the current sample is not stale.
|
|
||||||
//
|
//
|
||||||
// The method returns an additional boolean set to true if it is not appendable
|
// The method returns an additional boolean set to true if it is not appendable
|
||||||
// because of a counter reset. If the given sample is stale, it is always ok to
|
// because of a counter reset. If the given sample is stale, it is always ok to
|
||||||
|
@ -258,6 +264,9 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
|
||||||
positiveInterjections, negativeInterjections []Interjection,
|
positiveInterjections, negativeInterjections []Interjection,
|
||||||
okToAppend, counterReset bool,
|
okToAppend, counterReset bool,
|
||||||
) {
|
) {
|
||||||
|
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
|
||||||
|
return
|
||||||
|
}
|
||||||
if value.IsStaleNaN(h.Sum) {
|
if value.IsStaleNaN(h.Sum) {
|
||||||
// This is a stale sample whose buckets and spans don't matter.
|
// This is a stale sample whose buckets and spans don't matter.
|
||||||
okToAppend = true
|
okToAppend = true
|
||||||
|
@ -307,8 +316,47 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
type bucketValue interface {
|
// AppendableGauge returns whether the chunk can be appended to, and if so
|
||||||
int64 | float64
|
// whether:
|
||||||
|
// 1. Any recoding needs to happen to the chunk using the provided interjections
|
||||||
|
// (in case of any new buckets, positive or negative range, respectively).
|
||||||
|
// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections
|
||||||
|
// (in case of any missing buckets, positive or negative range, respectively).
|
||||||
|
//
|
||||||
|
// This method must be only used for gauge histograms.
|
||||||
|
//
|
||||||
|
// The chunk is not appendable in the following cases:
|
||||||
|
// - The schema has changed.
|
||||||
|
// - The threshold for the zero bucket has changed.
|
||||||
|
// - The last sample in the chunk was stale while the current sample is not stale.
|
||||||
|
func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) (
|
||||||
|
positiveInterjections, negativeInterjections []Interjection,
|
||||||
|
backwardPositiveInterjections, backwardNegativeInterjections []Interjection,
|
||||||
|
positiveSpans, negativeSpans []histogram.Span,
|
||||||
|
okToAppend bool,
|
||||||
|
) {
|
||||||
|
if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if value.IsStaleNaN(h.Sum) {
|
||||||
|
// This is a stale sample whose buckets and spans don't matter.
|
||||||
|
okToAppend = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if value.IsStaleNaN(a.sum) {
|
||||||
|
// If the last sample was stale, then we can only accept stale
|
||||||
|
// samples in this chunk.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans)
|
||||||
|
negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans)
|
||||||
|
okToAppend = true
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// counterResetInAnyBucket returns true if there was a counter reset for any
|
// counterResetInAnyBucket returns true if there was a counter reset for any
|
||||||
|
@ -542,6 +590,22 @@ func (a *HistogramAppender) Recode(
|
||||||
return hc, app
|
return hc, app
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RecodeHistogramm converts the current histogram (in-place) to accommodate an expansion of the set of
|
||||||
|
// (positive and/or negative) buckets used.
|
||||||
|
func (a *HistogramAppender) RecodeHistogramm(
|
||||||
|
h *histogram.Histogram,
|
||||||
|
pBackwardInter, nBackwardInter []Interjection,
|
||||||
|
) {
|
||||||
|
if len(pBackwardInter) > 0 {
|
||||||
|
numPositiveBuckets := countSpans(h.PositiveSpans)
|
||||||
|
h.PositiveBuckets = interject(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInter, true)
|
||||||
|
}
|
||||||
|
if len(nBackwardInter) > 0 {
|
||||||
|
numNegativeBuckets := countSpans(h.NegativeSpans)
|
||||||
|
h.NegativeBuckets = interject(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInter, true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (a *HistogramAppender) writeSumDelta(v float64) {
|
func (a *HistogramAppender) writeSumDelta(v float64) {
|
||||||
xorWrite(a.b, v, a.sum, &a.leading, &a.trailing)
|
xorWrite(a.b, v, a.sum, &a.leading, &a.trailing)
|
||||||
}
|
}
|
||||||
|
@ -551,6 +615,8 @@ type histogramIterator struct {
|
||||||
numTotal uint16
|
numTotal uint16
|
||||||
numRead uint16
|
numRead uint16
|
||||||
|
|
||||||
|
counterResetHeader CounterResetHeader
|
||||||
|
|
||||||
// Layout:
|
// Layout:
|
||||||
schema int32
|
schema int32
|
||||||
zThreshold float64
|
zThreshold float64
|
||||||
|
@ -599,7 +665,12 @@ func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) {
|
||||||
return it.t, &histogram.Histogram{Sum: it.sum}
|
return it.t, &histogram.Histogram{Sum: it.sum}
|
||||||
}
|
}
|
||||||
it.atHistogramCalled = true
|
it.atHistogramCalled = true
|
||||||
|
crHint := histogram.UnknownCounterReset
|
||||||
|
if it.counterResetHeader == GaugeType {
|
||||||
|
crHint = histogram.GaugeType
|
||||||
|
}
|
||||||
return it.t, &histogram.Histogram{
|
return it.t, &histogram.Histogram{
|
||||||
|
CounterResetHint: crHint,
|
||||||
Count: it.cnt,
|
Count: it.cnt,
|
||||||
ZeroCount: it.zCnt,
|
ZeroCount: it.zCnt,
|
||||||
Sum: it.sum,
|
Sum: it.sum,
|
||||||
|
@ -617,7 +688,12 @@ func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogra
|
||||||
return it.t, &histogram.FloatHistogram{Sum: it.sum}
|
return it.t, &histogram.FloatHistogram{Sum: it.sum}
|
||||||
}
|
}
|
||||||
it.atFloatHistogramCalled = true
|
it.atFloatHistogramCalled = true
|
||||||
|
crHint := histogram.UnknownCounterReset
|
||||||
|
if it.counterResetHeader == GaugeType {
|
||||||
|
crHint = histogram.GaugeType
|
||||||
|
}
|
||||||
return it.t, &histogram.FloatHistogram{
|
return it.t, &histogram.FloatHistogram{
|
||||||
|
CounterResetHint: crHint,
|
||||||
Count: float64(it.cnt),
|
Count: float64(it.cnt),
|
||||||
ZeroCount: float64(it.zCnt),
|
ZeroCount: float64(it.zCnt),
|
||||||
Sum: it.sum,
|
Sum: it.sum,
|
||||||
|
@ -645,6 +721,8 @@ func (it *histogramIterator) Reset(b []byte) {
|
||||||
it.numTotal = binary.BigEndian.Uint16(b)
|
it.numTotal = binary.BigEndian.Uint16(b)
|
||||||
it.numRead = 0
|
it.numRead = 0
|
||||||
|
|
||||||
|
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
|
||||||
|
|
||||||
it.t, it.cnt, it.zCnt = 0, 0, 0
|
it.t, it.cnt, it.zCnt = 0, 0, 0
|
||||||
it.tDelta, it.cntDelta, it.zCntDelta = 0, 0, 0
|
it.tDelta, it.cntDelta, it.zCntDelta = 0, 0, 0
|
||||||
|
|
||||||
|
|
|
@ -376,6 +376,10 @@ loop:
|
||||||
return interjections, bInterjections, mergedSpans
|
return interjections, bInterjections, mergedSpans
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type bucketValue interface {
|
||||||
|
int64 | float64
|
||||||
|
}
|
||||||
|
|
||||||
// interject merges 'in' with the provided interjections and writes them into
|
// interject merges 'in' with the provided interjections and writes them into
|
||||||
// 'out', which must already have the appropriate length.
|
// 'out', which must already have the appropriate length.
|
||||||
func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV {
|
func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV {
|
||||||
|
|
|
@ -517,3 +517,171 @@ func TestAtFloatHistogram(t *testing.T) {
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHistogramChunkAppendableGauge(t *testing.T) {
|
||||||
|
c := Chunk(NewHistogramChunk())
|
||||||
|
|
||||||
|
// Create fresh appender and add the first histogram.
|
||||||
|
app, err := c.Appender()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, c.NumSamples())
|
||||||
|
|
||||||
|
ts := int64(1234567890)
|
||||||
|
h1 := &histogram.Histogram{
|
||||||
|
Count: 5,
|
||||||
|
ZeroCount: 2,
|
||||||
|
Sum: 18.4,
|
||||||
|
ZeroThreshold: 1e-125,
|
||||||
|
Schema: 1,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 2, Length: 1},
|
||||||
|
{Offset: 3, Length: 2},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // {6, 3, 3, 2, 4, 5, 1}
|
||||||
|
}
|
||||||
|
|
||||||
|
app.AppendHistogram(ts, h1.Copy())
|
||||||
|
require.Equal(t, 1, c.NumSamples())
|
||||||
|
c.(*HistogramChunk).SetCounterResetHeader(GaugeType)
|
||||||
|
|
||||||
|
{ // Schema change.
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.Schema++
|
||||||
|
hApp, _ := app.(*HistogramAppender)
|
||||||
|
_, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // Zero threshold change.
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.ZeroThreshold += 0.1
|
||||||
|
hApp, _ := app.(*HistogramAppender)
|
||||||
|
_, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // New histogram that has more buckets.
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
{Offset: 1, Length: 4},
|
||||||
|
{Offset: 3, Length: 3},
|
||||||
|
}
|
||||||
|
h2.Count += 9
|
||||||
|
h2.ZeroCount++
|
||||||
|
h2.Sum = 30
|
||||||
|
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // {7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1}
|
||||||
|
|
||||||
|
hApp, _ := app.(*HistogramAppender)
|
||||||
|
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||||
|
require.Greater(t, len(pI), 0)
|
||||||
|
require.Len(t, nI, 0)
|
||||||
|
require.Len(t, pBackwardI, 0)
|
||||||
|
require.Len(t, nBackwardI, 0)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // New histogram that has buckets missing.
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 2, Length: 1},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 4, Length: 1},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
}
|
||||||
|
h2.Count -= 4
|
||||||
|
h2.Sum--
|
||||||
|
h2.PositiveBuckets = []int64{6, -3, 0, -1, 3, -4} // {6, 3, 3, 2, 5, 1}
|
||||||
|
|
||||||
|
hApp, _ := app.(*HistogramAppender)
|
||||||
|
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||||
|
require.Len(t, pI, 0)
|
||||||
|
require.Len(t, nI, 0)
|
||||||
|
require.Greater(t, len(pBackwardI), 0)
|
||||||
|
require.Len(t, nBackwardI, 0)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // New histogram that has a bucket missing and new buckets.
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 5, Length: 2},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
}
|
||||||
|
h2.Sum = 21
|
||||||
|
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // {6, 3, 2, 4, 5, 1}
|
||||||
|
|
||||||
|
hApp, _ := app.(*HistogramAppender)
|
||||||
|
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||||
|
require.Greater(t, len(pI), 0)
|
||||||
|
require.Greater(t, len(pBackwardI), 0)
|
||||||
|
require.Len(t, nI, 0)
|
||||||
|
require.Len(t, nBackwardI, 0)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // New histogram that has a counter reset while buckets are same.
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.Sum = 23
|
||||||
|
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // {6, 2, 3, 2, 4, 5, 1}
|
||||||
|
|
||||||
|
hApp, _ := app.(*HistogramAppender)
|
||||||
|
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||||
|
require.Len(t, pI, 0)
|
||||||
|
require.Len(t, nI, 0)
|
||||||
|
require.Len(t, pBackwardI, 0)
|
||||||
|
require.Len(t, nBackwardI, 0)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // New histogram that has a counter reset while new buckets were added.
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: 0, Length: 3},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
{Offset: 1, Length: 4},
|
||||||
|
{Offset: 3, Length: 3},
|
||||||
|
}
|
||||||
|
h2.Sum = 29
|
||||||
|
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // {7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0}
|
||||||
|
|
||||||
|
hApp, _ := app.(*HistogramAppender)
|
||||||
|
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||||
|
require.Greater(t, len(pI), 0)
|
||||||
|
require.Len(t, nI, 0)
|
||||||
|
require.Len(t, pBackwardI, 0)
|
||||||
|
require.Len(t, nBackwardI, 0)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// New histogram that has a counter reset while new buckets were
|
||||||
|
// added before the first bucket and reset on first bucket.
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: -3, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
{Offset: 2, Length: 1},
|
||||||
|
{Offset: 3, Length: 2},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 1, Length: 1},
|
||||||
|
}
|
||||||
|
h2.Sum = 26
|
||||||
|
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // {1, 2, 5, 3, 3, 2, 4, 5, 1}
|
||||||
|
|
||||||
|
hApp, _ := app.(*HistogramAppender)
|
||||||
|
pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
|
||||||
|
require.Greater(t, len(pI), 0)
|
||||||
|
require.Len(t, nI, 0)
|
||||||
|
require.Len(t, pBackwardI, 0)
|
||||||
|
require.Len(t, nBackwardI, 0)
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
28
tsdb/head.go
28
tsdb/head.go
|
@ -2048,6 +2048,32 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GenerateTestGaugeHistograms(n int) (r []*histogram.Histogram) {
|
||||||
|
for x := 0; x < n; x++ {
|
||||||
|
i := rand.Intn(n)
|
||||||
|
r = append(r, &histogram.Histogram{
|
||||||
|
CounterResetHint: histogram.GaugeType,
|
||||||
|
Count: 10 + uint64(i*8),
|
||||||
|
ZeroCount: 2 + uint64(i),
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
Sum: 18.4 * float64(i+1),
|
||||||
|
Schema: 1,
|
||||||
|
PositiveSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
|
||||||
|
NegativeSpans: []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
},
|
||||||
|
NegativeBuckets: []int64{int64(i + 1), 1, -1, 0},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
|
func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
r = append(r, &histogram.FloatHistogram{
|
r = append(r, &histogram.FloatHistogram{
|
||||||
|
@ -2072,7 +2098,7 @@ func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func GenerateTestGaugeHistograms(n int) (r []*histogram.FloatHistogram) {
|
func GenerateTestGaugeFloatHistograms(n int) (r []*histogram.FloatHistogram) {
|
||||||
for x := 0; x < n; x++ {
|
for x := 0; x < n; x++ {
|
||||||
i := rand.Intn(n)
|
i := rand.Intn(n)
|
||||||
r = append(r, &histogram.FloatHistogram{
|
r = append(r, &histogram.FloatHistogram{
|
||||||
|
|
|
@ -1148,18 +1148,29 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
||||||
app, _ := s.app.(*chunkenc.HistogramAppender)
|
app, _ := s.app.(*chunkenc.HistogramAppender)
|
||||||
var (
|
var (
|
||||||
positiveInterjections, negativeInterjections []chunkenc.Interjection
|
positiveInterjections, negativeInterjections []chunkenc.Interjection
|
||||||
|
pBackwardInter, nBackwardInter []chunkenc.Interjection
|
||||||
|
pMergedSpans, nMergedSpans []histogram.Span
|
||||||
okToAppend, counterReset bool
|
okToAppend, counterReset bool
|
||||||
)
|
)
|
||||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
|
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
|
||||||
if !sampleInOrder {
|
if !sampleInOrder {
|
||||||
return sampleInOrder, chunkCreated
|
return sampleInOrder, chunkCreated
|
||||||
}
|
}
|
||||||
|
gauge := h.CounterResetHint == histogram.GaugeType
|
||||||
if app != nil {
|
if app != nil {
|
||||||
|
if gauge {
|
||||||
|
positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter, pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(h)
|
||||||
|
} else {
|
||||||
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h)
|
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !chunkCreated {
|
if !chunkCreated {
|
||||||
|
if len(pBackwardInter)+len(nBackwardInter) > 0 {
|
||||||
|
h.PositiveSpans = pMergedSpans
|
||||||
|
h.NegativeSpans = nMergedSpans
|
||||||
|
app.RecodeHistogramm(h, pBackwardInter, nBackwardInter)
|
||||||
|
}
|
||||||
// We have 3 cases here
|
// We have 3 cases here
|
||||||
// - !okToAppend -> We need to cut a new chunk.
|
// - !okToAppend -> We need to cut a new chunk.
|
||||||
// - okToAppend but we have interjections → Existing chunk needs
|
// - okToAppend but we have interjections → Existing chunk needs
|
||||||
|
@ -1184,9 +1195,12 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
hc := s.headChunk.chunk.(*chunkenc.HistogramChunk)
|
hc := s.headChunk.chunk.(*chunkenc.HistogramChunk)
|
||||||
header := chunkenc.UnknownCounterReset
|
header := chunkenc.UnknownCounterReset
|
||||||
if counterReset {
|
switch {
|
||||||
|
case gauge:
|
||||||
|
header = chunkenc.GaugeType
|
||||||
|
case counterReset:
|
||||||
header = chunkenc.CounterReset
|
header = chunkenc.CounterReset
|
||||||
} else if okToAppend {
|
case okToAppend:
|
||||||
header = chunkenc.NotCounterReset
|
header = chunkenc.NotCounterReset
|
||||||
}
|
}
|
||||||
hc.SetCounterResetHeader(header)
|
hc.SetCounterResetHeader(header)
|
||||||
|
@ -1265,11 +1279,12 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk)
|
hc := s.headChunk.chunk.(*chunkenc.FloatHistogramChunk)
|
||||||
header := chunkenc.UnknownCounterReset
|
header := chunkenc.UnknownCounterReset
|
||||||
if gauge {
|
switch {
|
||||||
|
case gauge:
|
||||||
header = chunkenc.GaugeType
|
header = chunkenc.GaugeType
|
||||||
} else if counterReset {
|
case counterReset:
|
||||||
header = chunkenc.CounterReset
|
header = chunkenc.CounterReset
|
||||||
} else if okToAppend {
|
case okToAppend:
|
||||||
header = chunkenc.NotCounterReset
|
header = chunkenc.NotCounterReset
|
||||||
}
|
}
|
||||||
hc.SetCounterResetHeader(header)
|
hc.SetCounterResetHeader(header)
|
||||||
|
|
|
@ -2834,12 +2834,13 @@ func TestAppendHistogram(t *testing.T) {
|
||||||
ingestTs := int64(0)
|
ingestTs := int64(0)
|
||||||
app := head.Appender(context.Background())
|
app := head.Appender(context.Background())
|
||||||
|
|
||||||
// Integer histograms.
|
|
||||||
type timedHistogram struct {
|
type timedHistogram struct {
|
||||||
t int64
|
t int64
|
||||||
h *histogram.Histogram
|
h *histogram.Histogram
|
||||||
}
|
}
|
||||||
expHistograms := make([]timedHistogram, 0, numHistograms)
|
expHistograms := make([]timedHistogram, 0, 2*numHistograms)
|
||||||
|
|
||||||
|
// Counter integer histograms.
|
||||||
for _, h := range GenerateTestHistograms(numHistograms) {
|
for _, h := range GenerateTestHistograms(numHistograms) {
|
||||||
_, err := app.AppendHistogram(0, l, ingestTs, h, nil)
|
_, err := app.AppendHistogram(0, l, ingestTs, h, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -2851,12 +2852,25 @@ func TestAppendHistogram(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Gauge integer histograms.
|
||||||
|
for _, h := range GenerateTestGaugeHistograms(numHistograms) {
|
||||||
|
_, err := app.AppendHistogram(0, l, ingestTs, h, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expHistograms = append(expHistograms, timedHistogram{ingestTs, h})
|
||||||
|
ingestTs++
|
||||||
|
if ingestTs%50 == 0 {
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
app = head.Appender(context.Background())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type timedFloatHistogram struct {
|
type timedFloatHistogram struct {
|
||||||
t int64
|
t int64
|
||||||
h *histogram.FloatHistogram
|
h *histogram.FloatHistogram
|
||||||
}
|
}
|
||||||
// Float counter histograms.
|
expFloatHistograms := make([]timedFloatHistogram, 0, 2*numHistograms)
|
||||||
expFloatHistograms := make([]timedFloatHistogram, 0, numHistograms)
|
|
||||||
|
// Counter float histograms.
|
||||||
for _, fh := range GenerateTestFloatHistograms(numHistograms) {
|
for _, fh := range GenerateTestFloatHistograms(numHistograms) {
|
||||||
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
|
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -2868,8 +2882,8 @@ func TestAppendHistogram(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Float gauge histograms.
|
// Gauge float histograms.
|
||||||
for _, fh := range GenerateTestGaugeHistograms(numHistograms) {
|
for _, fh := range GenerateTestGaugeFloatHistograms(numHistograms) {
|
||||||
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
|
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh})
|
expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh})
|
||||||
|
@ -2879,6 +2893,7 @@ func TestAppendHistogram(t *testing.T) {
|
||||||
app = head.Appender(context.Background())
|
app = head.Appender(context.Background())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
|
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
|
||||||
|
@ -2913,7 +2928,7 @@ func TestAppendHistogram(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
||||||
head, _ := newTestHead(t, 2000, false, false)
|
head, _ := newTestHead(t, 3000, false, false)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, head.Close())
|
require.NoError(t, head.Close())
|
||||||
})
|
})
|
||||||
|
@ -2924,9 +2939,17 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
||||||
k1 := s1.String()
|
k1 := s1.String()
|
||||||
numHistograms := 300
|
numHistograms := 300
|
||||||
exp := map[string][]tsdbutil.Sample{}
|
exp := map[string][]tsdbutil.Sample{}
|
||||||
app := head.Appender(context.Background())
|
|
||||||
ts := int64(0)
|
ts := int64(0)
|
||||||
for _, h := range GenerateTestHistograms(numHistograms) {
|
var app storage.Appender
|
||||||
|
for _, gauge := range []bool{true, false} {
|
||||||
|
app = head.Appender(context.Background())
|
||||||
|
var hists []*histogram.Histogram
|
||||||
|
if gauge {
|
||||||
|
hists = GenerateTestGaugeHistograms(numHistograms)
|
||||||
|
} else {
|
||||||
|
hists = GenerateTestHistograms(numHistograms)
|
||||||
|
}
|
||||||
|
for _, h := range hists {
|
||||||
h.Count = h.Count * 2
|
h.Count = h.Count * 2
|
||||||
h.NegativeSpans = h.PositiveSpans
|
h.NegativeSpans = h.PositiveSpans
|
||||||
h.NegativeBuckets = h.PositiveBuckets
|
h.NegativeBuckets = h.PositiveBuckets
|
||||||
|
@ -2940,11 +2963,12 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
for _, gauge := range []bool{true, false} {
|
for _, gauge := range []bool{true, false} {
|
||||||
app = head.Appender(context.Background())
|
app = head.Appender(context.Background())
|
||||||
var hists []*histogram.FloatHistogram
|
var hists []*histogram.FloatHistogram
|
||||||
if gauge {
|
if gauge {
|
||||||
hists = GenerateTestGaugeHistograms(numHistograms)
|
hists = GenerateTestGaugeFloatHistograms(numHistograms)
|
||||||
} else {
|
} else {
|
||||||
hists = GenerateTestFloatHistograms(numHistograms)
|
hists = GenerateTestFloatHistograms(numHistograms)
|
||||||
}
|
}
|
||||||
|
@ -2964,10 +2988,10 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
}
|
}
|
||||||
|
|
||||||
// There should be 7 mmap chunks in s1.
|
// There should be 11 mmap chunks in s1.
|
||||||
ms := head.series.getByHash(s1.Hash(), s1)
|
ms := head.series.getByHash(s1.Hash(), s1)
|
||||||
require.Len(t, ms.mmappedChunks, 8)
|
require.Len(t, ms.mmappedChunks, 11)
|
||||||
expMmapChunks := make([]*mmappedChunk, 0, 8)
|
expMmapChunks := make([]*mmappedChunk, 0, 11)
|
||||||
for _, mmap := range ms.mmappedChunks {
|
for _, mmap := range ms.mmappedChunks {
|
||||||
require.Greater(t, mmap.numSamples, uint16(0))
|
require.Greater(t, mmap.numSamples, uint16(0))
|
||||||
cpy := *mmap
|
cpy := *mmap
|
||||||
|
@ -2979,9 +3003,16 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
||||||
// Series with mix of histograms and float.
|
// Series with mix of histograms and float.
|
||||||
s2 := labels.FromStrings("a", "b2")
|
s2 := labels.FromStrings("a", "b2")
|
||||||
k2 := s2.String()
|
k2 := s2.String()
|
||||||
app = head.Appender(context.Background())
|
|
||||||
ts = 0
|
ts = 0
|
||||||
for _, h := range GenerateTestHistograms(100) {
|
for _, gauge := range []bool{true, false} {
|
||||||
|
app = head.Appender(context.Background())
|
||||||
|
var hists []*histogram.Histogram
|
||||||
|
if gauge {
|
||||||
|
hists = GenerateTestGaugeHistograms(100)
|
||||||
|
} else {
|
||||||
|
hists = GenerateTestHistograms(100)
|
||||||
|
}
|
||||||
|
for _, h := range hists {
|
||||||
ts++
|
ts++
|
||||||
h.Count = h.Count * 2
|
h.Count = h.Count * 2
|
||||||
h.NegativeSpans = h.PositiveSpans
|
h.NegativeSpans = h.PositiveSpans
|
||||||
|
@ -3004,11 +3035,12 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
for _, gauge := range []bool{true, false} {
|
for _, gauge := range []bool{true, false} {
|
||||||
app = head.Appender(context.Background())
|
app = head.Appender(context.Background())
|
||||||
var hists []*histogram.FloatHistogram
|
var hists []*histogram.FloatHistogram
|
||||||
if gauge {
|
if gauge {
|
||||||
hists = GenerateTestGaugeHistograms(100)
|
hists = GenerateTestGaugeFloatHistograms(100)
|
||||||
} else {
|
} else {
|
||||||
hists = GenerateTestFloatHistograms(100)
|
hists = GenerateTestFloatHistograms(100)
|
||||||
}
|
}
|
||||||
|
@ -4571,6 +4603,81 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, head.Init(0))
|
require.NoError(t, head.Init(0))
|
||||||
|
|
||||||
|
ts := int64(0)
|
||||||
|
appendHistogram := func(h *histogram.Histogram) {
|
||||||
|
ts++
|
||||||
|
app := head.Appender(context.Background())
|
||||||
|
_, err := app.AppendHistogram(0, l, ts, h.Copy(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
}
|
||||||
|
|
||||||
|
hists := GenerateTestGaugeHistograms(5)
|
||||||
|
hists[0].CounterResetHint = histogram.UnknownCounterReset
|
||||||
|
appendHistogram(hists[0])
|
||||||
|
appendHistogram(hists[1])
|
||||||
|
appendHistogram(hists[2])
|
||||||
|
hists[3].CounterResetHint = histogram.UnknownCounterReset
|
||||||
|
appendHistogram(hists[3])
|
||||||
|
appendHistogram(hists[3])
|
||||||
|
appendHistogram(hists[4])
|
||||||
|
|
||||||
|
checkHeaders := func() {
|
||||||
|
ms, _, err := head.getOrCreate(l.Hash(), l)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, ms.mmappedChunks, 3)
|
||||||
|
expHeaders := []chunkenc.CounterResetHeader{
|
||||||
|
chunkenc.UnknownCounterReset,
|
||||||
|
chunkenc.GaugeType,
|
||||||
|
chunkenc.UnknownCounterReset,
|
||||||
|
chunkenc.GaugeType,
|
||||||
|
}
|
||||||
|
for i, mmapChunk := range ms.mmappedChunks {
|
||||||
|
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
|
||||||
|
}
|
||||||
|
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
|
||||||
|
}
|
||||||
|
checkHeaders()
|
||||||
|
|
||||||
|
recs := readTestWAL(t, head.wal.Dir())
|
||||||
|
require.Equal(t, []interface{}{
|
||||||
|
[]record.RefSeries{
|
||||||
|
{
|
||||||
|
Ref: 1,
|
||||||
|
Labels: labels.FromStrings("a", "b"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
[]record.RefHistogramSample{{Ref: 1, T: 1, H: hists[0]}},
|
||||||
|
[]record.RefHistogramSample{{Ref: 1, T: 2, H: hists[1]}},
|
||||||
|
[]record.RefHistogramSample{{Ref: 1, T: 3, H: hists[2]}},
|
||||||
|
[]record.RefHistogramSample{{Ref: 1, T: 4, H: hists[3]}},
|
||||||
|
[]record.RefHistogramSample{{Ref: 1, T: 5, H: hists[3]}},
|
||||||
|
[]record.RefHistogramSample{{Ref: 1, T: 6, H: hists[4]}},
|
||||||
|
}, recs)
|
||||||
|
|
||||||
|
// Restart Head without mmap chunks to expect the WAL replay to recognize gauge histograms.
|
||||||
|
require.NoError(t, head.Close())
|
||||||
|
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot)))
|
||||||
|
|
||||||
|
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, head.Init(0))
|
||||||
|
|
||||||
|
checkHeaders()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
|
||||||
|
l := labels.FromStrings("a", "b")
|
||||||
|
head, _ := newTestHead(t, 1000, false, false)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, head.Close())
|
||||||
|
})
|
||||||
|
require.NoError(t, head.Init(0))
|
||||||
|
|
||||||
ts := int64(0)
|
ts := int64(0)
|
||||||
appendHistogram := func(h *histogram.FloatHistogram) {
|
appendHistogram := func(h *histogram.FloatHistogram) {
|
||||||
ts++
|
ts++
|
||||||
|
@ -4580,7 +4687,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
}
|
}
|
||||||
|
|
||||||
hists := GenerateTestGaugeHistograms(5)
|
hists := GenerateTestGaugeFloatHistograms(5)
|
||||||
hists[0].CounterResetHint = histogram.UnknownCounterReset
|
hists[0].CounterResetHint = histogram.UnknownCounterReset
|
||||||
appendHistogram(hists[0])
|
appendHistogram(hists[0])
|
||||||
appendHistogram(hists[1])
|
appendHistogram(hists[1])
|
||||||
|
|
Loading…
Reference in a new issue