From 05d62ca84235717f2ba5c90cd3b8f0772f77fdd5 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 13 Dec 2017 20:58:21 +0000 Subject: [PATCH] Make sure gc'ed chunks are handled properly Signed-off-by: Goutham Veeramachaneni --- head.go | 11 +++++++ head_test.go | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++- querier.go | 19 ++++++++++-- 3 files changed, 110 insertions(+), 4 deletions(-) diff --git a/head.go b/head.go index d149cb1d9..cfc63bd62 100644 --- a/head.go +++ b/head.go @@ -776,9 +776,20 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { sid, cid := unpackChunkID(ref) s := h.head.series.getByID(sid) + // This means that the series has been garbage collected. + if s == nil { + return nil, ErrNotFound + } s.Lock() c := s.chunk(int(cid)) + + // This means that the chunk has been garbage collected. + if c == nil { + s.Unlock() + return nil, ErrNotFound + } + mint, maxt := c.minTime, c.maxTime s.Unlock() diff --git a/head_test.go b/head_test.go index 457cd1003..258c381e7 100644 --- a/head_test.go +++ b/head_test.go @@ -193,7 +193,7 @@ func TestHead_Truncate(t *testing.T) { // Truncation need not be aligned. testutil.Ok(t, h.Truncate(1)) - h.Truncate(2000) + testutil.Ok(t, h.Truncate(2000)) testutil.Equals(t, []*memChunk{ {minTime: 2000, maxTime: 2999}, @@ -705,3 +705,85 @@ func TestMemSeries_append(t *testing.T) { testutil.Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples()) } } + +func TestGCChunkAccess(t *testing.T) { + // Put a chunk, select it. GC it and then access it. + h, err := NewHead(nil, nil, NopWAL(), 1000) + testutil.Ok(t, err) + defer h.Close() + + h.initTime(0) + + s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) + s.chunks = []*memChunk{ + {minTime: 0, maxTime: 999}, + {minTime: 1000, maxTime: 1999}, + } + + idx := h.indexRange(0, 1500) + var ( + lset labels.Labels + chunks []ChunkMeta + ) + testutil.Ok(t, idx.Series(1, &lset, &chunks)) + + testutil.Equals(t, labels.Labels{{ + Name: "a", Value: "1", + }}, lset) + testutil.Equals(t, 2, len(chunks)) + + cr := h.chunksRange(0, 1500) + _, err = cr.Chunk(chunks[0].Ref) + testutil.Ok(t, err) + _, err = cr.Chunk(chunks[1].Ref) + testutil.Ok(t, err) + + h.Truncate(1500) // Remove a chunk. + + _, err = cr.Chunk(chunks[0].Ref) + testutil.Equals(t, ErrNotFound, err) + _, err = cr.Chunk(chunks[1].Ref) + testutil.Ok(t, err) +} + +func TestGCSeriesAccess(t *testing.T) { + // Put a series, select it. GC it and then access it. + h, err := NewHead(nil, nil, NopWAL(), 1000) + testutil.Ok(t, err) + defer h.Close() + + h.initTime(0) + + s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) + s.chunks = []*memChunk{ + {minTime: 0, maxTime: 999}, + {minTime: 1000, maxTime: 1999}, + } + + idx := h.indexRange(0, 2000) + var ( + lset labels.Labels + chunks []ChunkMeta + ) + testutil.Ok(t, idx.Series(1, &lset, &chunks)) + + testutil.Equals(t, labels.Labels{{ + Name: "a", Value: "1", + }}, lset) + testutil.Equals(t, 2, len(chunks)) + + cr := h.chunksRange(0, 2000) + _, err = cr.Chunk(chunks[0].Ref) + testutil.Ok(t, err) + _, err = cr.Chunk(chunks[1].Ref) + testutil.Ok(t, err) + + h.Truncate(2000) // Remove the series. + + testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1)) + + _, err = cr.Chunk(chunks[0].Ref) + testutil.Equals(t, ErrNotFound, err) + _, err = cr.Chunk(chunks[1].Ref) + testutil.Equals(t, ErrNotFound, err) +} diff --git a/querier.go b/querier.go index 37672c715..b051adb45 100644 --- a/querier.go +++ b/querier.go @@ -531,6 +531,7 @@ type populatedChunkSeries struct { func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { return s.lset, s.chks, s.intervals } + func (s *populatedChunkSeries) Err() error { return s.err } func (s *populatedChunkSeries) Next() bool { @@ -544,19 +545,31 @@ func (s *populatedChunkSeries) Next() bool { chks = chks[1:] } - for i := range chks { - c := &chks[i] + // This is to delete in place while iterating. + for i, rlen := 0, len(chks); i < rlen; i++ { + j := i - (rlen - len(chks)) + c := &chks[j] // Break out at the first chunk that has no overlap with mint, maxt. if c.MinTime > s.maxt { - chks = chks[:i] + chks = chks[:j] break } + c.Chunk, s.err = s.chunks.Chunk(c.Ref) if s.err != nil { + // This means that the chunk has be garbage collected. Remove it from the list. + if s.err == ErrNotFound { + s.err = nil + + // Delete in-place. + chks = append(chks[:j], chks[j+1:]...) + } + return false } } + if len(chks) == 0 { continue }