Bring back tombstones to Head block (#6542)

* Bring back tombstones to Head block

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Add test cases

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Cleanup

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2020-01-20 21:08:00 +05:30 committed by GitHub
parent 2b2eb79e8b
commit 21a5cf5d1d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 165 deletions

View file

@ -316,22 +316,6 @@ Outer:
newSeries(map[string]string{"a": "b"}, expSamples), newSeries(map[string]string{"a": "b"}, expSamples),
}) })
lns, err := q.LabelNames()
testutil.Ok(t, err)
lvs, err := q.LabelValues("a")
testutil.Ok(t, err)
if len(expSamples) == 0 {
testutil.Equals(t, 0, len(lns))
testutil.Equals(t, 0, len(lvs))
testutil.Assert(t, res.Next() == false, "")
continue
} else {
testutil.Equals(t, 1, len(lns))
testutil.Equals(t, 1, len(lvs))
testutil.Equals(t, "a", lns[0])
testutil.Equals(t, "b", lvs[0])
}
for { for {
eok, rok := expss.Next(), res.Next() eok, rok := expss.Next(), res.Next()
testutil.Equals(t, eok, rok) testutil.Equals(t, eok, rok)

View file

@ -56,10 +56,6 @@ var (
// ErrInvalidSample is returned if an appended sample is not valid and can't // ErrInvalidSample is returned if an appended sample is not valid and can't
// be ingested. // be ingested.
ErrInvalidSample = errors.New("invalid sample") ErrInvalidSample = errors.New("invalid sample")
// emptyTombstoneReader is a no-op Tombstone Reader.
// This is used by head to satisfy the Tombstones() function call.
emptyTombstoneReader = tombstones.NewMemTombstones()
) )
// Head handles reads and writes of time series data within a time window. // Head handles reads and writes of time series data within a time window.
@ -91,6 +87,8 @@ type Head struct {
postings *index.MemPostings // postings lists for terms postings *index.MemPostings // postings lists for terms
tombstones *tombstones.MemTombstones
cardinalityMutex sync.Mutex cardinalityMutex sync.Mutex
cardinalityCache *index.PostingsStats // posting stats cache which will expire after 30sec cardinalityCache *index.PostingsStats // posting stats cache which will expire after 30sec
lastPostingsStatsCall time.Duration // last posting stats call (PostingsCardinalityStats()) time for caching lastPostingsStatsCall time.Duration // last posting stats call (PostingsCardinalityStats()) time for caching
@ -276,6 +274,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
values: map[string]stringset{}, values: map[string]stringset{},
symbols: map[string]struct{}{}, symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(), postings: index.NewUnorderedMemPostings(),
tombstones: tombstones.NewMemTombstones(),
deleted: map[uint64]int{}, deleted: map[uint64]int{},
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
@ -392,14 +391,8 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
var ( var (
dec record.Decoder dec record.Decoder
allStones = tombstones.NewMemTombstones()
shards = make([][]record.RefSample, n) shards = make([][]record.RefSample, n)
) )
defer func() {
if err := allStones.Close(); err != nil {
level.Warn(h.logger).Log("msg", "closing memTombstones during wal read", "err", err)
}
}()
var ( var (
decoded = make(chan interface{}, 10) decoded = make(chan interface{}, 10)
@ -532,7 +525,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
unknownRefs++ unknownRefs++
continue continue
} }
allStones.AddInterval(s.Ref, itv) h.tombstones.AddInterval(s.Ref, itv)
} }
} }
//lint:ignore SA6002 relax staticcheck verification. //lint:ignore SA6002 relax staticcheck verification.
@ -560,12 +553,6 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
return errors.Wrap(r.Err(), "read records") return errors.Wrap(r.Err(), "read records")
} }
if err := allStones.Iter(func(ref uint64, dranges tombstones.Intervals) error {
return h.chunkRewrite(ref, dranges)
}); err != nil {
return errors.Wrap(r.Err(), "deleting samples from tombstones")
}
if unknownRefs > 0 { if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
} }
@ -772,7 +759,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) {
} }
func (h *rangeHead) Tombstones() (tombstones.Reader, error) { func (h *rangeHead) Tombstones() (tombstones.Reader, error) {
return emptyTombstoneReader, nil return h.head.tombstones, nil
} }
func (h *rangeHead) MinTime() int64 { func (h *rangeHead) MinTime() int64 {
@ -1066,7 +1053,6 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
} }
var stones []tombstones.Stone var stones []tombstones.Stone
dirty := false
for p.Next() { for p.Next() {
series := h.series.getByID(p.At()) series := h.series.getByID(p.At())
@ -1076,59 +1062,19 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
} }
// Delete only until the current values and not beyond. // Delete only until the current values and not beyond.
t0, t1 = clampInterval(mint, maxt, t0, t1) t0, t1 = clampInterval(mint, maxt, t0, t1)
if h.wal != nil {
stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}}) stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}})
} }
if err := h.chunkRewrite(p.At(), tombstones.Intervals{{Mint: t0, Maxt: t1}}); err != nil {
return errors.Wrap(err, "delete samples")
}
dirty = true
}
if p.Err() != nil { if p.Err() != nil {
return p.Err() return p.Err()
} }
var enc record.Encoder
if h.wal != nil { if h.wal != nil {
// Although we don't store the stones in the head var enc record.Encoder
// we need to write them to the WAL to mark these as deleted
// after a restart while loading the WAL.
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
return err return err
} }
} }
if dirty { for _, s := range stones {
h.gc() h.tombstones.AddInterval(s.Ref, s.Intervals[0])
}
return nil
}
// chunkRewrite re-writes the chunks which overlaps with deleted ranges
// and removes the samples in the deleted ranges.
// Chunks is deleted if no samples are left at the end.
func (h *Head) chunkRewrite(ref uint64, dranges tombstones.Intervals) (err error) {
if len(dranges) == 0 {
return nil
}
ms := h.series.getByID(ref)
ms.Lock()
defer ms.Unlock()
if len(ms.chunks) == 0 {
return nil
}
metas := ms.chunksMetas()
mint, maxt := metas[0].MinTime, metas[len(metas)-1].MaxTime
it := newChunkSeriesIterator(metas, dranges, mint, maxt)
ms.reset()
for it.Next() {
t, v := it.At()
ok, _ := ms.append(t, v)
if !ok {
level.Warn(h.logger).Log("msg", "failed to add sample during delete")
}
} }
return nil return nil
@ -1199,7 +1145,7 @@ func (h *Head) gc() {
// Tombstones returns a new reader over the head's tombstones // Tombstones returns a new reader over the head's tombstones
func (h *Head) Tombstones() (tombstones.Reader, error) { func (h *Head) Tombstones() (tombstones.Reader, error) {
return emptyTombstoneReader, nil return h.tombstones, nil
} }
// Index returns an IndexReader against the block. // Index returns an IndexReader against the block.
@ -1748,26 +1694,6 @@ func (s *memSeries) cut(mint int64) *memChunk {
return c return c
} }
func (s *memSeries) chunksMetas() []chunks.Meta {
metas := make([]chunks.Meta, 0, len(s.chunks))
for _, chk := range s.chunks {
metas = append(metas, chunks.Meta{Chunk: chk.chunk, MinTime: chk.minTime, MaxTime: chk.maxTime})
}
return metas
}
// reset re-initialises all the variable in the memSeries except 'lset', 'ref',
// and 'chunkRange', like how it would appear after 'newMemSeries(...)'.
func (s *memSeries) reset() {
s.chunks = nil
s.headChunk = nil
s.firstChunkID = 0
s.nextAt = math.MinInt64
s.sampleBuf = [4]sample{}
s.pendingCommit = false
s.app = nil
}
// appendable checks whether the given sample is valid for appending to the series. // appendable checks whether the given sample is valid for appending to the series.
func (s *memSeries) appendable(t int64, v float64) error { func (s *memSeries) appendable(t int64, v float64) error {
c := s.head() c := s.head()

View file

@ -455,6 +455,7 @@ func TestHeadDeleteSimple(t *testing.T) {
cases := []struct { cases := []struct {
dranges tombstones.Intervals dranges tombstones.Intervals
addSamples []sample // Samples to add after delete.
smplsExp []sample smplsExp []sample
}{ }{
{ {
@ -477,6 +478,18 @@ func TestHeadDeleteSimple(t *testing.T) {
dranges: tombstones.Intervals{{Mint: 0, Maxt: 9}}, dranges: tombstones.Intervals{{Mint: 0, Maxt: 9}},
smplsExp: buildSmpls([]int64{}), smplsExp: buildSmpls([]int64{}),
}, },
{
dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}},
addSamples: buildSmpls([]int64{11, 13, 15}),
smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9, 11, 13, 15}),
},
{
// After delete, the appended samples in the deleted range should be visible
// as the tombstones are clamped to head min/max time.
dranges: tombstones.Intervals{{Mint: 7, Maxt: 20}},
addSamples: buildSmpls([]int64{11, 13, 15}),
smplsExp: buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 11, 13, 15}),
},
} }
for _, compress := range []bool{false, true} { for _, compress := range []bool{false, true} {
@ -510,6 +523,15 @@ func TestHeadDeleteSimple(t *testing.T) {
testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))) testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)))
} }
// Add more samples.
app = head.Appender()
for _, smpl := range c.addSamples {
_, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
// Compare the samples for both heads - before and after the reload. // Compare the samples for both heads - before and after the reload.
reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload. reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload.
testutil.Ok(t, err) testutil.Ok(t, err)
@ -518,37 +540,6 @@ func TestHeadDeleteSimple(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer reloadedHead.Close() defer reloadedHead.Close()
testutil.Ok(t, reloadedHead.Init(0)) testutil.Ok(t, reloadedHead.Init(0))
for _, h := range []*Head{head, reloadedHead} {
indexr, err := h.Index()
testutil.Ok(t, err)
// Use an emptyTombstoneReader explicitly to get all the samples.
css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err)
// Getting the actual samples.
actSamples := make([]sample, 0)
for css.Next() {
lblsAct, chkMetas, intv := css.At()
testutil.Equals(t, labels.Labels{lblDefault}, lblsAct)
testutil.Equals(t, 0, len(intv))
chunkr, err := h.Chunks()
testutil.Ok(t, err)
var ii chunkenc.Iterator
for _, meta := range chkMetas {
chk, err := chunkr.Chunk(meta.Ref)
testutil.Ok(t, err)
ii = chk.Iterator(ii)
for ii.Next() {
t, v := ii.At()
actSamples = append(actSamples, sample{t: t, v: v})
}
}
}
testutil.Ok(t, css.Err())
testutil.Equals(t, c.smplsExp, actSamples)
}
// Compare the query results for both heads - before and after the reload. // Compare the query results for both heads - before and after the reload.
expSeriesSet := newMockSeriesSet([]Series{ expSeriesSet := newMockSeriesSet([]Series{
@ -567,24 +558,6 @@ func TestHeadDeleteSimple(t *testing.T) {
actSeriesSet, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) actSeriesSet, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err) testutil.Ok(t, err)
lns, err := q.LabelNames()
testutil.Ok(t, err)
lvs, err := q.LabelValues(lblDefault.Name)
testutil.Ok(t, err)
// When all samples are deleted we expect that no labels should exist either.
if len(c.smplsExp) == 0 {
testutil.Equals(t, 0, len(lns))
testutil.Equals(t, 0, len(lvs))
testutil.Assert(t, actSeriesSet.Next() == false, "")
testutil.Ok(t, h.Close())
continue
} else {
testutil.Equals(t, 1, len(lns))
testutil.Equals(t, 1, len(lvs))
testutil.Equals(t, lblDefault.Name, lns[0])
testutil.Equals(t, lblDefault.Value, lvs[0])
}
for { for {
eok, rok := expSeriesSet.Next(), actSeriesSet.Next() eok, rok := expSeriesSet.Next(), actSeriesSet.Next()
testutil.Equals(t, eok, rok) testutil.Equals(t, eok, rok)
@ -625,12 +598,15 @@ func TestDeleteUntilCurMax(t *testing.T) {
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.Ok(t, hb.Delete(0, 10000, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) testutil.Ok(t, hb.Delete(0, 10000, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
// Test the series have been deleted. // Test the series returns no samples. The series is cleared only after compaction.
q, err := NewBlockQuerier(hb, 0, 100000) q, err := NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err) testutil.Ok(t, err)
res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b")) res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Assert(t, !res.Next(), "series didn't get deleted") testutil.Assert(t, res.Next(), "series is not present")
s := res.At()
it := s.Iterator()
testutil.Assert(t, !it.Next(), "expected no samples")
// Add again and test for presence. // Add again and test for presence.
app = hb.Appender() app = hb.Appender()
@ -643,7 +619,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Assert(t, res.Next(), "series don't exist") testutil.Assert(t, res.Next(), "series don't exist")
exps := res.At() exps := res.At()
it := exps.Iterator() it = exps.Iterator()
resSamples, err := expandSeriesIterator(it) resSamples, err := expandSeriesIterator(it)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, resSamples) testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, resSamples)

View file

@ -199,18 +199,18 @@ func ReadTombstones(dir string) (Reader, int64, error) {
return stonesMap, int64(len(b)), nil return stonesMap, int64(len(b)), nil
} }
type memTombstones struct { type MemTombstones struct {
intvlGroups map[uint64]Intervals intvlGroups map[uint64]Intervals
mtx sync.RWMutex mtx sync.RWMutex
} }
// NewMemTombstones creates new in memory Tombstone Reader // NewMemTombstones creates new in memory Tombstone Reader
// that allows adding new intervals. // that allows adding new intervals.
func NewMemTombstones() *memTombstones { func NewMemTombstones() *MemTombstones {
return &memTombstones{intvlGroups: make(map[uint64]Intervals)} return &MemTombstones{intvlGroups: make(map[uint64]Intervals)}
} }
func NewTestMemTombstones(intervals []Intervals) *memTombstones { func NewTestMemTombstones(intervals []Intervals) *MemTombstones {
ret := NewMemTombstones() ret := NewMemTombstones()
for i, intervalsGroup := range intervals { for i, intervalsGroup := range intervals {
for _, interval := range intervalsGroup { for _, interval := range intervalsGroup {
@ -220,13 +220,13 @@ func NewTestMemTombstones(intervals []Intervals) *memTombstones {
return ret return ret
} }
func (t *memTombstones) Get(ref uint64) (Intervals, error) { func (t *MemTombstones) Get(ref uint64) (Intervals, error) {
t.mtx.RLock() t.mtx.RLock()
defer t.mtx.RUnlock() defer t.mtx.RUnlock()
return t.intvlGroups[ref], nil return t.intvlGroups[ref], nil
} }
func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { func (t *MemTombstones) Iter(f func(uint64, Intervals) error) error {
t.mtx.RLock() t.mtx.RLock()
defer t.mtx.RUnlock() defer t.mtx.RUnlock()
for ref, ivs := range t.intvlGroups { for ref, ivs := range t.intvlGroups {
@ -237,7 +237,7 @@ func (t *memTombstones) Iter(f func(uint64, Intervals) error) error {
return nil return nil
} }
func (t *memTombstones) Total() uint64 { func (t *MemTombstones) Total() uint64 {
t.mtx.RLock() t.mtx.RLock()
defer t.mtx.RUnlock() defer t.mtx.RUnlock()
@ -249,7 +249,7 @@ func (t *memTombstones) Total() uint64 {
} }
// AddInterval to an existing memTombstones. // AddInterval to an existing memTombstones.
func (t *memTombstones) AddInterval(ref uint64, itvs ...Interval) { func (t *MemTombstones) AddInterval(ref uint64, itvs ...Interval) {
t.mtx.Lock() t.mtx.Lock()
defer t.mtx.Unlock() defer t.mtx.Unlock()
for _, itv := range itvs { for _, itv := range itvs {
@ -257,7 +257,7 @@ func (t *memTombstones) AddInterval(ref uint64, itvs ...Interval) {
} }
} }
func (*memTombstones) Close() error { func (*MemTombstones) Close() error {
return nil return nil
} }