From ec9170b8bf6dd8595c42b57d211f3567aa3bd0c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Rabenstein?= Date: Wed, 20 Sep 2023 18:01:21 +0200 Subject: [PATCH] Merge pull request #12874 from krajorama/outof-order-chunks Fix duplicate sample detection at chunk size limit Signed-off-by: SuperQ --- tsdb/head_append.go | 9 ++++++--- tsdb/head_test.go | 46 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 9016943756..d1f4d3035e 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1282,9 +1282,6 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts // There is no head chunk in this series yet, create the first chunk for the sample. c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true - } else if len(c.chunk.Bytes()) > maxBytesPerXORChunk { - c = s.cutNewHeadChunk(t, e, o.chunkRange) - chunkCreated = true } // Out of order sample. @@ -1292,6 +1289,12 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts return c, false, chunkCreated } + // Check the chunk size, unless we just created it and if the chunk is too large, cut a new one. + if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk { + c = s.cutNewHeadChunk(t, e, o.chunkRange) + chunkCreated = true + } + if c.chunk.Encoding() != e { // The chunk encoding expected by this append is different than the head chunk's // encoding. So we cut a new chunk with the expected encoding. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index b501f5f3f3..cd9ab65108 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5399,3 +5399,49 @@ func TestCuttingNewHeadChunks(t *testing.T) { }) } } + +// TestHeadDetectsDuplcateSampleAtSizeLimit tests a regression where a duplicate sample +// is appended to the head, right when the head chunk is at the size limit. +// The test adds all samples as duplicate, thus expecting that the result has +// exactly half of the samples. +func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) { + numSamples := 1000 + baseTS := int64(1695209650) + + h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + defer func() { + require.NoError(t, h.Close()) + }() + + a := h.Appender(context.Background()) + var err error + vals := []float64{math.MaxFloat64, 0x00} // Use the worst case scenario for the XOR encoding. Otherwise we hit the sample limit before the size limit. + for i := 0; i < numSamples; i++ { + ts := baseTS + int64(i/2)*10000 + a.Append(0, labels.FromStrings("foo", "bar"), ts, vals[(i/2)%len(vals)]) + err = a.Commit() + require.NoError(t, err) + a = h.Appender(context.Background()) + } + + indexReader, err := h.Index() + require.NoError(t, err) + + var ( + chunks []chunks.Meta + builder labels.ScratchBuilder + ) + require.NoError(t, indexReader.Series(1, &builder, &chunks)) + + chunkReader, err := h.Chunks() + require.NoError(t, err) + + storedSampleCount := 0 + for _, chunkMeta := range chunks { + chunk, err := chunkReader.Chunk(chunkMeta) + require.NoError(t, err) + storedSampleCount += chunk.NumSamples() + } + + require.Equal(t, numSamples/2, storedSampleCount) +}