Merge pull request #12930 from prometheus/superq/pick-12874

Release 2.47.1
This commit is contained in:
Bryan Boreham 2023-10-04 11:06:28 +01:00 committed by GitHub
commit c4d1a8beff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 4 deletions

View file

@ -1,5 +1,9 @@
# Changelog # Changelog
## 2.47.1 / 2023-10-04
* [BUGFIX] Fix duplicate sample detection at chunk size limit #12874
## 2.47.0 / 2023-09-06 ## 2.47.0 / 2023-09-06
This release adds an experimental OpenTelemetry (OTLP) Ingestion feature, This release adds an experimental OpenTelemetry (OTLP) Ingestion feature,

View file

@ -1 +1 @@
2.47.0 2.47.1

View file

@ -1282,9 +1282,6 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
// There is no head chunk in this series yet, create the first chunk for the sample. // There is no head chunk in this series yet, create the first chunk for the sample.
c = s.cutNewHeadChunk(t, e, o.chunkRange) c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true chunkCreated = true
} else if len(c.chunk.Bytes()) > maxBytesPerXORChunk {
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
} }
// Out of order sample. // Out of order sample.
@ -1292,6 +1289,12 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
return c, false, chunkCreated return c, false, chunkCreated
} }
// Check the chunk size, unless we just created it and if the chunk is too large, cut a new one.
if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk {
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
}
if c.chunk.Encoding() != e { if c.chunk.Encoding() != e {
// The chunk encoding expected by this append is different than the head chunk's // The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding. // encoding. So we cut a new chunk with the expected encoding.

View file

@ -5399,3 +5399,49 @@ func TestCuttingNewHeadChunks(t *testing.T) {
}) })
} }
} }
// TestHeadDetectsDuplcateSampleAtSizeLimit tests a regression where a duplicate sample
// is appended to the head, right when the head chunk is at the size limit.
// The test adds all samples as duplicate, thus expecting that the result has
// exactly half of the samples.
func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) {
numSamples := 1000
baseTS := int64(1695209650)
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
defer func() {
require.NoError(t, h.Close())
}()
a := h.Appender(context.Background())
var err error
vals := []float64{math.MaxFloat64, 0x00} // Use the worst case scenario for the XOR encoding. Otherwise we hit the sample limit before the size limit.
for i := 0; i < numSamples; i++ {
ts := baseTS + int64(i/2)*10000
a.Append(0, labels.FromStrings("foo", "bar"), ts, vals[(i/2)%len(vals)])
err = a.Commit()
require.NoError(t, err)
a = h.Appender(context.Background())
}
indexReader, err := h.Index()
require.NoError(t, err)
var (
chunks []chunks.Meta
builder labels.ScratchBuilder
)
require.NoError(t, indexReader.Series(1, &builder, &chunks))
chunkReader, err := h.Chunks()
require.NoError(t, err)
storedSampleCount := 0
for _, chunkMeta := range chunks {
chunk, err := chunkReader.Chunk(chunkMeta)
require.NoError(t, err)
storedSampleCount += chunk.NumSamples()
}
require.Equal(t, numSamples/2, storedSampleCount)
}