From 85e6686f84c4e3bfed820047a03ae355292b04b1 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 13 Oct 2021 15:56:50 +0530 Subject: [PATCH 1/3] Add unit test for counter reset header Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/histogram.go | 22 ++++++-- tsdb/head.go | 24 ++++---- tsdb/head_append.go | 14 ++--- tsdb/head_test.go | 112 +++++++++++++++++++++++++++++++++++-- 4 files changed, 142 insertions(+), 30 deletions(-) diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index ad0dbe9f3..c0b849056 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -82,9 +82,15 @@ func (c *HistogramChunk) Meta() ( type CounterResetHeader byte const ( - CounterReset CounterResetHeader = 0b10000000 - NotCounterReset CounterResetHeader = 0b01000000 - GaugeType CounterResetHeader = 0b11000000 + // CounterReset means there was definitely a counter reset that resulted in this chunk. + CounterReset CounterResetHeader = 0b10000000 + // NotCounterReset means there was definitely no counter reset when cutting this chunk. + NotCounterReset CounterResetHeader = 0b01000000 + // GaugeType means the histograms represent a gauge instead of counters, hence we cannot make + // sense of counter reset in this case. + GaugeType CounterResetHeader = 0b11000000 + // UnknownCounterReset means we cannot say if this was a counter reset or not and not sure + // if this is a gauge type histogram or not. UnknownCounterReset CounterResetHeader = 0b00000000 ) @@ -400,7 +406,12 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) { writeHistogramChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) a.schema = h.Schema a.zThreshold = h.ZeroThreshold - a.pSpans, a.nSpans = h.PositiveSpans, h.NegativeSpans + + a.pSpans = make([]histogram.Span, len(h.PositiveSpans)) + copy(a.pSpans, h.PositiveSpans) + a.nSpans = make([]histogram.Span, len(h.NegativeSpans)) + copy(a.nSpans, h.NegativeSpans) + numPBuckets, numNBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans) a.pBuckets = make([]int64, numPBuckets) a.nBuckets = make([]int64, numNBuckets) @@ -486,7 +497,8 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) { a.cntDelta = cntDelta a.zCntDelta = zCntDelta - a.pBuckets, a.nBuckets = h.PositiveBuckets, h.NegativeBuckets + copy(a.pBuckets, h.PositiveBuckets) + copy(a.nBuckets, h.NegativeBuckets) // Note that the bucket deltas were already updated above. a.sum = h.Sum } diff --git a/tsdb/head.go b/tsdb/head.go index fffee9f70..eac11bf22 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -273,8 +273,8 @@ type headMetrics struct { // Sparse histogram metrics for experiments. // TODO: remove these in the final version. - sparseHistogramSamplesTotal prometheus.Counter - sparseHistogramSeries prometheus.Gauge + histogramSamplesTotal prometheus.Counter + histogramSeries prometheus.Gauge } func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { @@ -373,13 +373,13 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_snapshot_replay_error_total", Help: "Total number snapshot replays that failed.", }), - sparseHistogramSamplesTotal: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_sparse_histogram_samples_total", - Help: "Total number of sparse histograms samples added.", + histogramSamplesTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_histogram_samples_total", + Help: "Total number of histograms samples added.", }), - sparseHistogramSeries: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_sparse_histogram_series", - Help: "Number of sparse histogram series currently present in the head block.", + histogramSeries: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_histogram_series", + Help: "Number of histogram series currently present in the head block.", }), } @@ -408,8 +408,8 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.checkpointCreationTotal, m.mmapChunkCorruptionTotal, m.snapshotReplayErrorTotal, - m.sparseHistogramSamplesTotal, - m.sparseHistogramSeries, + m.histogramSamplesTotal, + m.histogramSeries, // Metrics bound to functions and not needed in tests // can be created and registered on the spot. prometheus.NewGaugeFunc(prometheus.GaugeOpts{ @@ -619,7 +619,7 @@ func (h *Head) Init(minValidTime int64) error { } } } - h.metrics.sparseHistogramSeries.Set(float64(sparseHistogramSeries)) + h.metrics.histogramSeries.Set(float64(sparseHistogramSeries)) } walReplayDuration := time.Since(start) @@ -1145,7 +1145,7 @@ func (h *Head) gc() int64 { h.metrics.seriesRemoved.Add(float64(seriesRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) - h.metrics.sparseHistogramSeries.Sub(float64(sparseHistogramSeriesDeleted)) + h.metrics.histogramSeries.Sub(float64(sparseHistogramSeriesDeleted)) h.numSeries.Sub(uint64(seriesRemoved)) // Remove deleted series IDs from the postings lists. diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 7364d1d94..6ff989f5e 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -398,7 +398,7 @@ func (a *headAppender) AppendHistogram(ref uint64, lset labels.Labels, t int64, } s.histogramSeries = true if created { - a.head.metrics.sparseHistogramSeries.Inc() + a.head.metrics.histogramSeries.Inc() a.series = append(a.series, record.RefSeries{ Ref: s.ref, Labels: lset, @@ -561,7 +561,7 @@ func (a *headAppender) Commit() (err error) { series.Unlock() if ok { - a.head.metrics.sparseHistogramSamplesTotal.Inc() + a.head.metrics.histogramSamplesTotal.Inc() } else { total-- a.head.metrics.outOfOrderSamples.Inc() @@ -606,7 +606,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) appendHistogram(t int64, sh histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendHistogram(t int64, h histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper chunk reference afterwards. // We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk, // we need to know if there was also a counter reset or not to set the meta properly. @@ -616,7 +616,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.Histogram, appendID ui okToAppend, counterReset bool ) if app != nil { - positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(sh) + positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h) } c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper) @@ -636,7 +636,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.Histogram, appendID ui // New buckets have appeared. We need to recode all // prior histogram samples within the chunk before we // can process this one. - chunk, app := app.Recode(positiveInterjections, negativeInterjections, sh.PositiveSpans, sh.NegativeSpans) + chunk, app := app.Recode(positiveInterjections, negativeInterjections, h.PositiveSpans, h.NegativeSpans) s.headChunk = &memChunk{ minTime: s.headChunk.minTime, maxTime: s.headChunk.maxTime, @@ -657,7 +657,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.Histogram, appendID ui hc.SetCounterResetHeader(header) } - s.app.AppendHistogram(t, sh) + s.app.AppendHistogram(t, h) s.histogramSeries = true c.maxTime = t @@ -665,7 +665,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.Histogram, appendID ui s.histogramBuf[0] = s.histogramBuf[1] s.histogramBuf[1] = s.histogramBuf[2] s.histogramBuf[2] = s.histogramBuf[3] - s.histogramBuf[3] = histogramSample{t: t, h: sh} + s.histogramBuf[3] = histogramSample{t: t, h: h} if appendID > 0 { s.txs.add(appendID) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 852e3d847..a34d23e25 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2950,7 +2950,7 @@ func TestSnapshotError(t *testing.T) { require.Equal(t, 0, len(tm)) } -func TestSparseHistogramMetrics(t *testing.T) { +func TestHistogramMetrics(t *testing.T) { head, _ := newTestHead(t, 1000, false) t.Cleanup(func() { require.NoError(t, head.Close()) @@ -2971,8 +2971,8 @@ func TestSparseHistogramMetrics(t *testing.T) { } } - require.Equal(t, float64(expHSeries), prom_testutil.ToFloat64(head.metrics.sparseHistogramSeries)) - require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.sparseHistogramSamplesTotal)) + require.Equal(t, float64(expHSeries), prom_testutil.ToFloat64(head.metrics.histogramSeries)) + require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.histogramSamplesTotal)) require.NoError(t, head.Close()) w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) @@ -2981,11 +2981,11 @@ func TestSparseHistogramMetrics(t *testing.T) { require.NoError(t, err) require.NoError(t, head.Init(0)) - require.Equal(t, float64(expHSeries), prom_testutil.ToFloat64(head.metrics.sparseHistogramSeries)) - require.Equal(t, float64(0), prom_testutil.ToFloat64(head.metrics.sparseHistogramSamplesTotal)) // Counter reset. + require.Equal(t, float64(expHSeries), prom_testutil.ToFloat64(head.metrics.histogramSeries)) + require.Equal(t, float64(0), prom_testutil.ToFloat64(head.metrics.histogramSamplesTotal)) // Counter reset. } -func TestSparseHistogramStaleSample(t *testing.T) { +func TestHistogramStaleSample(t *testing.T) { l := labels.Labels{{Name: "a", Value: "b"}} numHistograms := 20 head, _ := newTestHead(t, 100000, false) @@ -3078,3 +3078,103 @@ func TestSparseHistogramStaleSample(t *testing.T) { require.Equal(t, 1, len(s.mmappedChunks)) testQuery(2) } + +func TestHistogramCounterResetHeader(t *testing.T) { + l := labels.Labels{{Name: "a", Value: "b"}} + head, _ := newTestHead(t, 1000, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + ts := int64(0) + appendHistogram := func(h histogram.Histogram) { + ts++ + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, ts, h) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + var expHeaders []chunkenc.CounterResetHeader + checkExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) { + expHeaders = append(expHeaders, newHeaders...) + + ms, _, err := head.getOrCreate(l.Hash(), l) + require.NoError(t, err) + require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk. + + for i, mmapChunk := range ms.mmappedChunks { + chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref) + require.NoError(t, err) + require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + } + require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + } + + h := generateHistograms(1)[0] + if len(h.NegativeBuckets) == 0 { + h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...) + h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...) + } + h.PositiveBuckets[0] = 100 + h.NegativeBuckets[0] = 100 + + // First histogram is UnknownCounterReset. + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.UnknownCounterReset) + + // Another normal histogram. + h.Count++ + appendHistogram(h) + checkExpCounterResetHeader() + + // Counter reset via Count. + h.Count-- + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.CounterReset) + + // Add 2 non-counter reset histograms. + for i := 0; i < 250; i++ { + appendHistogram(h) + } + checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset) + + // Changing schema will cut a new chunk with unknown counter reset. + h.Schema++ + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.UnknownCounterReset) + + // Changing schema will zero threshold a new chunk with unknown counter reset. + h.ZeroThreshold += 0.01 + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.UnknownCounterReset) + + // Counter reset by removing a positive bucket. + h.PositiveSpans[1].Length-- + h.PositiveBuckets = h.PositiveBuckets[1:] + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.CounterReset) + + // Counter reset by removing a negative bucket. + h.NegativeSpans[1].Length-- + h.NegativeBuckets = h.NegativeBuckets[1:] + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.CounterReset) + + // Add 2 non-counter reset histograms. Just to have some non-counter reset chunks in between. + for i := 0; i < 250; i++ { + appendHistogram(h) + } + checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset) + + // Counter reset with counter reset in a positive bucket. + h.PositiveBuckets[len(h.PositiveBuckets)-1]-- + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.CounterReset) + + // Counter reset with counter reset in a negative bucket. + h.NegativeBuckets[len(h.NegativeBuckets)-1]-- + appendHistogram(h) + checkExpCounterResetHeader(chunkenc.CounterReset) +} From 4e206c7c774a9b55d7b1c73ff10e721f51fc3f38 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 13 Oct 2021 20:23:31 +0530 Subject: [PATCH 2/3] Fix reviews Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/histogram.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index c0b849056..fb5f3194a 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -86,11 +86,10 @@ const ( CounterReset CounterResetHeader = 0b10000000 // NotCounterReset means there was definitely no counter reset when cutting this chunk. NotCounterReset CounterResetHeader = 0b01000000 - // GaugeType means the histograms represent a gauge instead of counters, hence we cannot make - // sense of counter reset in this case. + // GaugeType means this chunk contains a gauge histogram, where counter resets do not happen. GaugeType CounterResetHeader = 0b11000000 - // UnknownCounterReset means we cannot say if this was a counter reset or not and not sure - // if this is a gauge type histogram or not. + // UnknownCounterReset means we cannot say if this chunk was created due to a counter reset or not. + // An explicit counter reset detection needs to happen during query time. UnknownCounterReset CounterResetHeader = 0b00000000 ) From dcaf568279983c4aff32c7836e263c40ddd9a4b8 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 13 Oct 2021 20:27:48 +0530 Subject: [PATCH 3/3] Metadata -> Layout renaming Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/histogram.go | 27 +++++++++++++-------------- tsdb/chunkenc/histogram_meta.go | 21 ++++++++------------- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index fb5f3194a..9e58af1ab 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -64,18 +64,18 @@ func (c *HistogramChunk) NumSamples() int { return int(binary.BigEndian.Uint16(c.Bytes())) } -// Meta returns the histogram metadata. Only call this on chunks that have at +// Layout returns the histogram layout. Only call this on chunks that have at // least one sample. -func (c *HistogramChunk) Meta() ( +func (c *HistogramChunk) Layout() ( schema int32, zeroThreshold float64, negativeSpans, positiveSpans []histogram.Span, err error, ) { if c.NumSamples() == 0 { - panic("HistoChunk.Meta() called on an empty chunk") + panic("HistoChunk.Layout() called on an empty chunk") } b := newBReader(c.Bytes()[2:]) - return readHistogramChunkMeta(&b) + return readHistogramChunkLayout(&b) } // CounterResetHeader defines the first 2 bits of the chunk header. @@ -176,9 +176,9 @@ func newHistogramIterator(b []byte) *histogramIterator { numTotal: binary.BigEndian.Uint16(b), t: math.MinInt64, } - // The first 2 bytes contain chunk headers. + // The first 3 bytes contain chunk headers. // We skip that for actual samples. - _, _ = it.br.readBits(16) + _, _ = it.br.readBits(24) return it } @@ -203,7 +203,7 @@ func (c *HistogramChunk) Iterator(it Iterator) Iterator { type HistogramAppender struct { b *bstream - // Metadata: + // Layout: schema int32 zThreshold float64 pSpans, nSpans []histogram.Span @@ -394,15 +394,15 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) { if value.IsStaleNaN(h.Sum) { // Emptying out other fields to write no buckets, and an empty - // meta in case of first histogram in the chunk. + // layout in case of first histogram in the chunk. h = histogram.Histogram{Sum: h.Sum} } switch num { case 0: - // The first append gets the privilege to dictate the metadata + // The first append gets the privilege to dictate the layout // but it's also responsible for encoding it into the chunk! - writeHistogramChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) + writeHistogramChunkLayout(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) a.schema = h.Schema a.zThreshold = h.ZeroThreshold @@ -591,7 +591,7 @@ type histogramIterator struct { numTotal uint16 numRead uint16 - // Metadata: + // Layout: schema int32 zThreshold float64 pSpans, nSpans []histogram.Span @@ -687,11 +687,10 @@ func (it *histogramIterator) Next() bool { } if it.numRead == 0 { - - // The first read is responsible for reading the chunk metadata + // The first read is responsible for reading the chunk layout // and for initializing fields that depend on it. We give // counter reset info at chunk level, hence we discard it here. - schema, zeroThreshold, posSpans, negSpans, err := readHistogramChunkMeta(&it.br) + schema, zeroThreshold, posSpans, negSpans, err := readHistogramChunkLayout(&it.br) if err != nil { it.err = err return false diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index ac4badee1..cc692006a 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -17,14 +17,14 @@ import ( "github.com/prometheus/prometheus/model/histogram" ) -func writeHistogramChunkMeta(b *bstream, schema int32, zeroThreshold float64, positiveSpans, negativeSpans []histogram.Span) { +func writeHistogramChunkLayout(b *bstream, schema int32, zeroThreshold float64, positiveSpans, negativeSpans []histogram.Span) { putVarbitInt(b, int64(schema)) putVarbitFloat(b, zeroThreshold) - putHistogramChunkMetaSpans(b, positiveSpans) - putHistogramChunkMetaSpans(b, negativeSpans) + putHistogramChunkLayoutSpans(b, positiveSpans) + putHistogramChunkLayoutSpans(b, negativeSpans) } -func putHistogramChunkMetaSpans(b *bstream, spans []histogram.Span) { +func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) { putVarbitInt(b, int64(len(spans))) for _, s := range spans { putVarbitInt(b, int64(s.Length)) @@ -32,16 +32,11 @@ func putHistogramChunkMetaSpans(b *bstream, spans []histogram.Span) { } } -func readHistogramChunkMeta(b *bstreamReader) ( +func readHistogramChunkLayout(b *bstreamReader) ( schema int32, zeroThreshold float64, positiveSpans, negativeSpans []histogram.Span, err error, ) { - _, err = b.ReadByte() // The header. - if err != nil { - return - } - v, err := readVarbitInt(b) if err != nil { return @@ -53,12 +48,12 @@ func readHistogramChunkMeta(b *bstreamReader) ( return } - positiveSpans, err = readHistogramChunkMetaSpans(b) + positiveSpans, err = readHistogramChunkLayoutSpans(b) if err != nil { return } - negativeSpans, err = readHistogramChunkMetaSpans(b) + negativeSpans, err = readHistogramChunkLayoutSpans(b) if err != nil { return } @@ -66,7 +61,7 @@ func readHistogramChunkMeta(b *bstreamReader) ( return } -func readHistogramChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) { +func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) { var spans []histogram.Span num, err := readVarbitInt(b) if err != nil {