mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 21:54:10 -08:00
Merge pull request #224 from Gouthamve/fix-214
Make sure gc'ed chunks are handled properly
This commit is contained in:
commit
9f2935c5cf
11
head.go
11
head.go
|
@ -776,9 +776,20 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||||
sid, cid := unpackChunkID(ref)
|
sid, cid := unpackChunkID(ref)
|
||||||
|
|
||||||
s := h.head.series.getByID(sid)
|
s := h.head.series.getByID(sid)
|
||||||
|
// This means that the series has been garbage collected.
|
||||||
|
if s == nil {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
c := s.chunk(int(cid))
|
c := s.chunk(int(cid))
|
||||||
|
|
||||||
|
// This means that the chunk has been garbage collected.
|
||||||
|
if c == nil {
|
||||||
|
s.Unlock()
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
mint, maxt := c.minTime, c.maxTime
|
mint, maxt := c.minTime, c.maxTime
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
|
|
84
head_test.go
84
head_test.go
|
@ -198,7 +198,7 @@ func TestHead_Truncate(t *testing.T) {
|
||||||
// Truncation need not be aligned.
|
// Truncation need not be aligned.
|
||||||
testutil.Ok(t, h.Truncate(1))
|
testutil.Ok(t, h.Truncate(1))
|
||||||
|
|
||||||
h.Truncate(2000)
|
testutil.Ok(t, h.Truncate(2000))
|
||||||
|
|
||||||
testutil.Equals(t, []*memChunk{
|
testutil.Equals(t, []*memChunk{
|
||||||
{minTime: 2000, maxTime: 2999},
|
{minTime: 2000, maxTime: 2999},
|
||||||
|
@ -710,3 +710,85 @@ func TestMemSeries_append(t *testing.T) {
|
||||||
testutil.Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples())
|
testutil.Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGCChunkAccess(t *testing.T) {
|
||||||
|
// Put a chunk, select it. GC it and then access it.
|
||||||
|
h, err := NewHead(nil, nil, NopWAL(), 1000)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer h.Close()
|
||||||
|
|
||||||
|
h.initTime(0)
|
||||||
|
|
||||||
|
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
|
||||||
|
s.chunks = []*memChunk{
|
||||||
|
{minTime: 0, maxTime: 999},
|
||||||
|
{minTime: 1000, maxTime: 1999},
|
||||||
|
}
|
||||||
|
|
||||||
|
idx := h.indexRange(0, 1500)
|
||||||
|
var (
|
||||||
|
lset labels.Labels
|
||||||
|
chunks []ChunkMeta
|
||||||
|
)
|
||||||
|
testutil.Ok(t, idx.Series(1, &lset, &chunks))
|
||||||
|
|
||||||
|
testutil.Equals(t, labels.Labels{{
|
||||||
|
Name: "a", Value: "1",
|
||||||
|
}}, lset)
|
||||||
|
testutil.Equals(t, 2, len(chunks))
|
||||||
|
|
||||||
|
cr := h.chunksRange(0, 1500)
|
||||||
|
_, err = cr.Chunk(chunks[0].Ref)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
_, err = cr.Chunk(chunks[1].Ref)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
h.Truncate(1500) // Remove a chunk.
|
||||||
|
|
||||||
|
_, err = cr.Chunk(chunks[0].Ref)
|
||||||
|
testutil.Equals(t, ErrNotFound, err)
|
||||||
|
_, err = cr.Chunk(chunks[1].Ref)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGCSeriesAccess(t *testing.T) {
|
||||||
|
// Put a series, select it. GC it and then access it.
|
||||||
|
h, err := NewHead(nil, nil, NopWAL(), 1000)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer h.Close()
|
||||||
|
|
||||||
|
h.initTime(0)
|
||||||
|
|
||||||
|
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
|
||||||
|
s.chunks = []*memChunk{
|
||||||
|
{minTime: 0, maxTime: 999},
|
||||||
|
{minTime: 1000, maxTime: 1999},
|
||||||
|
}
|
||||||
|
|
||||||
|
idx := h.indexRange(0, 2000)
|
||||||
|
var (
|
||||||
|
lset labels.Labels
|
||||||
|
chunks []ChunkMeta
|
||||||
|
)
|
||||||
|
testutil.Ok(t, idx.Series(1, &lset, &chunks))
|
||||||
|
|
||||||
|
testutil.Equals(t, labels.Labels{{
|
||||||
|
Name: "a", Value: "1",
|
||||||
|
}}, lset)
|
||||||
|
testutil.Equals(t, 2, len(chunks))
|
||||||
|
|
||||||
|
cr := h.chunksRange(0, 2000)
|
||||||
|
_, err = cr.Chunk(chunks[0].Ref)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
_, err = cr.Chunk(chunks[1].Ref)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
h.Truncate(2000) // Remove the series.
|
||||||
|
|
||||||
|
testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1))
|
||||||
|
|
||||||
|
_, err = cr.Chunk(chunks[0].Ref)
|
||||||
|
testutil.Equals(t, ErrNotFound, err)
|
||||||
|
_, err = cr.Chunk(chunks[1].Ref)
|
||||||
|
testutil.Equals(t, ErrNotFound, err)
|
||||||
|
}
|
||||||
|
|
19
querier.go
19
querier.go
|
@ -531,6 +531,7 @@ type populatedChunkSeries struct {
|
||||||
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
|
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
|
||||||
return s.lset, s.chks, s.intervals
|
return s.lset, s.chks, s.intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *populatedChunkSeries) Err() error { return s.err }
|
func (s *populatedChunkSeries) Err() error { return s.err }
|
||||||
|
|
||||||
func (s *populatedChunkSeries) Next() bool {
|
func (s *populatedChunkSeries) Next() bool {
|
||||||
|
@ -544,19 +545,31 @@ func (s *populatedChunkSeries) Next() bool {
|
||||||
chks = chks[1:]
|
chks = chks[1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range chks {
|
// This is to delete in place while iterating.
|
||||||
c := &chks[i]
|
for i, rlen := 0, len(chks); i < rlen; i++ {
|
||||||
|
j := i - (rlen - len(chks))
|
||||||
|
c := &chks[j]
|
||||||
|
|
||||||
// Break out at the first chunk that has no overlap with mint, maxt.
|
// Break out at the first chunk that has no overlap with mint, maxt.
|
||||||
if c.MinTime > s.maxt {
|
if c.MinTime > s.maxt {
|
||||||
chks = chks[:i]
|
chks = chks[:j]
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Chunk, s.err = s.chunks.Chunk(c.Ref)
|
c.Chunk, s.err = s.chunks.Chunk(c.Ref)
|
||||||
if s.err != nil {
|
if s.err != nil {
|
||||||
|
// This means that the chunk has be garbage collected. Remove it from the list.
|
||||||
|
if s.err == ErrNotFound {
|
||||||
|
s.err = nil
|
||||||
|
|
||||||
|
// Delete in-place.
|
||||||
|
chks = append(chks[:j], chks[j+1:]...)
|
||||||
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(chks) == 0 {
|
if len(chks) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue