From 7bbbd55aada925efb3d2f925e2ade3f9fc6ebc48 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 3 May 2017 22:45:28 +0530 Subject: [PATCH 1/5] Fix bug where having one chunk can cause panic When we have only one chunk that is out of range, then we are returning it unpopulated (w/o calling `Chunk(ref)`). This would cause a panic downstream. Fixes: prometheus/prometheus#2629 Signed-off-by: Goutham Veeramachaneni --- querier.go | 4 +++- querier_test.go | 28 ++++++++++++++++++++++------ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/querier.go b/querier.go index ca4ca5738..a0276b230 100644 --- a/querier.go +++ b/querier.go @@ -420,7 +420,7 @@ func (s *populatedChunkSeries) Next() bool { continue } if c.MinTime > s.maxt { - chks = chks[from+1 : i] + chks = chks[:i] break } c.Chunk, s.err = s.chunks.Chunk(c.Ref) @@ -428,6 +428,8 @@ func (s *populatedChunkSeries) Next() bool { return false } } + + chks = chks[from+1:] if len(chks) == 0 { continue } diff --git a/querier_test.go b/querier_test.go index 73359926c..864e76cbb 100644 --- a/querier_test.go +++ b/querier_test.go @@ -41,6 +41,12 @@ type mockSeries struct { iterator func() SeriesIterator } +func newSeries(l map[string]string, s []sample) Series { + return &mockSeries{ + labels: func() labels.Labels { return labels.FromMap(l) }, + iterator: func() SeriesIterator { return newListSeriesIterator(s) }, + } +} func (m *mockSeries) Labels() labels.Labels { return m.labels() } func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } @@ -81,12 +87,6 @@ func (it *listSeriesIterator) Err() error { } func TestMergedSeriesSet(t *testing.T) { - newSeries := func(l map[string]string, s []sample) Series { - return &mockSeries{ - labels: func() labels.Labels { return labels.FromMap(l) }, - iterator: func() SeriesIterator { return newListSeriesIterator(s) }, - } - } cases := []struct { // The input sets in order (samples in series in b are strictly @@ -885,6 +885,22 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { p.maxt = 9 require.False(t, p.Next()) + // Test the case where 1 chunk could cause an unpopulated chunk to be returned. + chunkMetas = [][]*ChunkMeta{ + { + {MinTime: 1, MaxTime: 2, Ref: 1}, + }, + } + + m = &mockChunkSeriesSet{l: lbls, cm: chunkMetas, i: -1} + p = &populatedChunkSeries{ + set: m, + chunks: cr, + + mint: 10, + maxt: 15, + } + require.False(t, p.Next()) return } From 0908b0d27e49dfe4ae61fae57837f16a30446de6 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 3 May 2017 22:50:34 +0530 Subject: [PATCH 2/5] Add an end-to-end test for headBlock Signed-off-by: Goutham Veeramachaneni --- head_test.go | 218 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) diff --git a/head_test.go b/head_test.go index 63f6da6f7..ea40ab811 100644 --- a/head_test.go +++ b/head_test.go @@ -16,7 +16,9 @@ package tsdb import ( "io/ioutil" "math" + "math/rand" "os" + "sort" "testing" "unsafe" @@ -161,3 +163,219 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { t.Fatalf("Expected error amending sample, got: %s", err) } } + +func TestHeadBlock_e2e(t *testing.T) { + numDatapoints := 1000 + numRanges := 1000 + timeInterval := int64(3) + // Create 8 series with 1000 data-points of different ranges and run queries. + lbls := [][]labels.Label{ + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + } + + seriesMap := map[string][]sample{} + for _, l := range lbls { + seriesMap[labels.New(l...).String()] = []sample{} + } + + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + require.NoError(t, err) + app := hb.Appender() + + for _, l := range lbls { + ls := labels.New(l...) + series := seriesMap[labels.New(l...).String()] + + ts := rand.Int63n(300) + for i := 0; i < numDatapoints; i++ { + v := rand.Float64() + series = append(series, sample{ts, v}) + + _, err := app.Add(ls, ts, v) + require.NoError(t, err) + + ts += rand.Int63n(timeInterval) + 1 + } + + seriesMap[labels.New(l...).String()] = series + } + + require.NoError(t, app.Commit()) + + // Query each selector on 1000 random time-ranges. + queries := []struct { + ms []labels.Matcher + }{ + { + ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "b"), + labels.NewEqualMatcher("job", "prom-k8s"), + }, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "c"), + labels.NewEqualMatcher("instance", "localhost:9090"), + labels.NewEqualMatcher("job", "prometheus"), + }, + }, + // TODO: Add Regexp Matchers. + } + + for _, qry := range queries { + matched := []labels.Labels{} + Outer: + for _, ls := range lbls { + for _, m := range qry.ms { + if !matchLSet(m, ls) { + continue Outer + } + } + + matched = append(matched, ls) + } + + sort.Slice(matched, func(i, j int) bool { + return labels.Compare(matched[i], matched[j]) < 0 + }) + + for i := 0; i < numRanges; i++ { + mint := rand.Int63n(300) + maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) + + q := hb.Querier(mint, maxt) + ss := q.Select(qry.ms...) + + // Build the mockSeriesSet. + matchedSeries := make([]Series, 0, len(matched)) + for _, m := range matched { + smpls := boundedSamples(seriesMap[m.String()], mint, maxt) + + // Only append those series for which samples exist as mockSeriesSet + // doesn't skip series with no samples. + // TODO: But sometimes SeriesSet returns an empty SeriesIterator + if len(smpls) > 0 { + matchedSeries = append(matchedSeries, newSeries( + m.Map(), + smpls, + )) + } + } + expSs := newListSeriesSet(matchedSeries) + + // Compare both SeriesSets. + for { + eok, rok := expSs.Next(), ss.Next() + + // HACK: Skip a series if iterator is empty. + if rok { + for !ss.At().Iterator().Next() { + rok = ss.Next() + if !rok { + break + } + } + } + + require.Equal(t, eok, rok, "next") + + if !eok { + break + } + sexp := expSs.At() + sres := ss.At() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + } + } + + return +} + +func boundedSamples(full []sample, mint, maxt int64) []sample { + start, end := -1, -1 + for i, s := range full { + if s.t >= mint { + start = i + break + } + } + if start == -1 { + start = len(full) + } + + for i, s := range full[start:] { + if s.t > maxt { + end = start + i + break + } + } + + if end == -1 { + end = len(full) + } + + return full[start:end] +} + +func matchLSet(m labels.Matcher, ls labels.Labels) bool { + for _, l := range ls { + if m.Name() == l.Name && m.Matches(l.Value) { + return true + } + } + + return false +} From 5d2e72269bfd4bbb8c5c3cbfdb08962b39cac2f1 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 5 May 2017 19:22:07 +0530 Subject: [PATCH 3/5] Simplfied loops and functions Signed-off-by: Goutham Veeramachaneni --- head_test.go | 59 +++++++++++++++------------------------------------- 1 file changed, 17 insertions(+), 42 deletions(-) diff --git a/head_test.go b/head_test.go index ea40ab811..540081ace 100644 --- a/head_test.go +++ b/head_test.go @@ -226,7 +226,7 @@ func TestHeadBlock_e2e(t *testing.T) { for _, l := range lbls { ls := labels.New(l...) - series := seriesMap[labels.New(l...).String()] + series := []sample{} ts := rand.Int63n(300) for i := 0; i < numDatapoints; i++ { @@ -268,21 +268,15 @@ func TestHeadBlock_e2e(t *testing.T) { } for _, qry := range queries { - matched := []labels.Labels{} - Outer: + matched := labels.Slice{} for _, ls := range lbls { - for _, m := range qry.ms { - if !matchLSet(m, ls) { - continue Outer - } + s := labels.Selector(qry.ms) + if s.Matches(ls) { + matched = append(matched, ls) } - - matched = append(matched, ls) } - sort.Slice(matched, func(i, j int) bool { - return labels.Compare(matched[i], matched[j]) < 0 - }) + sort.Sort(matched) for i := 0; i < numRanges; i++ { mint := rand.Int63n(300) @@ -312,7 +306,7 @@ func TestHeadBlock_e2e(t *testing.T) { for { eok, rok := expSs.Next(), ss.Next() - // HACK: Skip a series if iterator is empty. + // Skip a series if iterator is empty. if rok { for !ss.At().Iterator().Next() { rok = ss.Next() @@ -345,37 +339,18 @@ func TestHeadBlock_e2e(t *testing.T) { } func boundedSamples(full []sample, mint, maxt int64) []sample { - start, end := -1, -1 + for len(full) > 0 { + if full[0].t >= mint { + break + } + full = full[1:] + } for i, s := range full { - if s.t >= mint { - start = i - break - } - } - if start == -1 { - start = len(full) - } - - for i, s := range full[start:] { + // Terminate on the first sample larger than maxt. if s.t > maxt { - end = start + i - break + return full[:i] } } - - if end == -1 { - end = len(full) - } - - return full[start:end] -} - -func matchLSet(m labels.Matcher, ls labels.Labels) bool { - for _, l := range ls { - if m.Name() == l.Name && m.Matches(l.Value) { - return true - } - } - - return false + // maxt is after highest sample. + return full } From c1939b713602b6752150d3fd1b974f022baeec58 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 5 May 2017 19:34:59 +0530 Subject: [PATCH 4/5] Simply loop away from using tracking variables. Signed-off-by: Goutham Veeramachaneni --- querier.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/querier.go b/querier.go index a0276b230..97970e830 100644 --- a/querier.go +++ b/querier.go @@ -413,12 +413,15 @@ func (s *populatedChunkSeries) Next() bool { for s.set.Next() { lset, chks := s.set.At() - from := -1 - for i, c := range chks { - if c.MaxTime < s.mint { - from = i - continue + for len(chks) > 0 { + if chks[0].MaxTime >= s.mint { + break } + chks = chks[1:] + } + + // Break out at the first chunk that has no overlap with mint, maxt. + for i, c := range chks { if c.MinTime > s.maxt { chks = chks[:i] break @@ -429,7 +432,6 @@ func (s *populatedChunkSeries) Next() bool { } } - chks = chks[from+1:] if len(chks) == 0 { continue } From 8096d11e4e8157fc38660e193b97b912deec04f6 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 5 May 2017 19:52:11 +0530 Subject: [PATCH 5/5] Add bounds check to headBlockAppender Signed-off-by: Goutham Veeramachaneni --- head.go | 4 ++++ head_test.go | 14 +++++++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/head.go b/head.go index b350af799..6b8adebf6 100644 --- a/head.go +++ b/head.go @@ -296,6 +296,10 @@ type refdSample struct { } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { + if !a.inBounds(t) { + return 0, ErrOutOfBounds + } + hash := lset.Hash() if ms := a.get(hash, lset); ms != nil { diff --git a/head_test.go b/head_test.go index 540081ace..32960ffcb 100644 --- a/head_test.go +++ b/head_test.go @@ -168,6 +168,8 @@ func TestHeadBlock_e2e(t *testing.T) { numDatapoints := 1000 numRanges := 1000 timeInterval := int64(3) + maxTime := int64(2 * 1000) + minTime := int64(200) // Create 8 series with 1000 data-points of different ranges and run queries. lbls := [][]labels.Label{ { @@ -220,7 +222,7 @@ func TestHeadBlock_e2e(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime) require.NoError(t, err) app := hb.Appender() @@ -231,10 +233,16 @@ func TestHeadBlock_e2e(t *testing.T) { ts := rand.Int63n(300) for i := 0; i < numDatapoints; i++ { v := rand.Float64() - series = append(series, sample{ts, v}) + if ts >= minTime && ts <= maxTime { + series = append(series, sample{ts, v}) + } _, err := app.Add(ls, ts, v) - require.NoError(t, err) + if ts >= minTime && ts <= maxTime { + require.NoError(t, err) + } else { + require.Error(t, ErrOutOfBounds, err) + } ts += rand.Int63n(timeInterval) + 1 }