From 56b3a015b6864af3d9b4063f0807ddbc06fa46fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Wed, 20 Sep 2023 14:32:20 +0200 Subject: [PATCH 1/3] Add regression test for duplicate detection at chunk size limit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TestHeadDetectsDuplcateSampleAtSizeLimit tests a regression where a duplicate sample,is appended to the head, right when the head chunk is at the size limit. The test adds all samples as duplicate, thus expecting that the result has exactly half of the samples. Signed-off-by: György Krajcsovits --- tsdb/head_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2aa97cd440..3d271e93bb 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5499,3 +5499,49 @@ func TestCuttingNewHeadChunks(t *testing.T) { }) } } + +// TestHeadDetectsDuplcateSampleAtSizeLimit tests a regression where a duplicate sample +// is appended to the head, right when the head chunk is at the size limit. +// The test adds all samples as duplicate, thus expecting that the result has +// exactly half of the samples. +func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) { + numSamples := 1000 + baseTS := int64(1695209650) + + h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + defer func() { + require.NoError(t, h.Close()) + }() + + a := h.Appender(context.Background()) + var err error + vals := []float64{math.MaxFloat64, 0x00} // Use the worst case scenario for the XOR encoding. Otherwise we hit the sample limit before the size limit. + for i := 0; i < numSamples; i++ { + ts := baseTS + int64(i/2)*10000 + a.Append(0, labels.FromStrings("foo", "bar"), ts, vals[(i/2)%len(vals)]) + err = a.Commit() + require.NoError(t, err) + a = h.Appender(context.Background()) + } + + indexReader, err := h.Index() + require.NoError(t, err) + + var ( + chunks []chunks.Meta + builder labels.ScratchBuilder + ) + require.NoError(t, indexReader.Series(1, &builder, &chunks)) + + chunkReader, err := h.Chunks() + require.NoError(t, err) + + storedSampleCount := 0 + for _, chunkMeta := range chunks { + chunk, err := chunkReader.Chunk(chunkMeta) + require.NoError(t, err) + storedSampleCount += chunk.NumSamples() + } + + require.Equal(t, numSamples/2, storedSampleCount) +} From 96d03b6f46d98fb9b586026bd973aec603954853 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Wed, 20 Sep 2023 14:49:56 +0200 Subject: [PATCH 2/3] Fix duplicate sample detection at chunks size limit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before cutting a new XOR chunk in case the chunk goes over the size limit, check that the timestamp is in order and not equal or older than the latest sample in the old chunk. Signed-off-by: György Krajcsovits --- tsdb/head_append.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 9016943756..e271ff6c5e 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1283,6 +1283,9 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } else if len(c.chunk.Bytes()) > maxBytesPerXORChunk { + if c.maxTime >= t { + return c, false, false + } c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } From 9dbd100a5e8d6b8da8858147f8b92675e5876a27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Wed, 20 Sep 2023 15:54:00 +0200 Subject: [PATCH 3/3] Refactor solution to not repeat code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: György Krajcsovits --- tsdb/head_append.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index e271ff6c5e..d1f4d3035e 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1282,12 +1282,6 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts // There is no head chunk in this series yet, create the first chunk for the sample. c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true - } else if len(c.chunk.Bytes()) > maxBytesPerXORChunk { - if c.maxTime >= t { - return c, false, false - } - c = s.cutNewHeadChunk(t, e, o.chunkRange) - chunkCreated = true } // Out of order sample. @@ -1295,6 +1289,12 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts return c, false, chunkCreated } + // Check the chunk size, unless we just created it and if the chunk is too large, cut a new one. + if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk { + c = s.cutNewHeadChunk(t, e, o.chunkRange) + chunkCreated = true + } + if c.chunk.Encoding() != e { // The chunk encoding expected by this append is different than the head chunk's // encoding. So we cut a new chunk with the expected encoding.