scrape: Enable ingestion of multiple exemplars per sample

This has become a requirement for native histograms, as a single
histogram sample commonly has many buckets, so that providing many
exemplars makes sense.

Since OM text doesn't support native histograms yet, the test had to
be expanded to also support protobuf test cases.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2023-07-13 14:16:10 +02:00
parent cb93a0cbd2
commit 0e3f35324b
5 changed files with 147 additions and 15 deletions

View file

@ -59,7 +59,9 @@ type Parser interface {
Metric(l *labels.Labels) string
// Exemplar writes the exemplar of the current sample into the passed
// exemplar. It returns if an exemplar exists or not.
// exemplar. It can be called repeatedly to retrieve multiple exemplars
// for the same sample. It returns false once all exemplars are
// retrieved (including the case where no exemplars exist at all).
Exemplar(l *exemplar.Exemplar) bool
// Next advances the parser to the next sample. It returns false if no

View file

@ -174,8 +174,10 @@ func (p *OpenMetricsParser) Metric(l *labels.Labels) string {
return s
}
// Exemplar writes the exemplar of the current sample into the passed
// exemplar. It returns the whether an exemplar exists.
// Exemplar writes the exemplar of the current sample into the passed exemplar.
// It returns whether an exemplar exists. As OpenMetrics only ever has one
// exemplar per sample, every call after the first (for the same sample) will
// always return false.
func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
if len(p.exemplar) == 0 {
return false
@ -204,6 +206,8 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
p.builder.Sort()
e.Labels = p.builder.Labels()
// Wipe exemplar so that future calls return false.
p.exemplar = p.exemplar[:0]
return true
}

View file

@ -1685,7 +1685,7 @@ loop:
// number of samples remaining after relabeling.
added++
if hasExemplar := p.Exemplar(&e); hasExemplar {
for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {
if !e.HasTs {
e.Ts = t
}

View file

@ -17,6 +17,7 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"fmt"
"io"
"math"
@ -29,6 +30,7 @@ import (
"time"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
@ -39,6 +41,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/textparse"
@ -1980,15 +1983,18 @@ func TestScrapeLoopAppendExemplar(t *testing.T) {
tests := []struct {
title string
scrapeText string
contentType string
discoveryLabels []string
samples []sample
floats []sample
histograms []histogramSample
exemplars []exemplar.Exemplar
}{
{
title: "Metric without exemplars",
scrapeText: "metric_total{n=\"1\"} 0\n# EOF",
contentType: "application/openmetrics-text",
discoveryLabels: []string{"n", "2"},
samples: []sample{{
floats: []sample{{
metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
v: 0,
}},
@ -1996,8 +2002,9 @@ func TestScrapeLoopAppendExemplar(t *testing.T) {
{
title: "Metric with exemplars",
scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0\n# EOF",
contentType: "application/openmetrics-text",
discoveryLabels: []string{"n", "2"},
samples: []sample{{
floats: []sample{{
metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
v: 0,
}},
@ -2008,8 +2015,9 @@ func TestScrapeLoopAppendExemplar(t *testing.T) {
{
title: "Metric with exemplars and TS",
scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0 10000\n# EOF",
contentType: "application/openmetrics-text",
discoveryLabels: []string{"n", "2"},
samples: []sample{{
floats: []sample{{
metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
v: 0,
}},
@ -2022,7 +2030,8 @@ func TestScrapeLoopAppendExemplar(t *testing.T) {
scrapeText: `metric_total{n="1"} 1 # {t="1"} 1.0 10000
metric_total{n="2"} 2 # {t="2"} 2.0 20000
# EOF`,
samples: []sample{{
contentType: "application/openmetrics-text",
floats: []sample{{
metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
v: 1,
}, {
@ -2034,6 +2043,104 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000
{Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true},
},
},
{
title: "Native histogram with two exemplars",
scrapeText: `name: "test_histogram"
help: "Test histogram with many buckets removed to keep it manageable in size."
type: HISTOGRAM
metric: <
histogram: <
sample_count: 175
sample_sum: 0.0008280461746287094
bucket: <
cumulative_count: 2
upper_bound: -0.0004899999999999998
>
bucket: <
cumulative_count: 4
upper_bound: -0.0003899999999999998
exemplar: <
label: <
name: "dummyID"
value: "59727"
>
value: -0.00039
timestamp: <
seconds: 1625851155
nanos: 146848499
>
>
>
bucket: <
cumulative_count: 16
upper_bound: -0.0002899999999999998
exemplar: <
label: <
name: "dummyID"
value: "5617"
>
value: -0.00029
>
>
schema: 3
zero_threshold: 2.938735877055719e-39
zero_count: 2
negative_span: <
offset: -162
length: 1
>
negative_span: <
offset: 23
length: 4
>
negative_delta: 1
negative_delta: 3
negative_delta: -2
negative_delta: -1
negative_delta: 1
positive_span: <
offset: -161
length: 1
>
positive_span: <
offset: 8
length: 3
>
positive_delta: 1
positive_delta: 2
positive_delta: -1
positive_delta: -1
>
timestamp_ms: 1234568
>
`,
contentType: "application/vnd.google.protobuf",
histograms: []histogramSample{{
t: 1234568,
h: &histogram.Histogram{
Count: 175,
ZeroCount: 2,
Sum: 0.0008280461746287094,
ZeroThreshold: 2.938735877055719e-39,
Schema: 3,
PositiveSpans: []histogram.Span{
{Offset: -161, Length: 1},
{Offset: 8, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: -162, Length: 1},
{Offset: 23, Length: 4},
},
PositiveBuckets: []int64{1, 2, -1, -1},
NegativeBuckets: []int64{1, 3, -2, -1, 1},
},
}},
exemplars: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, Ts: 1234568, HasTs: false},
},
},
}
for _, test := range tests {
@ -2069,8 +2176,8 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000
now := time.Now()
for i := range test.samples {
test.samples[i].t = timestamp.FromTime(now)
for i := range test.floats {
test.floats[i].t = timestamp.FromTime(now)
}
// We need to set the timestamp for expected exemplars that does not have a timestamp.
@ -2080,10 +2187,29 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000
}
}
_, _, _, err := sl.append(app, []byte(test.scrapeText), "application/openmetrics-text", now)
buf := &bytes.Buffer{}
if test.contentType == "application/vnd.google.protobuf" {
// In case of protobuf, we have to create the binary representation.
pb := &dto.MetricFamily{}
// From text to proto message.
require.NoError(t, proto.UnmarshalText(test.scrapeText, pb))
// From proto message to binary protobuf.
protoBuf, err := proto.Marshal(pb)
require.NoError(t, err)
// Write first length, then binary protobuf.
varintBuf := binary.AppendUvarint(nil, uint64(len(protoBuf)))
buf.Write(varintBuf)
buf.Write(protoBuf)
} else {
buf.WriteString(test.scrapeText)
}
_, _, _, err := sl.append(app, buf.Bytes(), test.contentType, now)
require.NoError(t, err)
require.NoError(t, app.Commit())
require.Equal(t, test.samples, app.result)
require.Equal(t, test.floats, app.result)
require.Equal(t, test.histograms, app.resultHistograms)
require.Equal(t, test.exemplars, app.resultExemplars)
})
}

View file

@ -365,8 +365,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
if prev := ce.exemplars[ce.nextIndex]; prev == nil {
ce.exemplars[ce.nextIndex] = &circularBufferEntry{}
} else {
// There exists exemplar already on this ce.nextIndex entry, drop it, to make place
// for others.
// There exists an exemplar already on this ce.nextIndex entry,
// drop it, to make place for others.
var buf [1024]byte
prevLabels := prev.ref.seriesLabels.Bytes(buf[:])
if prev.next == noExemplar {