Dont store stones in head, delete samples directly

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2019-01-08 22:38:41 +05:30
parent 8d991bdc1e
commit d7e505db34
No known key found for this signature in database
GPG key ID: 0241A11211763456
4 changed files with 268 additions and 90 deletions

View file

@ -209,6 +209,34 @@ func TestDBAppenderAddRef(t *testing.T) {
func TestDeleteSimple(t *testing.T) { func TestDeleteSimple(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)
cases := []struct {
intervals Intervals
remaint []int64
}{
{
intervals: Intervals{{0, 3}},
remaint: []int64{4, 5, 6, 7, 8, 9},
},
{
intervals: Intervals{{1, 3}},
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
},
{
intervals: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
{
intervals: Intervals{{1, 3}, {4, 700}},
remaint: []int64{0},
},
{ // This case is to ensure that labels and symbols are deleted.
intervals: Intervals{{0, 9}},
remaint: []int64{},
},
}
Outer:
for _, c := range cases {
db, close := openTestDB(t, nil) db, close := openTestDB(t, nil)
defer close() defer close()
defer db.Close() defer db.Close()
@ -222,18 +250,7 @@ func TestDeleteSimple(t *testing.T) {
} }
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
cases := []struct {
intervals Intervals
remaint []int64
}{
{
intervals: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
Outer:
for _, c := range cases {
// TODO(gouthamve): Reset the tombstones somehow. // TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges. // Delete the ranges.
for _, r := range c.intervals { for _, r := range c.intervals {
@ -256,9 +273,20 @@ Outer:
newSeries(map[string]string{"a": "b"}, expSamples), newSeries(map[string]string{"a": "b"}, expSamples),
}) })
lns, err := q.LabelNames()
testutil.Ok(t, err)
lvs, err := q.LabelValues("a")
testutil.Ok(t, err)
if len(expSamples) == 0 { if len(expSamples) == 0 {
testutil.Equals(t, 0, len(lns))
testutil.Equals(t, 0, len(lvs))
testutil.Assert(t, res.Next() == false, "") testutil.Assert(t, res.Next() == false, "")
continue continue
} else {
testutil.Equals(t, 1, len(lns))
testutil.Equals(t, 1, len(lvs))
testutil.Equals(t, "a", lns[0])
testutil.Equals(t, "b", lvs[0])
} }
for { for {

104
head.go
View file

@ -48,6 +48,10 @@ var (
// ErrOutOfBounds is returned if an appended sample is out of the // ErrOutOfBounds is returned if an appended sample is out of the
// writable time range. // writable time range.
ErrOutOfBounds = errors.New("out of bounds") ErrOutOfBounds = errors.New("out of bounds")
// emptyTombstoneReader is a no-op Tombstone Reader.
// This is used by head to satisfy the Tombstones() function call.
emptyTombstoneReader = newMemTombstones()
) )
// Head handles reads and writes of time series data within a time window. // Head handles reads and writes of time series data within a time window.
@ -71,8 +75,6 @@ type Head struct {
values map[string]stringset // label names to possible values values map[string]stringset // label names to possible values
postings *index.MemPostings // postings lists for terms postings *index.MemPostings // postings lists for terms
tombstones *memTombstones
} }
type headMetrics struct { type headMetrics struct {
@ -231,7 +233,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
values: map[string]stringset{}, values: map[string]stringset{},
symbols: map[string]struct{}{}, symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(), postings: index.NewUnorderedMemPostings(),
tombstones: newMemTombstones(),
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
@ -338,8 +339,10 @@ func (h *Head) loadWAL(r *wal.Reader) error {
series []RefSeries series []RefSeries
samples []RefSample samples []RefSample
tstones []Stone tstones []Stone
allStones = newMemTombstones()
err error err error
) )
defer allStones.Close()
for r.Next() { for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0] series, samples, tstones = series[:0], samples[:0], tstones[:0]
rec := r.Record() rec := r.Record()
@ -413,7 +416,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
if itv.Maxt < h.minValidTime { if itv.Maxt < h.minValidTime {
continue continue
} }
h.tombstones.addInterval(s.ref, itv) allStones.addInterval(s.ref, itv)
} }
} }
default: default:
@ -436,6 +439,12 @@ func (h *Head) loadWAL(r *wal.Reader) error {
} }
wg.Wait() wg.Wait()
if err := allStones.Iter(func(ref uint64, dranges Intervals) error {
return h.chunkRewrite(ref, dranges)
}); err != nil {
return errors.Wrap(r.Err(), "deleting samples from tombstones")
}
if unknownRefs > 0 { if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
} }
@ -604,7 +613,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) {
} }
func (h *rangeHead) Tombstones() (TombstoneReader, error) { func (h *rangeHead) Tombstones() (TombstoneReader, error) {
return h.head.tombstones, nil return emptyTombstoneReader, nil
} }
// initAppender is a helper to initialize the time bounds of the head // initAppender is a helper to initialize the time bounds of the head
@ -849,7 +858,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
} }
var stones []Stone var stones []Stone
dirty := false
for p.Next() { for p.Next() {
series := h.series.getByID(p.At()) series := h.series.getByID(p.At())
@ -859,22 +868,61 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
} }
// Delete only until the current values and not beyond. // Delete only until the current values and not beyond.
t0, t1 = clampInterval(mint, maxt, t0, t1) t0, t1 = clampInterval(mint, maxt, t0, t1)
if h.wal != nil {
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}})
} }
if err := h.chunkRewrite(p.At(), Intervals{{t0, t1}}); err != nil {
return errors.Wrap(err, "delete samples")
}
dirty = true
}
if p.Err() != nil { if p.Err() != nil {
return p.Err() return p.Err()
} }
var enc RecordEncoder var enc RecordEncoder
if h.wal != nil { if h.wal != nil {
// Although we don't store the stones in the head
// we need to write them to the WAL to mark these as deleted
// after a restart while loeading the WAL.
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
return err return err
} }
} }
for _, s := range stones { if dirty {
h.tombstones.addInterval(s.ref, s.intervals[0]) h.gc()
} }
return nil
}
// chunkRewrite re-writes the chunks which overlaps with deleted ranges
// and removes the samples in the deleted ranges.
// Chunks is deleted if no samples are left at the end.
func (h *Head) chunkRewrite(ref uint64, dranges Intervals) (err error) {
if len(dranges) == 0 {
return nil
}
ms := h.series.getByID(ref)
ms.Lock()
defer ms.Unlock()
if len(ms.chunks) == 0 {
return nil
}
metas := ms.chunksMetas()
mint, maxt := metas[0].MinTime, metas[len(metas)-1].MaxTime
it := newChunkSeriesIterator(metas, dranges, mint, maxt)
ms.reset()
for it.Next() {
t, v := it.At()
ok, _ := ms.append(t, v)
if !ok {
level.Warn(h.logger).Log("msg", "failed to add sample during delete")
}
}
return nil return nil
} }
@ -926,7 +974,7 @@ func (h *Head) gc() {
// Tombstones returns a new reader over the head's tombstones // Tombstones returns a new reader over the head's tombstones
func (h *Head) Tombstones() (TombstoneReader, error) { func (h *Head) Tombstones() (TombstoneReader, error) {
return h.tombstones, nil return emptyTombstoneReader, nil
} }
// Index returns an IndexReader against the block. // Index returns an IndexReader against the block.
@ -1406,6 +1454,16 @@ type memSeries struct {
app chunkenc.Appender // Current appender for the chunk. app chunkenc.Appender // Current appender for the chunk.
} }
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
}
return s
}
func (s *memSeries) minTime() int64 { func (s *memSeries) minTime() int64 {
if len(s.chunks) == 0 { if len(s.chunks) == 0 {
return math.MinInt64 return math.MinInt64
@ -1442,14 +1500,24 @@ func (s *memSeries) cut(mint int64) *memChunk {
return c return c
} }
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { func (s *memSeries) chunksMetas() []chunks.Meta {
s := &memSeries{ metas := make([]chunks.Meta, 0, len(s.chunks))
lset: lset, for _, chk := range s.chunks {
ref: id, metas = append(metas, chunks.Meta{Chunk: chk.chunk, MinTime: chk.minTime, MaxTime: chk.maxTime})
chunkRange: chunkRange,
nextAt: math.MinInt64,
} }
return s return metas
}
// reset re-initialises all the variable in the memSeries except 'lset', 'ref',
// and 'chunkRange', like how it would appear after 'newMemSeries(...)'.
func (s *memSeries) reset() {
s.chunks = nil
s.headChunk = nil
s.firstChunkID = 0
s.nextAt = math.MinInt64
s.sampleBuf = [4]sample{}
s.pendingCommit = false
s.app = nil
} }
// appendable checks whether the given sample is valid for appending to the series. // appendable checks whether the given sample is valid for appending to the series.

View file

@ -18,6 +18,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"os" "os"
"path"
"path/filepath" "path/filepath"
"sort" "sort"
"testing" "testing"
@ -297,95 +298,168 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
func TestHeadDeleteSimple(t *testing.T) { func TestHeadDeleteSimple(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)
head, err := NewHead(nil, nil, nil, 1000)
testutil.Ok(t, err)
defer head.Close()
app := head.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
}
testutil.Ok(t, app.Commit())
cases := []struct { cases := []struct {
intervals Intervals dranges Intervals
remaint []int64 remaint []int64
remainSampbuf []int64 // Sample buffer that should remain after deletion.
}{ }{
{ {
intervals: Intervals{{0, 3}}, dranges: Intervals{{0, 3}},
remaint: []int64{4, 5, 6, 7, 8, 9}, remaint: []int64{4, 5, 6, 7, 8, 9},
remainSampbuf: []int64{6, 7, 8, 9},
}, },
{ {
intervals: Intervals{{1, 3}}, dranges: Intervals{{1, 3}},
remaint: []int64{0, 4, 5, 6, 7, 8, 9}, remaint: []int64{0, 4, 5, 6, 7, 8, 9},
remainSampbuf: []int64{6, 7, 8, 9},
}, },
{ {
intervals: Intervals{{1, 3}, {4, 7}}, dranges: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9}, remaint: []int64{0, 8, 9},
remainSampbuf: []int64{0, 8, 9},
}, },
{ {
intervals: Intervals{{1, 3}, {4, 700}}, dranges: Intervals{{1, 3}, {4, 700}},
remaint: []int64{0}, remaint: []int64{0},
remainSampbuf: []int64{0},
}, },
{ { // This case is to ensure that labels and symbols are deleted.
intervals: Intervals{{0, 9}}, dranges: Intervals{{0, 9}},
remaint: []int64{}, remaint: []int64{},
remainSampbuf: []int64{},
}, },
} }
Outer: Outer:
for _, c := range cases { for _, c := range cases {
// Reset the tombstones. dir, err := ioutil.TempDir("", "test_wal_reload")
head.tombstones = newMemTombstones() testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
// Samples are deleted from head after calling head.Delete()
// and not just creating tombstones.
// Hence creating new Head for every case.
head, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
app := head.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
_, err = app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
// Delete the ranges. // Delete the ranges.
for _, r := range c.intervals { for _, r := range c.dranges {
testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b")))
} }
// Compare the result. reloadedW, err := wal.New(nil, nil, w.Dir())
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
testutil.Ok(t, err) testutil.Ok(t, err)
res, err := q.Select(labels.NewEqualMatcher("a", "b")) reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
// Test the head reloaded from the WAL to ensure deleted samples
// are gone even after reloading the wal file.
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, reloadedHead.Init(0))
expSamples := make([]Sample, 0, len(c.remaint)) expSamples := make([]sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
expss := newMockSeriesSet([]Series{ // Compare the samples for both heads - before and after the reload.
newSeries(map[string]string{"a": "b"}, expSamples), for _, h := range []*Head{head, reloadedHead} {
indexr, err := h.Index()
testutil.Ok(t, err)
// We use emptyTombstoneReader explicitly to get all the samples.
css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err)
// Getting the actual samples.
actSamples := make([]sample, 0, len(c.remaint))
if len(expSamples) > 0 {
testutil.Assert(t, css.Next() == true, "")
lbls, chkMetas, intv := css.At()
testutil.Equals(t, labels.Labels{{"a", "b"}}, lbls)
testutil.Equals(t, 0, len(intv))
chunkr, err := h.Chunks()
testutil.Ok(t, err)
for _, meta := range chkMetas {
chk, err := chunkr.Chunk(meta.Ref)
testutil.Ok(t, err)
ii := chk.Iterator()
for ii.Next() {
t, v := ii.At()
actSamples = append(actSamples, sample{t: t, v: v})
}
}
}
testutil.Assert(t, css.Next() == false, "")
testutil.Ok(t, css.Err())
testutil.Equals(t, expSamples, actSamples)
}
expSamplesTemp := make([]Sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamplesTemp = append(expSamplesTemp, sample{ts, smpls[ts]})
}
expSeriesSet := newMockSeriesSet([]Series{
newSeries(map[string]string{"a": "b"}, expSamplesTemp),
}) })
// Compare the query results for both heads - before and after the reload.
for _, h := range []*Head{head, reloadedHead} {
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
testutil.Ok(t, err)
actSeriesSet, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err)
lns, err := q.LabelNames()
testutil.Ok(t, err)
lvs, err := q.LabelValues("a")
testutil.Ok(t, err)
if len(expSamples) == 0 { if len(expSamples) == 0 {
testutil.Assert(t, res.Next() == false, "") testutil.Equals(t, 0, len(lns))
testutil.Equals(t, 0, len(lvs))
testutil.Assert(t, actSeriesSet.Next() == false, "")
testutil.Ok(t, h.Close())
continue continue
} else {
testutil.Equals(t, 1, len(lns))
testutil.Equals(t, 1, len(lvs))
testutil.Equals(t, "a", lns[0])
testutil.Equals(t, "b", lvs[0])
} }
for { for {
eok, rok := expss.Next(), res.Next() eok, rok := expSeriesSet.Next(), actSeriesSet.Next()
testutil.Equals(t, eok, rok) testutil.Equals(t, eok, rok)
if !eok { if !eok {
testutil.Ok(t, h.Close())
continue Outer continue Outer
} }
sexp := expss.At() expSeries := expSeriesSet.At()
sres := res.At() actSeries := actSeriesSet.At()
testutil.Equals(t, sexp.Labels(), sres.Labels()) testutil.Equals(t, expSeries.Labels(), actSeries.Labels())
smplExp, errExp := expandSeriesIterator(sexp.Iterator()) smplExp, errExp := expandSeriesIterator(expSeries.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator()) smplRes, errRes := expandSeriesIterator(actSeries.Iterator())
testutil.Equals(t, errExp, errRes) testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes) testutil.Equals(t, smplExp, smplRes)
} }
} }
} }
}
func TestDeleteUntilCurMax(t *testing.T) { func TestDeleteUntilCurMax(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)
@ -523,8 +597,6 @@ func TestDelete_e2e(t *testing.T) {
// TODO: Add Regexp Matchers. // TODO: Add Regexp Matchers.
} }
for _, del := range dels { for _, del := range dels {
// Reset the deletes everytime.
hb.tombstones = newMemTombstones()
for _, r := range del.drange { for _, r := range del.drange {
testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...))
} }
@ -945,4 +1017,5 @@ func TestWalRepair(t *testing.T) {
testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records")
}) })
} }
} }

View file

@ -66,6 +66,15 @@ func Equals(tb testing.TB, exp, act interface{}, msgAndArgs ...interface{}) {
} }
} }
// NotEquals fails the test if exp is equal to act.
func NotEquals(tb testing.TB, exp, act interface{}) {
if reflect.DeepEqual(exp, act) {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: Expected different exp and got\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act)
tb.FailNow()
}
}
func formatMessage(msgAndArgs []interface{}) string { func formatMessage(msgAndArgs []interface{}) string {
if len(msgAndArgs) == 0 { if len(msgAndArgs) == 0 {
return "" return ""