Merge pull request #14755 from prometheus/arthursens/appendct-prwv2
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

Append CT as zero sample from PRWv2
This commit is contained in:
Bartlomiej Plotka 2024-12-27 12:44:54 +01:00 committed by GitHub
commit 30967330ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 188 additions and 50 deletions

View file

@ -259,6 +259,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
logger.Info("Experimental out-of-order native histogram ingestion enabled. This will only take effect if OutOfOrderTimeWindow is > 0 and if EnableNativeHistograms = true") logger.Info("Experimental out-of-order native histogram ingestion enabled. This will only take effect if OutOfOrderTimeWindow is > 0 and if EnableNativeHistograms = true")
case "created-timestamp-zero-ingestion": case "created-timestamp-zero-ingestion":
c.scrape.EnableCreatedTimestampZeroIngestion = true c.scrape.EnableCreatedTimestampZeroIngestion = true
c.web.CTZeroIngestionEnabled = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols

View file

@ -104,9 +104,10 @@ var (
HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help. HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit. UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
}, },
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, Samples: []writev2.Sample{{Value: 1, Timestamp: 10}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 10}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, Histograms: []writev2.Histogram{writev2.FromIntHistogram(10, &testHistogram), writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil))},
CreatedTimestamp: 1, // CT needs to be lower than the sample's timestamp.
}, },
{ {
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first.
@ -116,9 +117,9 @@ var (
HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help. HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help.
// No unit. // No unit.
}, },
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, Samples: []writev2.Sample{{Value: 2, Timestamp: 20}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}}, Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 20}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, Histograms: []writev2.Histogram{writev2.FromIntHistogram(30, &testHistogram), writev2.FromFloatHistogram(40, testHistogram.ToFloat(nil))},
}, },
}, },
} }
@ -140,9 +141,10 @@ func TestWriteV2RequestFixture(t *testing.T) {
HelpRef: st.Symbolize(writeV2RequestSeries1Metadata.Help), HelpRef: st.Symbolize(writeV2RequestSeries1Metadata.Help),
UnitRef: st.Symbolize(writeV2RequestSeries1Metadata.Unit), UnitRef: st.Symbolize(writeV2RequestSeries1Metadata.Unit),
}, },
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, Samples: []writev2.Sample{{Value: 1, Timestamp: 10}},
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 1}}, Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 10}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, Histograms: []writev2.Histogram{writev2.FromIntHistogram(10, &testHistogram), writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil))},
CreatedTimestamp: 1,
}, },
{ {
LabelsRefs: labelRefs, LabelsRefs: labelRefs,
@ -151,9 +153,9 @@ func TestWriteV2RequestFixture(t *testing.T) {
HelpRef: st.Symbolize(writeV2RequestSeries2Metadata.Help), HelpRef: st.Symbolize(writeV2RequestSeries2Metadata.Help),
// No unit. // No unit.
}, },
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, Samples: []writev2.Sample{{Value: 2, Timestamp: 20}},
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 2}}, Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 20}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, Histograms: []writev2.Histogram{writev2.FromIntHistogram(30, &testHistogram), writev2.FromFloatHistogram(40, testHistogram.ToFloat(nil))},
}, },
}, },
Symbols: st.Symbols(), Symbols: st.Symbols(),

View file

@ -48,6 +48,8 @@ type writeHandler struct {
samplesAppendedWithoutMetadata prometheus.Counter samplesAppendedWithoutMetadata prometheus.Counter
acceptedProtoMsgs map[config.RemoteWriteProtoMsg]struct{} acceptedProtoMsgs map[config.RemoteWriteProtoMsg]struct{}
ingestCTZeroSample bool
} }
const maxAheadTime = 10 * time.Minute const maxAheadTime = 10 * time.Minute
@ -57,7 +59,7 @@ const maxAheadTime = 10 * time.Minute
// //
// NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible // NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible
// as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write. // as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write.
func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg) http.Handler { func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedProtoMsgs []config.RemoteWriteProtoMsg, ingestCTZeroSample bool) http.Handler {
protoMsgs := map[config.RemoteWriteProtoMsg]struct{}{} protoMsgs := map[config.RemoteWriteProtoMsg]struct{}{}
for _, acc := range acceptedProtoMsgs { for _, acc := range acceptedProtoMsgs {
protoMsgs[acc] = struct{}{} protoMsgs[acc] = struct{}{}
@ -78,6 +80,8 @@ func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable
Name: "remote_write_without_metadata_appended_samples_total", Name: "remote_write_without_metadata_appended_samples_total",
Help: "The total number of received remote write samples (and histogram samples) which were ingested without corresponding metadata.", Help: "The total number of received remote write samples (and histogram samples) which were ingested without corresponding metadata.",
}), }),
ingestCTZeroSample: ingestCTZeroSample,
} }
return h return h
} }
@ -394,6 +398,17 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
var ref storage.SeriesRef var ref storage.SeriesRef
// Samples. // Samples.
if h.ingestCTZeroSample && len(ts.Samples) > 0 && ts.Samples[0].Timestamp != 0 && ts.CreatedTimestamp != 0 {
// CT only needs to be ingested for the first sample, it will be considered
// out of order for the rest.
ref, err = app.AppendCTZeroSample(ref, ls, ts.Samples[0].Timestamp, ts.CreatedTimestamp)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) {
// Even for the first sample OOO is a common scenario because
// we can't tell if a CT was already ingested in a previous request.
// We ignore the error.
h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Samples[0].Timestamp)
}
}
for _, s := range ts.Samples { for _, s := range ts.Samples {
ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue()) ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue())
if err == nil { if err == nil {
@ -415,6 +430,17 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
// Native Histograms. // Native Histograms.
for _, hp := range ts.Histograms { for _, hp := range ts.Histograms {
if h.ingestCTZeroSample && hp.Timestamp != 0 && ts.CreatedTimestamp != 0 {
// Differently from samples, we need to handle CT for each histogram instead of just the first one.
// This is because histograms and float histograms are stored separately, even if they have the same labels.
ref, err = h.handleHistogramZeroSample(app, ref, ls, hp, ts.CreatedTimestamp)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) {
// Even for the first sample OOO is a common scenario because
// we can't tell if a CT was already ingested in a previous request.
// We ignore the error.
h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", hp.Timestamp)
}
}
if hp.IsFloatHistogram() { if hp.IsFloatHistogram() {
ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, nil, hp.ToFloatHistogram()) ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, nil, hp.ToFloatHistogram())
} else { } else {
@ -479,6 +505,18 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
return samplesWithoutMetadata, http.StatusBadRequest, errors.Join(badRequestErrs...) return samplesWithoutMetadata, http.StatusBadRequest, errors.Join(badRequestErrs...)
} }
// handleHistogramZeroSample appends CT as a zero-value sample with CT value as the sample timestamp.
// It doens't return errors in case of out of order CT.
func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, hist writev2.Histogram, ct int64) (storage.SeriesRef, error) {
var err error
if hist.IsFloatHistogram() {
ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, nil, hist.ToFloatHistogram())
} else {
ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, hist.ToIntHistogram(), nil)
}
return ref, err
}
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable. // writes them to the provided appendable.
func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler { func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler {

View file

@ -130,7 +130,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) {
} }
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -231,7 +231,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
} }
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -256,7 +256,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) {
// in Prometheus, so keeping like this to not break existing 1.0 clients. // in Prometheus, so keeping like this to not break existing 1.0 clients.
appendable := &mockAppendable{} appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -310,14 +310,23 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
expectedCode int expectedCode int
expectedRespBody string expectedRespBody string
commitErr error commitErr error
appendSampleErr error appendSampleErr error
appendHistogramErr error appendCTZeroSampleErr error
appendExemplarErr error appendHistogramErr error
updateMetadataErr error appendExemplarErr error
updateMetadataErr error
ingestCTZeroSample bool
}{ }{
{ {
desc: "All timeseries accepted", desc: "All timeseries accepted/ct_enabled",
input: writeV2RequestFixture.Timeseries,
expectedCode: http.StatusNoContent,
ingestCTZeroSample: true,
},
{
desc: "All timeseries accepted/ct_disabled",
input: writeV2RequestFixture.Timeseries, input: writeV2RequestFixture.Timeseries,
expectedCode: http.StatusNoContent, expectedCode: http.StatusNoContent,
}, },
@ -440,13 +449,14 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
appendable := &mockAppendable{ appendable := &mockAppendable{
commitErr: tc.commitErr, commitErr: tc.commitErr,
appendSampleErr: tc.appendSampleErr, appendSampleErr: tc.appendSampleErr,
appendHistogramErr: tc.appendHistogramErr, appendCTZeroSampleErr: tc.appendCTZeroSampleErr,
appendExemplarErr: tc.appendExemplarErr, appendHistogramErr: tc.appendHistogramErr,
updateMetadataErr: tc.updateMetadataErr, appendExemplarErr: tc.appendExemplarErr,
updateMetadataErr: tc.updateMetadataErr,
} }
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, tc.ingestCTZeroSample)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -489,15 +499,27 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols) ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols)
for _, s := range ts.Samples { for _, s := range ts.Samples {
if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample {
requireEqual(t, mockSample{ls, ts.CreatedTimestamp, 0}, appendable.samples[i])
i++
}
requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
i++ i++
} }
for _, hp := range ts.Histograms { for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() { if hp.IsFloatHistogram() {
fh := hp.ToFloatHistogram() fh := hp.ToFloatHistogram()
if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample {
requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, nil, &histogram.FloatHistogram{}}, appendable.histograms[k])
k++
}
requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else { } else {
h := hp.ToIntHistogram() h := hp.ToIntHistogram()
if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample {
requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, &histogram.Histogram{}, nil}, appendable.histograms[k])
k++
}
requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
} }
k++ k++
@ -545,7 +567,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -587,7 +609,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -625,7 +647,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}} appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -656,7 +678,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) {
appendable := &mockAppendable{} appendable := &mockAppendable{}
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
b.ResetTimer() b.ResetTimer()
@ -673,7 +695,7 @@ func TestCommitErr_V1Message(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
appendable := &mockAppendable{commitErr: errors.New("commit error")} appendable := &mockAppendable{commitErr: errors.New("commit error")}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -697,7 +719,7 @@ func TestCommitErr_V2Message(t *testing.T) {
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
appendable := &mockAppendable{commitErr: errors.New("commit error")} appendable := &mockAppendable{commitErr: errors.New("commit error")}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}, false)
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req) handler.ServeHTTP(recorder, req)
@ -724,7 +746,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
require.NoError(b, db.Close()) require.NoError(b, db.Close())
}) })
// TODO: test with other proto format(s) // TODO: test with other proto format(s)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}) handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false)
buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy") buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy")
require.NoError(b, err) require.NoError(b, err)
@ -775,15 +797,17 @@ type mockAppendable struct {
latestExemplar map[uint64]int64 latestExemplar map[uint64]int64
exemplars []mockExemplar exemplars []mockExemplar
latestHistogram map[uint64]int64 latestHistogram map[uint64]int64
latestFloatHist map[uint64]int64
histograms []mockHistogram histograms []mockHistogram
metadata []mockMetadata metadata []mockMetadata
// optional errors to inject. // optional errors to inject.
commitErr error commitErr error
appendSampleErr error appendSampleErr error
appendHistogramErr error appendCTZeroSampleErr error
appendExemplarErr error appendHistogramErr error
updateMetadataErr error appendExemplarErr error
updateMetadataErr error
} }
type mockSample struct { type mockSample struct {
@ -827,6 +851,9 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
if m.latestHistogram == nil { if m.latestHistogram == nil {
m.latestHistogram = map[uint64]int64{} m.latestHistogram = map[uint64]int64{}
} }
if m.latestFloatHist == nil {
m.latestFloatHist = map[uint64]int64{}
}
if m.latestExemplar == nil { if m.latestExemplar == nil {
m.latestExemplar = map[uint64]int64{} m.latestExemplar = map[uint64]int64{}
} }
@ -900,7 +927,12 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
return 0, m.appendHistogramErr return 0, m.appendHistogramErr
} }
latestTs := m.latestHistogram[l.Hash()] var latestTs int64
if h != nil {
latestTs = m.latestHistogram[l.Hash()]
} else {
latestTs = m.latestFloatHist[l.Hash()]
}
if t < latestTs { if t < latestTs {
return 0, storage.ErrOutOfOrderSample return 0, storage.ErrOutOfOrderSample
} }
@ -915,15 +947,53 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
return 0, tsdb.ErrInvalidSample return 0, tsdb.ErrInvalidSample
} }
m.latestHistogram[l.Hash()] = t if h != nil {
m.latestHistogram[l.Hash()] = t
} else {
m.latestFloatHist[l.Hash()] = t
}
m.histograms = append(m.histograms, mockHistogram{l, t, h, fh}) m.histograms = append(m.histograms, mockHistogram{l, t, h, fh})
return 0, nil return 0, nil
} }
func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// AppendCTZeroSample is no-op for remote-write for now. if m.appendCTZeroSampleErr != nil {
// TODO(bwplotka/arthursens): Add support for PRW 2.0 for CT zero feature (but also we might return 0, m.appendCTZeroSampleErr
// replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218). }
// Created Timestamp can't be higher than the original sample's timestamp.
if ct > t {
return 0, storage.ErrOutOfOrderSample
}
var latestTs int64
if h != nil {
latestTs = m.latestHistogram[l.Hash()]
} else {
latestTs = m.latestFloatHist[l.Hash()]
}
if ct < latestTs {
return 0, storage.ErrOutOfOrderSample
}
if ct == latestTs {
return 0, storage.ErrDuplicateSampleForTimestamp
}
if l.IsEmpty() {
return 0, tsdb.ErrInvalidSample
}
if _, hasDuplicates := l.HasDuplicateLabelNames(); hasDuplicates {
return 0, tsdb.ErrInvalidSample
}
if h != nil {
m.latestHistogram[l.Hash()] = ct
m.histograms = append(m.histograms, mockHistogram{l, ct, &histogram.Histogram{}, nil})
} else {
m.latestFloatHist[l.Hash()] = ct
m.histograms = append(m.histograms, mockHistogram{l, ct, nil, &histogram.FloatHistogram{}})
}
return 0, nil return 0, nil
} }
@ -936,9 +1006,32 @@ func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp
return 0, nil return 0, nil
} }
func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { func (m *mockAppendable) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) {
// AppendCTZeroSample is no-op for remote-write for now. if m.appendCTZeroSampleErr != nil {
// TODO(bwplotka): Add support for PRW 2.0 for CT zero feature (but also we might return 0, m.appendCTZeroSampleErr
// replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218). }
// Created Timestamp can't be higher than the original sample's timestamp.
if ct > t {
return 0, storage.ErrOutOfOrderSample
}
latestTs := m.latestSample[l.Hash()]
if ct < latestTs {
return 0, storage.ErrOutOfOrderSample
}
if ct == latestTs {
return 0, storage.ErrDuplicateSampleForTimestamp
}
if l.IsEmpty() {
return 0, tsdb.ErrInvalidSample
}
if _, hasDuplicates := l.HasDuplicateLabelNames(); hasDuplicates {
return 0, tsdb.ErrInvalidSample
}
m.latestSample[l.Hash()] = ct
m.samples = append(m.samples, mockSample{l, ct, 0})
return 0, nil return 0, nil
} }

View file

@ -258,6 +258,7 @@ func NewAPI(
rwEnabled bool, rwEnabled bool,
acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg, acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg,
otlpEnabled bool, otlpEnabled bool,
ctZeroIngestionEnabled bool,
) *API { ) *API {
a := &API{ a := &API{
QueryEngine: qe, QueryEngine: qe,
@ -301,7 +302,7 @@ func NewAPI(
} }
if rwEnabled { if rwEnabled {
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs) a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, ctZeroIngestionEnabled)
} }
if otlpEnabled { if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc) a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc)

View file

@ -142,6 +142,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route
false, false,
config.RemoteWriteProtoMsgs{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2}, config.RemoteWriteProtoMsgs{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2},
false, false,
false,
) )
promRouter := route.New().WithPrefix("/api/v1") promRouter := route.New().WithPrefix("/api/v1")

View file

@ -290,6 +290,7 @@ type Options struct {
EnableRemoteWriteReceiver bool EnableRemoteWriteReceiver bool
EnableOTLPWriteReceiver bool EnableOTLPWriteReceiver bool
IsAgent bool IsAgent bool
CTZeroIngestionEnabled bool
AppName string AppName string
AcceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg AcceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg
@ -386,6 +387,7 @@ func New(logger *slog.Logger, o *Options) *Handler {
o.EnableRemoteWriteReceiver, o.EnableRemoteWriteReceiver,
o.AcceptRemoteWriteProtoMsgs, o.AcceptRemoteWriteProtoMsgs,
o.EnableOTLPWriteReceiver, o.EnableOTLPWriteReceiver,
o.CTZeroIngestionEnabled,
) )
if o.RoutePrefix != "/" { if o.RoutePrefix != "/" {