diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index cd0775bbdd..d2a69634aa 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -259,6 +259,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { logger.Info("Experimental out-of-order native histogram ingestion enabled. This will only take effect if OutOfOrderTimeWindow is > 0 and if EnableNativeHistograms = true") case "created-timestamp-zero-ingestion": c.scrape.EnableCreatedTimestampZeroIngestion = true + c.web.CTZeroIngestionEnabled = true // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 6fb1ae1e49..3557a87eb5 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -104,9 +104,10 @@ var ( HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help. UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit. }, - Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 10}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 10}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(10, &testHistogram), writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil))}, + CreatedTimestamp: 1, // CT needs to be lower than the sample's timestamp. }, { LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. @@ -116,9 +117,9 @@ var ( HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help. // No unit. }, - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 20}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 20}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(30, &testHistogram), writev2.FromFloatHistogram(40, testHistogram.ToFloat(nil))}, }, }, } @@ -140,9 +141,10 @@ func TestWriteV2RequestFixture(t *testing.T) { HelpRef: st.Symbolize(writeV2RequestSeries1Metadata.Help), UnitRef: st.Symbolize(writeV2RequestSeries1Metadata.Unit), }, - Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 1}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 10}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 10}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(10, &testHistogram), writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil))}, + CreatedTimestamp: 1, }, { LabelsRefs: labelRefs, @@ -151,9 +153,9 @@ func TestWriteV2RequestFixture(t *testing.T) { HelpRef: st.Symbolize(writeV2RequestSeries2Metadata.Help), // No unit. }, - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 2}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 20}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 20}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(30, &testHistogram), writev2.FromFloatHistogram(40, testHistogram.ToFloat(nil))}, }, }, Symbols: st.Symbols(), diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index afb50ef265..89433ae6f2 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -48,6 +48,8 @@ type writeHandler struct { samplesAppendedWithoutMetadata prometheus.Counter acceptedProtoMsgs map[config.RemoteWriteProtoMsg]struct{} + + ingestCTZeroSample bool } const maxAheadTime = 10 * time.Minute @@ -57,7 +59,7 @@ const maxAheadTime = 10 * time.Minute // // NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible // as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write. -func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg) http.Handler { +func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg, ingestCTZeroSample bool) http.Handler { protoMsgs := map[config.RemoteWriteProtoMsg]struct{}{} for _, acc := range acceptedProtoMsgs { protoMsgs[acc] = struct{}{} @@ -78,6 +80,8 @@ func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable Name: "remote_write_without_metadata_appended_samples_total", Help: "The total number of received remote write samples (and histogram samples) which were ingested without corresponding metadata.", }), + + ingestCTZeroSample: ingestCTZeroSample, } return h } @@ -394,6 +398,17 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * var ref storage.SeriesRef // Samples. + if h.ingestCTZeroSample && len(ts.Samples) > 0 && ts.Samples[0].Timestamp != 0 && ts.CreatedTimestamp != 0 { + // CT only needs to be ingested for the first sample, it will be considered + // out of order for the rest. + ref, err = app.AppendCTZeroSample(ref, ls, ts.Samples[0].Timestamp, ts.CreatedTimestamp) + if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { + // Even for the first sample OOO is a common scenario because + // we can't tell if a CT was already ingested in a previous request. + // We ignore the error. + h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Samples[0].Timestamp) + } + } for _, s := range ts.Samples { ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue()) if err == nil { @@ -415,6 +430,17 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * // Native Histograms. for _, hp := range ts.Histograms { + if h.ingestCTZeroSample && hp.Timestamp != 0 && ts.CreatedTimestamp != 0 { + // Differently from samples, we need to handle CT for each histogram instead of just the first one. + // This is because histograms and float histograms are stored separately, even if they have the same labels. + ref, err = h.handleHistogramZeroSample(app, ref, ls, hp, ts.CreatedTimestamp) + if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { + // Even for the first sample OOO is a common scenario because + // we can't tell if a CT was already ingested in a previous request. + // We ignore the error. + h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", hp.Timestamp) + } + } if hp.IsFloatHistogram() { ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, nil, hp.ToFloatHistogram()) } else { @@ -479,6 +505,18 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * return samplesWithoutMetadata, http.StatusBadRequest, errors.Join(badRequestErrs...) } +// handleHistogramZeroSample appends CT as a zero-value sample with CT value as the sample timestamp. +// It doens't return errors in case of out of order CT. +func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, hist writev2.Histogram, ct int64) (storage.SeriesRef, error) { + var err error + if hist.IsFloatHistogram() { + ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, nil, hist.ToFloatHistogram()) + } else { + ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, hist.ToIntHistogram(), nil) + } + return ref, err +} + // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // writes them to the provided appendable. func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler { diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index c40f227ea8..b37b3632b9 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -130,7 +130,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -231,7 +231,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -256,7 +256,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) { // in Prometheus, so keeping like this to not break existing 1.0 clients. appendable := &mockAppendable{} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -310,14 +310,23 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { expectedCode int expectedRespBody string - commitErr error - appendSampleErr error - appendHistogramErr error - appendExemplarErr error - updateMetadataErr error + commitErr error + appendSampleErr error + appendCTZeroSampleErr error + appendHistogramErr error + appendExemplarErr error + updateMetadataErr error + + ingestCTZeroSample bool }{ { - desc: "All timeseries accepted", + desc: "All timeseries accepted/ct_enabled", + input: writeV2RequestFixture.Timeseries, + expectedCode: http.StatusNoContent, + ingestCTZeroSample: true, + }, + { + desc: "All timeseries accepted/ct_disabled", input: writeV2RequestFixture.Timeseries, expectedCode: http.StatusNoContent, }, @@ -440,13 +449,14 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := &mockAppendable{ - commitErr: tc.commitErr, - appendSampleErr: tc.appendSampleErr, - appendHistogramErr: tc.appendHistogramErr, - appendExemplarErr: tc.appendExemplarErr, - updateMetadataErr: tc.updateMetadataErr, + commitErr: tc.commitErr, + appendSampleErr: tc.appendSampleErr, + appendCTZeroSampleErr: tc.appendCTZeroSampleErr, + appendHistogramErr: tc.appendHistogramErr, + appendExemplarErr: tc.appendExemplarErr, + updateMetadataErr: tc.updateMetadataErr, } - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, tc.ingestCTZeroSample) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -489,15 +499,27 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols) for _, s := range ts.Samples { + if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { + requireEqual(t, mockSample{ls, ts.CreatedTimestamp, 0}, appendable.samples[i]) + i++ + } requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) i++ } for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { fh := hp.ToFloatHistogram() + if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { + requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, nil, &histogram.FloatHistogram{}}, appendable.histograms[k]) + k++ + } requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) } else { h := hp.ToIntHistogram() + if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { + requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, &histogram.Histogram{}, nil}, appendable.histograms[k]) + k++ + } requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) } k++ @@ -545,7 +567,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -587,7 +609,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -625,7 +647,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -656,7 +678,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) { appendable := &mockAppendable{} // TODO: test with other proto format(s) - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() b.ResetTimer() @@ -673,7 +695,7 @@ func TestCommitErr_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{commitErr: errors.New("commit error")} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -697,7 +719,7 @@ func TestCommitErr_V2Message(t *testing.T) { req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := &mockAppendable{commitErr: errors.New("commit error")} - handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -724,7 +746,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { require.NoError(b, db.Close()) }) // TODO: test with other proto format(s) - handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") require.NoError(b, err) @@ -775,15 +797,17 @@ type mockAppendable struct { latestExemplar map[uint64]int64 exemplars []mockExemplar latestHistogram map[uint64]int64 + latestFloatHist map[uint64]int64 histograms []mockHistogram metadata []mockMetadata // optional errors to inject. - commitErr error - appendSampleErr error - appendHistogramErr error - appendExemplarErr error - updateMetadataErr error + commitErr error + appendSampleErr error + appendCTZeroSampleErr error + appendHistogramErr error + appendExemplarErr error + updateMetadataErr error } type mockSample struct { @@ -827,6 +851,9 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender { if m.latestHistogram == nil { m.latestHistogram = map[uint64]int64{} } + if m.latestFloatHist == nil { + m.latestFloatHist = map[uint64]int64{} + } if m.latestExemplar == nil { m.latestExemplar = map[uint64]int64{} } @@ -900,7 +927,12 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t return 0, m.appendHistogramErr } - latestTs := m.latestHistogram[l.Hash()] + var latestTs int64 + if h != nil { + latestTs = m.latestHistogram[l.Hash()] + } else { + latestTs = m.latestFloatHist[l.Hash()] + } if t < latestTs { return 0, storage.ErrOutOfOrderSample } @@ -915,15 +947,53 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t return 0, tsdb.ErrInvalidSample } - m.latestHistogram[l.Hash()] = t + if h != nil { + m.latestHistogram[l.Hash()] = t + } else { + m.latestFloatHist[l.Hash()] = t + } m.histograms = append(m.histograms, mockHistogram{l, t, h, fh}) return 0, nil } func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { - // AppendCTZeroSample is no-op for remote-write for now. - // TODO(bwplotka/arthursens): Add support for PRW 2.0 for CT zero feature (but also we might - // replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218). + if m.appendCTZeroSampleErr != nil { + return 0, m.appendCTZeroSampleErr + } + + // Created Timestamp can't be higher than the original sample's timestamp. + if ct > t { + return 0, storage.ErrOutOfOrderSample + } + + var latestTs int64 + if h != nil { + latestTs = m.latestHistogram[l.Hash()] + } else { + latestTs = m.latestFloatHist[l.Hash()] + } + if ct < latestTs { + return 0, storage.ErrOutOfOrderSample + } + if ct == latestTs { + return 0, storage.ErrDuplicateSampleForTimestamp + } + + if l.IsEmpty() { + return 0, tsdb.ErrInvalidSample + } + + if _, hasDuplicates := l.HasDuplicateLabelNames(); hasDuplicates { + return 0, tsdb.ErrInvalidSample + } + + if h != nil { + m.latestHistogram[l.Hash()] = ct + m.histograms = append(m.histograms, mockHistogram{l, ct, &histogram.Histogram{}, nil}) + } else { + m.latestFloatHist[l.Hash()] = ct + m.histograms = append(m.histograms, mockHistogram{l, ct, nil, &histogram.FloatHistogram{}}) + } return 0, nil } @@ -936,9 +1006,32 @@ func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp return 0, nil } -func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { - // AppendCTZeroSample is no-op for remote-write for now. - // TODO(bwplotka): Add support for PRW 2.0 for CT zero feature (but also we might - // replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218). +func (m *mockAppendable) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { + if m.appendCTZeroSampleErr != nil { + return 0, m.appendCTZeroSampleErr + } + + // Created Timestamp can't be higher than the original sample's timestamp. + if ct > t { + return 0, storage.ErrOutOfOrderSample + } + + latestTs := m.latestSample[l.Hash()] + if ct < latestTs { + return 0, storage.ErrOutOfOrderSample + } + if ct == latestTs { + return 0, storage.ErrDuplicateSampleForTimestamp + } + + if l.IsEmpty() { + return 0, tsdb.ErrInvalidSample + } + if _, hasDuplicates := l.HasDuplicateLabelNames(); hasDuplicates { + return 0, tsdb.ErrInvalidSample + } + + m.latestSample[l.Hash()] = ct + m.samples = append(m.samples, mockSample{l, ct, 0}) return 0, nil } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 3f26cbd484..3bdd9050f4 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -258,6 +258,7 @@ func NewAPI( rwEnabled bool, acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg, otlpEnabled bool, + ctZeroIngestionEnabled bool, ) *API { a := &API{ QueryEngine: qe, @@ -301,7 +302,7 @@ func NewAPI( } if rwEnabled { - a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs) + a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled) } if otlpEnabled { a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc) diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index f5e75615ec..0a5c76b48e 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -142,6 +142,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route false, config.RemoteWriteProtoMsgs{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2}, false, + false, ) promRouter := route.New().WithPrefix("/api/v1") diff --git a/web/web.go b/web/web.go index 08c683bae8..b5532cadff 100644 --- a/web/web.go +++ b/web/web.go @@ -290,6 +290,7 @@ type Options struct { EnableRemoteWriteReceiver bool EnableOTLPWriteReceiver bool IsAgent bool + CTZeroIngestionEnabled bool AppName string AcceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg @@ -386,6 +387,7 @@ func New(logger *slog.Logger, o *Options) *Handler { o.EnableRemoteWriteReceiver, o.AcceptRemoteWriteProtoMsgs, o.EnableOTLPWriteReceiver, + o.CTZeroIngestionEnabled, ) if o.RoutePrefix != "/" {