Merge pull request #277 from simonpasquier/delete-series-without-samples

Fix crash when a series has no block
This commit is contained in:
Goutham Veeramachaneni 2018-04-03 12:24:12 +05:30 committed by GitHub
commit 2f37e1eddc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 3 deletions

View file

@ -438,7 +438,7 @@ Outer:
for _, chk := range chks {
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
// Delete only until the current vlaues and not beyond.
// Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones[p.At()] = Intervals{{tmin, tmax}}
continue Outer

15
head.go
View file

@ -587,8 +587,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
for p.Next() {
series := h.series.getByID(p.At())
t0, t1 := series.minTime(), series.maxTime()
if t0 == math.MinInt64 || t1 == math.MinInt64 {
continue
}
// Delete only until the current values and not beyond.
t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime())
t0, t1 = clampInterval(mint, maxt, t0, t1)
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}})
}
@ -1106,11 +1110,18 @@ type memSeries struct {
}
func (s *memSeries) minTime() int64 {
if len(s.chunks) == 0 {
return math.MinInt64
}
return s.chunks[0].minTime
}
func (s *memSeries) maxTime() int64 {
return s.head().maxTime
c := s.head()
if c == nil {
return math.MinInt64
}
return c.maxTime
}
func (s *memSeries) cut(mint int64) *memChunk {

View file

@ -230,6 +230,31 @@ func TestMemSeries_truncateChunks(t *testing.T) {
testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer")
}
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
entries := []interface{}{
[]RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")},
},
[]RefSample{},
[]RefSeries{
{Ref: 50, Labels: labels.FromStrings("a", "2")},
},
[]RefSample{
{Ref: 50, T: 80, V: 1},
{Ref: 50, T: 90, V: 1},
},
}
wal := &memoryWAL{entries: entries}
head, err := NewHead(nil, nil, wal, 1000)
testutil.Ok(t, err)
defer head.Close()
testutil.Ok(t, head.ReadWAL())
testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
}
func TestHeadDeleteSimple(t *testing.T) {
numSamples := int64(10)