From 6571d97e704564596c065e78052f1adb24ca6c76 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Wed, 4 Dec 2024 17:15:13 -0300 Subject: [PATCH] Handle histogram's created timestamp Signed-off-by: Arthur Silva Sens --- storage/remote/write_handler.go | 36 ++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 79a627a53..53571e74a 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -397,7 +397,8 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * allSamplesSoFar := rs.AllSamples() var ref storage.SeriesRef - if h.ingestCTZeroSample { + // Samples. + if h.ingestCTZeroSample && len(ts.Samples) > 0 { // CT only needs to be ingested for the first sample, it will be considered // out of order for the rest. ref, err = h.handleCTZeroSample(app, ref, ls, ts.Samples[0], ts.CreatedTimestamp, rs) @@ -405,8 +406,6 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Samples[0].Timestamp) } } - - // Samples. for _, s := range ts.Samples { ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue()) if err == nil { @@ -427,6 +426,14 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * } // Native Histograms. + if h.ingestCTZeroSample && len(ts.Histograms) > 0 { + // CT only needs to be ingested for the first histogram, it will be considered + // out of order for the rest. + ref, err = h.handleHistogramZeroSample(app, ref, ls, ts.Histograms[0], ts.CreatedTimestamp, rs) + if err != nil { + h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Histograms[0].Timestamp) + } + } for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, nil, hp.ToFloatHistogram()) @@ -511,6 +518,29 @@ func (h *writeHandler) handleCTZeroSample(app storage.Appender, ref storage.Seri return ref, err } +// handleHistogramZeroSample appends CT as a zero-value sample with CT value as the sample timestamp. +// It doens't return errors in case of out of order CT. +func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, hist writev2.Histogram, ct int64, rs *WriteResponseStats) (storage.SeriesRef, error) { + var err error + if hist.Timestamp != 0 && ct != 0 { + if hist.IsFloatHistogram() { + ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, nil, hist.ToFloatHistogram()) + } else { + ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, hist.ToIntHistogram(), nil) + } + if err == nil { + rs.Histograms++ + } + if err != nil && errors.Is(err, storage.ErrOutOfOrderCT) { + // Even for the first sample OOO is a common scenario because + // we can't tell if a CT was already ingested in a previous request. + // We ignore the error. + err = nil + } + } + return ref, err +} + // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // writes them to the provided appendable. func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler {