diff --git a/head.go b/head.go index 6ab848b8a..92363659f 100644 --- a/head.go +++ b/head.go @@ -1162,6 +1162,10 @@ func (s *memSeries) cut(mint int64) *memChunk { } s.chunks = append(s.chunks, c) + // Set upper bound on when the next chunk must be started. An earlier timestamp + // may be chosen dynamically at a later point. + _, s.nextAt = rangeForTimestamp(mint, s.chunkRange) + app, err := c.chunk.Appender() if err != nil { panic(err) @@ -1241,10 +1245,17 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { } numSamples := c.chunk.NumSamples() + // Out of order sample. if c.maxTime >= t { return false, chunkCreated } - if numSamples > samplesPerChunk/4 && t >= s.nextAt { + // If we reach 25% of a chunk's desired sample count, set a definitive time + // at which to start the next chunk. + // At latest it must happen at the timestamp set when the chunk was cut. + if numSamples == samplesPerChunk/4 { + s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) + } + if t >= s.nextAt { c = s.cut(t) chunkCreated = true } @@ -1252,11 +1263,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c.maxTime = t - if numSamples == samplesPerChunk/4 { - _, maxt := rangeForTimestamp(c.minTime, s.chunkRange) - s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt) - } - s.lastValue = v s.sampleBuf[0] = s.sampleBuf[1] diff --git a/head_test.go b/head_test.go index 80c4a407c..24f14ec44 100644 --- a/head_test.go +++ b/head_test.go @@ -660,3 +660,43 @@ func TestComputeChunkEndTime(t *testing.T) { } } } + +func TestMemSeries_append(t *testing.T) { + s := newMemSeries(labels.Labels{}, 1, 500) + + // Add first two samples at the very end of a chunk range and the next two + // on and after it. + // New chunk must correctly be cut at 1000. + ok, chunkCreated := s.append(998, 1) + Assert(t, ok, "append failed") + Assert(t, chunkCreated, "first sample created chunk") + + ok, chunkCreated = s.append(999, 2) + Assert(t, ok, "append failed") + Assert(t, !chunkCreated, "second sample should use same chunk") + + ok, chunkCreated = s.append(1000, 3) + Assert(t, ok, "append failed") + Assert(t, ok, "expected new chunk on boundary") + + ok, chunkCreated = s.append(1001, 4) + Assert(t, ok, "append failed") + Assert(t, !chunkCreated, "second sample should use same chunk") + + Assert(t, s.chunks[0].minTime == 998 && s.chunks[0].maxTime == 999, "wrong chunk range") + Assert(t, s.chunks[1].minTime == 1000 && s.chunks[1].maxTime == 1001, "wrong chunk range") + + // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut + // at approximately 120 samples per chunk. + for i := 1; i < 1000; i++ { + ok, _ := s.append(1001+int64(i), float64(i)) + Assert(t, ok, "append failed") + } + + Assert(t, len(s.chunks) > 7, "expected intermediate chunks") + + // All chunks but the first and last should now be moderately full. + for i, c := range s.chunks[1 : len(s.chunks)-1] { + Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples()) + } +} diff --git a/querier.go b/querier.go index 97b98ea77..ed8b64cea 100644 --- a/querier.go +++ b/querier.go @@ -535,7 +535,6 @@ func (s *populatedChunkSeries) Next() bool { return false } } - if len(chks) == 0 { continue }