From 79305e704ba8827907ef9cbe04e72af96ff52a30 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Thu, 8 Jul 2021 11:01:53 +0530 Subject: [PATCH] Compare block sizes with sparse histograms (#9045) Signed-off-by: Ganesh Vernekar --- tsdb/compact_test.go | 313 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 313 insertions(+) diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index d798f72f58..a6d6a7c54e 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -18,9 +18,11 @@ import ( "fmt" "io/ioutil" "math" + "math/rand" "os" "path" "path/filepath" + "sync" "testing" "time" @@ -1377,3 +1379,314 @@ func TestHeadCompactionWithHistograms(t *testing.T) { require.Equal(t, expHists, actHists) } + +// Depending on numSeriesPerSchema, it can take few gigs of memory; +// the test adds all samples to appender before committing instead of +// buffering the writes to make it run faster. +func TestSparseHistoSpaceSavings(t *testing.T) { + t.Skip() + + cases := []struct { + numSeriesPerSchema int + numBuckets int + numSpans int + gapBetweenSpans int + }{ + {1, 15, 1, 0}, + {1, 50, 1, 0}, + {1, 100, 1, 0}, + {1, 15, 3, 5}, + {1, 50, 3, 3}, + {1, 100, 3, 2}, + {100, 15, 1, 0}, + {100, 50, 1, 0}, + {100, 100, 1, 0}, + {100, 15, 3, 5}, + {100, 50, 3, 3}, + {100, 100, 3, 2}, + //{1000, 15, 1, 0}, + //{1000, 50, 1, 0}, + //{1000, 100, 1, 0}, + //{1000, 15, 3, 5}, + //{1000, 50, 3, 3}, + //{1000, 100, 3, 2}, + } + + type testSummary struct { + oldBlockTotalSeries int + oldBlockIndexSize int64 + oldBlockChunksSize int64 + oldBlockTotalSize int64 + + sparseBlockTotalSeries int + sparseBlockIndexSize int64 + sparseBlockChunksSize int64 + sparseBlockTotalSize int64 + + numBuckets int + numSpans int + gapBetweenSpans int + } + + var summaries []testSummary + + allSchemas := []int{-4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8} + schemaDescription := []string{"minus_4", "minus_3", "minus_2", "minus_1", "0", "1", "2", "3", "4", "5", "6", "7", "8"} + numHistograms := 120 * 4 // 15s scrape interval. + timeStep := DefaultBlockDuration / int64(numHistograms) + for _, c := range cases { + t.Run( + fmt.Sprintf("series=%d,span=%d,gap=%d,buckets=%d", + len(allSchemas)*c.numSeriesPerSchema, + c.numSpans, + c.gapBetweenSpans, + c.numBuckets, + ), + func(t *testing.T) { + oldHead, _ := newTestHead(t, DefaultBlockDuration, false) + t.Cleanup(func() { + require.NoError(t, oldHead.Close()) + }) + sparseHead, _ := newTestHead(t, DefaultBlockDuration, false) + t.Cleanup(func() { + require.NoError(t, sparseHead.Close()) + }) + + var allSparseSeries []struct { + baseLabels labels.Labels + hists []histogram.SparseHistogram + } + + for sid, schema := range allSchemas { + for i := 0; i < c.numSeriesPerSchema; i++ { + lbls := labels.Labels{ + {Name: "__name__", Value: fmt.Sprintf("rpc_durations_%d_histogram_seconds", i)}, + {Name: "instance", Value: "localhost:8080"}, + {Name: "job", Value: fmt.Sprintf("sparse_histogram_schema_%s", schemaDescription[sid])}, + } + allSparseSeries = append(allSparseSeries, struct { + baseLabels labels.Labels + hists []histogram.SparseHistogram + }{baseLabels: lbls, hists: generateCustomHistograms(numHistograms, c.numBuckets, c.numSpans, c.gapBetweenSpans, schema)}) + } + } + + oldApp := oldHead.Appender(context.Background()) + sparseApp := sparseHead.Appender(context.Background()) + numOldSeriesPerHistogram := 0 + + var oldULID ulid.ULID + var sparseULID ulid.ULID + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + // Ingest sparse histograms. + for _, ah := range allSparseSeries { + var ( + ref uint64 + err error + ) + for i := 0; i < numHistograms; i++ { + ts := int64(i) * timeStep + ref, err = sparseApp.AppendHistogram(ref, ah.baseLabels, ts, ah.hists[i]) + require.NoError(t, err) + } + } + require.NoError(t, sparseApp.Commit()) + + // Sparse head compaction. + mint := sparseHead.MinTime() + maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) + require.NoError(t, err) + sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) + require.NoError(t, err) + require.NotEqual(t, ulid.ULID{}, sparseULID) + }() + + wg.Add(1) + go func() { + defer wg.Done() + + // Ingest histograms the old way. + for _, ah := range allSparseSeries { + refs := make([]uint64, c.numBuckets+((c.numSpans-1)*c.gapBetweenSpans)) + for i := 0; i < numHistograms; i++ { + ts := int64(i) * timeStep + + h := ah.hists[i] + + numOldSeriesPerHistogram = 0 + it := histogram.CumulativeExpandSparseHistogram(h) + itIdx := 0 + var err error + for it.Next() { + numOldSeriesPerHistogram++ + b := it.At() + lbls := append(ah.baseLabels, labels.Label{Name: "le", Value: fmt.Sprintf("%.16f", b.Le)}) + refs[itIdx], err = oldApp.Append(refs[itIdx], lbls, ts, float64(b.Count)) + require.NoError(t, err) + itIdx++ + } + require.NoError(t, it.Err()) + // _count metric. + countLbls := ah.baseLabels.Copy() + countLbls[0].Value = countLbls[0].Value + "_count" + _, err = oldApp.Append(0, countLbls, ts, float64(h.Count)) + require.NoError(t, err) + numOldSeriesPerHistogram++ + + // _sum metric. + sumLbls := ah.baseLabels.Copy() + sumLbls[0].Value = sumLbls[0].Value + "_sum" + _, err = oldApp.Append(0, sumLbls, ts, h.Sum) + require.NoError(t, err) + numOldSeriesPerHistogram++ + } + } + + require.NoError(t, oldApp.Commit()) + + // Old head compaction. + mint := oldHead.MinTime() + maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) + require.NoError(t, err) + oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil) + require.NoError(t, err) + require.NotEqual(t, ulid.ULID{}, oldULID) + }() + + wg.Wait() + + oldBlockDir := filepath.Join(oldHead.opts.ChunkDirRoot, oldULID.String()) + sparseBlockDir := filepath.Join(sparseHead.opts.ChunkDirRoot, sparseULID.String()) + + oldSize, err := fileutil.DirSize(oldBlockDir) + require.NoError(t, err) + oldIndexSize, err := fileutil.DirSize(filepath.Join(oldBlockDir, "index")) + require.NoError(t, err) + oldChunksSize, err := fileutil.DirSize(filepath.Join(oldBlockDir, "chunks")) + require.NoError(t, err) + + sparseSize, err := fileutil.DirSize(sparseBlockDir) + require.NoError(t, err) + sparseIndexSize, err := fileutil.DirSize(filepath.Join(sparseBlockDir, "index")) + require.NoError(t, err) + sparseChunksSize, err := fileutil.DirSize(filepath.Join(sparseBlockDir, "chunks")) + require.NoError(t, err) + + summaries = append(summaries, testSummary{ + oldBlockTotalSeries: len(allSchemas) * c.numSeriesPerSchema * numOldSeriesPerHistogram, + oldBlockIndexSize: oldIndexSize, + oldBlockChunksSize: oldChunksSize, + oldBlockTotalSize: oldSize, + sparseBlockTotalSeries: len(allSchemas) * c.numSeriesPerSchema, + sparseBlockIndexSize: sparseIndexSize, + sparseBlockChunksSize: sparseChunksSize, + sparseBlockTotalSize: sparseSize, + numBuckets: c.numBuckets, + numSpans: c.numSpans, + gapBetweenSpans: c.gapBetweenSpans, + }) + }) + } + + for _, s := range summaries { + fmt.Printf(` +Meta: NumBuckets=%d, NumSpans=%d, GapBetweenSpans=%d +Old Block: NumSeries=%d, IndexSize=%d, ChunksSize=%d, TotalSize=%d +Sparse Block: NumSeries=%d, IndexSize=%d, ChunksSize=%d, TotalSize=%d +Savings: Index=%.2f%%, Chunks=%.2f%%, Total=%.2f%% +`, + s.numBuckets, s.numSpans, s.gapBetweenSpans, + s.oldBlockTotalSeries, s.oldBlockIndexSize, s.oldBlockChunksSize, s.oldBlockTotalSize, + s.sparseBlockTotalSeries, s.sparseBlockIndexSize, s.sparseBlockChunksSize, s.sparseBlockTotalSize, + 100*(1-float64(s.sparseBlockIndexSize)/float64(s.oldBlockIndexSize)), + 100*(1-float64(s.sparseBlockChunksSize)/float64(s.oldBlockChunksSize)), + 100*(1-float64(s.sparseBlockTotalSize)/float64(s.oldBlockTotalSize)), + ) + } +} + +func generateCustomHistograms(numHists, numBuckets, numSpans, gapBetweenSpans, schema int) (r []histogram.SparseHistogram) { + // First histogram with all the settings. + h := histogram.SparseHistogram{ + Sum: 1000 * rand.Float64(), + Schema: int32(schema), + } + + // Generate spans. + h.PositiveSpans = []histogram.Span{ + {Offset: int32(rand.Intn(10)), Length: uint32(numBuckets)}, + } + if numSpans > 1 { + spanWidth := numBuckets / numSpans + // First span gets those additional buckets. + h.PositiveSpans[0].Length = uint32(spanWidth + (numBuckets - spanWidth*numSpans)) + for i := 0; i < numSpans-1; i++ { + h.PositiveSpans = append(h.PositiveSpans, histogram.Span{Offset: int32(rand.Intn(gapBetweenSpans) + 1), Length: uint32(spanWidth)}) + } + } + + // Generate buckets. + v := int64(rand.Intn(30) + 1) + h.PositiveBuckets = []int64{v} + count := v + firstHistValues := []int64{v} + for i := 0; i < numBuckets-1; i++ { + delta := int64(rand.Intn(20)) + if rand.Int()%2 == 0 && firstHistValues[len(firstHistValues)-1] > delta { + // Randomly making delta negative such that curr value will be >0. + delta = -delta + } + + currVal := firstHistValues[len(firstHistValues)-1] + delta + count += currVal + firstHistValues = append(firstHistValues, currVal) + + h.PositiveBuckets = append(h.PositiveBuckets, delta) + } + + h.Count = uint64(count) + + r = append(r, h) + + // Remaining histograms with same spans but changed bucket values. + for j := 0; j < numHists-1; j++ { + newH := h.Copy() + newH.Sum = float64(j+1) * 1000 * rand.Float64() + + // Generate buckets. + count := int64(0) + currVal := int64(0) + for i := range newH.PositiveBuckets { + delta := int64(rand.Intn(10)) + if i == 0 { + newH.PositiveBuckets[i] += delta + currVal = newH.PositiveBuckets[i] + continue + } + currVal += newH.PositiveBuckets[i] + if rand.Int()%2 == 0 && (currVal-delta) > firstHistValues[i] { + // Randomly making delta negative such that curr value will be >0 + // and above the previous count since we are not doing resets here. + delta = -delta + } + newH.PositiveBuckets[i] += delta + currVal += delta + count += currVal + } + + newH.Count = uint64(count) + + r = append(r, newH) + h = newH + } + + return r +}