From eedb86783ef42daa16c9bf4f560cd1d9151191fc Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Mon, 16 Aug 2021 18:52:29 +0530 Subject: [PATCH] Fix queries on blocks for sparse histograms and add unit test (#9209) Signed-off-by: Ganesh Vernekar --- storage/merge.go | 39 +++++++++++++++++---- tsdb/compact_test.go | 80 +++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 109 insertions(+), 10 deletions(-) diff --git a/storage/merge.go b/storage/merge.go index 1ff041c40..3cf4a8447 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -468,7 +468,11 @@ func (c *chainSampleIterator) Seek(t int64) bool { } if len(c.h) > 0 { c.curr = heap.Pop(&c.h).(chunkenc.Iterator) - c.lastt, _ = c.curr.At() + if c.curr.ChunkEncoding() == chunkenc.EncSHS { + c.lastt, _ = c.curr.AtHistogram() + } else { + c.lastt, _ = c.curr.At() + } return true } c.curr = nil @@ -516,7 +520,11 @@ func (c *chainSampleIterator) Next() bool { var currt int64 for { if c.curr.Next() { - currt, _ = c.curr.At() + if c.curr.ChunkEncoding() == chunkenc.EncSHS { + currt, _ = c.curr.AtHistogram() + } else { + currt, _ = c.curr.At() + } if currt == c.lastt { // Ignoring sample for the same timestamp. continue @@ -528,7 +536,13 @@ func (c *chainSampleIterator) Next() bool { } // Check current iterator with the top of the heap. - if nextt, _ := c.h[0].At(); currt < nextt { + var nextt int64 + if c.h[0].ChunkEncoding() == chunkenc.EncSHS { + nextt, _ = c.h[0].AtHistogram() + } else { + nextt, _ = c.h[0].At() + } + if currt < nextt { // Current iterator has smaller timestamp than the heap. break } @@ -541,7 +555,11 @@ func (c *chainSampleIterator) Next() bool { } c.curr = heap.Pop(&c.h).(chunkenc.Iterator) - currt, _ = c.curr.At() + if c.curr.ChunkEncoding() == chunkenc.EncSHS { + currt, _ = c.curr.AtHistogram() + } else { + currt, _ = c.curr.At() + } if currt != c.lastt { break } @@ -565,8 +583,17 @@ func (h samplesIteratorHeap) Len() int { return len(h) } func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h samplesIteratorHeap) Less(i, j int) bool { - at, _ := h[i].At() - bt, _ := h[j].At() + var at, bt int64 + if h[i].ChunkEncoding() == chunkenc.EncSHS { + at, _ = h[i].AtHistogram() + } else { + at, _ = h[i].At() + } + if h[j].ChunkEncoding() == chunkenc.EncSHS { + bt, _ = h[j].AtHistogram() + } else { + bt, _ = h[j].At() + } return at < bt } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index a73acb683..02c9bd342 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1692,7 +1692,7 @@ func generateCustomHistograms(numHists, numBuckets, numSpans, gapBetweenSpans, s return r } -func TestSparseHistogramCompaction(t *testing.T) { +func TestSparseHistogramCompactionAndQuery(t *testing.T) { dir, err := ioutil.TempDir("", "test") require.NoError(t, err) t.Cleanup(func() { @@ -1709,6 +1709,12 @@ func TestSparseHistogramCompaction(t *testing.T) { }) db.DisableCompactions() + type timedHist struct { + t int64 + h histogram.SparseHistogram + } + expHists := make(map[string][]timedHist) + series1Histograms := generateHistograms(20) series2Histograms := generateHistograms(20) idx1, idx2 := -1, -1 @@ -1721,16 +1727,54 @@ func TestSparseHistogramCompaction(t *testing.T) { idx2++ _, err = app.AppendHistogram(0, lbls2, ts, series2Histograms[idx2]) require.NoError(t, err) + + l1, l2 := lbls1.String(), lbls2.String() + expHists[l1] = append(expHists[l1], timedHist{t: ts, h: series1Histograms[idx1]}) + expHists[l2] = append(expHists[l2], timedHist{t: ts, h: series2Histograms[idx2]}) } - // Add histograms to create 3 blocks via compaction. + testQuery := func() { + q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + require.NoError(t, err) + defer func() { + require.NoError(t, q.Close()) + }() + + ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*")) + actHists := make(map[string][]timedHist) + for ss.Next() { + s := ss.At() + it := s.Iterator() + for it.Next() { + ts, h := it.AtHistogram() + actHists[s.Labels().String()] = append(actHists[s.Labels().String()], timedHist{ts, h.Copy()}) + } + require.NoError(t, it.Err()) + } + require.NoError(t, ss.Err()) + require.Equal(t, expHists, actHists) + } + + // Add histograms to create 1 block via compaction. app := db.Appender(context.Background()) - for ts := int64(0); ts <= 4*DefaultBlockDuration; ts += DefaultBlockDuration / 2 { + for ts := int64(0); ts <= 2*DefaultBlockDuration; ts += DefaultBlockDuration / 2 { + addNextHists(ts, app) + } + require.NoError(t, app.Commit()) + testQuery() // Only the head block. + require.NoError(t, db.Compact()) + require.Equal(t, 1, len(db.Blocks())) + testQuery() // 1 persistent block and the head block. + + // Add histograms to create 2 more blocks via compaction. + app = db.Appender(context.Background()) + for ts := 5 * DefaultBlockDuration / 2; ts <= 4*DefaultBlockDuration; ts += DefaultBlockDuration / 2 { addNextHists(ts, app) } require.NoError(t, app.Commit()) require.NoError(t, db.Compact()) require.Equal(t, 3, len(db.Blocks())) + testQuery() // >1 persistent block (and the head block). // Another block triggers compaction of the first 3 blocks into 1 block. app = db.Appender(context.Background()) @@ -1740,6 +1784,7 @@ func TestSparseHistogramCompaction(t *testing.T) { require.NoError(t, app.Commit()) require.NoError(t, db.Compact()) require.Equal(t, 2, len(db.Blocks())) + testQuery() require.Equal(t, int64(0), db.blocks[0].MinTime()) require.Equal(t, 3*DefaultBlockDuration, db.blocks[0].MaxTime()) @@ -1747,13 +1792,40 @@ func TestSparseHistogramCompaction(t *testing.T) { require.Equal(t, 4*DefaultBlockDuration, db.blocks[1].MaxTime()) // Add tombstones to the first block to make sure that the deletion works for histograms on compaction. - err = db.Delete(0, 2*DefaultBlockDuration, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*")) + delTime := 2 * DefaultBlockDuration + err = db.Delete(0, delTime, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*")) require.NoError(t, err) require.Equal(t, uint64(2), db.blocks[0].Meta().Stats.NumTombstones) + // Truncate expected histograms to test the query after deletion. + for k, v := range expHists { + oldCount := len(v) + for i := 0; i < len(v); i++ { + if v[i].t > delTime { + expHists[k] = expHists[k][i:] + break + } + } + require.Less(t, len(expHists[k]), oldCount) + require.Greater(t, len(expHists[k]), 0) + } + testQuery() // Query with tombstones on persistent block. oldULID := db.blocks[0].Meta().ULID require.NoError(t, db.Compact()) + require.Equal(t, 2, len(db.Blocks())) newULID := db.blocks[0].Meta().ULID require.NotEqual(t, oldULID, newULID) require.Equal(t, uint64(0), db.blocks[0].Meta().Stats.NumTombstones) + testQuery() + + // Adding tombstones to head and testing query for that. + // Last sample was ts=5*DefaultBlockDuration, so a tombstone just to cover that. + err = db.Delete((5*DefaultBlockDuration)-1, (5*DefaultBlockDuration)+1, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*")) + require.NoError(t, err) + // Remove last sample from expected. + for k := range expHists { + expHists[k] = expHists[k][:len(expHists[k])-1] + require.Greater(t, len(expHists[k]), 0) + } + testQuery() }