diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index c402fbcb8..197d49d0c 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -231,6 +231,7 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) case "created-timestamp-zero-ingestion": c.scrape.EnableCreatedTimestampZeroIngestion = true + c.web.CTZeroIngestionEnabled = true // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 0c6b482a2..8fcd18081 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -49,6 +49,8 @@ type writeHandler struct { samplesAppendedWithoutMetadata prometheus.Counter acceptedProtoMsgs map[config.RemoteWriteProtoMsg]struct{} + + ingestCTZeroSample bool } const maxAheadTime = 10 * time.Minute @@ -58,7 +60,7 @@ const maxAheadTime = 10 * time.Minute // // NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible // as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write. -func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg) http.Handler { +func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg, ingestCTZeroSample bool) http.Handler { protoMsgs := map[config.RemoteWriteProtoMsg]struct{}{} for _, acc := range acceptedProtoMsgs { protoMsgs[acc] = struct{}{} @@ -79,6 +81,8 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st Name: "remote_write_without_metadata_appended_samples_total", Help: "The total number of received remote write samples (and histogram samples) which were ingested without corresponding metadata.", }), + + ingestCTZeroSample: ingestCTZeroSample, } return h } @@ -394,22 +398,17 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * allSamplesSoFar := rs.AllSamples() var ref storage.SeriesRef - // Samples. - for i, s := range ts.Samples { + if h.ingestCTZeroSample { // CT only needs to be ingested for the first sample, it will be considered // out of order for the rest. - if i == 0 && ts.CreatedTimestamp != 0 { - ref, err = app.AppendCTZeroSample(ref, ls, s.Timestamp, ts.CreatedTimestamp) - if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { - // Even for the first sample OOO is a common scenario. - // We don't fail the request and just log it. - level.Debug(h.logger).Log("msg", "Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", s.Timestamp) - } - if err == nil { - rs.Samples++ - } + ref, err = h.handleCTZeroSample(app, ref, ls, ts.Samples[0], ts.CreatedTimestamp, rs) + if err != nil { + level.Debug(h.logger).Log("msg", "Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Samples[0].Timestamp) } + } + // Samples. + for _, s := range ts.Samples { ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue()) if err == nil { rs.Samples++ @@ -494,6 +493,25 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * return samplesWithoutMetadata, http.StatusBadRequest, errors.Join(badRequestErrs...) } +// handleCTZeroSample appends CT as a zero-value sample with CT value as the sample timestamp. +// It doens't return errors in case of out of order CT. +func (h *writeHandler) handleCTZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, sample writev2.Sample, ct int64, rs *WriteResponseStats) (storage.SeriesRef, error) { + var err error + if sample.Timestamp != 0 && ct != 0 { + ref, err = app.AppendCTZeroSample(ref, l, sample.Timestamp, ct) + if err == nil { + rs.Samples++ + } + if err != nil && errors.Is(err, storage.ErrOutOfOrderCT) { + // Even for the first sample OOO is a common scenario because + // we can't tell if a CT was already ingested in a previous request. + // We ignore the error. + err = nil + } + } + return ref, err +} + // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // writes them to the provided appendable. func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler { diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index cb76c01ca..cd6a3c773 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -129,7 +129,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -230,7 +230,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { } appendable := &mockAppendable{} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -255,7 +255,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) { // in Prometheus, so keeping like this to not break existing 1.0 clients. appendable := &mockAppendable{} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -315,9 +315,17 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { appendHistogramErr error appendExemplarErr error updateMetadataErr error + + ingestCTZeroSample bool }{ { - desc: "All timeseries accepted", + desc: "All timeseries accepted/ct_enabled", + input: writeV2RequestFixture.Timeseries, + expectedCode: http.StatusNoContent, + ingestCTZeroSample: true, + }, + { + desc: "All timeseries accepted/ct_disabled", input: writeV2RequestFixture.Timeseries, expectedCode: http.StatusNoContent, }, @@ -447,7 +455,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { appendExemplarErr: tc.appendExemplarErr, updateMetadataErr: tc.updateMetadataErr, } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, tc.ingestCTZeroSample) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -473,8 +481,11 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { // Double check mandatory 2.0 stats. // writeV2RequestFixture has 2 series with 1 sample, 2 histograms, 1 exemplar each. - // We expect 3 samples because the first series has regular sample + CT. - expectHeaderValue(t, 3, resp.Header.Get(rw20WrittenSamplesHeader)) + if tc.ingestCTZeroSample { + expectHeaderValue(t, 3, resp.Header.Get(rw20WrittenSamplesHeader)) + } else { + expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader)) + } expectHeaderValue(t, 4, resp.Header.Get(rw20WrittenHistogramsHeader)) if tc.appendExemplarErr != nil { expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader)) @@ -491,7 +502,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols) for _, s := range ts.Samples { - if ts.CreatedTimestamp != 0 { + if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { requireEqual(t, mockSample{ls, ts.CreatedTimestamp, 0}, appendable.samples[i]) i++ } @@ -551,7 +562,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -593,7 +604,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -631,7 +642,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -662,7 +673,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) { appendable := &mockAppendable{} // TODO: test with other proto format(s) - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() b.ResetTimer() @@ -679,7 +690,7 @@ func TestCommitErr_V1Message(t *testing.T) { require.NoError(t, err) appendable := &mockAppendable{commitErr: fmt.Errorf("commit error")} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -703,7 +714,7 @@ func TestCommitErr_V2Message(t *testing.T) { req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := &mockAppendable{commitErr: fmt.Errorf("commit error")} - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -730,7 +741,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { require.NoError(b, db.Close()) }) // TODO: test with other proto format(s) - handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) + handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") require.NoError(b, err) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index d58be211f..aff30133e 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -251,6 +251,7 @@ func NewAPI( rwEnabled bool, acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg, otlpEnabled bool, + ctZeroIngestionEnabled bool, ) *API { a := &API{ QueryEngine: qe, @@ -292,7 +293,7 @@ func NewAPI( } if rwEnabled { - a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs) + a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled) } if otlpEnabled { a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc) diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index 99ef81018..14cdd4f74 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -137,6 +137,7 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router { false, config.RemoteWriteProtoMsgs{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2}, false, + false, ) promRouter := route.New().WithPrefix("/api/v1") diff --git a/web/web.go b/web/web.go index 098baa055..abc0455fd 100644 --- a/web/web.go +++ b/web/web.go @@ -263,6 +263,7 @@ type Options struct { EnableRemoteWriteReceiver bool EnableOTLPWriteReceiver bool IsAgent bool + CTZeroIngestionEnabled bool AppName string AcceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg @@ -357,6 +358,7 @@ func New(logger log.Logger, o *Options) *Handler { o.EnableRemoteWriteReceiver, o.AcceptRemoteWriteProtoMsgs, o.EnableOTLPWriteReceiver, + o.CTZeroIngestionEnabled, ) if o.RoutePrefix != "/" {