From 82796db37b7864b7b7fa18a42701a4275cfb06d9 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 25 Oct 2017 09:32:06 +0200 Subject: [PATCH] Ensure near-empty chunks end at correct boundary We were determining a chunk's end time once it was one quarter full to compute it so all chunks have uniform number of samples. This accidentally skipped the case where series started near the end of a chunk range/block and never reached that threshold. As a result they got persisted but were continued across the range. This resulted in corrupted persisted data. --- head.go | 18 ++++++++++++------ head_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ querier.go | 1 - 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/head.go b/head.go index 6ab848b8a..92363659f 100644 --- a/head.go +++ b/head.go @@ -1162,6 +1162,10 @@ func (s *memSeries) cut(mint int64) *memChunk { } s.chunks = append(s.chunks, c) + // Set upper bound on when the next chunk must be started. An earlier timestamp + // may be chosen dynamically at a later point. + _, s.nextAt = rangeForTimestamp(mint, s.chunkRange) + app, err := c.chunk.Appender() if err != nil { panic(err) @@ -1241,10 +1245,17 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { } numSamples := c.chunk.NumSamples() + // Out of order sample. if c.maxTime >= t { return false, chunkCreated } - if numSamples > samplesPerChunk/4 && t >= s.nextAt { + // If we reach 25% of a chunk's desired sample count, set a definitive time + // at which to start the next chunk. + // At latest it must happen at the timestamp set when the chunk was cut. + if numSamples == samplesPerChunk/4 { + s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) + } + if t >= s.nextAt { c = s.cut(t) chunkCreated = true } @@ -1252,11 +1263,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c.maxTime = t - if numSamples == samplesPerChunk/4 { - _, maxt := rangeForTimestamp(c.minTime, s.chunkRange) - s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt) - } - s.lastValue = v s.sampleBuf[0] = s.sampleBuf[1] diff --git a/head_test.go b/head_test.go index 80c4a407c..24f14ec44 100644 --- a/head_test.go +++ b/head_test.go @@ -660,3 +660,43 @@ func TestComputeChunkEndTime(t *testing.T) { } } } + +func TestMemSeries_append(t *testing.T) { + s := newMemSeries(labels.Labels{}, 1, 500) + + // Add first two samples at the very end of a chunk range and the next two + // on and after it. + // New chunk must correctly be cut at 1000. + ok, chunkCreated := s.append(998, 1) + Assert(t, ok, "append failed") + Assert(t, chunkCreated, "first sample created chunk") + + ok, chunkCreated = s.append(999, 2) + Assert(t, ok, "append failed") + Assert(t, !chunkCreated, "second sample should use same chunk") + + ok, chunkCreated = s.append(1000, 3) + Assert(t, ok, "append failed") + Assert(t, ok, "expected new chunk on boundary") + + ok, chunkCreated = s.append(1001, 4) + Assert(t, ok, "append failed") + Assert(t, !chunkCreated, "second sample should use same chunk") + + Assert(t, s.chunks[0].minTime == 998 && s.chunks[0].maxTime == 999, "wrong chunk range") + Assert(t, s.chunks[1].minTime == 1000 && s.chunks[1].maxTime == 1001, "wrong chunk range") + + // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut + // at approximately 120 samples per chunk. + for i := 1; i < 1000; i++ { + ok, _ := s.append(1001+int64(i), float64(i)) + Assert(t, ok, "append failed") + } + + Assert(t, len(s.chunks) > 7, "expected intermediate chunks") + + // All chunks but the first and last should now be moderately full. + for i, c := range s.chunks[1 : len(s.chunks)-1] { + Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples()) + } +} diff --git a/querier.go b/querier.go index 97b98ea77..ed8b64cea 100644 --- a/querier.go +++ b/querier.go @@ -535,7 +535,6 @@ func (s *populatedChunkSeries) Next() bool { return false } } - if len(chks) == 0 { continue }