From f5fcaa3872ce03808567fabc56afc9cf61c732cb Mon Sep 17 00:00:00 2001 From: George Krajcsovits Date: Fri, 5 May 2023 14:34:30 +0200 Subject: [PATCH] Fix setting reset header to gauge histogram in seriesToChunkEncoder (#12329) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: György Krajcsovits --- storage/series.go | 10 +++ storage/series_test.go | 177 +++++++++++++++++++++++++++++++---------- 2 files changed, 147 insertions(+), 40 deletions(-) diff --git a/storage/series.go b/storage/series.go index 5daa6255d..b73f1e35c 100644 --- a/storage/series.go +++ b/storage/series.go @@ -297,9 +297,11 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { seriesIter := s.Series.Iterator(nil) lastType := chunkenc.ValNone for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { + chunkCreated := false if typ != lastType || i >= seriesToChunkEncoderSplit { // Create a new chunk if the sample type changed or too many samples in the current one. chks = appendChunk(chks, mint, maxt, chk) + chunkCreated = true chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding()) if err != nil { return errChunksIterator{err: err} @@ -330,6 +332,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { if ok, counterReset := app.AppendHistogram(t, h); !ok { chks = appendChunk(chks, mint, maxt, chk) histChunk := chunkenc.NewHistogramChunk() + chunkCreated = true if counterReset { histChunk.SetCounterResetHeader(chunkenc.CounterReset) } @@ -346,11 +349,15 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { panic("unexpected error while appending histogram") } } + if chunkCreated && h.CounterResetHint == histogram.GaugeType { + chk.(*chunkenc.HistogramChunk).SetCounterResetHeader(chunkenc.GaugeType) + } case chunkenc.ValFloatHistogram: t, fh = seriesIter.AtFloatHistogram() if ok, counterReset := app.AppendFloatHistogram(t, fh); !ok { chks = appendChunk(chks, mint, maxt, chk) floatHistChunk := chunkenc.NewFloatHistogramChunk() + chunkCreated = true if counterReset { floatHistChunk.SetCounterResetHeader(chunkenc.CounterReset) } @@ -366,6 +373,9 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { panic("unexpected error while float appending histogram") } } + if chunkCreated && fh.CounterResetHint == histogram.GaugeType { + chk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(chunkenc.GaugeType) + } default: return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())} } diff --git a/storage/series_test.go b/storage/series_test.go index 4c318f1a0..5c74fae09 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -126,14 +126,13 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) { } type histogramTest struct { - samples []tsdbutil.Sample - expectedChunks int - expectedCounterReset bool + samples []tsdbutil.Sample + expectedCounterResetHeaders []chunkenc.CounterResetHeader } func TestHistogramSeriesToChunks(t *testing.T) { h1 := &histogram.Histogram{ - Count: 3, + Count: 7, ZeroCount: 2, ZeroThreshold: 0.001, Sum: 100, @@ -158,7 +157,7 @@ func TestHistogramSeriesToChunks(t *testing.T) { } // Implicit counter reset by reduction in buckets, not appendable. h2down := &histogram.Histogram{ - Count: 8, + Count: 10, ZeroCount: 2, ZeroThreshold: 0.001, Sum: 100, @@ -171,7 +170,7 @@ func TestHistogramSeriesToChunks(t *testing.T) { } fh1 := &histogram.FloatHistogram{ - Count: 4, + Count: 6, ZeroCount: 2, ZeroThreshold: 0.001, Sum: 100, @@ -183,7 +182,7 @@ func TestHistogramSeriesToChunks(t *testing.T) { } // Appendable to fh1. fh2 := &histogram.FloatHistogram{ - Count: 15, + Count: 17, ZeroCount: 2, ZeroThreshold: 0.001, Sum: 100, @@ -196,7 +195,7 @@ func TestHistogramSeriesToChunks(t *testing.T) { } // Implicit counter reset by reduction in buckets, not appendable. fh2down := &histogram.FloatHistogram{ - Count: 13, + Count: 15, ZeroCount: 2, ZeroThreshold: 0.001, Sum: 100, @@ -208,6 +207,60 @@ func TestHistogramSeriesToChunks(t *testing.T) { PositiveBuckets: []float64{2, 2, 7, 2}, } + // Gauge histogram. + gh1 := &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Count: 7, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + }, + PositiveBuckets: []int64{2, 1}, // Abs: 2, 3 + } + gh2 := &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Count: 12, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{2, 1, -2, 3}, // Abs: 2, 3, 1, 4 + } + + // Float gauge histogram. + gfh1 := &histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Count: 6, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + }, + PositiveBuckets: []float64{3, 1}, + } + gfh2 := &histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Count: 17, + ZeroCount: 2, + ZeroThreshold: 0.001, + Sum: 100, + Schema: 0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{4, 2, 7, 2}, + } + staleHistogram := &histogram.Histogram{ Sum: math.Float64frombits(value.StaleNaN), } @@ -220,74 +273,70 @@ func TestHistogramSeriesToChunks(t *testing.T) { samples: []tsdbutil.Sample{ hSample{t: 1, h: h1}, }, - expectedChunks: 1, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset}, }, "two histograms encoded to a single chunk": { samples: []tsdbutil.Sample{ hSample{t: 1, h: h1}, hSample{t: 2, h: h2}, }, - expectedChunks: 1, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset}, }, "two histograms encoded to two chunks": { samples: []tsdbutil.Sample{ hSample{t: 1, h: h2}, hSample{t: 2, h: h1}, }, - expectedChunks: 2, - expectedCounterReset: true, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, }, "histogram and stale sample encoded to two chunks": { samples: []tsdbutil.Sample{ hSample{t: 1, h: staleHistogram}, hSample{t: 2, h: h1}, }, - expectedChunks: 2, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, }, "histogram and reduction in bucket encoded to two chunks": { samples: []tsdbutil.Sample{ hSample{t: 1, h: h1}, hSample{t: 2, h: h2down}, }, - expectedChunks: 2, - expectedCounterReset: true, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, }, // Float histograms. "single float histogram to single chunk": { samples: []tsdbutil.Sample{ fhSample{t: 1, fh: fh1}, }, - expectedChunks: 1, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset}, }, "two float histograms encoded to a single chunk": { samples: []tsdbutil.Sample{ fhSample{t: 1, fh: fh1}, fhSample{t: 2, fh: fh2}, }, - expectedChunks: 1, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset}, }, "two float histograms encoded to two chunks": { samples: []tsdbutil.Sample{ fhSample{t: 1, fh: fh2}, fhSample{t: 2, fh: fh1}, }, - expectedChunks: 2, - expectedCounterReset: true, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, }, "float histogram and stale sample encoded to two chunks": { samples: []tsdbutil.Sample{ fhSample{t: 1, fh: staleFloatHistogram}, fhSample{t: 2, fh: fh1}, }, - expectedChunks: 2, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, }, "float histogram and reduction in bucket encoded to two chunks": { samples: []tsdbutil.Sample{ fhSample{t: 1, fh: fh1}, fhSample{t: 2, fh: fh2down}, }, - expectedChunks: 2, - expectedCounterReset: true, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset}, }, // Mixed. "histogram and float histogram encoded to two chunks": { @@ -295,21 +344,61 @@ func TestHistogramSeriesToChunks(t *testing.T) { hSample{t: 1, h: h1}, fhSample{t: 2, fh: fh2}, }, - expectedChunks: 2, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, }, "float histogram and histogram encoded to two chunks": { samples: []tsdbutil.Sample{ fhSample{t: 1, fh: fh1}, hSample{t: 2, h: h2}, }, - expectedChunks: 2, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, }, "histogram and stale float histogram encoded to two chunks": { samples: []tsdbutil.Sample{ hSample{t: 1, h: h1}, fhSample{t: 2, fh: staleFloatHistogram}, }, - expectedChunks: 2, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, + }, + "single gauge histogram encoded to one chunk": { + samples: []tsdbutil.Sample{ + hSample{t: 1, h: gh1}, + }, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, + }, + "two gauge histograms encoded to one chunk when counter increases": { + samples: []tsdbutil.Sample{ + hSample{t: 1, h: gh1}, + hSample{t: 2, h: gh2}, + }, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, + }, + "two gauge histograms encoded to one chunk when counter decreases": { + samples: []tsdbutil.Sample{ + hSample{t: 1, h: gh2}, + hSample{t: 2, h: gh1}, + }, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, + }, + "single gauge float histogram encoded to one chunk": { + samples: []tsdbutil.Sample{ + fhSample{t: 1, fh: gfh1}, + }, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, + }, + "two float gauge histograms encoded to one chunk when counter increases": { + samples: []tsdbutil.Sample{ + fhSample{t: 1, fh: gfh1}, + fhSample{t: 2, fh: gfh2}, + }, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, + }, + "two float gauge histograms encoded to one chunk when counter decreases": { + samples: []tsdbutil.Sample{ + fhSample{t: 1, fh: gfh2}, + fhSample{t: 2, fh: gfh1}, + }, + expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType}, }, } @@ -322,13 +411,24 @@ func TestHistogramSeriesToChunks(t *testing.T) { func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { lbs := labels.FromStrings("__name__", "up", "instance", "localhost:8080") - series := NewListSeries(lbs, test.samples) + copiedSamples := []tsdbutil.Sample{} + for _, s := range test.samples { + switch cs := s.(type) { + case hSample: + copiedSamples = append(copiedSamples, hSample{t: cs.t, h: cs.h.Copy()}) + case fhSample: + copiedSamples = append(copiedSamples, fhSample{t: cs.t, fh: cs.fh.Copy()}) + default: + t.Error("internal error, unexpected type") + } + } + series := NewListSeries(lbs, copiedSamples) encoder := NewSeriesToChunkEncoder(series) require.EqualValues(t, lbs, encoder.Labels()) chks, err := ExpandChunks(encoder.Iterator(nil)) require.NoError(t, err) - require.Equal(t, test.expectedChunks, len(chks)) + require.Equal(t, len(test.expectedCounterResetHeaders), len(chks)) // Decode all encoded samples and assert they are equal to the original ones. encodedSamples := expandHistogramSamples(chks) @@ -339,8 +439,10 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { case hSample: encodedSample, ok := encodedSamples[i].(hSample) require.True(t, ok, "expect histogram", fmt.Sprintf("at idx %d", i)) - // Ignore counter reset here, will check on chunk level. - encodedSample.h.CounterResetHint = histogram.UnknownCounterReset + // Ignore counter reset if not gauge here, will check on chunk level. + if expectedSample.h.CounterResetHint != histogram.GaugeType { + encodedSample.h.CounterResetHint = histogram.UnknownCounterReset + } if value.IsStaleNaN(expectedSample.h.Sum) { require.True(t, value.IsStaleNaN(encodedSample.h.Sum), fmt.Sprintf("at idx %d", i)) continue @@ -349,8 +451,10 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { case fhSample: encodedSample, ok := encodedSamples[i].(fhSample) require.True(t, ok, "expect float histogram", fmt.Sprintf("at idx %d", i)) - // Ignore counter reset here, will check on chunk level. - encodedSample.fh.CounterResetHint = histogram.UnknownCounterReset + // Ignore counter reset if not gauge here, will check on chunk level. + if expectedSample.fh.CounterResetHint != histogram.GaugeType { + encodedSample.fh.CounterResetHint = histogram.UnknownCounterReset + } if value.IsStaleNaN(expectedSample.fh.Sum) { require.True(t, value.IsStaleNaN(encodedSample.fh.Sum), fmt.Sprintf("at idx %d", i)) continue @@ -361,15 +465,8 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { } } - // If a counter reset hint is expected, it can only be found in the second chunk. - // Otherwise, we assert an unknown counter reset hint in all chunks. - if test.expectedCounterReset { - require.Equal(t, chunkenc.UnknownCounterReset, getCounterResetHint(chks[0])) - require.Equal(t, chunkenc.CounterReset, getCounterResetHint(chks[1])) - } else { - for _, chk := range chks { - require.Equal(t, chunkenc.UnknownCounterReset, getCounterResetHint(chk)) - } + for i, expectedCounterResetHint := range test.expectedCounterResetHeaders { + require.Equal(t, expectedCounterResetHint, getCounterResetHint(chks[i]), fmt.Sprintf("chunk at index %d", i)) } }