Check for duplicated series on a scrape

When Prometheus scrapes a target and it sees the same time series repeated multiple times it currently silently ignores that.
This change adds a test for that and fixes the scrape loop so that:

- Only first sample for each unique time series is appended
- Duplicated samples increment the prometheus_target_scrapes_sample_duplicate_timestamp_total metric

This allows one to identify such scrape jobs and targets.

Benchmark results:

```
name                            old time/op    new time/op    delta
ScrapeLoopAppend-8                64.8µs ± 2%    71.1µs ±20%   +9.75%  (p=0.000 n=10+10)
ScrapeLoopAppendOM-8              64.2µs ± 1%    68.5µs ± 7%   +6.71%  (p=0.000 n=9+10)
TargetsFromGroup/1_targets-8      14.2µs ± 1%    14.5µs ± 1%   +1.99%  (p=0.000 n=10+10)
TargetsFromGroup/10_targets-8      149µs ± 1%     152µs ± 1%   +2.05%  (p=0.000 n=9+10)
TargetsFromGroup/100_targets-8    1.49ms ± 4%    1.48ms ± 1%     ~     (p=0.796 n=10+10)

name                            old alloc/op   new alloc/op   delta
ScrapeLoopAppend-8                19.9kB ± 1%    17.8kB ± 3%  -10.23%  (p=0.000 n=8+10)
ScrapeLoopAppendOM-8              19.9kB ± 1%    18.3kB ±10%   -8.14%  (p=0.001 n=9+10)
TargetsFromGroup/1_targets-8      2.43kB ± 0%    2.43kB ± 0%   -0.15%  (p=0.045 n=10+10)
TargetsFromGroup/10_targets-8     24.3kB ± 0%    24.3kB ± 0%     ~     (p=0.083 n=10+9)
TargetsFromGroup/100_targets-8     243kB ± 0%     243kB ± 0%     ~     (p=0.720 n=9+10)

name                            old allocs/op  new allocs/op  delta
ScrapeLoopAppend-8                  9.00 ± 0%      9.00 ± 0%     ~     (all equal)
ScrapeLoopAppendOM-8                10.0 ± 0%      10.0 ± 0%     ~     (all equal)
TargetsFromGroup/1_targets-8        40.0 ± 0%      40.0 ± 0%     ~     (all equal)
TargetsFromGroup/10_targets-8        400 ± 0%       400 ± 0%     ~     (all equal)
TargetsFromGroup/100_targets-8     4.00k ± 0%     4.00k ± 0%     ~     (all equal)
```

Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>
This commit is contained in:
Łukasz Mierzwa 2023-11-16 13:22:28 +00:00
parent 1a8ea78207
commit 50c81bed86
2 changed files with 59 additions and 22 deletions

View file

@ -1512,13 +1512,13 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
loop:
for {
var (
et textparse.Entry
sampleAdded, isHistogram bool
met []byte
parsedTimestamp *int64
val float64
h *histogram.Histogram
fh *histogram.FloatHistogram
et textparse.Entry
sampleAdded, isHistogram, seriesAlreadyScraped bool
met []byte
parsedTimestamp *int64
val float64
h *histogram.Histogram
fh *histogram.FloatHistogram
)
if et, err = p.Next(); err != nil {
if errors.Is(err, io.EOF) {
@ -1573,6 +1573,7 @@ loop:
if ok {
ref = ce.ref
lset = ce.lset
hash = ce.hash
// Update metadata only if it changed in the current iteration.
updateMetadata(lset, false)
@ -1609,24 +1610,30 @@ loop:
updateMetadata(lset, true)
}
if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.
level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err)
_, seriesAlreadyScraped = sl.cache.seriesCur[hash]
if seriesAlreadyScraped {
err = storage.ErrDuplicateSampleForTimestamp
} else {
if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.
level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err)
}
}
if isHistogram {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
} else {
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
}
} else {
ref, err = app.Append(ref, lset, t, val)
}
}
if isHistogram {
if h != nil {
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
} else {
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
}
} else {
ref, err = app.Append(ref, lset, t, val)
}
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
@ -1648,6 +1655,8 @@ loop:
// Increment added even if there's an error so we correctly report the
// number of samples remaining after relabeling.
// We still report duplicated samples here since this number should be the exact number
// of time series exposed on a scrape after relabelling.
added++
exemplars = exemplars[:0] // Reset and reuse the exemplar slice.
for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {

View file

@ -3600,3 +3600,31 @@ func BenchmarkTargetScraperGzip(b *testing.B) {
})
}
}
// When a scrape contains multiple instances for the same time series we should increment
// prometheus_target_scrapes_sample_duplicate_timestamp_total metric.
func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) {
ctx, sl := simpleTestScrapeLoop(t)
slApp := sl.appender(ctx)
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 1, seriesAdded)
slApp = sl.appender(ctx)
total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 0, seriesAdded)
metric := dto.Metric{}
err = sl.metrics.targetScrapeSampleDuplicate.Write(&metric)
require.NoError(t, err)
value := metric.GetCounter().GetValue()
require.Equal(t, 4.0, value)
}