From 758e29258ba77d158062413d5d1feec99e298cb5 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Fri, 23 Sep 2022 14:01:10 +0530 Subject: [PATCH] Add/Improve unit tests for compaction with histogram Part 2 (#11343) Signed-off-by: Ganesh Vernekar Signed-off-by: Ganesh Vernekar --- tsdb/compact_test.go | 242 +++++++++++++------------------------------ 1 file changed, 71 insertions(+), 171 deletions(-) diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index c86d8c4c4f..1bb63ae9a1 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -38,6 +38,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/tsdb/tsdbutil" ) func TestSplitByRange(t *testing.T) { @@ -1298,29 +1299,77 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { func TestHeadCompactionWithHistograms(t *testing.T) { head, _ := newTestHead(t, DefaultBlockDuration, false) + require.NoError(t, head.Init(0)) t.Cleanup(func() { require.NoError(t, head.Close()) }) - require.NoError(t, head.Init(0)) - app := head.Appender(context.Background()) + minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() } + ctx := context.Background() + appendHistogram := func(lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { + t.Helper() + app := head.Appender(ctx) + for tsMinute := from; tsMinute <= to; tsMinute++ { + _, err := app.AppendHistogram(0, lbls, minute(tsMinute), h) + require.NoError(t, err) + *exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) + } - type timedHistogram struct { - t int64 - h *histogram.Histogram + require.NoError(t, app.Commit()) + } + appendFloat := func(lbls labels.Labels, from, to int, exp *[]tsdbutil.Sample) { + t.Helper() + app := head.Appender(ctx) + for tsMinute := from; tsMinute <= to; tsMinute++ { + _, err := app.Append(0, lbls, minute(tsMinute), float64(tsMinute)) + require.NoError(t, err) + *exp = append(*exp, sample{t: minute(tsMinute), v: float64(tsMinute)}) + } + require.NoError(t, app.Commit()) } - // Ingest samples. - numHistograms := 120 * 4 - timeStep := DefaultBlockDuration / int64(numHistograms) - expHists := make([]timedHistogram, 0, numHistograms) - l := labels.Labels{{Name: "a", Value: "b"}} - for i, h := range GenerateTestHistograms(numHistograms) { - _, err := app.AppendHistogram(0, l, int64(i)*timeStep, h) - require.NoError(t, err) - expHists = append(expHists, timedHistogram{int64(i) * timeStep, h}) + var ( + series1 = labels.FromStrings("foo", "bar1") + series2 = labels.FromStrings("foo", "bar2") + series3 = labels.FromStrings("foo", "bar3") + series4 = labels.FromStrings("foo", "bar4") + exp1, exp2, exp3, exp4 []tsdbutil.Sample + ) + h := &histogram.Histogram{ + Count: 11, + ZeroCount: 4, + ZeroThreshold: 0.001, + Sum: 35.5, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 1}, + {Offset: 1, Length: 2}, + }, + NegativeBuckets: []int64{1, 2, -1}, } - require.NoError(t, app.Commit()) + + // Series with only histograms. + appendHistogram(series1, 100, 105, h, &exp1) + + // Series starting with float and then getting histograms. + appendFloat(series2, 100, 102, &exp2) + appendHistogram(series2, 103, 105, h.Copy(), &exp2) + appendFloat(series2, 106, 107, &exp2) + appendHistogram(series2, 108, 109, h.Copy(), &exp2) + + // Series starting with histogram and then getting float. + appendHistogram(series3, 101, 103, h.Copy(), &exp3) + appendFloat(series3, 104, 106, &exp3) + appendHistogram(series3, 107, 108, h.Copy(), &exp3) + appendFloat(series3, 109, 110, &exp3) + + // A float only series. + appendFloat(series4, 100, 102, &exp4) // Compaction. mint := head.MinTime() @@ -1340,25 +1389,14 @@ func TestHeadCompactionWithHistograms(t *testing.T) { q, err := NewBlockQuerier(block, block.MinTime(), block.MaxTime()) require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, q.Close()) - }) - ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) - - require.True(t, ss.Next()) - s := ss.At() - require.False(t, ss.Next()) - - it := s.Iterator() - actHists := make([]timedHistogram, 0, len(expHists)) - for it.Next() == chunkenc.ValHistogram { - // TODO(beorn7): Test mixed series? - t, h := it.AtHistogram() - actHists = append(actHists, timedHistogram{t, h}) - } - - require.Equal(t, expHists, actHists) + actHists := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + require.Equal(t, map[string][]tsdbutil.Sample{ + series1.String(): exp1, + series2.String(): exp2, + series3.String(): exp3, + series4.String(): exp4, + }, actHists) } // Depending on numSeriesPerSchema, it can take few gigs of memory; @@ -1671,144 +1709,6 @@ func generateCustomHistograms(numHists, numBuckets, numSpans, gapBetweenSpans, s return r } -func TestSparseHistogramCompactionAndQuery(t *testing.T) { - dir := t.TempDir() - t.Cleanup(func() { - require.NoError(t, os.RemoveAll(dir)) - }) - opts := DefaultOptions() - opts.EnableNativeHistograms = true - // Exactly 3 times so that level 2 of compaction happens and tombstone - // deletion and compaction considers the level 2 blocks to be big enough. - opts.MaxBlockDuration = 3 * opts.MinBlockDuration - db, err := Open(dir, nil, nil, opts, nil) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, db.Close()) - }) - db.DisableCompactions() - - type timedHistogram struct { - t int64 - h *histogram.Histogram - } - expHists := make(map[string][]timedHistogram) - - series1Histograms := GenerateTestHistograms(20) - series2Histograms := GenerateTestHistograms(20) - idx1, idx2 := -1, -1 - addNextHists := func(ts int64, app storage.Appender) { - lbls1 := labels.Labels{{Name: "a", Value: "b"}} - lbls2 := labels.Labels{{Name: "a", Value: "c"}} - idx1++ - _, err := app.AppendHistogram(0, lbls1, ts, series1Histograms[idx1]) - require.NoError(t, err) - idx2++ - _, err = app.AppendHistogram(0, lbls2, ts, series2Histograms[idx2]) - require.NoError(t, err) - - l1, l2 := lbls1.String(), lbls2.String() - expHists[l1] = append(expHists[l1], timedHistogram{t: ts, h: series1Histograms[idx1]}) - expHists[l2] = append(expHists[l2], timedHistogram{t: ts, h: series2Histograms[idx2]}) - } - - testQuery := func() { - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) - require.NoError(t, err) - defer func() { - require.NoError(t, q.Close()) - }() - - ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*")) - actHists := make(map[string][]timedHistogram) - for ss.Next() { - s := ss.At() - it := s.Iterator() - for it.Next() == chunkenc.ValHistogram { - ts, h := it.AtHistogram() - actHists[s.Labels().String()] = append(actHists[s.Labels().String()], timedHistogram{ts, h.Copy()}) - } - require.NoError(t, it.Err()) - } - require.NoError(t, ss.Err()) - require.Equal(t, expHists, actHists) - } - - // Add histograms to create 1 block via compaction. - app := db.Appender(context.Background()) - for ts := int64(0); ts <= 2*DefaultBlockDuration; ts += DefaultBlockDuration / 2 { - addNextHists(ts, app) - } - require.NoError(t, app.Commit()) - testQuery() // Only the head block. - require.NoError(t, db.Compact()) - require.Equal(t, 1, len(db.Blocks())) - testQuery() // 1 persistent block and the head block. - - // Add histograms to create 2 more blocks via compaction. - app = db.Appender(context.Background()) - for ts := 5 * DefaultBlockDuration / 2; ts <= 4*DefaultBlockDuration; ts += DefaultBlockDuration / 2 { - addNextHists(ts, app) - } - require.NoError(t, app.Commit()) - require.NoError(t, db.Compact()) - require.Equal(t, 3, len(db.Blocks())) - testQuery() // >1 persistent block (and the head block). - - // Another block triggers compaction of the first 3 blocks into 1 block. - app = db.Appender(context.Background()) - for ts := 9 * DefaultBlockDuration / 2; ts <= 5*DefaultBlockDuration; ts += DefaultBlockDuration / 2 { - addNextHists(ts, app) - } - require.NoError(t, app.Commit()) - require.NoError(t, db.Compact()) - require.Equal(t, 2, len(db.Blocks())) - testQuery() - - require.Equal(t, int64(0), db.blocks[0].MinTime()) - require.Equal(t, 3*DefaultBlockDuration, db.blocks[0].MaxTime()) - require.Equal(t, 3*DefaultBlockDuration, db.blocks[1].MinTime()) - require.Equal(t, 4*DefaultBlockDuration, db.blocks[1].MaxTime()) - - // Add tombstones to the first block to make sure that the deletion works for histograms on compaction. - delTime := 2 * DefaultBlockDuration - err = db.Delete(0, delTime, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*")) - require.NoError(t, err) - require.Equal(t, uint64(2), db.blocks[0].Meta().Stats.NumTombstones) - // Truncate expected histograms to test the query after deletion. - for k, v := range expHists { - oldCount := len(v) - for i := 0; i < len(v); i++ { - if v[i].t > delTime { - expHists[k] = expHists[k][i:] - break - } - } - require.Less(t, len(expHists[k]), oldCount) - require.Greater(t, len(expHists[k]), 0) - } - testQuery() // Query with tombstones on persistent block. - - oldULID := db.blocks[0].Meta().ULID - require.NoError(t, db.Compact()) - require.Equal(t, 2, len(db.Blocks())) - newULID := db.blocks[0].Meta().ULID - require.NotEqual(t, oldULID, newULID) - require.Equal(t, uint64(0), db.blocks[0].Meta().Stats.NumTombstones) - testQuery() - - // Adding tombstones to head and testing query for that. - // Last sample was ts=5*DefaultBlockDuration, so a tombstone just to cover that. - err = db.Delete((5*DefaultBlockDuration)-1, (5*DefaultBlockDuration)+1, labels.MustNewMatcher(labels.MatchRegexp, "a", ".*")) - require.NoError(t, err) - // Remove last sample from expected. - for k := range expHists { - expHists[k] = expHists[k][:len(expHists[k])-1] - require.Greater(t, len(expHists[k]), 0) - } - testQuery() -} - func TestCompactBlockMetas(t *testing.T) { parent1 := ulid.MustNew(100, nil) parent2 := ulid.MustNew(200, nil)