diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 5d9d980b2..efcafcf6c 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -304,12 +304,10 @@ func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { s.Unlock() return &safeChunk{ - Chunk: c.chunk, - s: s, - cid: cid, - isoState: h.isoState, - chunkDiskMapper: h.head.chunkDiskMapper, - memChunkPool: &h.head.memChunkPool, + Chunk: c.chunk, + s: s, + cid: cid, + isoState: h.isoState, }, nil } @@ -600,43 +598,24 @@ func (b boundedIterator) Seek(t int64) chunkenc.ValueType { // safeChunk makes sure that the chunk can be accessed without a race condition type safeChunk struct { chunkenc.Chunk - s *memSeries - cid chunks.HeadChunkID - isoState *isolationState - chunkDiskMapper *chunks.ChunkDiskMapper - memChunkPool *sync.Pool + s *memSeries + cid chunks.HeadChunkID + isoState *isolationState } func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { c.s.Lock() - it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, c.memChunkPool, reuseIter) + it := c.s.iterator(c.cid, c.Chunk, c.isoState, reuseIter) c.s.Unlock() return it } // iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range. // It is unsafe to call this concurrently with s.append(...) without holding the series lock. -func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool, it chunkenc.Iterator) chunkenc.Iterator { - c, garbageCollect, err := s.chunk(id, chunkDiskMapper, memChunkPool) - // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a - // series's chunk, which got then garbage collected before it got - // accessed. We must ensure to not garbage collect as long as any - // readers still hold a reference. - if err != nil { - return chunkenc.NewNopIterator() - } - defer func() { - if garbageCollect { - // Set this to nil so that Go GC can collect it after it has been used. - // This should be done always at the end. - c.chunk = nil - memChunkPool.Put(c) - } - }() - +func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator { ix := int(id) - int(s.firstChunkID) - numSamples := c.chunk.NumSamples() + numSamples := c.NumSamples() stopAfter := numSamples if isoState != nil && !isoState.IsolationDisabled() { @@ -681,9 +660,9 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch return chunkenc.NewNopIterator() } if stopAfter == numSamples { - return c.chunk.Iterator(it) + return c.Iterator(it) } - return makeStopIterator(c.chunk, it, stopAfter) + return makeStopIterator(c, it, stopAfter) } // stopIterator wraps an Iterator, but only returns the first diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 08842e4ce..4fe6c8f91 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -537,11 +537,18 @@ func TestHead_ReadWAL(t *testing.T) { require.NoError(t, c.Err()) return x } - require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil, nil))) - require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil, nil))) + + c, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + require.NoError(t, err) + require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) + c, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + require.NoError(t, err) + require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) // The samples before the new series record should be discarded since a duplicate record // is only possible when old samples were compacted. - require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil, nil))) + c, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool) + require.NoError(t, err) + require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) q, err := head.ExemplarQuerier(context.Background()) require.NoError(t, err) @@ -2566,7 +2573,13 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { require.True(t, ok, "sample append failed") } - it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil, nil) + c, _, err := s.chunk(0, chunkDiskMapper, &sync.Pool{ + New: func() interface{} { + return &memChunk{} + }, + }) + require.NoError(t, err) + it := c.chunk.Iterator(nil) // First point. require.Equal(t, chunkenc.ValFloat, it.Seek(0)) diff --git a/tsdb/test.txt b/tsdb/test.txt new file mode 100644 index 000000000..e69de29bb