Create less garbage when parsing metrics (#9299)

* Refactor: extract function to make scrapeLoop for testing

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Add benchmarks for ScrapeLoopAppend

For Prometheus and OpenMetrics

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Create less garbage when parsing metrics

Exemplar escapes to heap due to being passed through text-parser
interface, but we can reduce the impact by hoisting it out of the loop
and resetting it after every use.

(Note the cost was paid on every line even when exemplars were disabled)

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Create less garbage when parsing OpenMetrics

After calling parseLVals() we always append the return value, so pass in
what we want to append it to and save garbage.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2021-09-08 09:09:21 +01:00 committed by GitHub
parent ad642a85c0
commit 92a3eeac55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 10 deletions

View file

@ -306,11 +306,10 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
t2 := p.nextToken() t2 := p.nextToken()
if t2 == tBraceOpen { if t2 == tBraceOpen {
offsets, err := p.parseLVals() p.offsets, err = p.parseLVals(p.offsets)
if err != nil { if err != nil {
return EntryInvalid, err return EntryInvalid, err
} }
p.offsets = append(p.offsets, offsets...)
p.series = p.l.b[p.start:p.l.i] p.series = p.l.b[p.start:p.l.i]
t2 = p.nextToken() t2 = p.nextToken()
} }
@ -367,12 +366,12 @@ func (p *OpenMetricsParser) parseComment() error {
return err return err
} }
var err error
// Parse the labels. // Parse the labels.
offsets, err := p.parseLVals() p.eOffsets, err = p.parseLVals(p.eOffsets)
if err != nil { if err != nil {
return err return err
} }
p.eOffsets = append(p.eOffsets, offsets...)
p.exemplar = p.l.b[p.start:p.l.i] p.exemplar = p.l.b[p.start:p.l.i]
// Get the value. // Get the value.
@ -410,8 +409,7 @@ func (p *OpenMetricsParser) parseComment() error {
return nil return nil
} }
func (p *OpenMetricsParser) parseLVals() ([]int, error) { func (p *OpenMetricsParser) parseLVals(offsets []int) ([]int, error) {
var offsets []int
first := true first := true
for { for {
t := p.nextToken() t := p.nextToken()

View file

@ -1390,6 +1390,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
defTime = timestamp.FromTime(ts) defTime = timestamp.FromTime(ts)
appErrs = appendErrors{} appErrs = appendErrors{}
sampleLimitErr error sampleLimitErr error
e exemplar.Exemplar // escapes to heap so hoisted out of loop
) )
defer func() { defer func() {
@ -1406,7 +1407,6 @@ loop:
var ( var (
et textparse.Entry et textparse.Entry
sampleAdded bool sampleAdded bool
e exemplar.Exemplar
) )
if et, err = p.Next(); err != nil { if et, err = p.Next(); err != nil {
if err == io.EOF { if err == io.EOF {
@ -1513,6 +1513,7 @@ loop:
// Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors. // Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors.
level.Debug(sl.l).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr) level.Debug(sl.l).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
} }
e = exemplar.Exemplar{} // reset for next time round loop
} }
} }

View file

@ -929,10 +929,10 @@ test_metric 1
require.Equal(t, "", md.Unit) require.Equal(t, "", md.Unit)
} }
func TestScrapeLoopSeriesAdded(t *testing.T) { func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) {
// Need a full storage for correct Add/AddFast semantics. // Need a full storage for correct Add/AddFast semantics.
s := teststorage.New(t) s := teststorage.New(t)
defer s.Close() t.Cleanup(func() { s.Close() })
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, sl := newScrapeLoop(ctx,
@ -950,7 +950,13 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
0, 0,
false, false,
) )
defer cancel() t.Cleanup(func() { cancel() })
return ctx, sl
}
func TestScrapeLoopSeriesAdded(t *testing.T) {
ctx, sl := simpleTestScrapeLoop(t)
slApp := sl.appender(ctx) slApp := sl.appender(ctx)
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{}) total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
@ -969,6 +975,46 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
require.Equal(t, 0, seriesAdded) require.Equal(t, 0, seriesAdded)
} }
func makeTestMetrics(n int) []byte {
// Construct a metrics string to parse
sb := bytes.Buffer{}
for i := 0; i < n; i++ {
fmt.Fprintf(&sb, "# TYPE metric_a gauge\n")
fmt.Fprintf(&sb, "# HELP metric_a help text\n")
fmt.Fprintf(&sb, "metric_a{foo=\"%d\",bar=\"%d\"} 1\n", i, i*100)
}
return sb.Bytes()
}
func BenchmarkScrapeLoopAppend(b *testing.B) {
ctx, sl := simpleTestScrapeLoop(b)
slApp := sl.appender(ctx)
metrics := makeTestMetrics(100)
ts := time.Time{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
ts = ts.Add(time.Second)
_, _, _, _ = sl.append(slApp, metrics, "", ts)
}
}
func BenchmarkScrapeLoopAppendOM(b *testing.B) {
ctx, sl := simpleTestScrapeLoop(b)
slApp := sl.appender(ctx)
metrics := makeTestMetrics(100)
ts := time.Time{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
ts = ts.Add(time.Second)
_, _, _, _ = sl.append(slApp, metrics, "application/openmetrics-text", ts)
}
}
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
appender := &collectResultAppender{} appender := &collectResultAppender{}
var ( var (