Merge pull request #12557 from prometheus/beorn7/histogram

scrape: Enable ingestion of multiple exemplars per sample
This commit is contained in:
Julien Pivotto 2023-07-20 15:19:28 +02:00 committed by GitHub
commit 7905594b52
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 228 additions and 96 deletions

View file

@ -59,7 +59,9 @@ type Parser interface {
Metric(l *labels.Labels) string Metric(l *labels.Labels) string
// Exemplar writes the exemplar of the current sample into the passed // Exemplar writes the exemplar of the current sample into the passed
// exemplar. It returns if an exemplar exists or not. // exemplar. It can be called repeatedly to retrieve multiple exemplars
// for the same sample. It returns false once all exemplars are
// retrieved (including the case where no exemplars exist at all).
Exemplar(l *exemplar.Exemplar) bool Exemplar(l *exemplar.Exemplar) bool
// Next advances the parser to the next sample. It returns false if no // Next advances the parser to the next sample. It returns false if no

View file

@ -174,8 +174,10 @@ func (p *OpenMetricsParser) Metric(l *labels.Labels) string {
return s return s
} }
// Exemplar writes the exemplar of the current sample into the passed // Exemplar writes the exemplar of the current sample into the passed exemplar.
// exemplar. It returns the whether an exemplar exists. // It returns whether an exemplar exists. As OpenMetrics only ever has one
// exemplar per sample, every call after the first (for the same sample) will
// always return false.
func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool { func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
if len(p.exemplar) == 0 { if len(p.exemplar) == 0 {
return false return false
@ -204,6 +206,8 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
p.builder.Sort() p.builder.Sort()
e.Labels = p.builder.Labels() e.Labels = p.builder.Labels()
// Wipe exemplar so that future calls return false.
p.exemplar = p.exemplar[:0]
return true return true
} }

View file

@ -53,10 +53,10 @@ func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.M
func (a nopAppender) Commit() error { return nil } func (a nopAppender) Commit() error { return nil }
func (a nopAppender) Rollback() error { return nil } func (a nopAppender) Rollback() error { return nil }
type sample struct { type floatSample struct {
metric labels.Labels metric labels.Labels
t int64 t int64
v float64 f float64
} }
type histogramSample struct { type histogramSample struct {
@ -69,23 +69,23 @@ type histogramSample struct {
// It can be used as its zero value or be backed by another appender it writes samples through. // It can be used as its zero value or be backed by another appender it writes samples through.
type collectResultAppender struct { type collectResultAppender struct {
next storage.Appender next storage.Appender
result []sample resultFloats []floatSample
pendingResult []sample pendingFloats []floatSample
rolledbackResult []sample rolledbackFloats []floatSample
pendingExemplars []exemplar.Exemplar
resultExemplars []exemplar.Exemplar
resultHistograms []histogramSample resultHistograms []histogramSample
pendingHistograms []histogramSample pendingHistograms []histogramSample
rolledbackHistograms []histogramSample rolledbackHistograms []histogramSample
pendingMetadata []metadata.Metadata resultExemplars []exemplar.Exemplar
pendingExemplars []exemplar.Exemplar
resultMetadata []metadata.Metadata resultMetadata []metadata.Metadata
pendingMetadata []metadata.Metadata
} }
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
a.pendingResult = append(a.pendingResult, sample{ a.pendingFloats = append(a.pendingFloats, floatSample{
metric: lset, metric: lset,
t: t, t: t,
v: v, f: v,
}) })
if ref == 0 { if ref == 0 {
@ -133,11 +133,11 @@ func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.L
} }
func (a *collectResultAppender) Commit() error { func (a *collectResultAppender) Commit() error {
a.result = append(a.result, a.pendingResult...) a.resultFloats = append(a.resultFloats, a.pendingFloats...)
a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...) a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...)
a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...) a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...)
a.resultMetadata = append(a.resultMetadata, a.pendingMetadata...) a.resultMetadata = append(a.resultMetadata, a.pendingMetadata...)
a.pendingResult = nil a.pendingFloats = nil
a.pendingExemplars = nil a.pendingExemplars = nil
a.pendingHistograms = nil a.pendingHistograms = nil
a.pendingMetadata = nil a.pendingMetadata = nil
@ -148,9 +148,9 @@ func (a *collectResultAppender) Commit() error {
} }
func (a *collectResultAppender) Rollback() error { func (a *collectResultAppender) Rollback() error {
a.rolledbackResult = a.pendingResult a.rolledbackFloats = a.pendingFloats
a.rolledbackHistograms = a.pendingHistograms a.rolledbackHistograms = a.pendingHistograms
a.pendingResult = nil a.pendingFloats = nil
a.pendingHistograms = nil a.pendingHistograms = nil
if a.next == nil { if a.next == nil {
return nil return nil
@ -160,14 +160,14 @@ func (a *collectResultAppender) Rollback() error {
func (a *collectResultAppender) String() string { func (a *collectResultAppender) String() string {
var sb strings.Builder var sb strings.Builder
for _, s := range a.result { for _, s := range a.resultFloats {
sb.WriteString(fmt.Sprintf("committed: %s %f %d\n", s.metric, s.v, s.t)) sb.WriteString(fmt.Sprintf("committed: %s %f %d\n", s.metric, s.f, s.t))
} }
for _, s := range a.pendingResult { for _, s := range a.pendingFloats {
sb.WriteString(fmt.Sprintf("pending: %s %f %d\n", s.metric, s.v, s.t)) sb.WriteString(fmt.Sprintf("pending: %s %f %d\n", s.metric, s.f, s.t))
} }
for _, s := range a.rolledbackResult { for _, s := range a.rolledbackFloats {
sb.WriteString(fmt.Sprintf("rolledback: %s %f %d\n", s.metric, s.v, s.t)) sb.WriteString(fmt.Sprintf("rolledback: %s %f %d\n", s.metric, s.f, s.t))
} }
return sb.String() return sb.String()
} }

View file

@ -1685,7 +1685,7 @@ loop:
// number of samples remaining after relabeling. // number of samples remaining after relabeling.
added++ added++
if hasExemplar := p.Exemplar(&e); hasExemplar { for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {
if !e.HasTs { if !e.HasTs {
e.Ts = t e.Ts = t
} }

View file

@ -17,6 +17,7 @@ import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"context" "context"
"encoding/binary"
"fmt" "fmt"
"io" "io"
"math" "math"
@ -29,6 +30,7 @@ import (
"time" "time"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
@ -39,6 +41,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/textparse"
@ -737,12 +740,12 @@ func TestScrapeLoopStop(t *testing.T) {
// We expected 1 actual sample for each scrape plus 5 for report samples. // We expected 1 actual sample for each scrape plus 5 for report samples.
// At least 2 scrapes were made, plus the final stale markers. // At least 2 scrapes were made, plus the final stale markers.
if len(appender.result) < 6*3 || len(appender.result)%6 != 0 { if len(appender.resultFloats) < 6*3 || len(appender.resultFloats)%6 != 0 {
t.Fatalf("Expected at least 3 scrapes with 6 samples each, got %d samples", len(appender.result)) t.Fatalf("Expected at least 3 scrapes with 6 samples each, got %d samples", len(appender.resultFloats))
} }
// All samples in a scrape must have the same timestamp. // All samples in a scrape must have the same timestamp.
var ts int64 var ts int64
for i, s := range appender.result { for i, s := range appender.resultFloats {
switch { switch {
case i%6 == 0: case i%6 == 0:
ts = s.t ts = s.t
@ -751,9 +754,9 @@ func TestScrapeLoopStop(t *testing.T) {
} }
} }
// All samples from the last scrape must be stale markers. // All samples from the last scrape must be stale markers.
for _, s := range appender.result[len(appender.result)-5:] { for _, s := range appender.resultFloats[len(appender.resultFloats)-5:] {
if !value.IsStaleNaN(s.v) { if !value.IsStaleNaN(s.f) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.v)) t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.f))
} }
} }
} }
@ -1189,10 +1192,10 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
// each scrape successful or not. // each scrape successful or not.
require.Equal(t, 27, len(appender.result), "Appended samples not as expected:\n%s", appender) require.Equal(t, 27, len(appender.resultFloats), "Appended samples not as expected:\n%s", appender)
require.Equal(t, 42.0, appender.result[0].v, "Appended first sample not as expected") require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected")
require.True(t, value.IsStaleNaN(appender.result[6].v), require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[6].v)) "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
} }
func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
@ -1254,10 +1257,10 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
// each scrape successful or not. // each scrape successful or not.
require.Equal(t, 17, len(appender.result), "Appended samples not as expected:\n%s", appender) require.Equal(t, 17, len(appender.resultFloats), "Appended samples not as expected:\n%s", appender)
require.Equal(t, 42.0, appender.result[0].v, "Appended first sample not as expected") require.Equal(t, 42.0, appender.resultFloats[0].f, "Appended first sample not as expected")
require.True(t, value.IsStaleNaN(appender.result[6].v), require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[6].v)) "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
} }
func TestScrapeLoopCache(t *testing.T) { func TestScrapeLoopCache(t *testing.T) {
@ -1339,7 +1342,7 @@ func TestScrapeLoopCache(t *testing.T) {
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
// each scrape successful or not. // each scrape successful or not.
require.Equal(t, 26, len(appender.result), "Appended samples not as expected:\n%s", appender) require.Equal(t, 26, len(appender.resultFloats), "Appended samples not as expected:\n%s", appender)
} }
func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
@ -1498,11 +1501,11 @@ func TestScrapeLoopAppend(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
expected := []sample{ expected := []floatSample{
{ {
metric: test.expLset, metric: test.expLset,
t: timestamp.FromTime(now), t: timestamp.FromTime(now),
v: test.expValue, f: test.expValue,
}, },
} }
@ -1510,11 +1513,11 @@ func TestScrapeLoopAppend(t *testing.T) {
// DeepEqual will report NaNs as being different, // DeepEqual will report NaNs as being different,
// so replace it with the expected one. // so replace it with the expected one.
if test.expValue == float64(value.NormalNaN) { if test.expValue == float64(value.NormalNaN) {
app.result[0].v = expected[0].v app.resultFloats[0].f = expected[0].f
} }
t.Logf("Test:%s", test.title) t.Logf("Test:%s", test.title)
require.Equal(t, expected, app.result) require.Equal(t, expected, app.resultFloats)
} }
} }
@ -1584,13 +1587,13 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
require.Equal(t, []sample{ require.Equal(t, []floatSample{
{ {
metric: labels.FromStrings(tc.expected...), metric: labels.FromStrings(tc.expected...),
t: timestamp.FromTime(time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)), t: timestamp.FromTime(time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)),
v: 0, f: 0,
}, },
}, app.result) }, app.resultFloats)
}) })
} }
} }
@ -1638,15 +1641,15 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
expected := []sample{ expected := []floatSample{
{ {
metric: lset, metric: lset,
t: timestamp.FromTime(now), t: timestamp.FromTime(now),
v: expValue, f: expValue,
}, },
} }
require.Equal(t, expected, app.result) require.Equal(t, expected, app.resultFloats)
} }
func TestScrapeLoopAppendSampleLimit(t *testing.T) { func TestScrapeLoopAppendSampleLimit(t *testing.T) {
@ -1706,14 +1709,14 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
require.Equal(t, 1.0, change, "Unexpected change of sample limit metric: %f", change) require.Equal(t, 1.0, change, "Unexpected change of sample limit metric: %f", change)
// And verify that we got the samples that fit under the limit. // And verify that we got the samples that fit under the limit.
want := []sample{ want := []floatSample{
{ {
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now), t: timestamp.FromTime(now),
v: 1, f: 1,
}, },
} }
require.Equal(t, want, resApp.rolledbackResult, "Appended samples not as expected:\n%s", appender) require.Equal(t, want, resApp.rolledbackFloats, "Appended samples not as expected:\n%s", appender)
now = time.Now() now = time.Now()
slApp = sl.appender(context.Background()) slApp = sl.appender(context.Background())
@ -1866,19 +1869,19 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
// DeepEqual will report NaNs as being different, so replace with a different value. // DeepEqual will report NaNs as being different, so replace with a different value.
want := []sample{ want := []floatSample{
{ {
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
t: timestamp.FromTime(now), t: timestamp.FromTime(now),
v: 1, f: 1,
}, },
{ {
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
t: timestamp.FromTime(now.Add(time.Minute)), t: timestamp.FromTime(now.Add(time.Minute)),
v: 2, f: 2,
}, },
} }
require.Equal(t, want, capp.result, "Appended samples not as expected:\n%s", appender) require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender)
} }
func TestScrapeLoopAppendStaleness(t *testing.T) { func TestScrapeLoopAppendStaleness(t *testing.T) {
@ -1914,24 +1917,24 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
ingestedNaN := math.Float64bits(app.result[1].v) ingestedNaN := math.Float64bits(app.resultFloats[1].f)
require.Equal(t, value.StaleNaN, ingestedNaN, "Appended stale sample wasn't as expected") require.Equal(t, value.StaleNaN, ingestedNaN, "Appended stale sample wasn't as expected")
// DeepEqual will report NaNs as being different, so replace with a different value. // DeepEqual will report NaNs as being different, so replace with a different value.
app.result[1].v = 42 app.resultFloats[1].f = 42
want := []sample{ want := []floatSample{
{ {
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now), t: timestamp.FromTime(now),
v: 1, f: 1,
}, },
{ {
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now.Add(time.Second)), t: timestamp.FromTime(now.Add(time.Second)),
v: 42, f: 42,
}, },
} }
require.Equal(t, want, app.result, "Appended samples not as expected:\n%s", appender) require.Equal(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
} }
func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
@ -1966,40 +1969,44 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
want := []sample{ want := []floatSample{
{ {
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"), metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: 1000, t: 1000,
v: 1, f: 1,
}, },
} }
require.Equal(t, want, app.result, "Appended samples not as expected:\n%s", appender) require.Equal(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
} }
func TestScrapeLoopAppendExemplar(t *testing.T) { func TestScrapeLoopAppendExemplar(t *testing.T) {
tests := []struct { tests := []struct {
title string title string
scrapeText string scrapeText string
contentType string
discoveryLabels []string discoveryLabels []string
samples []sample floats []floatSample
histograms []histogramSample
exemplars []exemplar.Exemplar exemplars []exemplar.Exemplar
}{ }{
{ {
title: "Metric without exemplars", title: "Metric without exemplars",
scrapeText: "metric_total{n=\"1\"} 0\n# EOF", scrapeText: "metric_total{n=\"1\"} 0\n# EOF",
contentType: "application/openmetrics-text",
discoveryLabels: []string{"n", "2"}, discoveryLabels: []string{"n", "2"},
samples: []sample{{ floats: []floatSample{{
metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
v: 0, f: 0,
}}, }},
}, },
{ {
title: "Metric with exemplars", title: "Metric with exemplars",
scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0\n# EOF", scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0\n# EOF",
contentType: "application/openmetrics-text",
discoveryLabels: []string{"n", "2"}, discoveryLabels: []string{"n", "2"},
samples: []sample{{ floats: []floatSample{{
metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
v: 0, f: 0,
}}, }},
exemplars: []exemplar.Exemplar{ exemplars: []exemplar.Exemplar{
{Labels: labels.FromStrings("a", "abc"), Value: 1}, {Labels: labels.FromStrings("a", "abc"), Value: 1},
@ -2008,10 +2015,11 @@ func TestScrapeLoopAppendExemplar(t *testing.T) {
{ {
title: "Metric with exemplars and TS", title: "Metric with exemplars and TS",
scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0 10000\n# EOF", scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0 10000\n# EOF",
contentType: "application/openmetrics-text",
discoveryLabels: []string{"n", "2"}, discoveryLabels: []string{"n", "2"},
samples: []sample{{ floats: []floatSample{{
metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"), metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
v: 0, f: 0,
}}, }},
exemplars: []exemplar.Exemplar{ exemplars: []exemplar.Exemplar{
{Labels: labels.FromStrings("a", "abc"), Value: 1, Ts: 10000000, HasTs: true}, {Labels: labels.FromStrings("a", "abc"), Value: 1, Ts: 10000000, HasTs: true},
@ -2022,18 +2030,117 @@ func TestScrapeLoopAppendExemplar(t *testing.T) {
scrapeText: `metric_total{n="1"} 1 # {t="1"} 1.0 10000 scrapeText: `metric_total{n="1"} 1 # {t="1"} 1.0 10000
metric_total{n="2"} 2 # {t="2"} 2.0 20000 metric_total{n="2"} 2 # {t="2"} 2.0 20000
# EOF`, # EOF`,
samples: []sample{{ contentType: "application/openmetrics-text",
floats: []floatSample{{
metric: labels.FromStrings("__name__", "metric_total", "n", "1"), metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
v: 1, f: 1,
}, { }, {
metric: labels.FromStrings("__name__", "metric_total", "n", "2"), metric: labels.FromStrings("__name__", "metric_total", "n", "2"),
v: 2, f: 2,
}}, }},
exemplars: []exemplar.Exemplar{ exemplars: []exemplar.Exemplar{
{Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true}, {Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true},
{Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true}, {Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true},
}, },
}, },
{
title: "Native histogram with two exemplars",
scrapeText: `name: "test_histogram"
help: "Test histogram with many buckets removed to keep it manageable in size."
type: HISTOGRAM
metric: <
histogram: <
sample_count: 175
sample_sum: 0.0008280461746287094
bucket: <
cumulative_count: 2
upper_bound: -0.0004899999999999998
>
bucket: <
cumulative_count: 4
upper_bound: -0.0003899999999999998
exemplar: <
label: <
name: "dummyID"
value: "59727"
>
value: -0.00039
timestamp: <
seconds: 1625851155
nanos: 146848499
>
>
>
bucket: <
cumulative_count: 16
upper_bound: -0.0002899999999999998
exemplar: <
label: <
name: "dummyID"
value: "5617"
>
value: -0.00029
>
>
schema: 3
zero_threshold: 2.938735877055719e-39
zero_count: 2
negative_span: <
offset: -162
length: 1
>
negative_span: <
offset: 23
length: 4
>
negative_delta: 1
negative_delta: 3
negative_delta: -2
negative_delta: -1
negative_delta: 1
positive_span: <
offset: -161
length: 1
>
positive_span: <
offset: 8
length: 3
>
positive_delta: 1
positive_delta: 2
positive_delta: -1
positive_delta: -1
>
timestamp_ms: 1234568
>
`,
contentType: "application/vnd.google.protobuf",
histograms: []histogramSample{{
t: 1234568,
h: &histogram.Histogram{
Count: 175,
ZeroCount: 2,
Sum: 0.0008280461746287094,
ZeroThreshold: 2.938735877055719e-39,
Schema: 3,
PositiveSpans: []histogram.Span{
{Offset: -161, Length: 1},
{Offset: 8, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: -162, Length: 1},
{Offset: 23, Length: 4},
},
PositiveBuckets: []int64{1, 2, -1, -1},
NegativeBuckets: []int64{1, 3, -2, -1, 1},
},
}},
exemplars: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, Ts: 1234568, HasTs: false},
},
},
} }
for _, test := range tests { for _, test := range tests {
@ -2069,8 +2176,8 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000
now := time.Now() now := time.Now()
for i := range test.samples { for i := range test.floats {
test.samples[i].t = timestamp.FromTime(now) test.floats[i].t = timestamp.FromTime(now)
} }
// We need to set the timestamp for expected exemplars that does not have a timestamp. // We need to set the timestamp for expected exemplars that does not have a timestamp.
@ -2080,10 +2187,29 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000
} }
} }
_, _, _, err := sl.append(app, []byte(test.scrapeText), "application/openmetrics-text", now) buf := &bytes.Buffer{}
if test.contentType == "application/vnd.google.protobuf" {
// In case of protobuf, we have to create the binary representation.
pb := &dto.MetricFamily{}
// From text to proto message.
require.NoError(t, proto.UnmarshalText(test.scrapeText, pb))
// From proto message to binary protobuf.
protoBuf, err := proto.Marshal(pb)
require.NoError(t, err)
// Write first length, then binary protobuf.
varintBuf := binary.AppendUvarint(nil, uint64(len(protoBuf)))
buf.Write(varintBuf)
buf.Write(protoBuf)
} else {
buf.WriteString(test.scrapeText)
}
_, _, _, err := sl.append(app, buf.Bytes(), test.contentType, now)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.Equal(t, test.samples, app.result) require.Equal(t, test.floats, app.resultFloats)
require.Equal(t, test.histograms, app.resultHistograms)
require.Equal(t, test.exemplars, app.resultExemplars) require.Equal(t, test.exemplars, app.resultExemplars)
}) })
} }
@ -2093,12 +2219,12 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
scrapeText := []string{`metric_total{n="1"} 1 # {t="1"} 1.0 10000 scrapeText := []string{`metric_total{n="1"} 1 # {t="1"} 1.0 10000
# EOF`, `metric_total{n="1"} 2 # {t="2"} 2.0 20000 # EOF`, `metric_total{n="1"} 2 # {t="2"} 2.0 20000
# EOF`} # EOF`}
samples := []sample{{ samples := []floatSample{{
metric: labels.FromStrings("__name__", "metric_total", "n", "1"), metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
v: 1, f: 1,
}, { }, {
metric: labels.FromStrings("__name__", "metric_total", "n", "1"), metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
v: 2, f: 2,
}} }}
exemplars := []exemplar.Exemplar{ exemplars := []exemplar.Exemplar{
{Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true}, {Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true},
@ -2154,7 +2280,7 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
} }
require.Equal(t, samples, app.result) require.Equal(t, samples, app.resultFloats)
require.Equal(t, exemplars, app.resultExemplars) require.Equal(t, exemplars, app.resultExemplars)
} }
@ -2192,7 +2318,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
} }
sl.run(nil) sl.run(nil)
require.Equal(t, 0.0, appender.result[0].v, "bad 'up' value") require.Equal(t, 0.0, appender.resultFloats[0].f, "bad 'up' value")
} }
func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
@ -2230,7 +2356,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
} }
sl.run(nil) sl.run(nil)
require.Equal(t, 0.0, appender.result[0].v, "bad 'up' value") require.Equal(t, 0.0, appender.resultFloats[0].f, "bad 'up' value")
} }
type errorAppender struct { type errorAppender struct {
@ -2279,14 +2405,14 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
want := []sample{ want := []floatSample{
{ {
metric: labels.FromStrings(model.MetricNameLabel, "normal"), metric: labels.FromStrings(model.MetricNameLabel, "normal"),
t: timestamp.FromTime(now), t: timestamp.FromTime(now),
v: 1, f: 1,
}, },
} }
require.Equal(t, want, app.result, "Appended samples not as expected:\n%s", appender) require.Equal(t, want, app.resultFloats, "Appended samples not as expected:\n%s", appender)
require.Equal(t, 4, total) require.Equal(t, 4, total)
require.Equal(t, 4, added) require.Equal(t, 4, added)
require.Equal(t, 1, seriesAdded) require.Equal(t, 1, seriesAdded)
@ -2598,14 +2724,14 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
want := []sample{ want := []floatSample{
{ {
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
t: 0, t: 0,
v: 1, f: 1,
}, },
} }
require.Equal(t, want, capp.result, "Appended samples not as expected:\n%s", appender) require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender)
} }
func TestScrapeLoop_DiscardTimestamps(t *testing.T) { func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
@ -2640,14 +2766,14 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, slApp.Commit()) require.NoError(t, slApp.Commit())
want := []sample{ want := []floatSample{
{ {
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
t: timestamp.FromTime(now), t: timestamp.FromTime(now),
v: 1, f: 1,
}, },
} }
require.Equal(t, want, capp.result, "Appended samples not as expected:\n%s", appender) require.Equal(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", appender)
} }
func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {

View file

@ -365,8 +365,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
if prev := ce.exemplars[ce.nextIndex]; prev == nil { if prev := ce.exemplars[ce.nextIndex]; prev == nil {
ce.exemplars[ce.nextIndex] = &circularBufferEntry{} ce.exemplars[ce.nextIndex] = &circularBufferEntry{}
} else { } else {
// There exists exemplar already on this ce.nextIndex entry, drop it, to make place // There exists an exemplar already on this ce.nextIndex entry,
// for others. // drop it, to make place for others.
var buf [1024]byte var buf [1024]byte
prevLabels := prev.ref.seriesLabels.Bytes(buf[:]) prevLabels := prev.ref.seriesLabels.Bytes(buf[:])
if prev.next == noExemplar { if prev.next == noExemplar {