mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Handle histogram's created timestamp
Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
This commit is contained in:
parent
3b97a6397c
commit
6571d97e70
|
@ -397,7 +397,8 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
|
||||||
allSamplesSoFar := rs.AllSamples()
|
allSamplesSoFar := rs.AllSamples()
|
||||||
var ref storage.SeriesRef
|
var ref storage.SeriesRef
|
||||||
|
|
||||||
if h.ingestCTZeroSample {
|
// Samples.
|
||||||
|
if h.ingestCTZeroSample && len(ts.Samples) > 0 {
|
||||||
// CT only needs to be ingested for the first sample, it will be considered
|
// CT only needs to be ingested for the first sample, it will be considered
|
||||||
// out of order for the rest.
|
// out of order for the rest.
|
||||||
ref, err = h.handleCTZeroSample(app, ref, ls, ts.Samples[0], ts.CreatedTimestamp, rs)
|
ref, err = h.handleCTZeroSample(app, ref, ls, ts.Samples[0], ts.CreatedTimestamp, rs)
|
||||||
|
@ -405,8 +406,6 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
|
||||||
h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Samples[0].Timestamp)
|
h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Samples[0].Timestamp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Samples.
|
|
||||||
for _, s := range ts.Samples {
|
for _, s := range ts.Samples {
|
||||||
ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue())
|
ref, err = app.Append(ref, ls, s.GetTimestamp(), s.GetValue())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -427,6 +426,14 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
|
||||||
}
|
}
|
||||||
|
|
||||||
// Native Histograms.
|
// Native Histograms.
|
||||||
|
if h.ingestCTZeroSample && len(ts.Histograms) > 0 {
|
||||||
|
// CT only needs to be ingested for the first histogram, it will be considered
|
||||||
|
// out of order for the rest.
|
||||||
|
ref, err = h.handleHistogramZeroSample(app, ref, ls, ts.Histograms[0], ts.CreatedTimestamp, rs)
|
||||||
|
if err != nil {
|
||||||
|
h.logger.Debug("Error when appending CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ts.CreatedTimestamp, "timestamp", ts.Histograms[0].Timestamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
for _, hp := range ts.Histograms {
|
for _, hp := range ts.Histograms {
|
||||||
if hp.IsFloatHistogram() {
|
if hp.IsFloatHistogram() {
|
||||||
ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, nil, hp.ToFloatHistogram())
|
ref, err = app.AppendHistogram(ref, ls, hp.Timestamp, nil, hp.ToFloatHistogram())
|
||||||
|
@ -511,6 +518,29 @@ func (h *writeHandler) handleCTZeroSample(app storage.Appender, ref storage.Seri
|
||||||
return ref, err
|
return ref, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleHistogramZeroSample appends CT as a zero-value sample with CT value as the sample timestamp.
|
||||||
|
// It doens't return errors in case of out of order CT.
|
||||||
|
func (h *writeHandler) handleHistogramZeroSample(app storage.Appender, ref storage.SeriesRef, l labels.Labels, hist writev2.Histogram, ct int64, rs *WriteResponseStats) (storage.SeriesRef, error) {
|
||||||
|
var err error
|
||||||
|
if hist.Timestamp != 0 && ct != 0 {
|
||||||
|
if hist.IsFloatHistogram() {
|
||||||
|
ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, nil, hist.ToFloatHistogram())
|
||||||
|
} else {
|
||||||
|
ref, err = app.AppendHistogramCTZeroSample(ref, l, hist.Timestamp, ct, hist.ToIntHistogram(), nil)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
rs.Histograms++
|
||||||
|
}
|
||||||
|
if err != nil && errors.Is(err, storage.ErrOutOfOrderCT) {
|
||||||
|
// Even for the first sample OOO is a common scenario because
|
||||||
|
// we can't tell if a CT was already ingested in a previous request.
|
||||||
|
// We ignore the error.
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
|
||||||
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
|
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
|
||||||
// writes them to the provided appendable.
|
// writes them to the provided appendable.
|
||||||
func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler {
|
func NewOTLPWriteHandler(logger *slog.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler {
|
||||||
|
|
Loading…
Reference in a new issue