diff --git a/head_test.go b/head_test.go index 5531f9653..5383b2ba4 100644 --- a/head_test.go +++ b/head_test.go @@ -17,6 +17,7 @@ import ( "io/ioutil" "math/rand" "os" + "sort" "testing" "github.com/prometheus/tsdb/chunkenc" @@ -386,231 +387,203 @@ Outer: } } -// func TestDeleteUntilCurMax(t *testing.T) { -// numSamples := int64(10) +func TestDeleteUntilCurMax(t *testing.T) { + numSamples := int64(10) + hb, err := NewHead(nil, nil, nil, 1000000) + testutil.Ok(t, err) + app := hb.Appender() + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + _, err := app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + testutil.Ok(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) -// dir, _ := ioutil.TempDir("", "test") -// defer os.RemoveAll(dir) + // Test the series have been deleted. + q, err := NewBlockQuerier(hb, 0, 100000) + testutil.Ok(t, err) + res, err := q.Select(labels.NewEqualMatcher("a", "b")) + testutil.Ok(t, err) + testutil.Assert(t, !res.Next(), "series didn't get deleted") -// hb := createTestHead(t, dir, 0, 2*numSamples) -// app := hb.Appender() - -// smpls := make([]float64, numSamples) -// for i := int64(0); i < numSamples; i++ { -// smpls[i] = rand.Float64() -// app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) -// } - -// testutil.Ok(t, app.Commit()) -// testutil.Ok(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) -// app = hb.Appender() -// _, err := app.Add(labels.Labels{{"a", "b"}}, 11, 1) -// testutil.Ok(t, err) -// testutil.Ok(t, app.Commit()) - -// q := hb.Querier(0, 100000) -// res := q.Select(labels.NewEqualMatcher("a", "b")) - -// require.True(t, res.Next()) -// exps := res.At() -// it := exps.Iterator() -// ressmpls, err := expandSeriesIterator(it) -// testutil.Ok(t, err) -// testutil.Equals(t, []sample{{11, 1}}, ressmpls) -// } - -// func TestDelete_e2e(t *testing.T) { -// numDatapoints := 1000 -// numRanges := 1000 -// timeInterval := int64(2) -// maxTime := int64(2 * 1000) -// minTime := int64(200) -// // Create 8 series with 1000 data-points of different ranges, delete and run queries. -// lbls := [][]labels.Label{ -// { -// {"a", "b"}, -// {"instance", "localhost:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "b"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "b"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prom-k8s"}, -// }, -// { -// {"a", "b"}, -// {"instance", "localhost:9090"}, -// {"job", "prom-k8s"}, -// }, -// { -// {"a", "c"}, -// {"instance", "localhost:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "c"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "c"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prom-k8s"}, -// }, -// { -// {"a", "c"}, -// {"instance", "localhost:9090"}, -// {"job", "prom-k8s"}, -// }, -// } - -// seriesMap := map[string][]sample{} -// for _, l := range lbls { -// seriesMap[labels.New(l...).String()] = []sample{} -// } - -// dir, _ := ioutil.TempDir("", "test") -// defer os.RemoveAll(dir) - -// hb := createTestHead(t, dir, minTime, maxTime) -// app := hb.Appender() - -// for _, l := range lbls { -// ls := labels.New(l...) -// series := []sample{} - -// ts := rand.Int63n(300) -// for i := 0; i < numDatapoints; i++ { -// v := rand.Float64() -// if ts >= minTime && ts <= maxTime { -// series = append(series, sample{ts, v}) -// } - -// _, err := app.Add(ls, ts, v) -// if ts >= minTime && ts <= maxTime { -// testutil.Ok(t, err) -// } else { -// testutil.EqualsError(t, err, ErrOutOfBounds.Error()) -// } - -// ts += rand.Int63n(timeInterval) + 1 -// } - -// seriesMap[labels.New(l...).String()] = series -// } - -// testutil.Ok(t, app.Commit()) - -// // Delete a time-range from each-selector. -// dels := []struct { -// ms []labels.Matcher -// drange Intervals -// }{ -// { -// ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, -// drange: Intervals{{300, 500}, {600, 670}}, -// }, -// { -// ms: []labels.Matcher{ -// labels.NewEqualMatcher("a", "b"), -// labels.NewEqualMatcher("job", "prom-k8s"), -// }, -// drange: Intervals{{300, 500}, {100, 670}}, -// }, -// { -// ms: []labels.Matcher{ -// labels.NewEqualMatcher("a", "c"), -// labels.NewEqualMatcher("instance", "localhost:9090"), -// labels.NewEqualMatcher("job", "prometheus"), -// }, -// drange: Intervals{{300, 400}, {100, 6700}}, -// }, -// // TODO: Add Regexp Matchers. -// } - -// for _, del := range dels { -// // Reset the deletes everytime. -// writeTombstoneFile(hb.dir, newEmptyTombstoneReader()) -// hb.tombstones = newEmptyTombstoneReader() - -// for _, r := range del.drange { -// testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) -// } - -// matched := labels.Slice{} -// for _, ls := range lbls { -// s := labels.Selector(del.ms) -// if s.Matches(ls) { -// matched = append(matched, ls) -// } -// } - -// sort.Sort(matched) - -// for i := 0; i < numRanges; i++ { -// mint := rand.Int63n(200) -// maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) - -// q := hb.Querier(mint, maxt) -// ss := q.Select(del.ms...) - -// // Build the mockSeriesSet. -// matchedSeries := make([]Series, 0, len(matched)) -// for _, m := range matched { -// smpls := boundedSamples(seriesMap[m.String()], mint, maxt) -// smpls = deletedSamples(smpls, del.drange) - -// // Only append those series for which samples exist as mockSeriesSet -// // doesn't skip series with no samples. -// // TODO: But sometimes SeriesSet returns an empty SeriesIterator -// if len(smpls) > 0 { -// matchedSeries = append(matchedSeries, newSeries( -// m.Map(), -// smpls, -// )) -// } -// } -// expSs := newListSeriesSet(matchedSeries) - -// // Compare both SeriesSets. -// for { -// eok, rok := expSs.Next(), ss.Next() - -// // Skip a series if iterator is empty. -// if rok { -// for !ss.At().Iterator().Next() { -// rok = ss.Next() -// if !rok { -// break -// } -// } -// } -// testutil.Equals(t, eok, rok, "next") - -// if !eok { -// break -// } -// sexp := expSs.At() -// sres := ss.At() - -// testutil.Equals(t, sexp.Labels(), sres.Labels(), "labels") - -// smplExp, errExp := expandSeriesIterator(sexp.Iterator()) -// smplRes, errRes := expandSeriesIterator(sres.Iterator()) - -// testutil.Equals(t, errExp, errRes, "samples error") -// testutil.Equals(t, smplExp, smplRes, "samples") -// } -// } -// } - -// return -// } + // Add again and test for presence. + app = hb.Appender() + _, err = app.Add(labels.Labels{{"a", "b"}}, 11, 1) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + q, err = NewBlockQuerier(hb, 0, 100000) + testutil.Ok(t, err) + res, err = q.Select(labels.NewEqualMatcher("a", "b")) + testutil.Ok(t, err) + testutil.Assert(t, res.Next(), "series don't exist") + exps := res.At() + it := exps.Iterator() + ressmpls, err := expandSeriesIterator(it) + testutil.Ok(t, err) + testutil.Equals(t, []sample{{11, 1}}, ressmpls) +} +func TestDelete_e2e(t *testing.T) { + numDatapoints := 1000 + numRanges := 1000 + timeInterval := int64(2) + // Create 8 series with 1000 data-points of different ranges, delete and run queries. + lbls := [][]labels.Label{ + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + } + seriesMap := map[string][]sample{} + for _, l := range lbls { + seriesMap[labels.New(l...).String()] = []sample{} + } + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) + hb, err := NewHead(nil, nil, nil, 100000) + testutil.Ok(t, err) + app := hb.Appender() + for _, l := range lbls { + ls := labels.New(l...) + series := []sample{} + ts := rand.Int63n(300) + for i := 0; i < numDatapoints; i++ { + v := rand.Float64() + _, err := app.Add(ls, ts, v) + testutil.Ok(t, err) + series = append(series, sample{ts, v}) + ts += rand.Int63n(timeInterval) + 1 + } + seriesMap[labels.New(l...).String()] = series + } + testutil.Ok(t, app.Commit()) + // Delete a time-range from each-selector. + dels := []struct { + ms []labels.Matcher + drange Intervals + }{ + { + ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, + drange: Intervals{{300, 500}, {600, 670}}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "b"), + labels.NewEqualMatcher("job", "prom-k8s"), + }, + drange: Intervals{{300, 500}, {100, 670}}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "c"), + labels.NewEqualMatcher("instance", "localhost:9090"), + labels.NewEqualMatcher("job", "prometheus"), + }, + drange: Intervals{{300, 400}, {100, 6700}}, + }, + // TODO: Add Regexp Matchers. + } + for _, del := range dels { + // Reset the deletes everytime. + hb.tombstones = NewMemTombstones() + for _, r := range del.drange { + testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) + } + matched := labels.Slice{} + for _, ls := range lbls { + s := labels.Selector(del.ms) + if s.Matches(ls) { + matched = append(matched, ls) + } + } + sort.Sort(matched) + for i := 0; i < numRanges; i++ { + q, err := NewBlockQuerier(hb, 0, 100000) + testutil.Ok(t, err) + defer q.Close() + ss, err := q.Select(del.ms...) + testutil.Ok(t, err) + // Build the mockSeriesSet. + matchedSeries := make([]Series, 0, len(matched)) + for _, m := range matched { + smpls := seriesMap[m.String()] + smpls = deletedSamples(smpls, del.drange) + // Only append those series for which samples exist as mockSeriesSet + // doesn't skip series with no samples. + // TODO: But sometimes SeriesSet returns an empty SeriesIterator + if len(smpls) > 0 { + matchedSeries = append(matchedSeries, newSeries( + m.Map(), + smpls, + )) + } + } + expSs := newListSeriesSet(matchedSeries) + // Compare both SeriesSets. + for { + eok, rok := expSs.Next(), ss.Next() + // Skip a series if iterator is empty. + if rok { + for !ss.At().Iterator().Next() { + rok = ss.Next() + if !rok { + break + } + } + } + testutil.Equals(t, eok, rok) + if !eok { + break + } + sexp := expSs.At() + sres := ss.At() + testutil.Equals(t, sexp.Labels(), sres.Labels()) + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + testutil.Equals(t, errExp, errRes) + testutil.Equals(t, smplExp, smplRes) + } + } + } + return +} func boundedSamples(full []sample, mint, maxt int64) []sample { for len(full) > 0 {