Merge pull request #187 from prometheus/cutchunk

Ensure near-empty chunks end at correct boundary
This commit is contained in:
Fabian Reinartz 2017-10-25 16:52:11 +02:00 committed by GitHub
commit 5d28c849c7
3 changed files with 52 additions and 7 deletions

18
head.go
View file

@ -1162,6 +1162,10 @@ func (s *memSeries) cut(mint int64) *memChunk {
}
s.chunks = append(s.chunks, c)
// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
_, s.nextAt = rangeForTimestamp(mint, s.chunkRange)
app, err := c.chunk.Appender()
if err != nil {
panic(err)
@ -1241,10 +1245,17 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
}
numSamples := c.chunk.NumSamples()
// Out of order sample.
if c.maxTime >= t {
return false, chunkCreated
}
if numSamples > samplesPerChunk/4 && t >= s.nextAt {
// If we reach 25% of a chunk's desired sample count, set a definitive time
// at which to start the next chunk.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == samplesPerChunk/4 {
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
}
if t >= s.nextAt {
c = s.cut(t)
chunkCreated = true
}
@ -1252,11 +1263,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
c.maxTime = t
if numSamples == samplesPerChunk/4 {
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
}
s.lastValue = v
s.sampleBuf[0] = s.sampleBuf[1]

View file

@ -660,3 +660,43 @@ func TestComputeChunkEndTime(t *testing.T) {
}
}
}
func TestMemSeries_append(t *testing.T) {
s := newMemSeries(labels.Labels{}, 1, 500)
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.append(998, 1)
Assert(t, ok, "append failed")
Assert(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.append(999, 2)
Assert(t, ok, "append failed")
Assert(t, !chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.append(1000, 3)
Assert(t, ok, "append failed")
Assert(t, ok, "expected new chunk on boundary")
ok, chunkCreated = s.append(1001, 4)
Assert(t, ok, "append failed")
Assert(t, !chunkCreated, "second sample should use same chunk")
Assert(t, s.chunks[0].minTime == 998 && s.chunks[0].maxTime == 999, "wrong chunk range")
Assert(t, s.chunks[1].minTime == 1000 && s.chunks[1].maxTime == 1001, "wrong chunk range")
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
// at approximately 120 samples per chunk.
for i := 1; i < 1000; i++ {
ok, _ := s.append(1001+int64(i), float64(i))
Assert(t, ok, "append failed")
}
Assert(t, len(s.chunks) > 7, "expected intermediate chunks")
// All chunks but the first and last should now be moderately full.
for i, c := range s.chunks[1 : len(s.chunks)-1] {
Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples())
}
}

View file

@ -535,7 +535,6 @@ func (s *populatedChunkSeries) Next() bool {
return false
}
}
if len(chks) == 0 {
continue
}