diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 53571e74a0..cdf141095e 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -401,7 +401,7 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * if h.ingestCTZeroSample && len(ts.Samples) > 0 { // CT only needs to be ingested for the first sample, it will be considered // out of order for the rest. - ref, err = h.handleCTZeroSample(app, ref, ls, ts.Samples[0], ts.CreatedTimestamp, rs) + ref, err = h.handleCTZeroSample(app, ref, ls, ts.Samples[0], ts.CreatedTimestamp) if err != nil { h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Samples[0].Timestamp) } @@ -426,15 +426,15 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * } // Native Histograms. - if h.ingestCTZeroSample && len(ts.Histograms) > 0 { - // CT only needs to be ingested for the first histogram, it will be considered - // out of order for the rest. - ref, err = h.handleHistogramZeroSample(app, ref, ls, ts.Histograms[0], ts.CreatedTimestamp, rs) - if err != nil { - h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Histograms[0].Timestamp) - } - } for _, hp := range ts.Histograms { + if h.ingestCTZeroSample { + // Differently from samples, we need to handle CT for each histogram instead of just the first one. + // This is because histograms and float histograms are stored separately, even if they have the same labels. + ref, err = h.handleHistogramZeroSample(app, ref, ls, hp, ts.CreatedTimestamp) + if err != nil { + h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", hp.Timestamp) + } + } if hp.IsFloatHistogram() { ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, nil, hp.ToFloatHistogram()) } else { @@ -501,13 +501,10 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * // handleCTZeroSample appends CT as a zero-value sample with CT value as the sample timestamp. // It doens't return errors in case of out of order CT. -func (h *writeHandler) handleCTZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, sample writev2.Sample, ct int64, rs *WriteResponseStats) (storage.SeriesRef, error) { +func (h *writeHandler) handleCTZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, sample writev2.Sample, ct int64) (storage.SeriesRef, error) { var err error if sample.Timestamp != 0 && ct != 0 { ref, err = app.AppendCTZeroSample(ref, l, sample.Timestamp, ct) - if err == nil { - rs.Samples++ - } if err != nil && errors.Is(err, storage.ErrOutOfOrderCT) { // Even for the first sample OOO is a common scenario because // we can't tell if a CT was already ingested in a previous request. @@ -520,7 +517,7 @@ func (h *writeHandler) handleCTZeroSample(app storage.Appender, ref storage.Seri // handleHistogramZeroSample appends CT as a zero-value sample with CT value as the sample timestamp. // It doens't return errors in case of out of order CT. -func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, hist writev2.Histogram, ct int64, rs *WriteResponseStats) (storage.SeriesRef, error) { +func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, hist writev2.Histogram, ct int64) (storage.SeriesRef, error) { var err error if hist.Timestamp != 0 && ct != 0 { if hist.IsFloatHistogram() { @@ -528,9 +525,6 @@ func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref stora } else { ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, hist.ToIntHistogram(), nil) } - if err == nil { - rs.Histograms++ - } if err != nil && errors.Is(err, storage.ErrOutOfOrderCT) { // Even for the first sample OOO is a common scenario because // we can't tell if a CT was already ingested in a previous request. diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 035d35a839..084db9fcd8 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -482,11 +482,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { // Double check mandatory 2.0 stats. // writeV2RequestFixture has 2 series with 1 sample, 2 histograms, 1 exemplar each. - if tc.ingestCTZeroSample { - expectHeaderValue(t, 3, resp.Header.Get(rw20WrittenSamplesHeader)) - } else { - expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader)) - } + expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader)) expectHeaderValue(t, 4, resp.Header.Get(rw20WrittenHistogramsHeader)) if tc.appendExemplarErr != nil { expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader)) @@ -513,9 +509,17 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { fh := hp.ToFloatHistogram() + if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { + requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, nil, &histogram.FloatHistogram{}}, appendable.histograms[k]) + k++ + } requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) } else { h := hp.ToIntHistogram() + if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { + requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, &histogram.Histogram{}, nil}, appendable.histograms[k]) + k++ + } requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) } k++ @@ -532,6 +536,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { requireEqual(t, mockMetadata{ls, expectedMeta}, appendable.metadata[m]) m++ } + } }) } @@ -793,6 +798,7 @@ type mockAppendable struct { latestExemplar map[uint64]int64 exemplars []mockExemplar latestHistogram map[uint64]int64 + latestFloatHist map[uint64]int64 histograms []mockHistogram metadata []mockMetadata @@ -846,6 +852,9 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender { if m.latestHistogram == nil { m.latestHistogram = map[uint64]int64{} } + if m.latestFloatHist == nil { + m.latestFloatHist = map[uint64]int64{} + } if m.latestExemplar == nil { m.latestExemplar = map[uint64]int64{} } @@ -919,7 +928,12 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t return 0, m.appendHistogramErr } - latestTs := m.latestHistogram[l.Hash()] + var latestTs int64 + if h != nil { + latestTs = m.latestHistogram[l.Hash()] + } else { + latestTs = m.latestFloatHist[l.Hash()] + } if t < latestTs { return 0, storage.ErrOutOfOrderSample } @@ -934,15 +948,53 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t return 0, tsdb.ErrInvalidSample } - m.latestHistogram[l.Hash()] = t + if h != nil { + m.latestHistogram[l.Hash()] = t + } else { + m.latestFloatHist[l.Hash()] = t + } m.histograms = append(m.histograms, mockHistogram{l, t, h, fh}) return 0, nil } func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { - // AppendCTZeroSample is no-op for remote-write for now. - // TODO(bwplotka/arthursens): Add support for PRW 2.0 for CT zero feature (but also we might - // replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218). + if m.appendCTZeroSampleErr != nil { + return 0, m.appendCTZeroSampleErr + } + + // Created Timestamp can't be higher than the original sample's timestamp. + if ct > t { + return 0, storage.ErrOutOfOrderSample + } + + var latestTs int64 + if h != nil { + latestTs = m.latestHistogram[l.Hash()] + } else { + latestTs = m.latestFloatHist[l.Hash()] + } + if ct < latestTs { + return 0, storage.ErrOutOfOrderSample + } + if ct == latestTs { + return 0, storage.ErrDuplicateSampleForTimestamp + } + + if l.IsEmpty() { + return 0, tsdb.ErrInvalidSample + } + + if _, hasDuplicates := l.HasDuplicateLabelNames(); hasDuplicates { + return 0, tsdb.ErrInvalidSample + } + + if h != nil { + m.latestHistogram[l.Hash()] = ct + m.histograms = append(m.histograms, mockHistogram{l, ct, &histogram.Histogram{}, nil}) + } else { + m.latestFloatHist[l.Hash()] = ct + m.histograms = append(m.histograms, mockHistogram{l, ct, nil, &histogram.FloatHistogram{}}) + } return 0, nil }