diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 95d4035071..97a2155f5b 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -144,9 +144,9 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error if newChunk != nil { // A new chunk was allocated. if !recoded { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + cmint = s.t } chunk = newChunk - cmint = s.t } case chunkenc.EncFloatHistogram: // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. @@ -159,9 +159,9 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error if newChunk != nil { // A new chunk was allocated. if !recoded { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + cmint = s.t } chunk = newChunk - cmint = s.t } } cmaxt = s.t diff --git a/tsdb/ooo_head_test.go b/tsdb/ooo_head_test.go index 44d093094d..b03a462bd8 100644 --- a/tsdb/ooo_head_test.go +++ b/tsdb/ooo_head_test.go @@ -14,8 +14,12 @@ package tsdb import ( + "math" "testing" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/require" @@ -145,3 +149,100 @@ func testOOOInsertDuplicate(t *testing.T, } } } + +type chunkVerify struct { + encoding chunkenc.Encoding + minTime int64 + maxTime int64 +} + +func TestOOOChunks_ToEncodedChunks(t *testing.T) { + h1 := tsdbutil.GenerateTestHistogram(1) + // Make h2 appendible but with more buckets, to trigger recoding. + h2 := h1.Copy() + h2.PositiveSpans = append(h2.PositiveSpans, histogram.Span{Offset: 1, Length: 1}) + h2.PositiveBuckets = append(h2.PositiveBuckets, 12) + + testCases := map[string]struct { + samples []sample + expectedCounterResets []histogram.CounterResetHint + expectedChunks []chunkVerify + }{ + "empty": { + samples: []sample{}, + }, + "has a recoded histogram": { // Regression test for wrong minT, maxT in histogram recoding. + samples: []sample{ + {t: 0, h: h1}, + {t: 1, h: h2}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset}, + expectedChunks: []chunkVerify{ + {encoding: chunkenc.EncHistogram, minTime: 0, maxTime: 1}, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + // Sanity check. + require.Equal(t, len(tc.samples), len(tc.expectedCounterResets), "number of samples and counter resets") + + oooChunk := OOOChunk{} + for _, s := range tc.samples { + switch s.Type() { + case chunkenc.ValFloat: + oooChunk.Insert(s.t, s.f, nil, nil) + case chunkenc.ValHistogram: + oooChunk.Insert(s.t, 0, s.h.Copy(), nil) + case chunkenc.ValFloatHistogram: + oooChunk.Insert(s.t, 0, nil, s.fh.Copy()) + default: + t.Fatalf("unexpected sample type %d", s.Type()) + } + } + + chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + require.NoError(t, err) + require.Equal(t, len(tc.expectedChunks), len(chunks), "number of chunks") + sampleIndex := 0 + for i, c := range chunks { + require.Equal(t, tc.expectedChunks[i].encoding, c.chunk.Encoding(), "chunk %d encoding", i) + require.Equal(t, tc.expectedChunks[i].minTime, c.minTime, "chunk %d minTime", i) + require.Equal(t, tc.expectedChunks[i].maxTime, c.maxTime, "chunk %d maxTime", i) + samples, err := storage.ExpandSamples(c.chunk.Iterator(nil), newSample) + require.GreaterOrEqual(t, len(tc.samples)-sampleIndex, len(samples), "too many samples in chunk %d expected less than %d", i, len(tc.samples)-sampleIndex) + require.NoError(t, err) + if len(samples) == 0 { + // Ignore empty chunks. + continue + } + switch c.chunk.Encoding() { + case chunkenc.EncXOR: + for j, s := range samples { + require.Equal(t, chunkenc.ValFloat, s.Type()) + require.Equal(t, tc.samples[sampleIndex+j].f, s.F(), "sample %d", sampleIndex+j) + } + case chunkenc.EncHistogram: + for j, s := range samples { + require.Equal(t, chunkenc.ValHistogram, s.Type()) + require.Equal(t, tc.expectedCounterResets[sampleIndex+j], s.H().CounterResetHint, "sample reset hint %d", sampleIndex+j) + compareTo := tc.samples[sampleIndex+j].h.Copy() + compareTo.CounterResetHint = tc.expectedCounterResets[sampleIndex+j] + require.Equal(t, compareTo, s.H().Compact(0), "sample %d", sampleIndex+j) + } + case chunkenc.EncFloatHistogram: + for j, s := range samples { + require.Equal(t, chunkenc.ValFloatHistogram, s.Type()) + require.Equal(t, tc.expectedCounterResets[sampleIndex+j], s.FH().CounterResetHint, "sample reset hint %d", sampleIndex+j) + compareTo := tc.samples[sampleIndex+j].fh.Copy() + compareTo.CounterResetHint = tc.expectedCounterResets[sampleIndex+j] + require.Equal(t, compareTo, s.FH().Compact(0), "sample %d", sampleIndex+j) + } + } + sampleIndex += len(samples) + } + require.Equal(t, len(tc.samples), sampleIndex, "number of samples") + }) + } +}