From c0573185780b66991bd89baa4f8d27672cd5cd25 Mon Sep 17 00:00:00 2001 From: Sebastian Rabenhorst <4246554+rabenhorst@users.noreply.github.com> Date: Thu, 12 Jan 2023 17:13:44 +0100 Subject: [PATCH] agent: native histogram support (#11842) Signed-off-by: Sebastian Rabenhorst --- tsdb/agent/db.go | 205 +++++++++++++++++++++++++++++++++++++--- tsdb/agent/db_test.go | 213 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 398 insertions(+), 20 deletions(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 656725fe3..da74fe4c9 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -44,6 +44,11 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" ) +const ( + sampleMetricTypeFloat = "float" + sampleMetricTypeHistogram = "histogram" +) + var ErrUnsupported = errors.New("unsupported operation with WAL-only storage") // Default values for options. @@ -96,7 +101,7 @@ type dbMetrics struct { numActiveSeries prometheus.Gauge numWALSeriesPendingDeletion prometheus.Gauge - totalAppendedSamples prometheus.Counter + totalAppendedSamples *prometheus.CounterVec totalAppendedExemplars prometheus.Counter totalOutOfOrderSamples prometheus.Counter walTruncateDuration prometheus.Summary @@ -120,10 +125,10 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { Help: "Number of series pending deletion from the WAL", }) - m.totalAppendedSamples = prometheus.NewCounter(prometheus.CounterOpts{ + m.totalAppendedSamples = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "prometheus_agent_samples_appended_total", Help: "Total number of samples appended to the storage", - }) + }, []string{"type"}) m.totalAppendedExemplars = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_agent_exemplars_appended_total", @@ -284,10 +289,12 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin db.appenderPool.New = func() interface{} { return &appender{ - DB: db, - pendingSeries: make([]record.RefSeries, 0, 100), - pendingSamples: make([]record.RefSample, 0, 100), - pendingExamplars: make([]record.RefExemplar, 0, 10), + DB: db, + pendingSeries: make([]record.RefSeries, 0, 100), + pendingSamples: make([]record.RefSample, 0, 100), + pendingHistograms: make([]record.RefHistogramSample, 0, 100), + pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100), + pendingExamplars: make([]record.RefExemplar, 0, 10), } } @@ -411,6 +418,16 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H return []record.RefSample{} }, } + histogramsPool = sync.Pool{ + New: func() interface{} { + return []record.RefHistogramSample{} + }, + } + floatHistogramsPool = sync.Pool{ + New: func() interface{} { + return []record.RefFloatHistogramSample{} + }, + } ) go func() { @@ -443,6 +460,30 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H return } decoded <- samples + case record.HistogramSamples: + histograms := histogramsPool.Get().([]record.RefHistogramSample)[:0] + histograms, err = dec.HistogramSamples(rec, histograms) + if err != nil { + errCh <- &wlog.CorruptionErr{ + Err: errors.Wrap(err, "decode histogram samples"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- histograms + case record.FloatHistogramSamples: + floatHistograms := floatHistogramsPool.Get().([]record.RefFloatHistogramSample)[:0] + floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) + if err != nil { + errCh <- &wlog.CorruptionErr{ + Err: errors.Wrap(err, "decode float histogram samples"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- floatHistograms case record.Tombstones, record.Exemplars: // We don't care about tombstones or exemplars during replay. // TODO: If decide to decode exemplars, we should make sure to prepopulate @@ -496,6 +537,36 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H //nolint:staticcheck samplesPool.Put(v) + case []record.RefHistogramSample: + for _, entry := range v { + // Update the lastTs for the series based + ref, ok := multiRef[entry.Ref] + if !ok { + nonExistentSeriesRefs.Inc() + continue + } + series := db.series.GetByID(ref) + if entry.T > series.lastTs { + series.lastTs = entry.T + } + } + //nolint:staticcheck + histogramsPool.Put(v) + case []record.RefFloatHistogramSample: + for _, entry := range v { + // Update the lastTs for the series based + ref, ok := multiRef[entry.Ref] + if !ok { + nonExistentSeriesRefs.Inc() + continue + } + series := db.series.GetByID(ref) + if entry.T > series.lastTs { + series.lastTs = entry.T + } + } + //nolint:staticcheck + floatHistogramsPool.Put(v) default: panic(fmt.Errorf("unexpected decoded type: %T", d)) } @@ -695,13 +766,23 @@ func (db *DB) Close() error { type appender struct { *DB - pendingSeries []record.RefSeries - pendingSamples []record.RefSample - pendingExamplars []record.RefExemplar + pendingSeries []record.RefSeries + pendingSamples []record.RefSample + pendingHistograms []record.RefHistogramSample + pendingFloatHistograms []record.RefFloatHistogramSample + pendingExamplars []record.RefExemplar // Pointers to the series referenced by each element of pendingSamples. // Series lock is not held on elements. sampleSeries []*memSeries + + // Pointers to the series referenced by each element of pendingHistograms. + // Series lock is not held on elements. + histogramSeries []*memSeries + + // Pointers to the series referenced by each element of pendingFloatHistograms. + // Series lock is not held on elements. + floatHistogramSeries []*memSeries } func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { @@ -749,7 +830,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo }) a.sampleSeries = append(a.sampleSeries, series) - a.metrics.totalAppendedSamples.Inc() + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc() return storage.SeriesRef(series.ref), nil } @@ -821,8 +902,74 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem } func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { - // TODO: Add histogram support. - return 0, nil + if h != nil { + if err := tsdb.ValidateHistogram(h); err != nil { + return 0, err + } + } + + if fh != nil { + if err := tsdb.ValidateFloatHistogram(fh); err != nil { + return 0, err + } + } + + // series references and chunk references are identical for agent mode. + headRef := chunks.HeadSeriesRef(ref) + + series := a.series.GetByID(headRef) + if series == nil { + // Ensure no empty or duplicate labels have gotten through. This mirrors the + // equivalent validation code in the TSDB's headAppender. + l = l.WithoutEmpty() + if l.IsEmpty() { + return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset") + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + } + + var created bool + series, created = a.getOrCreate(l) + if created { + a.pendingSeries = append(a.pendingSeries, record.RefSeries{ + Ref: series.ref, + Labels: l, + }) + + a.metrics.numActiveSeries.Inc() + } + } + + series.Lock() + defer series.Unlock() + + if t < series.lastTs { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + + if h != nil { + // NOTE: always modify pendingHistograms and histogramSeries together + a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{ + Ref: series.ref, + T: t, + H: h, + }) + a.histogramSeries = append(a.histogramSeries, series) + } else if fh != nil { + // NOTE: always modify pendingFloatHistograms and floatHistogramSeries together + a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{ + Ref: series.ref, + T: t, + FH: fh, + }) + a.floatHistogramSeries = append(a.floatHistogramSeries, series) + } + + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + return storage.SeriesRef(series.ref), nil } func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { @@ -854,6 +1001,22 @@ func (a *appender) Commit() error { buf = buf[:0] } + if len(a.pendingHistograms) > 0 { + buf = encoder.HistogramSamples(a.pendingHistograms, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + if len(a.pendingFloatHistograms) > 0 { + buf = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf) + if err := a.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + if len(a.pendingExamplars) > 0 { buf = encoder.Exemplars(a.pendingExamplars, buf) if err := a.wal.Log(buf); err != nil { @@ -869,6 +1032,18 @@ func (a *appender) Commit() error { a.metrics.totalOutOfOrderSamples.Inc() } } + for i, s := range a.pendingHistograms { + series = a.histogramSeries[i] + if !series.updateTimestamp(s.T) { + a.metrics.totalOutOfOrderSamples.Inc() + } + } + for i, s := range a.pendingFloatHistograms { + series = a.floatHistogramSeries[i] + if !series.updateTimestamp(s.T) { + a.metrics.totalOutOfOrderSamples.Inc() + } + } //nolint:staticcheck a.bufPool.Put(buf) @@ -878,8 +1053,12 @@ func (a *appender) Commit() error { func (a *appender) Rollback() error { a.pendingSeries = a.pendingSeries[:0] a.pendingSamples = a.pendingSamples[:0] + a.pendingHistograms = a.pendingHistograms[:0] + a.pendingFloatHistograms = a.pendingFloatHistograms[:0] a.pendingExamplars = a.pendingExamplars[:0] a.sampleSeries = a.sampleSeries[:0] + a.histogramSeries = a.histogramSeries[:0] + a.floatHistogramSeries = a.floatHistogramSeries[:0] a.appenderPool.Put(a) return nil } diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 5933944de..0bd3b8249 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -53,6 +53,14 @@ func TestDB_InvalidSeries(t *testing.T) { require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels") }) + t.Run("Histograms", func(t *testing.T) { + _, err := app.AppendHistogram(0, labels.Labels{}, 0, tsdb.GenerateTestHistograms(1)[0], nil) + require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels") + + _, err = app.AppendHistogram(0, labels.FromStrings("a", "1", "a", "2"), 0, tsdb.GenerateTestHistograms(1)[0], nil) + require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels") + }) + t.Run("Exemplars", func(t *testing.T) { sRef, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0) require.NoError(t, err, "should not reject valid series") @@ -112,6 +120,7 @@ func TestUnsupportedFunctions(t *testing.T) { func TestCommit(t *testing.T) { const ( numDatapoints = 1000 + numHistograms = 100 numSeries = 8 ) @@ -138,6 +147,30 @@ func TestCommit(t *testing.T) { } } + lbls = labelsForTest(t.Name()+"_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdb.GenerateTestHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdb.GenerateTestFloatHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + require.NoError(t, err) + } + } + require.NoError(t, app.Commit()) require.NoError(t, s.Close()) @@ -152,7 +185,7 @@ func TestCommit(t *testing.T) { r = wlog.NewReader(sr) dec record.Decoder - walSeriesCount, walSamplesCount, walExemplarsCount int + walSeriesCount, walSamplesCount, walExemplarsCount, walHistogramCount, walFloatHistogramCount int ) for r.Next() { rec := r.Record() @@ -169,6 +202,18 @@ func TestCommit(t *testing.T) { require.NoError(t, err) walSamplesCount += len(samples) + case record.HistogramSamples: + var histograms []record.RefHistogramSample + histograms, err = dec.HistogramSamples(rec, histograms) + require.NoError(t, err) + walHistogramCount += len(histograms) + + case record.FloatHistogramSamples: + var floatHistograms []record.RefFloatHistogramSample + floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) + require.NoError(t, err) + walFloatHistogramCount += len(floatHistograms) + case record.Exemplars: var exemplars []record.RefExemplar exemplars, err = dec.Exemplars(rec, exemplars) @@ -180,14 +225,17 @@ func TestCommit(t *testing.T) { } // Check that the WAL contained the same number of committed series/samples/exemplars. - require.Equal(t, numSeries, walSeriesCount, "unexpected number of series") + require.Equal(t, numSeries*3, walSeriesCount, "unexpected number of series") require.Equal(t, numSeries*numDatapoints, walSamplesCount, "unexpected number of samples") require.Equal(t, numSeries*numDatapoints, walExemplarsCount, "unexpected number of exemplars") + require.Equal(t, numSeries*numHistograms, walHistogramCount, "unexpected number of histograms") + require.Equal(t, numSeries*numHistograms, walFloatHistogramCount, "unexpected number of float histograms") } func TestRollback(t *testing.T) { const ( numDatapoints = 1000 + numHistograms = 100 numSeries = 8 ) @@ -205,6 +253,30 @@ func TestRollback(t *testing.T) { } } + lbls = labelsForTest(t.Name()+"_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdb.GenerateTestHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdb.GenerateTestFloatHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + require.NoError(t, err) + } + } + // Do a rollback, which should clear uncommitted data. A followup call to // commit should persist nothing to the WAL. require.NoError(t, app.Rollback()) @@ -222,7 +294,7 @@ func TestRollback(t *testing.T) { r = wlog.NewReader(sr) dec record.Decoder - walSeriesCount, walSamplesCount, walExemplarsCount int + walSeriesCount, walSamplesCount, walHistogramCount, walFloatHistogramCount, walExemplarsCount int ) for r.Next() { rec := r.Record() @@ -245,6 +317,18 @@ func TestRollback(t *testing.T) { require.NoError(t, err) walExemplarsCount += len(exemplars) + case record.HistogramSamples: + var histograms []record.RefHistogramSample + histograms, err = dec.HistogramSamples(rec, histograms) + require.NoError(t, err) + walHistogramCount += len(histograms) + + case record.FloatHistogramSamples: + var floatHistograms []record.RefFloatHistogramSample + floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) + require.NoError(t, err) + walFloatHistogramCount += len(floatHistograms) + default: } } @@ -253,11 +337,14 @@ func TestRollback(t *testing.T) { require.Equal(t, 0, walSeriesCount, "series should not have been written to WAL") require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL") require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL") + require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL") + require.Equal(t, 0, walFloatHistogramCount, "float histograms should not have been written to WAL") } func TestFullTruncateWAL(t *testing.T) { const ( numDatapoints = 1000 + numHistograms = 100 numSeries = 800 lastTs = 500 ) @@ -283,11 +370,37 @@ func TestFullTruncateWAL(t *testing.T) { require.NoError(t, app.Commit()) } + lbls = labelsForTest(t.Name()+"_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdb.GenerateTestHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(lastTs), histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdb.GenerateTestFloatHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, int64(lastTs), nil, floatHistograms[i]) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + // Truncate WAL with mint to GC all the samples. s.truncate(lastTs + 1) m := gatherFamily(t, reg, "prometheus_agent_deleted_series") - require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count") + require.Equal(t, float64(numSeries*3), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count") } func TestPartialTruncateWAL(t *testing.T) { @@ -319,6 +432,32 @@ func TestPartialTruncateWAL(t *testing.T) { require.NoError(t, app.Commit()) } + lbls = labelsForTest(t.Name()+"_histogram_batch-1", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdb.GenerateTestHistograms(numDatapoints) + + for i := 0; i < numDatapoints; i++ { + _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_float_histogram_batch-1", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdb.GenerateTestFloatHistograms(numDatapoints) + + for i := 0; i < numDatapoints; i++ { + _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + // Create second batch of 800 series with 1000 data-points with a fixed lastTs as 600. lastTs = 600 lbls = labelsForTest(t.Name()+"batch-2", numSeries) @@ -332,16 +471,43 @@ func TestPartialTruncateWAL(t *testing.T) { require.NoError(t, app.Commit()) } + lbls = labelsForTest(t.Name()+"_histogram_batch-2", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdb.GenerateTestHistograms(numDatapoints) + + for i := 0; i < numDatapoints; i++ { + _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + lbls = labelsForTest(t.Name()+"_float_histogram_batch-2", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdb.GenerateTestFloatHistograms(numDatapoints) + + for i := 0; i < numDatapoints; i++ { + _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + // Truncate WAL with mint to GC only the first batch of 800 series and retaining 2nd batch of 800 series. s.truncate(lastTs - 1) m := gatherFamily(t, reg, "prometheus_agent_deleted_series") - require.Equal(t, m.Metric[0].Gauge.GetValue(), float64(numSeries), "agent wal truncate mismatch of deleted series count") + require.Equal(t, float64(numSeries*3), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count") } func TestWALReplay(t *testing.T) { const ( numDatapoints = 1000 + numHistograms = 100 numSeries = 8 lastTs = 500 ) @@ -359,6 +525,30 @@ func TestWALReplay(t *testing.T) { } } + lbls = labelsForTest(t.Name()+"_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + histograms := tsdb.GenerateTestHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + require.NoError(t, err) + } + } + + lbls = labelsForTest(t.Name()+"_float_histogram", numSeries) + for _, l := range lbls { + lset := labels.New(l...) + + floatHistograms := tsdb.GenerateTestFloatHistograms(numHistograms) + + for i := 0; i < numHistograms; i++ { + _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + require.NoError(t, err) + } + } + require.NoError(t, app.Commit()) require.NoError(t, s.Close()) @@ -377,7 +567,7 @@ func TestWALReplay(t *testing.T) { // Check if all the series are retrieved back from the WAL. m := gatherFamily(t, reg, "prometheus_agent_active_series") - require.Equal(t, float64(numSeries), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count") + require.Equal(t, float64(numSeries*3), m.Metric[0].Gauge.GetValue(), "agent wal replay mismatch of active series count") // Check if lastTs of the samples retrieved from the WAL is retained. metrics := replayStorage.series.series @@ -430,6 +620,15 @@ func Test_ExistingWAL_NextRef(t *testing.T) { _, err := app.Append(0, lset, 0, 100) require.NoError(t, err) } + + histogramCount := 10 + histograms := tsdb.GenerateTestHistograms(histogramCount) + // Append series + for i := 0; i < histogramCount; i++ { + lset := labels.FromStrings(model.MetricNameLabel, fmt.Sprintf("histogram_%d", i)) + _, err := app.AppendHistogram(0, lset, 0, histograms[i], nil) + require.NoError(t, err) + } require.NoError(t, app.Commit()) // Truncate the WAL to force creation of a new segment. @@ -441,7 +640,7 @@ func Test_ExistingWAL_NextRef(t *testing.T) { require.NoError(t, err) defer require.NoError(t, db.Close()) - require.Equal(t, uint64(seriesCount), db.nextRef.Load(), "nextRef should be equal to the number of series written across the entire WAL") + require.Equal(t, uint64(seriesCount+histogramCount), db.nextRef.Load(), "nextRef should be equal to the number of series written across the entire WAL") } func Test_validateOptions(t *testing.T) {