From 1a8ea78207af44d2e26c53a55341687b859340d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Thu, 16 Nov 2023 13:16:47 +0000 Subject: [PATCH 1/5] Fix BenchmarkScrapeLoopAppendOM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OpenMetrics requires EOF comment at the end of metrics body, but the makeTestMetrics() function doesn't append it. This means this benchmark tests a response with errors but I don't think that was the intention. Signed-off-by: Łukasz Mierzwa --- scrape/scrape_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index bcaeb460e..e37d091ae 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1068,6 +1068,7 @@ func makeTestMetrics(n int) []byte { fmt.Fprintf(&sb, "# HELP metric_a help text\n") fmt.Fprintf(&sb, "metric_a{foo=\"%d\",bar=\"%d\"} 1\n", i, i*100) } + fmt.Fprintf(&sb, "# EOF\n") return sb.Bytes() } From 50c81bed86327fa4c954d5cab35142f84bfce532 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Thu, 16 Nov 2023 13:22:28 +0000 Subject: [PATCH 2/5] Check for duplicated series on a scrape MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When Prometheus scrapes a target and it sees the same time series repeated multiple times it currently silently ignores that. This change adds a test for that and fixes the scrape loop so that: - Only first sample for each unique time series is appended - Duplicated samples increment the prometheus_target_scrapes_sample_duplicate_timestamp_total metric This allows one to identify such scrape jobs and targets. Benchmark results: ``` name old time/op new time/op delta ScrapeLoopAppend-8 64.8µs ± 2% 71.1µs ±20% +9.75% (p=0.000 n=10+10) ScrapeLoopAppendOM-8 64.2µs ± 1% 68.5µs ± 7% +6.71% (p=0.000 n=9+10) TargetsFromGroup/1_targets-8 14.2µs ± 1% 14.5µs ± 1% +1.99% (p=0.000 n=10+10) TargetsFromGroup/10_targets-8 149µs ± 1% 152µs ± 1% +2.05% (p=0.000 n=9+10) TargetsFromGroup/100_targets-8 1.49ms ± 4% 1.48ms ± 1% ~ (p=0.796 n=10+10) name old alloc/op new alloc/op delta ScrapeLoopAppend-8 19.9kB ± 1% 17.8kB ± 3% -10.23% (p=0.000 n=8+10) ScrapeLoopAppendOM-8 19.9kB ± 1% 18.3kB ±10% -8.14% (p=0.001 n=9+10) TargetsFromGroup/1_targets-8 2.43kB ± 0% 2.43kB ± 0% -0.15% (p=0.045 n=10+10) TargetsFromGroup/10_targets-8 24.3kB ± 0% 24.3kB ± 0% ~ (p=0.083 n=10+9) TargetsFromGroup/100_targets-8 243kB ± 0% 243kB ± 0% ~ (p=0.720 n=9+10) name old allocs/op new allocs/op delta ScrapeLoopAppend-8 9.00 ± 0% 9.00 ± 0% ~ (all equal) ScrapeLoopAppendOM-8 10.0 ± 0% 10.0 ± 0% ~ (all equal) TargetsFromGroup/1_targets-8 40.0 ± 0% 40.0 ± 0% ~ (all equal) TargetsFromGroup/10_targets-8 400 ± 0% 400 ± 0% ~ (all equal) TargetsFromGroup/100_targets-8 4.00k ± 0% 4.00k ± 0% ~ (all equal) ``` Signed-off-by: Łukasz Mierzwa --- scrape/scrape.go | 53 +++++++++++++++++++++++++------------------ scrape/scrape_test.go | 28 +++++++++++++++++++++++ 2 files changed, 59 insertions(+), 22 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index aa2d5538b..9bff47eeb 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1512,13 +1512,13 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, loop: for { var ( - et textparse.Entry - sampleAdded, isHistogram bool - met []byte - parsedTimestamp *int64 - val float64 - h *histogram.Histogram - fh *histogram.FloatHistogram + et textparse.Entry + sampleAdded, isHistogram, seriesAlreadyScraped bool + met []byte + parsedTimestamp *int64 + val float64 + h *histogram.Histogram + fh *histogram.FloatHistogram ) if et, err = p.Next(); err != nil { if errors.Is(err, io.EOF) { @@ -1573,6 +1573,7 @@ loop: if ok { ref = ce.ref lset = ce.lset + hash = ce.hash // Update metadata only if it changed in the current iteration. updateMetadata(lset, false) @@ -1609,24 +1610,30 @@ loop: updateMetadata(lset, true) } - if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil { - ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) - if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. - // CT is an experimental feature. For now, we don't need to fail the - // scrape on errors updating the created timestamp, log debug. - level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) + _, seriesAlreadyScraped = sl.cache.seriesCur[hash] + if seriesAlreadyScraped { + err = storage.ErrDuplicateSampleForTimestamp + } else { + if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil { + ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) + if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. + // CT is an experimental feature. For now, we don't need to fail the + // scrape on errors updating the created timestamp, log debug. + level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) + } + } + + if isHistogram { + if h != nil { + ref, err = app.AppendHistogram(ref, lset, t, h, nil) + } else { + ref, err = app.AppendHistogram(ref, lset, t, nil, fh) + } + } else { + ref, err = app.Append(ref, lset, t, val) } } - if isHistogram { - if h != nil { - ref, err = app.AppendHistogram(ref, lset, t, h, nil) - } else { - ref, err = app.AppendHistogram(ref, lset, t, nil, fh) - } - } else { - ref, err = app.Append(ref, lset, t, val) - } sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs) if err != nil { if !errors.Is(err, storage.ErrNotFound) { @@ -1648,6 +1655,8 @@ loop: // Increment added even if there's an error so we correctly report the // number of samples remaining after relabeling. + // We still report duplicated samples here since this number should be the exact number + // of time series exposed on a scrape after relabelling. added++ exemplars = exemplars[:0] // Reset and reuse the exemplar slice. for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index e37d091ae..4732fbe0d 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -3600,3 +3600,31 @@ func BenchmarkTargetScraperGzip(b *testing.B) { }) } } + +// When a scrape contains multiple instances for the same time series we should increment +// prometheus_target_scrapes_sample_duplicate_timestamp_total metric. +func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) { + ctx, sl := simpleTestScrapeLoop(t) + + slApp := sl.appender(ctx) + total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "", time.Time{}) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 3, total) + require.Equal(t, 3, added) + require.Equal(t, 1, seriesAdded) + + slApp = sl.appender(ctx) + total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "", time.Time{}) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 3, total) + require.Equal(t, 3, added) + require.Equal(t, 0, seriesAdded) + + metric := dto.Metric{} + err = sl.metrics.targetScrapeSampleDuplicate.Write(&metric) + require.NoError(t, err) + value := metric.GetCounter().GetValue() + require.Equal(t, 4.0, value) +} From 55dcaab41bedc995460927a2c4f3e977d23c374c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Thu, 16 Nov 2023 14:35:44 +0000 Subject: [PATCH 3/5] Fix TestScrapeLoopDiscardDuplicateLabels test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This test calls Rollback() which is normally called from within append code. Doing so means that staleness tracking data is outdated and need to by cycled manually. Signed-off-by: Łukasz Mierzwa --- scrape/scrape_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 4732fbe0d..dbe9f0bb2 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -2636,6 +2636,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { _, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{}) require.Error(t, err) require.NoError(t, slApp.Rollback()) + // We need to cycle staleness cache maps after a manual rollback. Otherwise they will have old entries in them, + // which would cause ErrDuplicateSampleForTimestamp errors on the next append. + sl.cache.iterDone(true) q, err := s.Querier(time.Time{}.UnixNano(), 0) require.NoError(t, err) From 21f8b35f5bdef290c9e1254ab2e0b86d8f68237a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Tue, 28 Nov 2023 17:42:29 +0000 Subject: [PATCH 4/5] Move staleness tracking out of checkAddError() calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This call bloats checkAddError signature and logic, we can and should call it from the main scrape logic. Signed-off-by: Łukasz Mierzwa --- scrape/scrape.go | 13 ++++++++----- scrape/scrape_test.go | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index 9bff47eeb..701aa609f 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1634,7 +1634,13 @@ loop: } } - sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs) + if err == nil { + if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { + sl.cache.trackStaleness(ce.hash, ce.lset) + } + } + + sampleAdded, err = sl.checkAddError(met, err, &sampleLimitErr, &bucketLimitErr, &appErrs) if err != nil { if !errors.Is(err, storage.ErrNotFound) { level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err) @@ -1751,12 +1757,9 @@ loop: // Adds samples to the appender, checking the error, and then returns the # of samples added, // whether the caller should continue to process more samples, and any sample or bucket limit errors. -func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { +func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { switch { case err == nil: - if (tp == nil || sl.trackTimestampsStaleness) && ce != nil { - sl.cache.trackStaleness(ce.hash, ce.lset) - } return true, nil case errors.Is(err, storage.ErrNotFound): return false, storage.ErrNotFound diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index dbe9f0bb2..0dfca538d 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -2975,7 +2975,7 @@ func TestReuseCacheRace(t *testing.T) { func TestCheckAddError(t *testing.T) { var appErrs appendErrors sl := scrapeLoop{l: log.NewNopLogger(), metrics: newTestScrapeMetrics(t)} - sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs) + sl.checkAddError(nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs) require.Equal(t, 1, appErrs.numOutOfOrder) } From c013a3c1b572fb7a11683c49590822c46d91a917 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Tue, 27 Feb 2024 12:09:32 +0000 Subject: [PATCH 5/5] Check of duplicated samples directly in scrapeCache.get() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoid extra map lookup by hooking check into cache get. ``` goos: linux goarch: amd64 pkg: github.com/prometheus/prometheus/scrape cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz │ main.txt │ new.txt │ │ sec/op │ sec/op vs base │ ScrapeLoopAppend-8 66.72µ ± 0% 66.89µ ± 0% ~ (p=0.879 n=50) ScrapeLoopAppendOM-8 66.61µ ± 0% 66.89µ ± 1% ~ (p=0.115 n=50) geomean 66.66µ 66.89µ +0.34% │ main.txt │ new.txt │ │ B/op │ B/op vs base │ ScrapeLoopAppend-8 20.17Ki ± 1% 20.12Ki ± 1% ~ (p=0.343 n=50) ScrapeLoopAppendOM-8 20.38Ki ± 10% 17.99Ki ± 2% -11.69% (p=0.017 n=50) geomean 20.27Ki 19.03Ki -6.14% │ main.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ ScrapeLoopAppend-8 11.00 ± 0% 11.00 ± 0% ~ (p=1.000 n=50) ¹ ScrapeLoopAppendOM-8 12.00 ± 0% 12.00 ± 0% ~ (p=1.000 n=50) ¹ geomean 11.49 11.49 +0.00% ¹ all samples are equal ``` Signed-off-by: Łukasz Mierzwa --- scrape/scrape.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/scrape/scrape.go b/scrape/scrape.go index 701aa609f..6be858c68 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -954,13 +954,14 @@ func (c *scrapeCache) iterDone(flushCache bool) { } } -func (c *scrapeCache) get(met []byte) (*cacheEntry, bool) { +func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) { e, ok := c.series[string(met)] if !ok { - return nil, false + return nil, false, false } + alreadyScraped := e.lastIter == c.iter e.lastIter = c.iter - return e, true + return e, true, alreadyScraped } func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) { @@ -1512,13 +1513,13 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, loop: for { var ( - et textparse.Entry - sampleAdded, isHistogram, seriesAlreadyScraped bool - met []byte - parsedTimestamp *int64 - val float64 - h *histogram.Histogram - fh *histogram.FloatHistogram + et textparse.Entry + sampleAdded, isHistogram bool + met []byte + parsedTimestamp *int64 + val float64 + h *histogram.Histogram + fh *histogram.FloatHistogram ) if et, err = p.Next(); err != nil { if errors.Is(err, io.EOF) { @@ -1564,7 +1565,7 @@ loop: if sl.cache.getDropped(met) { continue } - ce, ok := sl.cache.get(met) + ce, ok, seriesAlreadyScraped := sl.cache.get(met) var ( ref storage.SeriesRef hash uint64 @@ -1610,7 +1611,6 @@ loop: updateMetadata(lset, true) } - _, seriesAlreadyScraped = sl.cache.seriesCur[hash] if seriesAlreadyScraped { err = storage.ErrDuplicateSampleForTimestamp } else { @@ -1882,7 +1882,7 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er } func (sl *scrapeLoop) addReportSample(app storage.Appender, s []byte, t int64, v float64, b *labels.Builder) error { - ce, ok := sl.cache.get(s) + ce, ok, _ := sl.cache.get(s) var ref storage.SeriesRef var lset labels.Labels if ok {