agent: native histogram support (#11842)

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>
This commit is contained in:
Sebastian Rabenhorst 2023-01-12 17:13:44 +01:00 committed by GitHub
parent d82ea2eb1c
commit c057318578
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 398 additions and 20 deletions

View file

@ -44,6 +44,11 @@ import (
"github.com/prometheus/prometheus/tsdb/wlog"
)
const (
sampleMetricTypeFloat = "float"
sampleMetricTypeHistogram = "histogram"
)
var ErrUnsupported = errors.New("unsupported operation with WAL-only storage")
// Default values for options.
@ -96,7 +101,7 @@ type dbMetrics struct {
numActiveSeries prometheus.Gauge
numWALSeriesPendingDeletion prometheus.Gauge
totalAppendedSamples prometheus.Counter
totalAppendedSamples *prometheus.CounterVec
totalAppendedExemplars prometheus.Counter
totalOutOfOrderSamples prometheus.Counter
walTruncateDuration prometheus.Summary
@ -120,10 +125,10 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics {
Help: "Number of series pending deletion from the WAL",
})
m.totalAppendedSamples = prometheus.NewCounter(prometheus.CounterOpts{
m.totalAppendedSamples = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_agent_samples_appended_total",
Help: "Total number of samples appended to the storage",
})
}, []string{"type"})
m.totalAppendedExemplars = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_agent_exemplars_appended_total",
@ -284,10 +289,12 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin
db.appenderPool.New = func() interface{} {
return &appender{
DB: db,
pendingSeries: make([]record.RefSeries, 0, 100),
pendingSamples: make([]record.RefSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
DB: db,
pendingSeries: make([]record.RefSeries, 0, 100),
pendingSamples: make([]record.RefSample, 0, 100),
pendingHistograms: make([]record.RefHistogramSample, 0, 100),
pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
}
}
@ -411,6 +418,16 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
return []record.RefSample{}
},
}
histogramsPool = sync.Pool{
New: func() interface{} {
return []record.RefHistogramSample{}
},
}
floatHistogramsPool = sync.Pool{
New: func() interface{} {
return []record.RefFloatHistogramSample{}
},
}
)
go func() {
@ -443,6 +460,30 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
return
}
decoded <- samples
case record.HistogramSamples:
histograms := histogramsPool.Get().([]record.RefHistogramSample)[:0]
histograms, err = dec.HistogramSamples(rec, histograms)
if err != nil {
errCh <- &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode histogram samples"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- histograms
case record.FloatHistogramSamples:
floatHistograms := floatHistogramsPool.Get().([]record.RefFloatHistogramSample)[:0]
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
if err != nil {
errCh <- &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode float histogram samples"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- floatHistograms
case record.Tombstones, record.Exemplars:
// We don't care about tombstones or exemplars during replay.
// TODO: If decide to decode exemplars, we should make sure to prepopulate
@ -496,6 +537,36 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
//nolint:staticcheck
samplesPool.Put(v)
case []record.RefHistogramSample:
for _, entry := range v {
// Update the lastTs for the series based
ref, ok := multiRef[entry.Ref]
if !ok {
nonExistentSeriesRefs.Inc()
continue
}
series := db.series.GetByID(ref)
if entry.T > series.lastTs {
series.lastTs = entry.T
}
}
//nolint:staticcheck
histogramsPool.Put(v)
case []record.RefFloatHistogramSample:
for _, entry := range v {
// Update the lastTs for the series based
ref, ok := multiRef[entry.Ref]
if !ok {
nonExistentSeriesRefs.Inc()
continue
}
series := db.series.GetByID(ref)
if entry.T > series.lastTs {
series.lastTs = entry.T
}
}
//nolint:staticcheck
floatHistogramsPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
@ -695,13 +766,23 @@ func (db *DB) Close() error {
type appender struct {
*DB
pendingSeries []record.RefSeries
pendingSamples []record.RefSample
pendingExamplars []record.RefExemplar
pendingSeries []record.RefSeries
pendingSamples []record.RefSample
pendingHistograms []record.RefHistogramSample
pendingFloatHistograms []record.RefFloatHistogramSample
pendingExamplars []record.RefExemplar
// Pointers to the series referenced by each element of pendingSamples.
// Series lock is not held on elements.
sampleSeries []*memSeries
// Pointers to the series referenced by each element of pendingHistograms.
// Series lock is not held on elements.
histogramSeries []*memSeries
// Pointers to the series referenced by each element of pendingFloatHistograms.
// Series lock is not held on elements.
floatHistogramSeries []*memSeries
}
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
@ -749,7 +830,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
})
a.sampleSeries = append(a.sampleSeries, series)
a.metrics.totalAppendedSamples.Inc()
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
return storage.SeriesRef(series.ref), nil
}
@ -821,8 +902,74 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
}
func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// TODO: Add histogram support.
return 0, nil
if h != nil {
if err := tsdb.ValidateHistogram(h); err != nil {
return 0, err
}
}
if fh != nil {
if err := tsdb.ValidateFloatHistogram(fh); err != nil {
return 0, err
}
}
// series references and chunk references are identical for agent mode.
headRef := chunks.HeadSeriesRef(ref)
series := a.series.GetByID(headRef)
if series == nil {
// Ensure no empty or duplicate labels have gotten through. This mirrors the
// equivalent validation code in the TSDB's headAppender.
l = l.WithoutEmpty()
if l.IsEmpty() {
return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset")
}
if lbl, dup := l.HasDuplicateLabelNames(); dup {
return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl))
}
var created bool
series, created = a.getOrCreate(l)
if created {
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
Ref: series.ref,
Labels: l,
})
a.metrics.numActiveSeries.Inc()
}
}
series.Lock()
defer series.Unlock()
if t < series.lastTs {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}
if h != nil {
// NOTE: always modify pendingHistograms and histogramSeries together
a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{
Ref: series.ref,
T: t,
H: h,
})
a.histogramSeries = append(a.histogramSeries, series)
} else if fh != nil {
// NOTE: always modify pendingFloatHistograms and floatHistogramSeries together
a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{
Ref: series.ref,
T: t,
FH: fh,
})
a.floatHistogramSeries = append(a.floatHistogramSeries, series)
}
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
return storage.SeriesRef(series.ref), nil
}
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
@ -854,6 +1001,22 @@ func (a *appender) Commit() error {
buf = buf[:0]
}
if len(a.pendingHistograms) > 0 {
buf = encoder.HistogramSamples(a.pendingHistograms, buf)
if err := a.wal.Log(buf); err != nil {
return err
}
buf = buf[:0]
}
if len(a.pendingFloatHistograms) > 0 {
buf = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf)
if err := a.wal.Log(buf); err != nil {
return err
}
buf = buf[:0]
}
if len(a.pendingExamplars) > 0 {
buf = encoder.Exemplars(a.pendingExamplars, buf)
if err := a.wal.Log(buf); err != nil {
@ -869,6 +1032,18 @@ func (a *appender) Commit() error {
a.metrics.totalOutOfOrderSamples.Inc()
}
}
for i, s := range a.pendingHistograms {
series = a.histogramSeries[i]
if !series.updateTimestamp(s.T) {
a.metrics.totalOutOfOrderSamples.Inc()
}
}
for i, s := range a.pendingFloatHistograms {
series = a.floatHistogramSeries[i]
if !series.updateTimestamp(s.T) {
a.metrics.totalOutOfOrderSamples.Inc()
}
}
//nolint:staticcheck
a.bufPool.Put(buf)
@ -878,8 +1053,12 @@ func (a *appender) Commit() error {
func (a *appender) Rollback() error {
a.pendingSeries = a.pendingSeries[:0]
a.pendingSamples = a.pendingSamples[:0]
a.pendingHistograms = a.pendingHistograms[:0]
a.pendingFloatHistograms = a.pendingFloatHistograms[:0]
a.pendingExamplars = a.pendingExamplars[:0]
a.sampleSeries = a.sampleSeries[:0]
a.histogramSeries = a.histogramSeries[:0]
a.floatHistogramSeries = a.floatHistogramSeries[:0]
a.appenderPool.Put(a)
return nil
}

View file

@ -53,6 +53,14 @@ func TestDB_InvalidSeries(t *testing.T) {
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels")
})
t.Run("Histograms", func(t *testing.T) {
_, err := app.AppendHistogram(0, labels.Labels{}, 0, tsdb.GenerateTestHistograms(1)[0], nil)
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels")
_, err = app.AppendHistogram(0, labels.FromStrings("a", "1", "a", "2"), 0, tsdb.GenerateTestHistograms(1)[0], nil)
require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels")
})
t.Run("Exemplars", func(t *testing.T) {
sRef, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0)
require.NoError(t, err, "should not reject valid series")
@ -112,6 +120,7 @@ func TestUnsupportedFunctions(t *testing.T) {
func TestCommit(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 8
)
@ -138,6 +147,30 @@ func TestCommit(t *testing.T) {
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdb.GenerateTestHistograms(numHistograms)
for i := 0; i < numHistograms; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil)
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdb.GenerateTestFloatHistograms(numHistograms)
for i := 0; i < numHistograms; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i])
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
require.NoError(t, s.Close())
@ -152,7 +185,7 @@ func TestCommit(t *testing.T) {
r = wlog.NewReader(sr)
dec record.Decoder
walSeriesCount, walSamplesCount, walExemplarsCount int
walSeriesCount, walSamplesCount, walExemplarsCount, walHistogramCount, walFloatHistogramCount int
)
for r.Next() {
rec := r.Record()
@ -169,6 +202,18 @@ func TestCommit(t *testing.T) {
require.NoError(t, err)
walSamplesCount += len(samples)
case record.HistogramSamples:
var histograms []record.RefHistogramSample
histograms, err = dec.HistogramSamples(rec, histograms)
require.NoError(t, err)
walHistogramCount += len(histograms)
case record.FloatHistogramSamples:
var floatHistograms []record.RefFloatHistogramSample
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
require.NoError(t, err)
walFloatHistogramCount += len(floatHistograms)
case record.Exemplars:
var exemplars []record.RefExemplar
exemplars, err = dec.Exemplars(rec, exemplars)
@ -180,14 +225,17 @@ func TestCommit(t *testing.T) {
}
// Check that the WAL contained the same number of committed series/samples/exemplars.
require.Equal(t, numSeries, walSeriesCount, "unexpected number of series")
require.Equal(t, numSeries*3, walSeriesCount, "unexpected number of series")
require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples")
require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars")
require.Equal(t, numSeries*numHistograms, walHistogramCount, "unexpected number of histograms")
require.Equal(t, numSeries*numHistograms, walFloatHistogramCount, "unexpected number of float histograms")
}
func TestRollback(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 8
)
@ -205,6 +253,30 @@ func TestRollback(t *testing.T) {
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdb.GenerateTestHistograms(numHistograms)
for i := 0; i < numHistograms; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil)
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdb.GenerateTestFloatHistograms(numHistograms)
for i := 0; i < numHistograms; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i])
require.NoError(t, err)
}
}
// Do a rollback, which should clear uncommitted data. A followup call to
// commit should persist nothing to the WAL.
require.NoError(t, app.Rollback())
@ -222,7 +294,7 @@ func TestRollback(t *testing.T) {
r = wlog.NewReader(sr)
dec record.Decoder
walSeriesCount, walSamplesCount, walExemplarsCount int
walSeriesCount, walSamplesCount, walHistogramCount, walFloatHistogramCount, walExemplarsCount int
)
for r.Next() {
rec := r.Record()
@ -245,6 +317,18 @@ func TestRollback(t *testing.T) {
require.NoError(t, err)
walExemplarsCount += len(exemplars)
case record.HistogramSamples:
var histograms []record.RefHistogramSample
histograms, err = dec.HistogramSamples(rec, histograms)
require.NoError(t, err)
walHistogramCount += len(histograms)
case record.FloatHistogramSamples:
var floatHistograms []record.RefFloatHistogramSample
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
require.NoError(t, err)
walFloatHistogramCount += len(floatHistograms)
default:
}
}
@ -253,11 +337,14 @@ func TestRollback(t *testing.T) {
require.Equal(t, 0, walSeriesCount, "series should not have been written to WAL")
require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL")
require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL")
require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL")
require.Equal(t, 0, walFloatHistogramCount, "float histograms should not have been written to WAL")
}
func TestFullTruncateWAL(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 800
lastTs = 500
)
@ -283,11 +370,37 @@ func TestFullTruncateWAL(t *testing.T) {
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdb.GenerateTestHistograms(numHistograms)
for i := 0; i < numHistograms; i++ {
_, err := app.AppendHistogram(0, lset, int64(lastTs), histograms[i], nil)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdb.GenerateTestFloatHistograms(numHistograms)
for i := 0; i < numHistograms; i++ {
_, err := app.AppendHistogram(0, lset, int64(lastTs), nil, floatHistograms[i])
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Truncate WAL with mint to GC all the samples.
s.truncate(lastTs + 1)
m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
require.Equal(t, float64(numSeries*3), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
}
func TestPartialTruncateWAL(t *testing.T) {
@ -319,6 +432,32 @@ func TestPartialTruncateWAL(t *testing.T) {
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_histogram_batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdb.GenerateTestHistograms(numDatapoints)
for i := 0; i < numDatapoints; i++ {
_, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_float_histogram_batch-1", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdb.GenerateTestFloatHistograms(numDatapoints)
for i := 0; i < numDatapoints; i++ {
_, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i])
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600.
lastTs = 600
lbls = labelsForTest(t.Name()+"batch-2", numSeries)
@ -332,16 +471,43 @@ func TestPartialTruncateWAL(t *testing.T) {
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_histogram_batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdb.GenerateTestHistograms(numDatapoints)
for i := 0; i < numDatapoints; i++ {
_, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
lbls = labelsForTest(t.Name()+"_float_histogram_batch-2", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdb.GenerateTestFloatHistograms(numDatapoints)
for i := 0; i < numDatapoints; i++ {
_, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i])
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series.
s.truncate(lastTs - 1)
m := gatherFamily(t, reg, "prometheus_agent_deleted_series")
require.Equal(t, m.Metric[0].Gauge.GetValue(), float64(numSeries), "agent wal truncate mismatch of deleted series count")
require.Equal(t, float64(numSeries*3), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count")
}
func TestWALReplay(t *testing.T) {
const (
numDatapoints = 1000
numHistograms = 100
numSeries = 8
lastTs = 500
)
@ -359,6 +525,30 @@ func TestWALReplay(t *testing.T) {
}
}
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
histograms := tsdb.GenerateTestHistograms(numHistograms)
for i := 0; i < numHistograms; i++ {
_, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil)
require.NoError(t, err)
}
}
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)
floatHistograms := tsdb.GenerateTestFloatHistograms(numHistograms)
for i := 0; i < numHistograms; i++ {
_, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i])
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
require.NoError(t, s.Close())
@ -377,7 +567,7 @@ func TestWALReplay(t *testing.T) {
// Check if all the series are retrieved back from the WAL.
m := gatherFamily(t, reg, "prometheus_agent_active_series")
require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count")
require.Equal(t, float64(numSeries*3), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count")
// Check if lastTs of the samples retrieved from the WAL is retained.
metrics := replayStorage.series.series
@ -430,6 +620,15 @@ func Test_ExistingWAL_NextRef(t *testing.T) {
_, err := app.Append(0, lset, 0, 100)
require.NoError(t, err)
}
histogramCount := 10
histograms := tsdb.GenerateTestHistograms(histogramCount)
// Append <histogramCount> series
for i := 0; i < histogramCount; i++ {
lset := labels.FromStrings(model.MetricNameLabel, fmt.Sprintf("histogram_%d", i))
_, err := app.AppendHistogram(0, lset, 0, histograms[i], nil)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Truncate the WAL to force creation of a new segment.
@ -441,7 +640,7 @@ func Test_ExistingWAL_NextRef(t *testing.T) {
require.NoError(t, err)
defer require.NoError(t, db.Close())
require.Equal(t, uint64(seriesCount), db.nextRef.Load(), "nextRef should be equal to the number of series written across the entire WAL")
require.Equal(t, uint64(seriesCount+histogramCount), db.nextRef.Load(), "nextRef should be equal to the number of series written across the entire WAL")
}
func Test_validateOptions(t *testing.T) {