Merge pull request #11778 from codesome/float-iso

Staleness handling for FloatHistogram
This commit is contained in:
Björn Rabenstein 2023-01-03 13:31:12 +01:00 committed by GitHub
commit 432ea873c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 33 deletions

View file

@ -1862,10 +1862,6 @@ type memSeries struct {
// txs is nil if isolation is disabled. // txs is nil if isolation is disabled.
txs *txRing txs *txRing
// TODO(beorn7): The only reason we track this is to create a staleness
// marker as either histogram or float sample. Perhaps there is a better way.
isHistogramSeries bool
pendingCommit bool // Whether there are samples waiting to be committed to this series. pendingCommit bool // Whether there are samples waiting to be committed to this series.
} }

View file

@ -350,9 +350,12 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
} }
} }
if value.IsStaleNaN(v) && s.isHistogramSeries { if value.IsStaleNaN(v) {
// TODO(marctc): do we have do to the same for float histograms? if s.lastHistogramValue != nil {
return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil) return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil)
} else if s.lastFloatHistogramValue != nil {
return a.AppendHistogram(ref, lset, t, nil, &histogram.FloatHistogram{Sum: v})
}
} }
s.Lock() s.Lock()
@ -555,8 +558,12 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
if err != nil { if err != nil {
return 0, err return 0, err
} }
s.isHistogramSeries = true
if created { if created {
if h != nil {
s.lastHistogramValue = &histogram.Histogram{}
} else if fh != nil {
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
}
a.series = append(a.series, record.RefSeries{ a.series = append(a.series, record.RefSeries{
Ref: s.ref, Ref: s.ref,
Labels: lset, Labels: lset,
@ -1115,11 +1122,12 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
return sampleInOrder, chunkCreated return sampleInOrder, chunkCreated
} }
s.app.Append(t, v) s.app.Append(t, v)
s.isHistogramSeries = false
c.maxTime = t c.maxTime = t
s.lastValue = v s.lastValue = v
s.lastHistogramValue = nil
s.lastFloatHistogramValue = nil
if appendID > 0 { if appendID > 0 {
s.txs.add(appendID) s.txs.add(appendID)
@ -1184,11 +1192,11 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
} }
s.app.AppendHistogram(t, h) s.app.AppendHistogram(t, h)
s.isHistogramSeries = true
c.maxTime = t c.maxTime = t
s.lastHistogramValue = h s.lastHistogramValue = h
s.lastFloatHistogramValue = nil
if appendID > 0 { if appendID > 0 {
s.txs.add(appendID) s.txs.add(appendID)
@ -1252,11 +1260,11 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
} }
s.app.AppendFloatHistogram(t, fh) s.app.AppendFloatHistogram(t, fh)
s.isHistogramSeries = true
c.maxTime = t c.maxTime = t
s.lastFloatHistogramValue = fh s.lastFloatHistogramValue = fh
s.lastHistogramValue = nil
if appendID > 0 { if appendID > 0 {
s.txs.add(appendID) s.txs.add(appendID)

View file

@ -3361,8 +3361,16 @@ func TestHistogramMetrics(t *testing.T) {
} }
func TestHistogramStaleSample(t *testing.T) { func TestHistogramStaleSample(t *testing.T) {
// TODO(marctc): Add similar test for float histograms t.Run("integer histogram", func(t *testing.T) {
testHistogramStaleSampleHelper(t, false)
})
t.Run("float histogram", func(t *testing.T) {
testHistogramStaleSampleHelper(t, true)
})
}
func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
t.Helper()
l := labels.FromStrings("a", "b") l := labels.FromStrings("a", "b")
numHistograms := 20 numHistograms := 20
head, _ := newTestHead(t, 100000, false, false) head, _ := newTestHead(t, 100000, false, false)
@ -3372,8 +3380,9 @@ func TestHistogramStaleSample(t *testing.T) {
require.NoError(t, head.Init(0)) require.NoError(t, head.Init(0))
type timedHistogram struct { type timedHistogram struct {
t int64 t int64
h *histogram.Histogram h *histogram.Histogram
fh *histogram.FloatHistogram
} }
expHistograms := make([]timedHistogram, 0, numHistograms) expHistograms := make([]timedHistogram, 0, numHistograms)
@ -3392,9 +3401,15 @@ func TestHistogramStaleSample(t *testing.T) {
it := s.Iterator(nil) it := s.Iterator(nil)
actHistograms := make([]timedHistogram, 0, len(expHistograms)) actHistograms := make([]timedHistogram, 0, len(expHistograms))
for it.Next() == chunkenc.ValHistogram { for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
t, h := it.AtHistogram() switch typ {
actHistograms = append(actHistograms, timedHistogram{t, h}) case chunkenc.ValHistogram:
t, h := it.AtHistogram()
actHistograms = append(actHistograms, timedHistogram{t: t, h: h})
case chunkenc.ValFloatHistogram:
t, h := it.AtFloatHistogram()
actHistograms = append(actHistograms, timedHistogram{t: t, fh: h})
}
} }
// We cannot compare StaleNAN with require.Equal, hence checking each histogram manually. // We cannot compare StaleNAN with require.Equal, hence checking each histogram manually.
@ -3402,15 +3417,27 @@ func TestHistogramStaleSample(t *testing.T) {
actNumStale := 0 actNumStale := 0
for i, eh := range expHistograms { for i, eh := range expHistograms {
ah := actHistograms[i] ah := actHistograms[i]
if value.IsStaleNaN(eh.h.Sum) { if floatHistogram {
actNumStale++ if value.IsStaleNaN(eh.fh.Sum) {
require.True(t, value.IsStaleNaN(ah.h.Sum)) actNumStale++
// To make require.Equal work. require.True(t, value.IsStaleNaN(ah.fh.Sum))
ah.h.Sum = 0 // To make require.Equal work.
eh.h = eh.h.Copy() ah.fh.Sum = 0
eh.h.Sum = 0 eh.fh = eh.fh.Copy()
eh.fh.Sum = 0
}
require.Equal(t, eh, ah)
} else {
if value.IsStaleNaN(eh.h.Sum) {
actNumStale++
require.True(t, value.IsStaleNaN(ah.h.Sum))
// To make require.Equal work.
ah.h.Sum = 0
eh.h = eh.h.Copy()
eh.h.Sum = 0
}
require.Equal(t, eh, ah)
} }
require.Equal(t, eh, ah)
} }
require.Equal(t, numStale, actNumStale) require.Equal(t, numStale, actNumStale)
} }
@ -3418,14 +3445,24 @@ func TestHistogramStaleSample(t *testing.T) {
// Adding stale in the same appender. // Adding stale in the same appender.
app := head.Appender(context.Background()) app := head.Appender(context.Background())
for _, h := range GenerateTestHistograms(numHistograms) { for _, h := range GenerateTestHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h, nil) var err error
if floatHistogram {
_, err = app.AppendHistogram(0, l, 100*int64(len(expHistograms)), nil, h.ToFloat())
expHistograms = append(expHistograms, timedHistogram{t: 100 * int64(len(expHistograms)), fh: h.ToFloat()})
} else {
_, err = app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h, nil)
expHistograms = append(expHistograms, timedHistogram{t: 100 * int64(len(expHistograms)), h: h})
}
require.NoError(t, err) require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h})
} }
// +1 so that delta-of-delta is not 0. // +1 so that delta-of-delta is not 0.
_, err := app.Append(0, l, 100*int64(len(expHistograms))+1, math.Float64frombits(value.StaleNaN)) _, err := app.Append(0, l, 100*int64(len(expHistograms))+1, math.Float64frombits(value.StaleNaN))
require.NoError(t, err) require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{100*int64(len(expHistograms)) + 1, &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}}) if floatHistogram {
expHistograms = append(expHistograms, timedHistogram{t: 100*int64(len(expHistograms)) + 1, fh: &histogram.FloatHistogram{Sum: math.Float64frombits(value.StaleNaN)}})
} else {
expHistograms = append(expHistograms, timedHistogram{t: 100*int64(len(expHistograms)) + 1, h: &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}})
}
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
// Only 1 chunk in the memory, no m-mapped chunk. // Only 1 chunk in the memory, no m-mapped chunk.
@ -3437,9 +3474,15 @@ func TestHistogramStaleSample(t *testing.T) {
// Adding stale in different appender and continuing series after a stale sample. // Adding stale in different appender and continuing series after a stale sample.
app = head.Appender(context.Background()) app = head.Appender(context.Background())
for _, h := range GenerateTestHistograms(2 * numHistograms)[numHistograms:] { for _, h := range GenerateTestHistograms(2 * numHistograms)[numHistograms:] {
_, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h, nil) var err error
if floatHistogram {
_, err = app.AppendHistogram(0, l, 100*int64(len(expHistograms)), nil, h.ToFloat())
expHistograms = append(expHistograms, timedHistogram{t: 100 * int64(len(expHistograms)), fh: h.ToFloat()})
} else {
_, err = app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h, nil)
expHistograms = append(expHistograms, timedHistogram{t: 100 * int64(len(expHistograms)), h: h})
}
require.NoError(t, err) require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h})
} }
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
@ -3447,7 +3490,11 @@ func TestHistogramStaleSample(t *testing.T) {
// +1 so that delta-of-delta is not 0. // +1 so that delta-of-delta is not 0.
_, err = app.Append(0, l, 100*int64(len(expHistograms))+1, math.Float64frombits(value.StaleNaN)) _, err = app.Append(0, l, 100*int64(len(expHistograms))+1, math.Float64frombits(value.StaleNaN))
require.NoError(t, err) require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{100*int64(len(expHistograms)) + 1, &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}}) if floatHistogram {
expHistograms = append(expHistograms, timedHistogram{t: 100*int64(len(expHistograms)) + 1, fh: &histogram.FloatHistogram{Sum: math.Float64frombits(value.StaleNaN)}})
} else {
expHistograms = append(expHistograms, timedHistogram{t: 100*int64(len(expHistograms)) + 1, h: &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}})
}
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
// Total 2 chunks, 1 m-mapped. // Total 2 chunks, 1 m-mapped.

View file

@ -605,7 +605,6 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
if s.T <= ms.mmMaxTime { if s.T <= ms.mmMaxTime {
continue continue
} }
ms.isHistogramSeries = false
if s.T <= ms.mmMaxTime { if s.T <= ms.mmMaxTime {
continue continue
} }
@ -634,7 +633,6 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
unknownHistogramRefs++ unknownHistogramRefs++
continue continue
} }
ms.isHistogramSeries = true
if s.t <= ms.mmMaxTime { if s.t <= ms.mmMaxTime {
continue continue
} }