mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
handle histogram CT
Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
This commit is contained in:
parent
6571d97e70
commit
3ffc3bf6a3
|
@ -401,7 +401,7 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
|
|||
if h.ingestCTZeroSample && len(ts.Samples) > 0 {
|
||||
// CT only needs to be ingested for the first sample, it will be considered
|
||||
// out of order for the rest.
|
||||
ref, err = h.handleCTZeroSample(app, ref, ls, ts.Samples[0], ts.CreatedTimestamp, rs)
|
||||
ref, err = h.handleCTZeroSample(app, ref, ls, ts.Samples[0], ts.CreatedTimestamp)
|
||||
if err != nil {
|
||||
h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Samples[0].Timestamp)
|
||||
}
|
||||
|
@ -426,15 +426,15 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
|
|||
}
|
||||
|
||||
// Native Histograms.
|
||||
if h.ingestCTZeroSample && len(ts.Histograms) > 0 {
|
||||
// CT only needs to be ingested for the first histogram, it will be considered
|
||||
// out of order for the rest.
|
||||
ref, err = h.handleHistogramZeroSample(app, ref, ls, ts.Histograms[0], ts.CreatedTimestamp, rs)
|
||||
if err != nil {
|
||||
h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Histograms[0].Timestamp)
|
||||
}
|
||||
}
|
||||
for _, hp := range ts.Histograms {
|
||||
if h.ingestCTZeroSample {
|
||||
// Differently from samples, we need to handle CT for each histogram instead of just the first one.
|
||||
// This is because histograms and float histograms are stored separately, even if they have the same labels.
|
||||
ref, err = h.handleHistogramZeroSample(app, ref, ls, hp, ts.CreatedTimestamp)
|
||||
if err != nil {
|
||||
h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", hp.Timestamp)
|
||||
}
|
||||
}
|
||||
if hp.IsFloatHistogram() {
|
||||
ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, nil, hp.ToFloatHistogram())
|
||||
} else {
|
||||
|
@ -501,13 +501,10 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
|
|||
|
||||
// handleCTZeroSample appends CT as a zero-value sample with CT value as the sample timestamp.
|
||||
// It doens't return errors in case of out of order CT.
|
||||
func (h *writeHandler) handleCTZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, sample writev2.Sample, ct int64, rs *WriteResponseStats) (storage.SeriesRef, error) {
|
||||
func (h *writeHandler) handleCTZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, sample writev2.Sample, ct int64) (storage.SeriesRef, error) {
|
||||
var err error
|
||||
if sample.Timestamp != 0 && ct != 0 {
|
||||
ref, err = app.AppendCTZeroSample(ref, l, sample.Timestamp, ct)
|
||||
if err == nil {
|
||||
rs.Samples++
|
||||
}
|
||||
if err != nil && errors.Is(err, storage.ErrOutOfOrderCT) {
|
||||
// Even for the first sample OOO is a common scenario because
|
||||
// we can't tell if a CT was already ingested in a previous request.
|
||||
|
@ -520,7 +517,7 @@ func (h *writeHandler) handleCTZeroSample(app storage.Appender, ref storage.Seri
|
|||
|
||||
// handleHistogramZeroSample appends CT as a zero-value sample with CT value as the sample timestamp.
|
||||
// It doens't return errors in case of out of order CT.
|
||||
func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, hist writev2.Histogram, ct int64, rs *WriteResponseStats) (storage.SeriesRef, error) {
|
||||
func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, hist writev2.Histogram, ct int64) (storage.SeriesRef, error) {
|
||||
var err error
|
||||
if hist.Timestamp != 0 && ct != 0 {
|
||||
if hist.IsFloatHistogram() {
|
||||
|
@ -528,9 +525,6 @@ func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref stora
|
|||
} else {
|
||||
ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, hist.ToIntHistogram(), nil)
|
||||
}
|
||||
if err == nil {
|
||||
rs.Histograms++
|
||||
}
|
||||
if err != nil && errors.Is(err, storage.ErrOutOfOrderCT) {
|
||||
// Even for the first sample OOO is a common scenario because
|
||||
// we can't tell if a CT was already ingested in a previous request.
|
||||
|
|
|
@ -482,11 +482,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
|
|||
|
||||
// Double check mandatory 2.0 stats.
|
||||
// writeV2RequestFixture has 2 series with 1 sample, 2 histograms, 1 exemplar each.
|
||||
if tc.ingestCTZeroSample {
|
||||
expectHeaderValue(t, 3, resp.Header.Get(rw20WrittenSamplesHeader))
|
||||
} else {
|
||||
expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader))
|
||||
}
|
||||
expectHeaderValue(t, 2, resp.Header.Get(rw20WrittenSamplesHeader))
|
||||
expectHeaderValue(t, 4, resp.Header.Get(rw20WrittenHistogramsHeader))
|
||||
if tc.appendExemplarErr != nil {
|
||||
expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader))
|
||||
|
@ -513,9 +509,17 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
|
|||
for _, hp := range ts.Histograms {
|
||||
if hp.IsFloatHistogram() {
|
||||
fh := hp.ToFloatHistogram()
|
||||
if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample {
|
||||
requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, nil, &histogram.FloatHistogram{}}, appendable.histograms[k])
|
||||
k++
|
||||
}
|
||||
requireEqual(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
|
||||
} else {
|
||||
h := hp.ToIntHistogram()
|
||||
if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample {
|
||||
requireEqual(t, mockHistogram{ls, ts.CreatedTimestamp, &histogram.Histogram{}, nil}, appendable.histograms[k])
|
||||
k++
|
||||
}
|
||||
requireEqual(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
|
||||
}
|
||||
k++
|
||||
|
@ -532,6 +536,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
|
|||
requireEqual(t, mockMetadata{ls, expectedMeta}, appendable.metadata[m])
|
||||
m++
|
||||
}
|
||||
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -793,6 +798,7 @@ type mockAppendable struct {
|
|||
latestExemplar map[uint64]int64
|
||||
exemplars []mockExemplar
|
||||
latestHistogram map[uint64]int64
|
||||
latestFloatHist map[uint64]int64
|
||||
histograms []mockHistogram
|
||||
metadata []mockMetadata
|
||||
|
||||
|
@ -846,6 +852,9 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
|
|||
if m.latestHistogram == nil {
|
||||
m.latestHistogram = map[uint64]int64{}
|
||||
}
|
||||
if m.latestFloatHist == nil {
|
||||
m.latestFloatHist = map[uint64]int64{}
|
||||
}
|
||||
if m.latestExemplar == nil {
|
||||
m.latestExemplar = map[uint64]int64{}
|
||||
}
|
||||
|
@ -919,7 +928,12 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
|
|||
return 0, m.appendHistogramErr
|
||||
}
|
||||
|
||||
latestTs := m.latestHistogram[l.Hash()]
|
||||
var latestTs int64
|
||||
if h != nil {
|
||||
latestTs = m.latestHistogram[l.Hash()]
|
||||
} else {
|
||||
latestTs = m.latestFloatHist[l.Hash()]
|
||||
}
|
||||
if t < latestTs {
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
}
|
||||
|
@ -934,15 +948,53 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
|
|||
return 0, tsdb.ErrInvalidSample
|
||||
}
|
||||
|
||||
m.latestHistogram[l.Hash()] = t
|
||||
if h != nil {
|
||||
m.latestHistogram[l.Hash()] = t
|
||||
} else {
|
||||
m.latestFloatHist[l.Hash()] = t
|
||||
}
|
||||
m.histograms = append(m.histograms, mockHistogram{l, t, h, fh})
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
// AppendCTZeroSample is no-op for remote-write for now.
|
||||
// TODO(bwplotka/arthursens): Add support for PRW 2.0 for CT zero feature (but also we might
|
||||
// replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218).
|
||||
if m.appendCTZeroSampleErr != nil {
|
||||
return 0, m.appendCTZeroSampleErr
|
||||
}
|
||||
|
||||
// Created Timestamp can't be higher than the original sample's timestamp.
|
||||
if ct > t {
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
}
|
||||
|
||||
var latestTs int64
|
||||
if h != nil {
|
||||
latestTs = m.latestHistogram[l.Hash()]
|
||||
} else {
|
||||
latestTs = m.latestFloatHist[l.Hash()]
|
||||
}
|
||||
if ct < latestTs {
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
}
|
||||
if ct == latestTs {
|
||||
return 0, storage.ErrDuplicateSampleForTimestamp
|
||||
}
|
||||
|
||||
if l.IsEmpty() {
|
||||
return 0, tsdb.ErrInvalidSample
|
||||
}
|
||||
|
||||
if _, hasDuplicates := l.HasDuplicateLabelNames(); hasDuplicates {
|
||||
return 0, tsdb.ErrInvalidSample
|
||||
}
|
||||
|
||||
if h != nil {
|
||||
m.latestHistogram[l.Hash()] = ct
|
||||
m.histograms = append(m.histograms, mockHistogram{l, ct, &histogram.Histogram{}, nil})
|
||||
} else {
|
||||
m.latestFloatHist[l.Hash()] = ct
|
||||
m.histograms = append(m.histograms, mockHistogram{l, ct, nil, &histogram.FloatHistogram{}})
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue