From 67871fd1f2b8f266d97b0c872afebb29740c727e Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Sun, 4 Jul 2021 16:12:37 +0530 Subject: [PATCH] Support compaction of Head block for histograms (#9044) * Update querier.go to support Head compaction with histograms Signed-off-by: Ganesh Vernekar * Add test for Head compaction with histograms Signed-off-by: Ganesh Vernekar * Fix tests Signed-off-by: Ganesh Vernekar --- tsdb/compact_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++++ tsdb/head_test.go | 3 ++ tsdb/querier.go | 39 +++++++++++++++++++++----- 3 files changed, 101 insertions(+), 7 deletions(-) diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index e30f2b190f..d798f72f58 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -25,10 +25,12 @@ import ( "time" "github.com/go-kit/log" + "github.com/oklog/ulid" "github.com/pkg/errors" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -1311,3 +1313,67 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { }) } } + +func TestHeadCompactionWithHistograms(t *testing.T) { + head, _ := newTestHead(t, DefaultBlockDuration, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + + require.NoError(t, head.Init(0)) + app := head.Appender(context.Background()) + + type timedHist struct { + t int64 + h histogram.SparseHistogram + } + + // Ingest samples. + numHistograms := 120 * 4 + timeStep := DefaultBlockDuration / int64(numHistograms) + expHists := make([]timedHist, 0, numHistograms) + l := labels.Labels{{Name: "a", Value: "b"}} + for i, h := range generateHistograms(numHistograms) { + _, err := app.AppendHistogram(0, l, int64(i)*timeStep, h) + require.NoError(t, err) + expHists = append(expHists, timedHist{int64(i) * timeStep, h}) + } + require.NoError(t, app.Commit()) + + // Compaction. + mint := head.MinTime() + maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) + require.NoError(t, err) + id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) + require.NoError(t, err) + require.NotEqual(t, ulid.ULID{}, id) + + // Open the block and query it and check the histograms. + block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, id.String()), nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, block.Close()) + }) + + q, err := NewBlockQuerier(block, block.MinTime(), block.MaxTime()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, q.Close()) + }) + + ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + + require.True(t, ss.Next()) + s := ss.At() + require.False(t, ss.Next()) + + it := s.Iterator() + actHists := make([]timedHist, 0, len(expHists)) + for it.Next() { + t, h := it.AtHistogram() + actHists = append(actHists, timedHist{t, h.Copy()}) + } + + require.Equal(t, expHists, actHists) +} diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 09516494ad..b5dd45d321 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2205,6 +2205,9 @@ func TestAppendHistogram(t *testing.T) { q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, q.Close()) + }) ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) diff --git a/tsdb/querier.go b/tsdb/querier.go index d35fa49226..c9daf23842 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -666,8 +666,18 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { } // Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened. - newChunk := chunkenc.NewXORChunk() - app, err := newChunk.Appender() + var ( + newChunk chunkenc.Chunk + app chunkenc.Appender + err error + ) + if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS { + newChunk = chunkenc.NewHistoChunk() + app, err = newChunk.Appender() + } else { + newChunk = chunkenc.NewXORChunk() + app, err = newChunk.Appender() + } if err != nil { p.err = err return false @@ -684,14 +694,29 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { return false } - t, v := p.currDelIter.At() - p.curr.MinTime = t - app.Append(t, v) - - for p.currDelIter.Next() { + var ( + t int64 + v float64 + h histogram.SparseHistogram + ) + if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS { + t, h = p.currDelIter.AtHistogram() + p.curr.MinTime = t + app.AppendHistogram(t, h.Copy()) + for p.currDelIter.Next() { + t, h = p.currDelIter.AtHistogram() + app.AppendHistogram(t, h.Copy()) + } + } else { t, v = p.currDelIter.At() + p.curr.MinTime = t app.Append(t, v) + for p.currDelIter.Next() { + t, v = p.currDelIter.At() + app.Append(t, v) + } } + if err := p.currDelIter.Err(); err != nil { p.err = errors.Wrap(err, "iterate chunk while re-encoding") return false