From 6bd9b1a7cc0369952da4467a4f38c4c8c6dd4629 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 19 Jul 2024 11:28:00 -0300 Subject: [PATCH] Histogram CT Zero ingestion Signed-off-by: Arthur Silva Sens --- cmd/prometheus/main.go | 4 + scrape/helpers_test.go | 20 ++- scrape/manager_test.go | 174 +++++++++++++++++++++++++++ scrape/scrape.go | 10 +- scrape/scrape_test.go | 6 +- storage/fanout.go | 14 +++ storage/interface.go | 17 ++- storage/remote/write.go | 5 + storage/remote/write_handler_test.go | 7 ++ tsdb/agent/db.go | 5 + tsdb/head_append.go | 113 ++++++++++++++++- tsdb/head_test.go | 160 ++++++++++++++++++++++++ 12 files changed, 526 insertions(+), 9 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index e7fd82e6f3..7d9106b335 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1597,6 +1597,10 @@ func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels return 0, tsdb.ErrNotReady } +func (n notReadyAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 0, tsdb.ErrNotReady +} + func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { return 0, tsdb.ErrNotReady } diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 116fa5c94b..4f7918f79e 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -55,6 +55,10 @@ func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *h return 0, nil } +func (a nopAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 0, nil +} + func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { return 0, nil } @@ -78,9 +82,10 @@ func equalFloatSamples(a, b floatSample) bool { } type histogramSample struct { - t int64 - h *histogram.Histogram - fh *histogram.FloatHistogram + metric labels.Labels + t int64 + h *histogram.Histogram + fh *histogram.FloatHistogram } type collectResultAppendable struct { @@ -146,7 +151,7 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { a.mtx.Lock() defer a.mtx.Unlock() - a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t}) + a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, metric: l}) if a.next == nil { return 0, nil } @@ -154,6 +159,13 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels. return a.next.AppendHistogram(ref, l, t, h, fh) } +func (a *collectResultAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if h != nil { + return a.AppendHistogram(ref, l, ct, &histogram.Histogram{}, nil) + } + return a.AppendHistogram(ref, l, ct, nil, &histogram.FloatHistogram{}) +} + func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { a.mtx.Lock() defer a.mtx.Unlock() diff --git a/scrape/manager_test.go b/scrape/manager_test.go index cd712ca62b..13a3698127 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -39,8 +39,10 @@ import ( "github.com/prometheus/prometheus/discovery" _ "github.com/prometheus/prometheus/discovery/file" "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/runutil" "github.com/prometheus/prometheus/util/testutil" ) @@ -858,6 +860,178 @@ func TestManagerCTZeroIngestion(t *testing.T) { } } +// generateTestHistogram generates the same thing as tsdbutil.GenerateTestHistogram, +// but in the form of dto.Histogram. +func generateTestHistogram(i int) *dto.Histogram { + helper := tsdbutil.GenerateTestHistogram(i) + h := &dto.Histogram{} + h.SampleCount = proto.Uint64(helper.Count) + h.SampleSum = proto.Float64(helper.Sum) + h.Schema = proto.Int32(helper.Schema) + h.ZeroThreshold = proto.Float64(helper.ZeroThreshold) + h.ZeroCount = proto.Uint64(helper.ZeroCount) + h.PositiveSpan = make([]*dto.BucketSpan, len(helper.PositiveSpans)) + for i, span := range helper.PositiveSpans { + h.PositiveSpan[i] = &dto.BucketSpan{ + Offset: proto.Int32(span.Offset), + Length: proto.Uint32(span.Length), + } + } + h.PositiveDelta = helper.PositiveBuckets + h.NegativeSpan = make([]*dto.BucketSpan, len(helper.NegativeSpans)) + for i, span := range helper.NegativeSpans { + h.NegativeSpan[i] = &dto.BucketSpan{ + Offset: proto.Int32(span.Offset), + Length: proto.Uint32(span.Length), + } + } + h.NegativeDelta = helper.NegativeBuckets + return h +} + +func TestManagerCTZeroIngestionHistogram(t *testing.T) { + const mName = "expected_histogram" + + for _, tc := range []struct { + name string + inputHistSample *dto.Histogram + enableCTZeroIngestion bool + }{ + { + name: "disabled with CT on histogram", + inputHistSample: func() *dto.Histogram { + h := generateTestHistogram(0) + h.CreatedTimestamp = timestamppb.Now() + return h + }(), + enableCTZeroIngestion: false, + }, + { + name: "enabled with CT on histogram", + inputHistSample: func() *dto.Histogram { + h := generateTestHistogram(0) + h.CreatedTimestamp = timestamppb.Now() + return h + }(), + enableCTZeroIngestion: true, + }, + { + name: "enabled without CT on histogram", + inputHistSample: func() *dto.Histogram { + h := generateTestHistogram(0) + return h + }(), + enableCTZeroIngestion: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + app := &collectResultAppender{} + scrapeManager, err := NewManager( + &Options{ + EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion, + EnableNativeHistogramsIngestion: true, + skipOffsetting: true, + }, + log.NewLogfmtLogger(os.Stderr), + nil, + &collectResultAppendable{app}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + + require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ + GlobalConfig: config.GlobalConfig{ + // Disable regular scrapes. + ScrapeInterval: model.Duration(9999 * time.Minute), + ScrapeTimeout: model.Duration(5 * time.Second), + // Ensure the proto is chosen. We need proto as it's the only protocol + // with the CT parsing support. + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + })) + + once := sync.Once{} + // Start fake HTTP target to that allow one scrape only. + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fail := true // TODO(bwplotka): Kill or use? + once.Do(func() { + fail = false + w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) + + ctrType := dto.MetricType_HISTOGRAM + w.Write(protoMarshalDelimited(t, &dto.MetricFamily{ + Name: proto.String(mName), + Type: &ctrType, + Metric: []*dto.Metric{{Histogram: tc.inputHistSample}}, + })) + }) + + if fail { + w.WriteHeader(http.StatusInternalServerError) + } + }), + ) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + require.NoError(t, err) + + // Add fake target directly into tsets + reload. Normally users would use + // Manager.Run and wait for minimum 5s refresh interval. + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }}, + }}, + }) + scrapeManager.reload() + + var got []histogramSample + + // Wait for one scrape. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { + app.mtx.Lock() + defer app.mtx.Unlock() + + // Check if scrape happened and grab the relevant histograms, they have to be there - or it's a bug + // and it's not worth waiting. + for _, h := range app.resultHistograms { + if h.metric.Get(model.MetricNameLabel) == mName { + got = append(got, h) + } + } + if len(app.resultHistograms) > 0 { + return nil + } + return fmt.Errorf("expected some histogram samples, got none") + }), "after 1 minute") + scrapeManager.Stop() + + // Check for zero samples, assuming we only injected always one histogram sample. + // Did it contain CT to inject? If yes, was CT zero enabled? + if tc.inputHistSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion { + require.Len(t, got, 2) + // Zero sample. + require.Equal(t, histogram.Histogram{}, *got[0].h) + // Quick soft check to make sure it's the same sample or at least not zero. + require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].h.Sum) + return + } + + // Expect only one, valid sample. + require.Len(t, got, 1) + // Quick soft check to make sure it's the same sample or at least not zero. + require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].h.Sum) + }) + } +} + func TestUnregisterMetrics(t *testing.T) { reg := prometheus.NewRegistry() // Check that all metrics can be unregistered, allowing a second manager to be created. diff --git a/scrape/scrape.go b/scrape/scrape.go index dca4682b11..f29beeb03f 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1701,7 +1701,15 @@ loop: } else { if sl.enableCTZeroIngestion { if ctMs := p.CreatedTimestamp(); ctMs != nil { - ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) + if isHistogram && sl.enableNativeHistogramIngestion { + if h != nil { + ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, h, nil) + } else { + ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, nil, fh) + } + } else { + ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) + } if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. // CT is an experimental feature. For now, we don't need to fail the // scrape on errors updating the created timestamp, log debug. diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 9887924c33..9e49fe8efa 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1999,7 +1999,8 @@ metric: < `, contentType: "application/vnd.google.protobuf", histograms: []histogramSample{{ - t: 1234568, + t: 1234568, + metric: labels.FromStrings("__name__", "test_histogram"), h: &histogram.Histogram{ Count: 175, ZeroCount: 2, @@ -2125,7 +2126,8 @@ metric: < {metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"), t: 1234568, f: 175}, }, histograms: []histogramSample{{ - t: 1234568, + t: 1234568, + metric: labels.FromStrings("__name__", "test_histogram"), h: &histogram.Histogram{ Count: 175, ZeroCount: 2, diff --git a/storage/fanout.go b/storage/fanout.go index e52342bc7e..80022b2566 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -190,6 +190,20 @@ func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64 return ref, nil } +func (f *fanoutAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) { + ref, err := f.primary.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh); err != nil { + return 0, err + } + } + return ref, nil +} + func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) { ref, err := f.primary.UpdateMetadata(ref, l, m) if err != nil { diff --git a/storage/interface.go b/storage/interface.go index 9654c88331..7ac93129e8 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -50,7 +50,8 @@ var ( // NOTE(bwplotka): This can be both an instrumentation failure or commonly expected // behaviour, and we currently don't have a way to determine this. As a result // it's recommended to ignore this error for now. - ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring") + ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring") + ErrCTNewerThanSample = fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring") ) // SeriesRef is a generic series reference. In prometheus it is either a @@ -313,6 +314,20 @@ type HistogramAppender interface { // pointer. AppendHistogram won't mutate the histogram, but in turn // depends on the caller to not mutate it either. AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) + // AppendHistogramCTZeroSample adds synthetic zero sample for the given ct timestamp, + // which will be associated with given series, labels and the incoming + // sample's t (timestamp). AppendHistogramCTZeroSample returns error if zero sample can't be + // appended, for example when ct is too old, or when it would collide with + // incoming sample (sample has priority). + // + // AppendHistogramCTZeroSample has to be called before the corresponding histogram AppendHistogram. + // A series reference number is returned which can be used to modify the + // CT for the given series in the same or later transactions. + // Returned reference numbers are ephemeral and may be rejected in calls + // to AppendHistogramCTZeroSample() at any point. + // + // If the reference is 0 it must not be used for caching. + AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) } // MetadataUpdater provides an interface for associating metadata to stored series. diff --git a/storage/remote/write.go b/storage/remote/write.go index eba4290840..624732c4fe 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -306,6 +306,11 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, return 0, nil } +func (t *timestampTracker) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + // TODO: Implement + return 0, nil +} + func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { // TODO: Add and increment a `metadata` field when we get around to wiring metadata in remote_write. // UpdateMetadata is no-op for remote write (where timestampTracker is being used) for now. diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 5c89a1ab95..8e628f40de 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -915,6 +915,13 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t return 0, nil } +func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + // AppendCTZeroSample is no-op for remote-write for now. + // TODO(bwplotka/arthursens): Add support for PRW 2.0 for CT zero feature (but also we might + // replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218). + return 0, nil +} + func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp metadata.Metadata) (storage.SeriesRef, error) { if m.updateMetadataErr != nil { return 0, m.updateMetadataErr diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 596d5c8a31..5e33fce808 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -972,6 +972,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int return storage.SeriesRef(series.ref), nil } +func (a *appender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + // TODO(bwplotka/arthursens): Wire metadata in the Agent's appender. + return 0, nil +} + func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { // TODO: Wire metadata in the Agent's appender. return 0, nil diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 3dd9a367b0..10fb17809b 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -79,6 +79,16 @@ func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t return a.app.AppendHistogram(ref, l, t, h, fh) } +func (a *initAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if a.app != nil { + return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh) + } + a.head.initTime(t) + a.app = a.head.appender() + + return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh) +} + func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { if a.app != nil { return a.app.UpdateMetadata(ref, l, m) @@ -388,7 +398,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 // storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation. func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) { if ct >= t { - return 0, fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring") + return 0, storage.ErrCTNewerThanSample } s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) @@ -747,6 +757,107 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels return storage.SeriesRef(s.ref), nil } +func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if !a.head.opts.EnableNativeHistograms.Load() { + return 0, storage.ErrNativeHistogramsDisabled + } + + if ct >= t { + return 0, storage.ErrCTNewerThanSample + } + s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) + if s == nil { + // Ensure no empty labels have gotten through. + lset = lset.WithoutEmpty() + if lset.IsEmpty() { + return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample) + } + + if l, dup := lset.HasDuplicateLabelNames(); dup { + return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) + } + + var created bool + var err error + s, created, err = a.head.getOrCreate(lset.Hash(), lset) + if err != nil { + return 0, err + } + if created { + switch { + case h != nil: + s.lastHistogramValue = &histogram.Histogram{} + case fh != nil: + s.lastFloatHistogramValue = &histogram.FloatHistogram{} + } + a.series = append(a.series, record.RefSeries{ + Ref: s.ref, + Labels: lset, + }) + } + } + + switch { + case h != nil: + zeroHistogram := &histogram.Histogram{} + s.Lock() + // Although we call `appendableHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed. + // We set it to true to make this implementation as close as possible to the float implementation. + isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true) + if err != nil { + s.Unlock() + if errors.Is(err, storage.ErrOutOfOrderSample) { + return 0, storage.ErrOutOfOrderCT + } + } + // OOO is not allowed because after the first scrape, CT will be the same for most (if not all) future samples. + // This is to prevent the injected zero from being marked as OOO forever. + if isOOO { + s.Unlock() + return 0, storage.ErrOutOfOrderCT + } + s.pendingCommit = true + s.Unlock() + a.histograms = append(a.histograms, record.RefHistogramSample{ + Ref: s.ref, + T: ct, + H: zeroHistogram, + }) + a.histogramSeries = append(a.histogramSeries, s) + case fh != nil: + zeroFloatHistogram := &histogram.FloatHistogram{} + s.Lock() + // Although we call `appendableFloatHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed. + // We set it to true to make this implementation as close as possible to the float implementation. + isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true) // OOO is not allowed for CTZeroSamples. + if err != nil { + s.Unlock() + if errors.Is(err, storage.ErrOutOfOrderSample) { + return 0, storage.ErrOutOfOrderCT + } + } + // OOO is not allowed because after the first scrape, CT will be the same for most (if not all) future samples. + // This is to prevent the injected zero from being marked as OOO forever. + if isOOO { + s.Unlock() + return 0, storage.ErrOutOfOrderCT + } + s.pendingCommit = true + s.Unlock() + a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ + Ref: s.ref, + T: ct, + FH: zeroFloatHistogram, + }) + a.floatHistogramSeries = append(a.floatHistogramSeries, s) + } + + if ct > a.maxt { + a.maxt = ct + } + return storage.SeriesRef(s.ref), nil +} + // UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't // use getOrCreate or make any of the lset sanity checks that Append does. func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata) (storage.SeriesRef, error) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 483121dc66..ebfd1ff8b4 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6363,6 +6363,166 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) { } } +func TestHeadAppender_AppendHistogramCTZeroSample(t *testing.T) { + testHistogram := tsdbutil.GenerateTestHistogram(1) + testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1) + lbls := labels.FromStrings("foo", "bar") + type appendableHistograms struct { + ts int64 + h *histogram.Histogram + fh *histogram.FloatHistogram + ct int64 + } + for _, tc := range []struct { + name string + appendableHistograms []appendableHistograms + expectedHistograms []chunks.Sample + }{ + { + name: "In order ct+normal sample/histogram", + appendableHistograms: []appendableHistograms{ + {ts: 100, h: testHistogram, ct: 1}, + {ts: 101, h: testHistogram, ct: 1}, + }, + expectedHistograms: func() []chunks.Sample { + hNoCounterReset := *testHistogram + hNoCounterReset.CounterResetHint = histogram.NotCounterReset + return []chunks.Sample{ + sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 100, h: testHistogram}, + sample{t: 101, h: &hNoCounterReset}, + } + }(), + }, + { + name: "In order ct+normal sample/floathistogram", + appendableHistograms: []appendableHistograms{ + {ts: 100, fh: testFloatHistogram, ct: 1}, + {ts: 101, fh: testFloatHistogram, ct: 1}, + }, + expectedHistograms: func() []chunks.Sample { + fhNoCounterReset := *testFloatHistogram + fhNoCounterReset.CounterResetHint = histogram.NotCounterReset + return []chunks.Sample{ + sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 100, fh: testFloatHistogram}, + sample{t: 101, fh: &fhNoCounterReset}, + } + }(), + }, + { + name: "Consecutive appends with same ct ignore ct/histogram", + appendableHistograms: []appendableHistograms{ + {ts: 100, h: testHistogram, ct: 1}, + {ts: 101, h: testHistogram, ct: 1}, + }, + expectedHistograms: func() []chunks.Sample { + hNoCounterReset := *testHistogram + hNoCounterReset.CounterResetHint = histogram.NotCounterReset + return []chunks.Sample{ + sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 100, h: testHistogram}, + sample{t: 101, h: &hNoCounterReset}, + } + }(), + }, + { + name: "Consecutive appends with same ct ignore ct/floathistogram", + appendableHistograms: []appendableHistograms{ + {ts: 100, fh: testFloatHistogram, ct: 1}, + {ts: 101, fh: testFloatHistogram, ct: 1}, + }, + expectedHistograms: func() []chunks.Sample { + fhNoCounterReset := *testFloatHistogram + fhNoCounterReset.CounterResetHint = histogram.NotCounterReset + return []chunks.Sample{ + sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 100, fh: testFloatHistogram}, + sample{t: 101, fh: &fhNoCounterReset}, + } + }(), + }, + { + name: "Consecutive appends with newer ct do not ignore ct/histogram", + appendableHistograms: []appendableHistograms{ + {ts: 100, h: testHistogram, ct: 1}, + {ts: 102, h: testHistogram, ct: 101}, + }, + expectedHistograms: []chunks.Sample{ + sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 100, h: testHistogram}, + sample{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.CounterReset}}, + sample{t: 102, h: testHistogram}, + }, + }, + { + name: "Consecutive appends with newer ct do not ignore ct/floathistogram", + appendableHistograms: []appendableHistograms{ + {ts: 100, fh: testFloatHistogram, ct: 1}, + {ts: 102, fh: testFloatHistogram, ct: 101}, + }, + expectedHistograms: []chunks.Sample{ + sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 100, fh: testFloatHistogram}, + sample{t: 101, fh: &histogram.FloatHistogram{CounterResetHint: histogram.CounterReset}}, + sample{t: 102, fh: testFloatHistogram}, + }, + }, + { + name: "CT equals to previous sample timestamp is ignored/histogram", + appendableHistograms: []appendableHistograms{ + {ts: 100, h: testHistogram, ct: 1}, + {ts: 101, h: testHistogram, ct: 100}, + }, + expectedHistograms: func() []chunks.Sample { + hNoCounterReset := *testHistogram + hNoCounterReset.CounterResetHint = histogram.NotCounterReset + return []chunks.Sample{ + sample{t: 1, h: &histogram.Histogram{}}, + sample{t: 100, h: testHistogram}, + sample{t: 101, h: &hNoCounterReset}, + } + }(), + }, + { + name: "CT equals to previous sample timestamp is ignored/floathistogram", + appendableHistograms: []appendableHistograms{ + {ts: 100, fh: testFloatHistogram, ct: 1}, + {ts: 101, fh: testFloatHistogram, ct: 100}, + }, + expectedHistograms: func() []chunks.Sample { + fhNoCounterReset := *testFloatHistogram + fhNoCounterReset.CounterResetHint = histogram.NotCounterReset + return []chunks.Sample{ + sample{t: 1, fh: &histogram.FloatHistogram{}}, + sample{t: 100, fh: testFloatHistogram}, + sample{t: 101, fh: &fhNoCounterReset}, + } + }(), + }, + } { + t.Run(tc.name, func(t *testing.T) { + head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + defer func() { + require.NoError(t, head.Close()) + }() + appender := head.Appender(context.Background()) + for _, sample := range tc.appendableHistograms { + ref, err := appender.AppendHistogramCTZeroSample(0, lbls, sample.ts, sample.ct, sample.h, sample.fh) + require.NoError(t, err) + _, err = appender.AppendHistogram(ref, lbls, sample.ts, sample.h, sample.fh) + require.NoError(t, err) + } + require.NoError(t, appender.Commit()) + + q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + result := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, tc.expectedHistograms, result[`{foo="bar"}`]) + }) + } +} + func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) { // Use a chunk range of 1 here so that if we attempted to determine if the head // was compactable using default values for min and max times, `Head.compactable()`