From d6bd64357b18aca15e3f3c32a107d58d4429b313 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 19 May 2017 22:54:29 +0530 Subject: [PATCH] Fix Delete on HeadBlock Signed-off-by: Goutham Veeramachaneni --- head.go | 12 +- head_test.go | 305 ++++++++++++++++++++++++++++++++++++++++++++++++ querier.go | 2 +- querier_test.go | 4 +- tombstones.go | 7 +- 5 files changed, 318 insertions(+), 12 deletions(-) diff --git a/head.go b/head.go index e4899cb68..84434753d 100644 --- a/head.go +++ b/head.go @@ -98,7 +98,7 @@ func TouchHeadBlock(dir string, seq int, mint, maxt int64) error { } // Write an empty tombstones file. - if err := writeTombstoneFile(tmp, emptyTombstoneReader); err != nil { + if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { return err } @@ -120,7 +120,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { values: map[string]stringset{}, postings: &memPostings{m: make(map[term][]uint32)}, meta: *meta, - tombstones: emptyTombstoneReader, + tombstones: newEmptyTombstoneReader(), } return h, h.init() } @@ -235,9 +235,6 @@ func (h *HeadBlock) Tombstones() TombstoneReader { // Delete implements headBlock. func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error { - h.mtx.Lock() // We are modifying the tombstones here. - defer h.mtx.Unlock() - ir := h.Index() pr := newPostingsReader(ir) @@ -260,7 +257,8 @@ Outer: return p.Err() } - return writeTombstoneFile(h.dir, newMapTombstoneReader(h.tombstones.stones)) + h.tombstones = newMapTombstoneReader(h.tombstones.stones) + return writeTombstoneFile(h.dir, h.tombstones.Copy()) } // Querier implements Queryable and headBlock. @@ -280,7 +278,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier { maxt: maxt, index: h.Index(), chunks: h.Chunks(), - tombstones: h.Tombstones(), + tombstones: h.Tombstones().Copy(), postingsMapper: func(p Postings) Postings { ep := make([]uint32, 0, 64) diff --git a/head_test.go b/head_test.go index aa9138060..36f6a49df 100644 --- a/head_test.go +++ b/head_test.go @@ -36,6 +36,10 @@ func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock err := TouchHeadBlock(dir, 0, mint, maxt) require.NoError(t, err) + return openTestHeadBlock(t, dir) +} + +func openTestHeadBlock(t testing.TB, dir string) *HeadBlock { wal, err := OpenSegmentWAL(dir, nil, 5*time.Second) require.NoError(t, err) @@ -378,6 +382,291 @@ func TestHeadBlock_e2e(t *testing.T) { return } +func TestDelete_simple(t *testing.T) { + numSamples := int64(10) + + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) + + hb := createTestHeadBlock(t, dir, 0, numSamples) + app := hb.Appender() + + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + } + + require.NoError(t, app.Commit()) + cases := []struct { + dranges []trange + remaint []int64 + }{ + { + dranges: []trange{{0, 3}}, + remaint: []int64{4, 5, 6, 7, 8, 9}, + }, + { + dranges: []trange{{1, 3}}, + remaint: []int64{0, 4, 5, 6, 7, 8, 9}, + }, + { + dranges: []trange{{1, 3}, {4, 7}}, + remaint: []int64{0, 8, 9}, + }, + { + dranges: []trange{{1, 3}, {4, 700}}, + remaint: []int64{0}, + }, + { + dranges: []trange{{0, 9}}, + remaint: []int64{}, + }, + } + +Outer: + for _, c := range cases { + // Reset the tombstones. + writeTombstoneFile(hb.dir, newEmptyTombstoneReader()) + hb.tombstones = newEmptyTombstoneReader() + + // Delete the ranges. + for _, r := range c.dranges { + require.NoError(t, hb.Delete(r.mint, r.maxt, labels.NewEqualMatcher("a", "b"))) + } + + // Compare the result. + q := hb.Querier(0, numSamples) + res := q.Select(labels.NewEqualMatcher("a", "b")) + + expSamples := make([]sample, 0, len(c.remaint)) + for _, ts := range c.remaint { + expSamples = append(expSamples, sample{ts, smpls[ts]}) + } + + expss := newListSeriesSet([]Series{ + newSeries(map[string]string{"a": "b"}, expSamples), + }) + + if len(expSamples) == 0 { + require.False(t, res.Next()) + continue + } + + for { + eok, rok := expss.Next(), res.Next() + require.Equal(t, eok, rok, "next") + + if !eok { + continue Outer + } + sexp := expss.At() + sres := res.At() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + } +} + +func TestDelete_e2e(t *testing.T) { + numDatapoints := 1000 + numRanges := 1000 + timeInterval := int64(2) + maxTime := int64(2 * 1000) + minTime := int64(200) + // Create 8 series with 1000 data-points of different ranges, delete and run queries. + lbls := [][]labels.Label{ + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + } + + seriesMap := map[string][]sample{} + for _, l := range lbls { + seriesMap[labels.New(l...).String()] = []sample{} + } + + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) + + hb := createTestHeadBlock(t, dir, minTime, maxTime) + app := hb.Appender() + + for _, l := range lbls { + ls := labels.New(l...) + series := []sample{} + + ts := rand.Int63n(300) + for i := 0; i < numDatapoints; i++ { + v := rand.Float64() + if ts >= minTime && ts <= maxTime { + series = append(series, sample{ts, v}) + } + + _, err := app.Add(ls, ts, v) + if ts >= minTime && ts <= maxTime { + require.NoError(t, err) + } else { + require.Error(t, ErrOutOfBounds, err) + } + + ts += rand.Int63n(timeInterval) + 1 + } + + seriesMap[labels.New(l...).String()] = series + } + + require.NoError(t, app.Commit()) + + // Delete a time-range from each-selector. + dels := []struct { + ms []labels.Matcher + drange []trange + }{ + { + ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, + drange: []trange{{300, 500}, {600, 670}}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "b"), + labels.NewEqualMatcher("job", "prom-k8s"), + }, + drange: []trange{{300, 500}, {100, 670}}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "c"), + labels.NewEqualMatcher("instance", "localhost:9090"), + labels.NewEqualMatcher("job", "prometheus"), + }, + drange: []trange{{300, 400}, {100, 6700}}, + }, + // TODO: Add Regexp Matchers. + } + + for _, del := range dels { + // Reset the deletes everytime. + writeTombstoneFile(hb.dir, newEmptyTombstoneReader()) + hb.tombstones = newEmptyTombstoneReader() + + for _, r := range del.drange { + require.NoError(t, hb.Delete(r.mint, r.maxt, del.ms...)) + } + + matched := labels.Slice{} + for _, ls := range lbls { + s := labels.Selector(del.ms) + if s.Matches(ls) { + matched = append(matched, ls) + } + } + + sort.Sort(matched) + + for i := 0; i < numRanges; i++ { + mint := rand.Int63n(200) + maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) + + q := hb.Querier(mint, maxt) + ss := q.Select(del.ms...) + + // Build the mockSeriesSet. + matchedSeries := make([]Series, 0, len(matched)) + for _, m := range matched { + smpls := boundedSamples(seriesMap[m.String()], mint, maxt) + smpls = deletedSamples(smpls, del.drange) + + // Only append those series for which samples exist as mockSeriesSet + // doesn't skip series with no samples. + // TODO: But sometimes SeriesSet returns an empty SeriesIterator + if len(smpls) > 0 { + matchedSeries = append(matchedSeries, newSeries( + m.Map(), + smpls, + )) + } + } + expSs := newListSeriesSet(matchedSeries) + + // Compare both SeriesSets. + for { + eok, rok := expSs.Next(), ss.Next() + + // Skip a series if iterator is empty. + if rok { + for !ss.At().Iterator().Next() { + rok = ss.Next() + if !rok { + break + } + } + } + require.Equal(t, eok, rok, "next") + + if !eok { + break + } + sexp := expSs.At() + sres := ss.At() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + } + } + + return +} + func boundedSamples(full []sample, mint, maxt int64) []sample { for len(full) > 0 { if full[0].t >= mint { @@ -394,3 +683,19 @@ func boundedSamples(full []sample, mint, maxt int64) []sample { // maxt is after highest sample. return full } + +func deletedSamples(full []sample, dranges []trange) []sample { + ds := make([]sample, 0, len(full)) +Outer: + for _, s := range full { + for _, r := range dranges { + if r.inBounds(s.t) { + continue Outer + } + } + + ds = append(ds, s) + } + + return ds +} diff --git a/querier.go b/querier.go index a2f0159fb..c67091cc6 100644 --- a/querier.go +++ b/querier.go @@ -449,7 +449,7 @@ type populatedChunkSeries struct { } func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta, stone) { - return s.lset, s.chks, stone{} + return s.lset, s.chks, s.stone } func (s *populatedChunkSeries) Err() error { return s.err } diff --git a/querier_test.go b/querier_test.go index 9502b5b4f..c10536afc 100644 --- a/querier_test.go +++ b/querier_test.go @@ -380,7 +380,7 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: emptyTombstoneReader, + tombstones: newEmptyTombstoneReader(), mint: c.mint, maxt: c.maxt, @@ -490,7 +490,7 @@ func TestBaseChunkSeries(t *testing.T) { bcs := &baseChunkSeries{ p: newListPostings(tc.postings), index: mi, - tombstones: emptyTombstoneReader, + tombstones: newEmptyTombstoneReader(), } i := 0 diff --git a/tombstones.go b/tombstones.go index 02c3ad1ec..d8182c706 100644 --- a/tombstones.go +++ b/tombstones.go @@ -107,8 +107,6 @@ type TombstoneReader interface { Err() error } -var emptyTombstoneReader = newMapTombstoneReader(make(map[uint32][]trange)) - type tombstoneReader struct { stones []byte idx int @@ -223,10 +221,15 @@ func newMapTombstoneReader(ts map[uint32][]trange) *mapTombstoneReader { for k := range ts { refs = append(refs, k) } + sort.Sort(uint32slice(refs)) return &mapTombstoneReader{stones: ts, refs: refs} } +func newEmptyTombstoneReader() *mapTombstoneReader { + return &mapTombstoneReader{stones: make(map[uint32][]trange)} +} + func (t *mapTombstoneReader) Next() bool { if len(t.refs) > 0 { t.cur = t.refs[0]