mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Ensure near-empty chunks end at correct boundary
We were determining a chunk's end time once it was one quarter full to compute it so all chunks have uniform number of samples. This accidentally skipped the case where series started near the end of a chunk range/block and never reached that threshold. As a result they got persisted but were continued across the range. This resulted in corrupted persisted data.
This commit is contained in:
parent
d109149d17
commit
82796db37b
18
head.go
18
head.go
|
@ -1162,6 +1162,10 @@ func (s *memSeries) cut(mint int64) *memChunk {
|
|||
}
|
||||
s.chunks = append(s.chunks, c)
|
||||
|
||||
// Set upper bound on when the next chunk must be started. An earlier timestamp
|
||||
// may be chosen dynamically at a later point.
|
||||
_, s.nextAt = rangeForTimestamp(mint, s.chunkRange)
|
||||
|
||||
app, err := c.chunk.Appender()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -1241,10 +1245,17 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
|
|||
}
|
||||
numSamples := c.chunk.NumSamples()
|
||||
|
||||
// Out of order sample.
|
||||
if c.maxTime >= t {
|
||||
return false, chunkCreated
|
||||
}
|
||||
if numSamples > samplesPerChunk/4 && t >= s.nextAt {
|
||||
// If we reach 25% of a chunk's desired sample count, set a definitive time
|
||||
// at which to start the next chunk.
|
||||
// At latest it must happen at the timestamp set when the chunk was cut.
|
||||
if numSamples == samplesPerChunk/4 {
|
||||
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
|
||||
}
|
||||
if t >= s.nextAt {
|
||||
c = s.cut(t)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
@ -1252,11 +1263,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
|
|||
|
||||
c.maxTime = t
|
||||
|
||||
if numSamples == samplesPerChunk/4 {
|
||||
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
|
||||
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
|
||||
}
|
||||
|
||||
s.lastValue = v
|
||||
|
||||
s.sampleBuf[0] = s.sampleBuf[1]
|
||||
|
|
40
head_test.go
40
head_test.go
|
@ -660,3 +660,43 @@ func TestComputeChunkEndTime(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemSeries_append(t *testing.T) {
|
||||
s := newMemSeries(labels.Labels{}, 1, 500)
|
||||
|
||||
// Add first two samples at the very end of a chunk range and the next two
|
||||
// on and after it.
|
||||
// New chunk must correctly be cut at 1000.
|
||||
ok, chunkCreated := s.append(998, 1)
|
||||
Assert(t, ok, "append failed")
|
||||
Assert(t, chunkCreated, "first sample created chunk")
|
||||
|
||||
ok, chunkCreated = s.append(999, 2)
|
||||
Assert(t, ok, "append failed")
|
||||
Assert(t, !chunkCreated, "second sample should use same chunk")
|
||||
|
||||
ok, chunkCreated = s.append(1000, 3)
|
||||
Assert(t, ok, "append failed")
|
||||
Assert(t, ok, "expected new chunk on boundary")
|
||||
|
||||
ok, chunkCreated = s.append(1001, 4)
|
||||
Assert(t, ok, "append failed")
|
||||
Assert(t, !chunkCreated, "second sample should use same chunk")
|
||||
|
||||
Assert(t, s.chunks[0].minTime == 998 && s.chunks[0].maxTime == 999, "wrong chunk range")
|
||||
Assert(t, s.chunks[1].minTime == 1000 && s.chunks[1].maxTime == 1001, "wrong chunk range")
|
||||
|
||||
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
|
||||
// at approximately 120 samples per chunk.
|
||||
for i := 1; i < 1000; i++ {
|
||||
ok, _ := s.append(1001+int64(i), float64(i))
|
||||
Assert(t, ok, "append failed")
|
||||
}
|
||||
|
||||
Assert(t, len(s.chunks) > 7, "expected intermediate chunks")
|
||||
|
||||
// All chunks but the first and last should now be moderately full.
|
||||
for i, c := range s.chunks[1 : len(s.chunks)-1] {
|
||||
Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -535,7 +535,6 @@ func (s *populatedChunkSeries) Next() bool {
|
|||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if len(chks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue