From 4017e90789fb24fcd72ba4f27cd87934f71ab919 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Wed, 28 Aug 2024 16:47:13 -0300 Subject: [PATCH] Support ingesting PWRv2's Created Timestamp as 0 samples Signed-off-by: Arthur Silva Sens --- storage/remote/codec_test.go | 26 +++++----- storage/remote/write_handler.go | 16 ++++++- storage/remote/write_handler_test.go | 71 ++++++++++++++++++++-------- 3 files changed, 80 insertions(+), 33 deletions(-) diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 404f1add7..095a2a653 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -104,9 +104,10 @@ var ( HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help. UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit. }, - Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 10}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 10}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(10, &testHistogram), writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil))}, + CreatedTimestamp: 1, // CT needs to be lower than the sample's timestamp. }, { LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. @@ -116,9 +117,9 @@ var ( HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help. // No unit. }, - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 20}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 20}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(30, &testHistogram), writev2.FromFloatHistogram(40, testHistogram.ToFloat(nil))}, }, }, } @@ -140,9 +141,10 @@ func TestWriteV2RequestFixture(t *testing.T) { HelpRef: st.Symbolize(writeV2RequestSeries1Metadata.Help), UnitRef: st.Symbolize(writeV2RequestSeries1Metadata.Unit), }, - Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 1}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 10}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 10}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(10, &testHistogram), writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil))}, + CreatedTimestamp: 1, }, { LabelsRefs: labelRefs, @@ -151,9 +153,9 @@ func TestWriteV2RequestFixture(t *testing.T) { HelpRef: st.Symbolize(writeV2RequestSeries2Metadata.Help), // No unit. }, - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 2}}, - Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 20}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 20}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(30, &testHistogram), writev2.FromFloatHistogram(40, testHistogram.ToFloat(nil))}, }, }, Symbols: st.Symbols(), diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 58fb668cc..0c6b482a2 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -395,7 +395,21 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * var ref storage.SeriesRef // Samples. - for _, s := range ts.Samples { + for i, s := range ts.Samples { + // CT only needs to be ingested for the first sample, it will be considered + // out of order for the rest. + if i == 0 && ts.CreatedTimestamp != 0 { + ref, err = app.AppendCTZeroSample(ref, ls, s.Timestamp, ts.CreatedTimestamp) + if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { + // Even for the first sample OOO is a common scenario. + // We don't fail the request and just log it. + level.Debug(h.logger).Log("msg", "Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", s.Timestamp) + } + if err == nil { + rs.Samples++ + } + } + ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue()) if err == nil { rs.Samples++ diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 5c89a1ab9..cb76c01ca 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -309,11 +309,12 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { expectedCode int expectedRespBody string - commitErr error - appendSampleErr error - appendHistogramErr error - appendExemplarErr error - updateMetadataErr error + commitErr error + appendSampleErr error + appendCTZeroSampleErr error + appendHistogramErr error + appendExemplarErr error + updateMetadataErr error }{ { desc: "All timeseries accepted", @@ -439,11 +440,12 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue) appendable := &mockAppendable{ - commitErr: tc.commitErr, - appendSampleErr: tc.appendSampleErr, - appendHistogramErr: tc.appendHistogramErr, - appendExemplarErr: tc.appendExemplarErr, - updateMetadataErr: tc.updateMetadataErr, + commitErr: tc.commitErr, + appendSampleErr: tc.appendSampleErr, + appendCTZeroSampleErr: tc.appendCTZeroSampleErr, + appendHistogramErr: tc.appendHistogramErr, + appendExemplarErr: tc.appendExemplarErr, + updateMetadataErr: tc.updateMetadataErr, } handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2}) @@ -471,7 +473,8 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { // Double check mandatory 2.0 stats. // writeV2RequestFixture has 2 series with 1 sample, 2 histograms, 1 exemplar each. - expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader)) + // We expect 3 samples because the first series has regular sample + CT. + expectHeaderValue(t, 3, resp.Header.Get(rw20WrittenSamplesHeader)) expectHeaderValue(t, 4, resp.Header.Get(rw20WrittenHistogramsHeader)) if tc.appendExemplarErr != nil { expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader)) @@ -488,6 +491,10 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols) for _, s := range ts.Samples { + if ts.CreatedTimestamp != 0 { + requireEqual(t, mockSample{ls, ts.CreatedTimestamp, 0}, appendable.samples[i]) + i++ + } requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) i++ } @@ -778,11 +785,12 @@ type mockAppendable struct { metadata []mockMetadata // optional errors to inject. - commitErr error - appendSampleErr error - appendHistogramErr error - appendExemplarErr error - updateMetadataErr error + commitErr error + appendSampleErr error + appendCTZeroSampleErr error + appendHistogramErr error + appendExemplarErr error + updateMetadataErr error } type mockSample struct { @@ -924,9 +932,32 @@ func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp return 0, nil } -func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { - // AppendCTZeroSample is no-op for remote-write for now. - // TODO(bwplotka): Add support for PRW 2.0 for CT zero feature (but also we might - // replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218). +func (m *mockAppendable) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { + if m.appendCTZeroSampleErr != nil { + return 0, m.appendCTZeroSampleErr + } + + // Created Timestamp can't be higher than the original sample's timestamp. + if ct > t { + return 0, storage.ErrOutOfOrderSample + } + + latestTs := m.latestSample[l.Hash()] + if ct < latestTs { + return 0, storage.ErrOutOfOrderSample + } + if ct == latestTs { + return 0, storage.ErrDuplicateSampleForTimestamp + } + + if l.IsEmpty() { + return 0, tsdb.ErrInvalidSample + } + if _, hasDuplicates := l.HasDuplicateLabelNames(); hasDuplicates { + return 0, tsdb.ErrInvalidSample + } + + m.latestSample[l.Hash()] = ct + m.samples = append(m.samples, mockSample{l, ct, 0}) return 0, nil }