Merge pull request #12933 from prymitive/duplicated_samples

When Prometheus scrapes a target and it sees the same time series repeated multiple times it currently silently ignores that. This change adds a test for that and fixes the scrape loop so that:

* Only first sample for each unique time series is appended
* Duplicated samples increment the prometheus_target_scrapes_sample_duplicate_timestamp_total metric
This allows one to identify such scrape jobs and targets.

Also fix some tests and benchmark.
This commit is contained in:
Bryan Boreham 2024-03-16 09:18:46 +00:00 committed by GitHub
commit 5ed21c0d76
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 68 additions and 24 deletions

View file

@ -954,13 +954,14 @@ func (c *scrapeCache) iterDone(flushCache bool) {
}
}
func (c *scrapeCache) get(met []byte) (*cacheEntry, bool) {
func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) {
e, ok := c.series[string(met)]
if !ok {
return nil, false
return nil, false, false
}
alreadyScraped := e.lastIter == c.iter
e.lastIter = c.iter
return e, true
return e, true, alreadyScraped
}
func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) {
@ -1566,7 +1567,7 @@ loop:
if sl.cache.getDropped(met) {
continue
}
ce, ok := sl.cache.get(met)
ce, ok, seriesAlreadyScraped := sl.cache.get(met)
var (
ref storage.SeriesRef
hash uint64
@ -1575,6 +1576,7 @@ loop:
if ok {
ref = ce.ref
lset = ce.lset
hash = ce.hash
// Update metadata only if it changed in the current iteration.
updateMetadata(lset, false)
@ -1611,25 +1613,36 @@ loop:
updateMetadata(lset, true)
}
if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.
level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err)
if seriesAlreadyScraped {
err = storage.ErrDuplicateSampleForTimestamp
} else {
if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.
level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err)
}
}
if isHistogram {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
} else {
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
}
} else {
ref, err = app.Append(ref, lset, t, val)
}
}
if isHistogram {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
} else {
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
if err == nil {
if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil {
sl.cache.trackStaleness(ce.hash, ce.lset)
}
} else {
ref, err = app.Append(ref, lset, t, val)
}
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
sampleAdded, err = sl.checkAddError(met, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
@ -1650,6 +1663,8 @@ loop:
// Increment added even if there's an error so we correctly report the
// number of samples remaining after relabeling.
// We still report duplicated samples here since this number should be the exact number
// of time series exposed on a scrape after relabelling.
added++
exemplars = exemplars[:0] // Reset and reuse the exemplar slice.
for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {
@ -1744,12 +1759,9 @@ loop:
// Adds samples to the appender, checking the error, and then returns the # of samples added,
// whether the caller should continue to process more samples, and any sample or bucket limit errors.
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
switch {
case err == nil:
if (tp == nil || sl.trackTimestampsStaleness) && ce != nil {
sl.cache.trackStaleness(ce.hash, ce.lset)
}
return true, nil
case errors.Is(err, storage.ErrNotFound):
return false, storage.ErrNotFound
@ -1872,7 +1884,7 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er
}
func (sl *scrapeLoop) addReportSample(app storage.Appender, s []byte, t int64, v float64, b *labels.Builder) error {
ce, ok := sl.cache.get(s)
ce, ok, _ := sl.cache.get(s)
var ref storage.SeriesRef
var lset labels.Labels
if ok {

View file

@ -1068,6 +1068,7 @@ func makeTestMetrics(n int) []byte {
fmt.Fprintf(&sb, "# HELP metric_a help text\n")
fmt.Fprintf(&sb, "metric_a{foo=\"%d\",bar=\"%d\"} 1\n", i, i*100)
}
fmt.Fprintf(&sb, "# EOF\n")
return sb.Bytes()
}
@ -2635,6 +2636,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
_, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
require.Error(t, err)
require.NoError(t, slApp.Rollback())
// We need to cycle staleness cache maps after a manual rollback. Otherwise they will have old entries in them,
// which would cause ErrDuplicateSampleForTimestamp errors on the next append.
sl.cache.iterDone(true)
q, err := s.Querier(time.Time{}.UnixNano(), 0)
require.NoError(t, err)
@ -2971,7 +2975,7 @@ func TestReuseCacheRace(t *testing.T) {
func TestCheckAddError(t *testing.T) {
var appErrs appendErrors
sl := scrapeLoop{l: log.NewNopLogger(), metrics: newTestScrapeMetrics(t)}
sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs)
sl.checkAddError(nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs)
require.Equal(t, 1, appErrs.numOutOfOrder)
}
@ -3599,3 +3603,31 @@ func BenchmarkTargetScraperGzip(b *testing.B) {
})
}
}
// When a scrape contains multiple instances for the same time series we should increment
// prometheus_target_scrapes_sample_duplicate_timestamp_total metric.
func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) {
ctx, sl := simpleTestScrapeLoop(t)
slApp := sl.appender(ctx)
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 1, seriesAdded)
slApp = sl.appender(ctx)
total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 0, seriesAdded)
metric := dto.Metric{}
err = sl.metrics.targetScrapeSampleDuplicate.Write(&metric)
require.NoError(t, err)
value := metric.GetCounter().GetValue()
require.Equal(t, 4.0, value)
}