From bab587b9dca16274e38babbbf56efba50956dbd2 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Sun, 27 Oct 2024 02:06:34 +0200 Subject: [PATCH] Agent: allow for ingestion of CT samples (#15124) * Remove unused option from HeadOptions Signed-off-by: Pedro Tanaka * Improve docs for appendable() method in head appender Signed-off-by: Pedro Tanaka * Ingest CT (float) samples in Agent DB Signed-off-by: Pedro Tanaka * allow for ingestion of CT native histogram Signed-off-by: Pedro Tanaka * adding some verification for ct ts Signed-off-by: Pedro Tanaka * Validating CT histogram before append and add newly created series to pending series Signed-off-by: Pedro Tanaka * checking the wal for written samples Signed-off-by: Pedro Tanaka * Checking for samples in test Signed-off-by: Pedro Tanaka * adding case for validations Signed-off-by: Pedro Tanaka * fixing comparison when dedupelabels is enabled Signed-off-by: Pedro Tanaka * unite tests, use table testing Signed-off-by: Pedro Tanaka * Implement CT related methods in timestampTracker for write storage Signed-off-by: Pedro Tanaka * adding error case to test Signed-off-by: Pedro Tanaka * removing unused fields Signed-off-by: Pedro Tanaka * Updating lastTs for series when adding CT to invalidate duplicates Signed-off-by: Pedro Tanaka * making sure that updating the lastTS wont cause OOO later on in Commit(); Signed-off-by: Pedro Tanaka --------- Signed-off-by: Pedro Tanaka --- storage/remote/write.go | 24 ++-- tsdb/agent/db.go | 136 ++++++++++++++++++++-- tsdb/agent/db_test.go | 246 ++++++++++++++++++++++++++++++++++++++++ tsdb/head.go | 4 - tsdb/head_append.go | 7 +- 5 files changed, 395 insertions(+), 22 deletions(-) diff --git a/storage/remote/write.go b/storage/remote/write.go index 00e4fa3a0c..639f344520 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -312,8 +312,23 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, return 0, nil } -func (t *timestampTracker) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { - // TODO: Implement +func (t *timestampTracker) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, ct int64) (storage.SeriesRef, error) { + t.samples++ + if ct > t.highestTimestamp { + // Theoretically, we should never see a CT zero sample with a timestamp higher than the highest timestamp we've seen so far. + // However, we're not going to enforce that here, as it is not the responsibility of the tracker to enforce this. + t.highestTimestamp = ct + } + return 0, nil +} + +func (t *timestampTracker) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, ct int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + t.histograms++ + if ct > t.highestTimestamp { + // Theoretically, we should never see a CT zero sample with a timestamp higher than the highest timestamp we've seen so far. + // However, we're not going to enforce that here, as it is not the responsibility of the tracker to enforce this. + t.highestTimestamp = ct + } return 0, nil } @@ -323,11 +338,6 @@ func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, return 0, nil } -func (t *timestampTracker) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { - // AppendCTZeroSample is no-op for remote-write for now. - return 0, nil -} - // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 5de84c93af..3863e6cd99 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -976,19 +976,139 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int return storage.SeriesRef(series.ref), nil } -func (a *appender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { - // TODO(bwplotka/arthursens): Wire metadata in the Agent's appender. - return 0, nil -} - func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { // TODO: Wire metadata in the Agent's appender. return 0, nil } -func (a *appender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) { - // TODO(bwplotka): Wire metadata in the Agent's appender. - return 0, nil +func (a *appender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if h != nil { + if err := h.Validate(); err != nil { + return 0, err + } + } + if fh != nil { + if err := fh.Validate(); err != nil { + return 0, err + } + } + if ct >= t { + return 0, storage.ErrCTNewerThanSample + } + + series := a.series.GetByID(chunks.HeadSeriesRef(ref)) + if series == nil { + // Ensure no empty labels have gotten through. + l = l.WithoutEmpty() + if l.IsEmpty() { + return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) + } + + var created bool + series, created = a.getOrCreate(l) + if created { + a.pendingSeries = append(a.pendingSeries, record.RefSeries{ + Ref: series.ref, + Labels: l, + }) + a.metrics.numActiveSeries.Inc() + } + } + + series.Lock() + defer series.Unlock() + + if ct <= a.minValidTime(series.lastTs) { + return 0, storage.ErrOutOfOrderCT + } + + if ct > series.lastTs { + series.lastTs = ct + } else { + // discard the sample if it's out of order. + return 0, storage.ErrOutOfOrderCT + } + + switch { + case h != nil: + zeroHistogram := &histogram.Histogram{} + a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{ + Ref: series.ref, + T: ct, + H: zeroHistogram, + }) + a.histogramSeries = append(a.histogramSeries, series) + case fh != nil: + a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{ + Ref: series.ref, + T: ct, + FH: &histogram.FloatHistogram{}, + }) + a.floatHistogramSeries = append(a.floatHistogramSeries, series) + } + + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + return storage.SeriesRef(series.ref), nil +} + +func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { + if ct >= t { + return 0, storage.ErrCTNewerThanSample + } + + series := a.series.GetByID(chunks.HeadSeriesRef(ref)) + if series == nil { + l = l.WithoutEmpty() + if l.IsEmpty() { + return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) + } + + newSeries, created := a.getOrCreate(l) + if created { + a.pendingSeries = append(a.pendingSeries, record.RefSeries{ + Ref: newSeries.ref, + Labels: l, + }) + a.metrics.numActiveSeries.Inc() + } + + series = newSeries + } + + series.Lock() + defer series.Unlock() + + if t <= a.minValidTime(series.lastTs) { + a.metrics.totalOutOfOrderSamples.Inc() + return 0, storage.ErrOutOfOrderSample + } + + if ct > series.lastTs { + series.lastTs = ct + } else { + // discard the sample if it's out of order. + return 0, storage.ErrOutOfOrderCT + } + + // NOTE: always modify pendingSamples and sampleSeries together. + a.pendingSamples = append(a.pendingSamples, record.RefSample{ + Ref: series.ref, + T: ct, + V: 0, + }) + a.sampleSeries = append(a.sampleSeries, series) + + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc() + + return storage.SeriesRef(series.ref), nil } // Commit submits the collected samples and purges the batch. diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 4d5fda25db..b28c29095c 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -15,7 +15,9 @@ package agent import ( "context" + "errors" "fmt" + "io" "math" "path/filepath" "strconv" @@ -29,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" @@ -933,6 +936,249 @@ func TestDBOutOfOrderTimeWindow(t *testing.T) { } } +type walSample struct { + t int64 + f float64 + h *histogram.Histogram + lbls labels.Labels + ref storage.SeriesRef +} + +func TestDBCreatedTimestampSamplesIngestion(t *testing.T) { + t.Parallel() + + type appendableSample struct { + t int64 + ct int64 + v float64 + lbls labels.Labels + h *histogram.Histogram + expectsError bool + } + + testHistogram := tsdbutil.GenerateTestHistograms(1)[0] + zeroHistogram := &histogram.Histogram{} + + lbls := labelsForTest(t.Name(), 1) + defLbls := labels.New(lbls[0]...) + + testCases := []struct { + name string + inputSamples []appendableSample + expectedSamples []*walSample + expectedSeriesCount int + }{ + { + name: "in order ct+normal sample/floatSamples", + inputSamples: []appendableSample{ + {t: 100, ct: 1, v: 10, lbls: defLbls}, + {t: 101, ct: 1, v: 10, lbls: defLbls}, + }, + expectedSamples: []*walSample{ + {t: 1, f: 0, lbls: defLbls}, + {t: 100, f: 10, lbls: defLbls}, + {t: 101, f: 10, lbls: defLbls}, + }, + }, + { + name: "CT+float && CT+histogram samples", + inputSamples: []appendableSample{ + { + t: 100, + ct: 30, + v: 20, + lbls: defLbls, + }, + { + t: 300, + ct: 230, + h: testHistogram, + lbls: defLbls, + }, + }, + expectedSamples: []*walSample{ + {t: 30, f: 0, lbls: defLbls}, + {t: 100, f: 20, lbls: defLbls}, + {t: 230, h: zeroHistogram, lbls: defLbls}, + {t: 300, h: testHistogram, lbls: defLbls}, + }, + expectedSeriesCount: 1, + }, + { + name: "CT+float && CT+histogram samples with error", + inputSamples: []appendableSample{ + { + // invalid CT + t: 100, + ct: 100, + v: 10, + lbls: defLbls, + expectsError: true, + }, + { + // invalid CT histogram + t: 300, + ct: 300, + h: testHistogram, + lbls: defLbls, + expectsError: true, + }, + }, + expectedSamples: []*walSample{ + {t: 100, f: 10, lbls: defLbls}, + {t: 300, h: testHistogram, lbls: defLbls}, + }, + expectedSeriesCount: 0, + }, + { + name: "In order ct+normal sample/histogram", + inputSamples: []appendableSample{ + {t: 100, h: testHistogram, ct: 1, lbls: defLbls}, + {t: 101, h: testHistogram, ct: 1, lbls: defLbls}, + }, + expectedSamples: []*walSample{ + {t: 1, h: &histogram.Histogram{}}, + {t: 100, h: testHistogram}, + {t: 101, h: &histogram.Histogram{CounterResetHint: histogram.NotCounterReset}}, + }, + }, + { + name: "ct+normal then OOO sample/float", + inputSamples: []appendableSample{ + {t: 60_000, ct: 40_000, v: 10, lbls: defLbls}, + {t: 120_000, ct: 40_000, v: 10, lbls: defLbls}, + {t: 180_000, ct: 40_000, v: 10, lbls: defLbls}, + {t: 50_000, ct: 40_000, v: 10, lbls: defLbls}, + }, + expectedSamples: []*walSample{ + {t: 40_000, f: 0, lbls: defLbls}, + {t: 50_000, f: 10, lbls: defLbls}, + {t: 60_000, f: 10, lbls: defLbls}, + {t: 120_000, f: 10, lbls: defLbls}, + {t: 180_000, f: 10, lbls: defLbls}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + reg := prometheus.NewRegistry() + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 360_000 + s := createTestAgentDB(t, reg, opts) + app := s.Appender(context.TODO()) + + for _, sample := range tc.inputSamples { + // We supposed to write a Histogram to the WAL + if sample.h != nil { + _, err := app.AppendHistogramCTZeroSample(0, sample.lbls, sample.t, sample.ct, zeroHistogram, nil) + if !errors.Is(err, storage.ErrOutOfOrderCT) { + require.Equal(t, sample.expectsError, err != nil, "expected error: %v, got: %v", sample.expectsError, err) + } + + _, err = app.AppendHistogram(0, sample.lbls, sample.t, sample.h, nil) + require.NoError(t, err) + } else { + // We supposed to write a float sample to the WAL + _, err := app.AppendCTZeroSample(0, sample.lbls, sample.t, sample.ct) + if !errors.Is(err, storage.ErrOutOfOrderCT) { + require.Equal(t, sample.expectsError, err != nil, "expected error: %v, got: %v", sample.expectsError, err) + } + + _, err = app.Append(0, sample.lbls, sample.t, sample.v) + require.NoError(t, err) + } + } + + require.NoError(t, app.Commit()) + // Close the DB to ensure all data is flushed to the WAL + require.NoError(t, s.Close()) + + // Check that we dont have any OOO samples in the WAL by checking metrics + families, err := reg.Gather() + require.NoError(t, err, "failed to gather metrics") + for _, f := range families { + if f.GetName() == "prometheus_agent_out_of_order_samples_total" { + t.Fatalf("unexpected metric %s", f.GetName()) + } + } + + outputSamples := readWALSamples(t, s.wal.Dir()) + + require.Equal(t, len(tc.expectedSamples), len(outputSamples), "Expected %d samples", len(tc.expectedSamples)) + + for i, expectedSample := range tc.expectedSamples { + for _, sample := range outputSamples { + if sample.t == expectedSample.t && sample.lbls.String() == expectedSample.lbls.String() { + if expectedSample.h != nil { + require.Equal(t, expectedSample.h, sample.h, "histogram value mismatch (sample index %d)", i) + } else { + require.Equal(t, expectedSample.f, sample.f, "value mismatch (sample index %d)", i) + } + } + } + } + }) + } +} + +func readWALSamples(t *testing.T, walDir string) []*walSample { + t.Helper() + sr, err := wlog.NewSegmentsReader(walDir) + require.NoError(t, err) + defer func(sr io.ReadCloser) { + err := sr.Close() + require.NoError(t, err) + }(sr) + + r := wlog.NewReader(sr) + dec := record.NewDecoder(labels.NewSymbolTable()) + + var ( + samples []record.RefSample + histograms []record.RefHistogramSample + + lastSeries record.RefSeries + outputSamples = make([]*walSample, 0) + ) + + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + series, err := dec.Series(rec, nil) + require.NoError(t, err) + lastSeries = series[0] + case record.Samples: + samples, err = dec.Samples(rec, samples[:0]) + require.NoError(t, err) + for _, s := range samples { + outputSamples = append(outputSamples, &walSample{ + t: s.T, + f: s.V, + lbls: lastSeries.Labels.Copy(), + ref: storage.SeriesRef(lastSeries.Ref), + }) + } + case record.HistogramSamples: + histograms, err = dec.HistogramSamples(rec, histograms[:0]) + require.NoError(t, err) + for _, h := range histograms { + outputSamples = append(outputSamples, &walSample{ + t: h.T, + h: h.H, + lbls: lastSeries.Labels.Copy(), + ref: storage.SeriesRef(lastSeries.Ref), + }) + } + } + } + + return outputSamples +} + func BenchmarkCreateSeries(b *testing.B) { s := createTestAgentDB(b, nil, DefaultOptions()) defer s.Close() diff --git a/tsdb/head.go b/tsdb/head.go index 2963d781d0..c67c438e52 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -155,10 +155,6 @@ type HeadOptions struct { // OutOfOrderTimeWindow is > 0 EnableOOONativeHistograms atomic.Bool - // EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample. - // See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md - EnableCreatedTimestampZeroIngestion bool - ChunkRange int64 // ChunkDirRoot is the parent directory of the chunks directory. ChunkDirRoot string diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 170e740448..9c732990bf 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -474,9 +474,10 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo return s, created, nil } -// appendable checks whether the given sample is valid for appending to the series. (if we return false and no error) -// The sample belongs to the out of order chunk if we return true and no error. -// An error signifies the sample cannot be handled. +// appendable checks whether the given sample is valid for appending to the series. +// If the sample is valid and in-order, it returns false with no error. +// If the sample belongs to the out-of-order chunk, it returns true with no error. +// If the sample cannot be handled, it returns an error. func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTimeWindow int64) (isOOO bool, oooDelta int64, err error) { // Check if we can append in the in-order chunk. if t >= minValidTime {