From 33f880d123177377c03daf1d60019962b4b3361e Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Mon, 9 Jan 2023 17:06:15 +0530 Subject: [PATCH] Add native histogram support in federation Signed-off-by: Ganesh Vernekar --- web/federate.go | 100 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 80 insertions(+), 20 deletions(-) diff --git a/web/federate.go b/web/federate.go index 85472bb44..1589c893e 100644 --- a/web/federate.go +++ b/web/federate.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" @@ -103,6 +104,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) it := storage.NewBuffer(int64(h.lookbackDelta / 1e6)) var chkIter chunkenc.Iterator +Loop: for set.Next() { s := set.At() @@ -111,18 +113,22 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { chkIter = s.Iterator(chkIter) it.Reset(chkIter) - var t int64 - var v float64 - var ok bool - + var ( + t int64 + v float64 + h *histogram.FloatHistogram + ok bool + ) valueType := it.Seek(maxt) - if valueType == chunkenc.ValFloat { + switch valueType { + case chunkenc.ValFloat: t, v = it.At() - } else { - // TODO(beorn7): Handle histograms. + case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: + t, h = it.AtFloatHistogram() + default: t, v, _, ok = it.PeekBack(1) if !ok { - continue + continue Loop } } // The exposition formats do not support stale markers, so drop them. This @@ -135,7 +141,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { vec = append(vec, promql.Sample{ Metric: s.Labels(), - Point: promql.Point{T: t, V: v}, + Point: promql.Point{T: t, V: v, H: h}, }) } if ws := set.Warnings(); len(ws) > 0 { @@ -161,15 +167,22 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { sort.Strings(externalLabelNames) var ( - lastMetricName string - protMetricFam *dto.MetricFamily + lastMetricName string + lastWasHistogram, lastHistogramWasGauge bool + protMetricFam *dto.MetricFamily ) for _, s := range vec { + isHistogram := s.H != nil + if isHistogram && + format != expfmt.FmtProtoDelim && format != expfmt.FmtProtoText && format != expfmt.FmtProtoCompact { + // Can't serve the native histogram. + // TODO(codesome): Serve them when other protocols get the native histogram support. + continue + } + nameSeen := false globalUsed := map[string]struct{}{} - protMetric := &dto.Metric{ - Untyped: &dto.Untyped{}, - } + protMetric := &dto.Metric{} err := s.Metric.Validate(func(l labels.Label) error { if l.Value == "" { @@ -179,11 +192,18 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } if l.Name == labels.MetricName { nameSeen = true - if l.Value == lastMetricName { - // We already have the name in the current MetricFamily, - // and we ignore nameless metrics. + if l.Value == lastMetricName && // We already have the name in the current MetricFamily, and we ignore nameless metrics. + lastWasHistogram == isHistogram && // The sample type matches (float vs histogram). + // If it was a histogram, the histogram type (counter vs gauge) also matches. + (!isHistogram || lastHistogramWasGauge == (s.H.CounterResetHint == histogram.GaugeType)) { return nil } + + // Since we now check for the sample type and type of histogram above, we will end up + // creating multiple metric families for the same metric name. This would technically be + // an invalid exposition. But since the consumer of this is Prometheus, and Prometheus can + // parse it fine, we allow it and bend the rules to make federation possible in those cases. + // Need to start a new MetricFamily. Ship off the old one (if any) before // creating the new one. if protMetricFam != nil { @@ -195,6 +215,13 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { Type: dto.MetricType_UNTYPED.Enum(), Name: proto.String(l.Value), } + if isHistogram { + if s.H.CounterResetHint == histogram.GaugeType { + protMetricFam.Type = dto.MetricType_GAUGE_HISTOGRAM.Enum() + } else { + protMetricFam.Type = dto.MetricType_HISTOGRAM.Enum() + } + } lastMetricName = l.Value return nil } @@ -228,9 +255,42 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } protMetric.TimestampMs = proto.Int64(s.T) - protMetric.Untyped.Value = proto.Float64(s.V) - // TODO(beorn7): Handle histograms. - + if !isHistogram { + lastHistogramWasGauge = false + protMetric.Untyped = &dto.Untyped{ + Value: proto.Float64(s.V), + } + } else { + lastHistogramWasGauge = s.H.CounterResetHint == histogram.GaugeType + protMetric.Histogram = &dto.Histogram{ + SampleCountFloat: proto.Float64(s.H.Count), + SampleSum: proto.Float64(s.H.Sum), + Schema: proto.Int32(s.H.Schema), + ZeroThreshold: proto.Float64(s.H.ZeroThreshold), + ZeroCountFloat: proto.Float64(s.H.ZeroCount), + NegativeCount: s.H.NegativeBuckets, + PositiveCount: s.H.PositiveBuckets, + } + if len(s.H.PositiveSpans) > 0 { + protMetric.Histogram.PositiveSpan = make([]*dto.BucketSpan, len(s.H.PositiveSpans)) + for i, sp := range s.H.PositiveSpans { + protMetric.Histogram.PositiveSpan[i] = &dto.BucketSpan{ + Offset: proto.Int32(sp.Offset), + Length: proto.Uint32(sp.Length), + } + } + } + if len(s.H.NegativeSpans) > 0 { + protMetric.Histogram.NegativeSpan = make([]*dto.BucketSpan, len(s.H.NegativeSpans)) + for i, sp := range s.H.NegativeSpans { + protMetric.Histogram.NegativeSpan[i] = &dto.BucketSpan{ + Offset: proto.Int32(sp.Offset), + Length: proto.Uint32(sp.Length), + } + } + } + } + lastWasHistogram = isHistogram protMetricFam.Metric = append(protMetricFam.Metric, protMetric) } // Still have to ship off the last MetricFamily, if any.