Add unit test for counter reset header

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-10-13 15:56:50 +05:30
parent 12486b1250
commit 85e6686f84
No known key found for this signature in database
GPG key ID: 0F8729A5EB59B965
4 changed files with 142 additions and 30 deletions

View file

@ -82,9 +82,15 @@ func (c *HistogramChunk) Meta() (
type CounterResetHeader byte type CounterResetHeader byte
const ( const (
CounterReset CounterResetHeader = 0b10000000 // CounterReset means there was definitely a counter reset that resulted in this chunk.
NotCounterReset CounterResetHeader = 0b01000000 CounterReset CounterResetHeader = 0b10000000
GaugeType CounterResetHeader = 0b11000000 // NotCounterReset means there was definitely no counter reset when cutting this chunk.
NotCounterReset CounterResetHeader = 0b01000000
// GaugeType means the histograms represent a gauge instead of counters, hence we cannot make
// sense of counter reset in this case.
GaugeType CounterResetHeader = 0b11000000
// UnknownCounterReset means we cannot say if this was a counter reset or not and not sure
// if this is a gauge type histogram or not.
UnknownCounterReset CounterResetHeader = 0b00000000 UnknownCounterReset CounterResetHeader = 0b00000000
) )
@ -400,7 +406,12 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) {
writeHistogramChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) writeHistogramChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans)
a.schema = h.Schema a.schema = h.Schema
a.zThreshold = h.ZeroThreshold a.zThreshold = h.ZeroThreshold
a.pSpans, a.nSpans = h.PositiveSpans, h.NegativeSpans
a.pSpans = make([]histogram.Span, len(h.PositiveSpans))
copy(a.pSpans, h.PositiveSpans)
a.nSpans = make([]histogram.Span, len(h.NegativeSpans))
copy(a.nSpans, h.NegativeSpans)
numPBuckets, numNBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans) numPBuckets, numNBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans)
a.pBuckets = make([]int64, numPBuckets) a.pBuckets = make([]int64, numPBuckets)
a.nBuckets = make([]int64, numNBuckets) a.nBuckets = make([]int64, numNBuckets)
@ -486,7 +497,8 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) {
a.cntDelta = cntDelta a.cntDelta = cntDelta
a.zCntDelta = zCntDelta a.zCntDelta = zCntDelta
a.pBuckets, a.nBuckets = h.PositiveBuckets, h.NegativeBuckets copy(a.pBuckets, h.PositiveBuckets)
copy(a.nBuckets, h.NegativeBuckets)
// Note that the bucket deltas were already updated above. // Note that the bucket deltas were already updated above.
a.sum = h.Sum a.sum = h.Sum
} }

View file

@ -273,8 +273,8 @@ type headMetrics struct {
// Sparse histogram metrics for experiments. // Sparse histogram metrics for experiments.
// TODO: remove these in the final version. // TODO: remove these in the final version.
sparseHistogramSamplesTotal prometheus.Counter histogramSamplesTotal prometheus.Counter
sparseHistogramSeries prometheus.Gauge histogramSeries prometheus.Gauge
} }
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
@ -373,13 +373,13 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_snapshot_replay_error_total", Name: "prometheus_tsdb_snapshot_replay_error_total",
Help: "Total number snapshot replays that failed.", Help: "Total number snapshot replays that failed.",
}), }),
sparseHistogramSamplesTotal: prometheus.NewCounter(prometheus.CounterOpts{ histogramSamplesTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_sparse_histogram_samples_total", Name: "prometheus_tsdb_histogram_samples_total",
Help: "Total number of sparse histograms samples added.", Help: "Total number of histograms samples added.",
}), }),
sparseHistogramSeries: prometheus.NewGauge(prometheus.GaugeOpts{ histogramSeries: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_sparse_histogram_series", Name: "prometheus_tsdb_histogram_series",
Help: "Number of sparse histogram series currently present in the head block.", Help: "Number of histogram series currently present in the head block.",
}), }),
} }
@ -408,8 +408,8 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.checkpointCreationTotal, m.checkpointCreationTotal,
m.mmapChunkCorruptionTotal, m.mmapChunkCorruptionTotal,
m.snapshotReplayErrorTotal, m.snapshotReplayErrorTotal,
m.sparseHistogramSamplesTotal, m.histogramSamplesTotal,
m.sparseHistogramSeries, m.histogramSeries,
// Metrics bound to functions and not needed in tests // Metrics bound to functions and not needed in tests
// can be created and registered on the spot. // can be created and registered on the spot.
prometheus.NewGaugeFunc(prometheus.GaugeOpts{ prometheus.NewGaugeFunc(prometheus.GaugeOpts{
@ -619,7 +619,7 @@ func (h *Head) Init(minValidTime int64) error {
} }
} }
} }
h.metrics.sparseHistogramSeries.Set(float64(sparseHistogramSeries)) h.metrics.histogramSeries.Set(float64(sparseHistogramSeries))
} }
walReplayDuration := time.Since(start) walReplayDuration := time.Since(start)
@ -1145,7 +1145,7 @@ func (h *Head) gc() int64 {
h.metrics.seriesRemoved.Add(float64(seriesRemoved)) h.metrics.seriesRemoved.Add(float64(seriesRemoved))
h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved))
h.metrics.chunks.Sub(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved))
h.metrics.sparseHistogramSeries.Sub(float64(sparseHistogramSeriesDeleted)) h.metrics.histogramSeries.Sub(float64(sparseHistogramSeriesDeleted))
h.numSeries.Sub(uint64(seriesRemoved)) h.numSeries.Sub(uint64(seriesRemoved))
// Remove deleted series IDs from the postings lists. // Remove deleted series IDs from the postings lists.

View file

@ -398,7 +398,7 @@ func (a *headAppender) AppendHistogram(ref uint64, lset labels.Labels, t int64,
} }
s.histogramSeries = true s.histogramSeries = true
if created { if created {
a.head.metrics.sparseHistogramSeries.Inc() a.head.metrics.histogramSeries.Inc()
a.series = append(a.series, record.RefSeries{ a.series = append(a.series, record.RefSeries{
Ref: s.ref, Ref: s.ref,
Labels: lset, Labels: lset,
@ -561,7 +561,7 @@ func (a *headAppender) Commit() (err error) {
series.Unlock() series.Unlock()
if ok { if ok {
a.head.metrics.sparseHistogramSamplesTotal.Inc() a.head.metrics.histogramSamplesTotal.Inc()
} else { } else {
total-- total--
a.head.metrics.outOfOrderSamples.Inc() a.head.metrics.outOfOrderSamples.Inc()
@ -606,7 +606,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// appendHistogram adds the histogram. // appendHistogram adds the histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) appendHistogram(t int64, sh histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { func (s *memSeries) appendHistogram(t int64, h histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper chunk reference afterwards. // Head controls the execution of recoding, so that we own the proper chunk reference afterwards.
// We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk, // We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk,
// we need to know if there was also a counter reset or not to set the meta properly. // we need to know if there was also a counter reset or not to set the meta properly.
@ -616,7 +616,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.Histogram, appendID ui
okToAppend, counterReset bool okToAppend, counterReset bool
) )
if app != nil { if app != nil {
positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(sh) positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h)
} }
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper)
@ -636,7 +636,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.Histogram, appendID ui
// New buckets have appeared. We need to recode all // New buckets have appeared. We need to recode all
// prior histogram samples within the chunk before we // prior histogram samples within the chunk before we
// can process this one. // can process this one.
chunk, app := app.Recode(positiveInterjections, negativeInterjections, sh.PositiveSpans, sh.NegativeSpans) chunk, app := app.Recode(positiveInterjections, negativeInterjections, h.PositiveSpans, h.NegativeSpans)
s.headChunk = &memChunk{ s.headChunk = &memChunk{
minTime: s.headChunk.minTime, minTime: s.headChunk.minTime,
maxTime: s.headChunk.maxTime, maxTime: s.headChunk.maxTime,
@ -657,7 +657,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.Histogram, appendID ui
hc.SetCounterResetHeader(header) hc.SetCounterResetHeader(header)
} }
s.app.AppendHistogram(t, sh) s.app.AppendHistogram(t, h)
s.histogramSeries = true s.histogramSeries = true
c.maxTime = t c.maxTime = t
@ -665,7 +665,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.Histogram, appendID ui
s.histogramBuf[0] = s.histogramBuf[1] s.histogramBuf[0] = s.histogramBuf[1]
s.histogramBuf[1] = s.histogramBuf[2] s.histogramBuf[1] = s.histogramBuf[2]
s.histogramBuf[2] = s.histogramBuf[3] s.histogramBuf[2] = s.histogramBuf[3]
s.histogramBuf[3] = histogramSample{t: t, h: sh} s.histogramBuf[3] = histogramSample{t: t, h: h}
if appendID > 0 { if appendID > 0 {
s.txs.add(appendID) s.txs.add(appendID)

View file

@ -2950,7 +2950,7 @@ func TestSnapshotError(t *testing.T) {
require.Equal(t, 0, len(tm)) require.Equal(t, 0, len(tm))
} }
func TestSparseHistogramMetrics(t *testing.T) { func TestHistogramMetrics(t *testing.T) {
head, _ := newTestHead(t, 1000, false) head, _ := newTestHead(t, 1000, false)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, head.Close()) require.NoError(t, head.Close())
@ -2971,8 +2971,8 @@ func TestSparseHistogramMetrics(t *testing.T) {
} }
} }
require.Equal(t, float64(expHSeries), prom_testutil.ToFloat64(head.metrics.sparseHistogramSeries)) require.Equal(t, float64(expHSeries), prom_testutil.ToFloat64(head.metrics.histogramSeries))
require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.sparseHistogramSamplesTotal)) require.Equal(t, float64(expHSamples), prom_testutil.ToFloat64(head.metrics.histogramSamplesTotal))
require.NoError(t, head.Close()) require.NoError(t, head.Close())
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
@ -2981,11 +2981,11 @@ func TestSparseHistogramMetrics(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, head.Init(0)) require.NoError(t, head.Init(0))
require.Equal(t, float64(expHSeries), prom_testutil.ToFloat64(head.metrics.sparseHistogramSeries)) require.Equal(t, float64(expHSeries), prom_testutil.ToFloat64(head.metrics.histogramSeries))
require.Equal(t, float64(0), prom_testutil.ToFloat64(head.metrics.sparseHistogramSamplesTotal)) // Counter reset. require.Equal(t, float64(0), prom_testutil.ToFloat64(head.metrics.histogramSamplesTotal)) // Counter reset.
} }
func TestSparseHistogramStaleSample(t *testing.T) { func TestHistogramStaleSample(t *testing.T) {
l := labels.Labels{{Name: "a", Value: "b"}} l := labels.Labels{{Name: "a", Value: "b"}}
numHistograms := 20 numHistograms := 20
head, _ := newTestHead(t, 100000, false) head, _ := newTestHead(t, 100000, false)
@ -3078,3 +3078,103 @@ func TestSparseHistogramStaleSample(t *testing.T) {
require.Equal(t, 1, len(s.mmappedChunks)) require.Equal(t, 1, len(s.mmappedChunks))
testQuery(2) testQuery(2)
} }
func TestHistogramCounterResetHeader(t *testing.T) {
l := labels.Labels{{Name: "a", Value: "b"}}
head, _ := newTestHead(t, 1000, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))
ts := int64(0)
appendHistogram := func(h histogram.Histogram) {
ts++
app := head.Appender(context.Background())
_, err := app.AppendHistogram(0, l, ts, h)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
var expHeaders []chunkenc.CounterResetHeader
checkExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) {
expHeaders = append(expHeaders, newHeaders...)
ms, _, err := head.getOrCreate(l.Hash(), l)
require.NoError(t, err)
require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk.
for i, mmapChunk := range ms.mmappedChunks {
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref)
require.NoError(t, err)
require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
h := generateHistograms(1)[0]
if len(h.NegativeBuckets) == 0 {
h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...)
h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...)
}
h.PositiveBuckets[0] = 100
h.NegativeBuckets[0] = 100
// First histogram is UnknownCounterReset.
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.UnknownCounterReset)
// Another normal histogram.
h.Count++
appendHistogram(h)
checkExpCounterResetHeader()
// Counter reset via Count.
h.Count--
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Add 2 non-counter reset histograms.
for i := 0; i < 250; i++ {
appendHistogram(h)
}
checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset)
// Changing schema will cut a new chunk with unknown counter reset.
h.Schema++
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.UnknownCounterReset)
// Changing schema will zero threshold a new chunk with unknown counter reset.
h.ZeroThreshold += 0.01
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.UnknownCounterReset)
// Counter reset by removing a positive bucket.
h.PositiveSpans[1].Length--
h.PositiveBuckets = h.PositiveBuckets[1:]
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Counter reset by removing a negative bucket.
h.NegativeSpans[1].Length--
h.NegativeBuckets = h.NegativeBuckets[1:]
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Add 2 non-counter reset histograms. Just to have some non-counter reset chunks in between.
for i := 0; i < 250; i++ {
appendHistogram(h)
}
checkExpCounterResetHeader(chunkenc.NotCounterReset, chunkenc.NotCounterReset)
// Counter reset with counter reset in a positive bucket.
h.PositiveBuckets[len(h.PositiveBuckets)-1]--
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
// Counter reset with counter reset in a negative bucket.
h.NegativeBuckets[len(h.NegativeBuckets)-1]--
appendHistogram(h)
checkExpCounterResetHeader(chunkenc.CounterReset)
}