Fix crash when a series has no block

This commit is contained in:
Simon Pasquier 2018-02-07 14:43:21 +01:00
parent bc97c6c6be
commit 79defa54df
3 changed files with 39 additions and 3 deletions

View file

@ -438,7 +438,7 @@ Outer:
for _, chk := range chks { for _, chk := range chks {
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
// Delete only until the current vlaues and not beyond. // Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones[p.At()] = Intervals{{tmin, tmax}} stones[p.At()] = Intervals{{tmin, tmax}}
continue Outer continue Outer

15
head.go
View file

@ -587,8 +587,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
for p.Next() { for p.Next() {
series := h.series.getByID(p.At()) series := h.series.getByID(p.At())
t0, t1 := series.minTime(), series.maxTime()
if t0 == math.MinInt64 || t1 == math.MinInt64 {
continue
}
// Delete only until the current values and not beyond. // Delete only until the current values and not beyond.
t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime()) t0, t1 = clampInterval(mint, maxt, t0, t1)
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}})
} }
@ -1106,11 +1110,18 @@ type memSeries struct {
} }
func (s *memSeries) minTime() int64 { func (s *memSeries) minTime() int64 {
if len(s.chunks) == 0 {
return math.MinInt64
}
return s.chunks[0].minTime return s.chunks[0].minTime
} }
func (s *memSeries) maxTime() int64 { func (s *memSeries) maxTime() int64 {
return s.head().maxTime c := s.head()
if c == nil {
return math.MinInt64
}
return c.maxTime
} }
func (s *memSeries) cut(mint int64) *memChunk { func (s *memSeries) cut(mint int64) *memChunk {

View file

@ -230,6 +230,31 @@ func TestMemSeries_truncateChunks(t *testing.T) {
testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer") testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer")
} }
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
entries := []interface{}{
[]RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")},
},
[]RefSample{},
[]RefSeries{
{Ref: 50, Labels: labels.FromStrings("a", "2")},
},
[]RefSample{
{Ref: 50, T: 80, V: 1},
{Ref: 50, T: 90, V: 1},
},
}
wal := &memoryWAL{entries: entries}
head, err := NewHead(nil, nil, wal, 1000)
testutil.Ok(t, err)
defer head.Close()
testutil.Ok(t, head.ReadWAL())
testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
}
func TestHeadDeleteSimple(t *testing.T) { func TestHeadDeleteSimple(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)