diff --git a/block.go b/block.go index 7dc3af9d80..dd6a6def89 100644 --- a/block.go +++ b/block.go @@ -438,7 +438,7 @@ Outer: for _, chk := range chks { if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { - // Delete only until the current vlaues and not beyond. + // Delete only until the current values and not beyond. tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) stones[p.At()] = Intervals{{tmin, tmax}} continue Outer diff --git a/head.go b/head.go index c76c139d5a..65bee54a8a 100644 --- a/head.go +++ b/head.go @@ -587,8 +587,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { for p.Next() { series := h.series.getByID(p.At()) + t0, t1 := series.minTime(), series.maxTime() + if t0 == math.MinInt64 || t1 == math.MinInt64 { + continue + } // Delete only until the current values and not beyond. - t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime()) + t0, t1 = clampInterval(mint, maxt, t0, t1) stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) } @@ -1106,11 +1110,18 @@ type memSeries struct { } func (s *memSeries) minTime() int64 { + if len(s.chunks) == 0 { + return math.MinInt64 + } return s.chunks[0].minTime } func (s *memSeries) maxTime() int64 { - return s.head().maxTime + c := s.head() + if c == nil { + return math.MinInt64 + } + return c.maxTime } func (s *memSeries) cut(mint int64) *memChunk { diff --git a/head_test.go b/head_test.go index 36a97542b0..512c7fe1be 100644 --- a/head_test.go +++ b/head_test.go @@ -230,6 +230,31 @@ func TestMemSeries_truncateChunks(t *testing.T) { testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer") } +func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { + entries := []interface{}{ + []RefSeries{ + {Ref: 10, Labels: labels.FromStrings("a", "1")}, + }, + []RefSample{}, + []RefSeries{ + {Ref: 50, Labels: labels.FromStrings("a", "2")}, + }, + []RefSample{ + {Ref: 50, T: 80, V: 1}, + {Ref: 50, T: 90, V: 1}, + }, + } + wal := &memoryWAL{entries: entries} + + head, err := NewHead(nil, nil, wal, 1000) + testutil.Ok(t, err) + defer head.Close() + + testutil.Ok(t, head.ReadWAL()) + + testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) +} + func TestHeadDeleteSimple(t *testing.T) { numSamples := int64(10)