Support ingesting PWRv2's Created Timestamp as 0 samples

Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
This commit is contained in:
Arthur Silva Sens 2024-08-28 16:47:13 -03:00
parent c586c15ae6
commit 4017e90789
No known key found for this signature in database
3 changed files with 80 additions and 33 deletions

View file

@ -104,9 +104,10 @@ var (
HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
Samples: []writev2.Sample{{Value: 1, Timestamp: 10}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 10}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(10, &testHistogram), writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil))},
CreatedTimestamp: 1, // CT needs to be lower than the sample's timestamp.
},
{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first.
@ -116,9 +117,9 @@ var (
HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help.
// No unit.
},
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
Samples: []writev2.Sample{{Value: 2, Timestamp: 20}},
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 20}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(30, &testHistogram), writev2.FromFloatHistogram(40, testHistogram.ToFloat(nil))},
},
},
}
@ -140,9 +141,10 @@ func TestWriteV2RequestFixture(t *testing.T) {
HelpRef: st.Symbolize(writeV2RequestSeries1Metadata.Help),
UnitRef: st.Symbolize(writeV2RequestSeries1Metadata.Unit),
},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 1}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))},
Samples: []writev2.Sample{{Value: 1, Timestamp: 10}},
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar1LabelRefs, Value: 1, Timestamp: 10}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(10, &testHistogram), writev2.FromFloatHistogram(20, testHistogram.ToFloat(nil))},
CreatedTimestamp: 1,
},
{
LabelsRefs: labelRefs,
@ -151,9 +153,9 @@ func TestWriteV2RequestFixture(t *testing.T) {
HelpRef: st.Symbolize(writeV2RequestSeries2Metadata.Help),
// No unit.
},
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 2}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))},
Samples: []writev2.Sample{{Value: 2, Timestamp: 20}},
Exemplars: []writev2.Exemplar{{LabelsRefs: exemplar2LabelRefs, Value: 2, Timestamp: 20}},
Histograms: []writev2.Histogram{writev2.FromIntHistogram(30, &testHistogram), writev2.FromFloatHistogram(40, testHistogram.ToFloat(nil))},
},
},
Symbols: st.Symbols(),

View file

@ -395,7 +395,21 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
var ref storage.SeriesRef
// Samples.
for _, s := range ts.Samples {
for i, s := range ts.Samples {
// CT only needs to be ingested for the first sample, it will be considered
// out of order for the rest.
if i == 0 && ts.CreatedTimestamp != 0 {
ref, err = app.AppendCTZeroSample(ref, ls, s.Timestamp, ts.CreatedTimestamp)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) {
// Even for the first sample OOO is a common scenario.
// We don't fail the request and just log it.
level.Debug(h.logger).Log("msg", "Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", s.Timestamp)
}
if err == nil {
rs.Samples++
}
}
ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue())
if err == nil {
rs.Samples++

View file

@ -309,11 +309,12 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
expectedCode int
expectedRespBody string
commitErr error
appendSampleErr error
appendHistogramErr error
appendExemplarErr error
updateMetadataErr error
commitErr error
appendSampleErr error
appendCTZeroSampleErr error
appendHistogramErr error
appendExemplarErr error
updateMetadataErr error
}{
{
desc: "All timeseries accepted",
@ -439,11 +440,12 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
appendable := &mockAppendable{
commitErr: tc.commitErr,
appendSampleErr: tc.appendSampleErr,
appendHistogramErr: tc.appendHistogramErr,
appendExemplarErr: tc.appendExemplarErr,
updateMetadataErr: tc.updateMetadataErr,
commitErr: tc.commitErr,
appendSampleErr: tc.appendSampleErr,
appendCTZeroSampleErr: tc.appendCTZeroSampleErr,
appendHistogramErr: tc.appendHistogramErr,
appendExemplarErr: tc.appendExemplarErr,
updateMetadataErr: tc.updateMetadataErr,
}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV2})
@ -471,7 +473,8 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
// Double check mandatory 2.0 stats.
// writeV2RequestFixture has 2 series with 1 sample, 2 histograms, 1 exemplar each.
expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader))
// We expect 3 samples because the first series has regular sample + CT.
expectHeaderValue(t, 3, resp.Header.Get(rw20WrittenSamplesHeader))
expectHeaderValue(t, 4, resp.Header.Get(rw20WrittenHistogramsHeader))
if tc.appendExemplarErr != nil {
expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader))
@ -488,6 +491,10 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols)
for _, s := range ts.Samples {
if ts.CreatedTimestamp != 0 {
requireEqual(t, mockSample{ls, ts.CreatedTimestamp, 0}, appendable.samples[i])
i++
}
requireEqual(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
@ -778,11 +785,12 @@ type mockAppendable struct {
metadata []mockMetadata
// optional errors to inject.
commitErr error
appendSampleErr error
appendHistogramErr error
appendExemplarErr error
updateMetadataErr error
commitErr error
appendSampleErr error
appendCTZeroSampleErr error
appendHistogramErr error
appendExemplarErr error
updateMetadataErr error
}
type mockSample struct {
@ -924,9 +932,32 @@ func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp
return 0, nil
}
func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) {
// AppendCTZeroSample is no-op for remote-write for now.
// TODO(bwplotka): Add support for PRW 2.0 for CT zero feature (but also we might
// replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218).
func (m *mockAppendable) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) {
if m.appendCTZeroSampleErr != nil {
return 0, m.appendCTZeroSampleErr
}
// Created Timestamp can't be higher than the original sample's timestamp.
if ct > t {
return 0, storage.ErrOutOfOrderSample
}
latestTs := m.latestSample[l.Hash()]
if ct < latestTs {
return 0, storage.ErrOutOfOrderSample
}
if ct == latestTs {
return 0, storage.ErrDuplicateSampleForTimestamp
}
if l.IsEmpty() {
return 0, tsdb.ErrInvalidSample
}
if _, hasDuplicates := l.HasDuplicateLabelNames(); hasDuplicates {
return 0, tsdb.ErrInvalidSample
}
m.latestSample[l.Hash()] = ct
m.samples = append(m.samples, mockSample{l, ct, 0})
return 0, nil
}