Make sure gc'ed chunks are handled properly

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
This commit is contained in:
Goutham Veeramachaneni 2017-12-13 20:58:21 +00:00
parent 8b7b19714d
commit 05d62ca842
3 changed files with 110 additions and 4 deletions

11
head.go
View file

@ -776,9 +776,20 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
sid, cid := unpackChunkID(ref) sid, cid := unpackChunkID(ref)
s := h.head.series.getByID(sid) s := h.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, ErrNotFound
}
s.Lock() s.Lock()
c := s.chunk(int(cid)) c := s.chunk(int(cid))
// This means that the chunk has been garbage collected.
if c == nil {
s.Unlock()
return nil, ErrNotFound
}
mint, maxt := c.minTime, c.maxTime mint, maxt := c.minTime, c.maxTime
s.Unlock() s.Unlock()

View file

@ -193,7 +193,7 @@ func TestHead_Truncate(t *testing.T) {
// Truncation need not be aligned. // Truncation need not be aligned.
testutil.Ok(t, h.Truncate(1)) testutil.Ok(t, h.Truncate(1))
h.Truncate(2000) testutil.Ok(t, h.Truncate(2000))
testutil.Equals(t, []*memChunk{ testutil.Equals(t, []*memChunk{
{minTime: 2000, maxTime: 2999}, {minTime: 2000, maxTime: 2999},
@ -705,3 +705,85 @@ func TestMemSeries_append(t *testing.T) {
testutil.Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples()) testutil.Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples())
} }
} }
func TestGCChunkAccess(t *testing.T) {
// Put a chunk, select it. GC it and then access it.
h, err := NewHead(nil, nil, NopWAL(), 1000)
testutil.Ok(t, err)
defer h.Close()
h.initTime(0)
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
s.chunks = []*memChunk{
{minTime: 0, maxTime: 999},
{minTime: 1000, maxTime: 1999},
}
idx := h.indexRange(0, 1500)
var (
lset labels.Labels
chunks []ChunkMeta
)
testutil.Ok(t, idx.Series(1, &lset, &chunks))
testutil.Equals(t, labels.Labels{{
Name: "a", Value: "1",
}}, lset)
testutil.Equals(t, 2, len(chunks))
cr := h.chunksRange(0, 1500)
_, err = cr.Chunk(chunks[0].Ref)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Ok(t, err)
h.Truncate(1500) // Remove a chunk.
_, err = cr.Chunk(chunks[0].Ref)
testutil.Equals(t, ErrNotFound, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Ok(t, err)
}
func TestGCSeriesAccess(t *testing.T) {
// Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, NopWAL(), 1000)
testutil.Ok(t, err)
defer h.Close()
h.initTime(0)
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
s.chunks = []*memChunk{
{minTime: 0, maxTime: 999},
{minTime: 1000, maxTime: 1999},
}
idx := h.indexRange(0, 2000)
var (
lset labels.Labels
chunks []ChunkMeta
)
testutil.Ok(t, idx.Series(1, &lset, &chunks))
testutil.Equals(t, labels.Labels{{
Name: "a", Value: "1",
}}, lset)
testutil.Equals(t, 2, len(chunks))
cr := h.chunksRange(0, 2000)
_, err = cr.Chunk(chunks[0].Ref)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Ok(t, err)
h.Truncate(2000) // Remove the series.
testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1))
_, err = cr.Chunk(chunks[0].Ref)
testutil.Equals(t, ErrNotFound, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Equals(t, ErrNotFound, err)
}

View file

@ -531,6 +531,7 @@ type populatedChunkSeries struct {
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
return s.lset, s.chks, s.intervals return s.lset, s.chks, s.intervals
} }
func (s *populatedChunkSeries) Err() error { return s.err } func (s *populatedChunkSeries) Err() error { return s.err }
func (s *populatedChunkSeries) Next() bool { func (s *populatedChunkSeries) Next() bool {
@ -544,19 +545,31 @@ func (s *populatedChunkSeries) Next() bool {
chks = chks[1:] chks = chks[1:]
} }
for i := range chks { // This is to delete in place while iterating.
c := &chks[i] for i, rlen := 0, len(chks); i < rlen; i++ {
j := i - (rlen - len(chks))
c := &chks[j]
// Break out at the first chunk that has no overlap with mint, maxt. // Break out at the first chunk that has no overlap with mint, maxt.
if c.MinTime > s.maxt { if c.MinTime > s.maxt {
chks = chks[:i] chks = chks[:j]
break break
} }
c.Chunk, s.err = s.chunks.Chunk(c.Ref) c.Chunk, s.err = s.chunks.Chunk(c.Ref)
if s.err != nil { if s.err != nil {
// This means that the chunk has be garbage collected. Remove it from the list.
if s.err == ErrNotFound {
s.err = nil
// Delete in-place.
chks = append(chks[:j], chks[j+1:]...)
}
return false return false
} }
} }
if len(chks) == 0 { if len(chks) == 0 {
continue continue
} }