Separate scrape add error checking out into it's own function. (#6930)

* Separate scrape add error checking out into it's own function.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* pass sampleLimitError to checkAddError instead of returning an error

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Return bool, error from checkAddError so we can properly handle
ErrNotFound for AddFast. This should in theory never happen, but the
previous code path handled this case. Adds a test for this, which master
passes and the previous commit fails.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address comment changes.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Move sampleAdded inside the loop iteration within append, since that's
the only block the variable is used in.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2020-03-25 19:31:48 -07:00 committed by GitHub
parent b1fcfcf9c4
commit c453def8c5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 68 deletions

View file

@ -1068,16 +1068,20 @@ func (sl *scrapeLoop) getCache() *scrapeCache {
return sl.cache return sl.cache
} }
type appendErrors struct {
numOutOfOrder int
numDuplicates int
numOutOfBounds int
}
func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
var ( var (
app = sl.appender() app = sl.appender()
p = textparse.New(b, contentType) p = textparse.New(b, contentType)
defTime = timestamp.FromTime(ts) defTime = timestamp.FromTime(ts)
numOutOfOrder = 0 appErrs = appendErrors{}
numDuplicates = 0 sampleLimitErr error
numOutOfBounds = 0
) )
var sampleLimitErr error
defer func() { defer func() {
if err != nil { if err != nil {
@ -1094,7 +1098,10 @@ func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total,
loop: loop:
for { for {
var et textparse.Entry var (
et textparse.Entry
sampleAdded bool
)
if et, err = p.Next(); err != nil { if et, err = p.Next(); err != nil {
if err == io.EOF { if err == io.EOF {
err = nil err = nil
@ -1130,37 +1137,13 @@ loop:
continue continue
} }
ce, ok := sl.cache.get(yoloString(met)) ce, ok := sl.cache.get(yoloString(met))
if ok { if ok {
switch err = app.AddFast(ce.ref, t, v); errors.Cause(err) { err = app.AddFast(ce.ref, t, v)
case nil: sampleAdded, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, appErrs)
if tp == nil { // In theory this should never happen.
sl.cache.trackStaleness(ce.hash, ce.lset) if err == storage.ErrNotFound {
}
case storage.ErrNotFound:
ok = false ok = false
case storage.ErrOutOfOrderSample:
numOutOfOrder++
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
targetScrapeSampleOutOfOrder.Inc()
continue
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
targetScrapeSampleDuplicate.Inc()
continue
case storage.ErrOutOfBounds:
numOutOfBounds++
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
targetScrapeSampleOutOfBounds.Inc()
continue
case errSampleLimit:
// Keep on parsing output if we hit the limit, so we report the correct
// total number of samples scraped.
sampleLimitErr = err
added++
continue
default:
break loop
} }
} }
if !ok { if !ok {
@ -1186,43 +1169,29 @@ loop:
var ref uint64 var ref uint64
ref, err = app.Add(lset, t, v) ref, err = app.Add(lset, t, v)
switch errors.Cause(err) { sampleAdded, err = sl.checkAddError(nil, met, tp, err, &sampleLimitErr, appErrs)
case nil: if err != nil {
case storage.ErrOutOfOrderSample: if err != storage.ErrNotFound {
err = nil
numOutOfOrder++
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
targetScrapeSampleOutOfOrder.Inc()
continue
case storage.ErrDuplicateSampleForTimestamp:
err = nil
numDuplicates++
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
targetScrapeSampleDuplicate.Inc()
continue
case storage.ErrOutOfBounds:
err = nil
numOutOfBounds++
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
targetScrapeSampleOutOfBounds.Inc()
continue
case errSampleLimit:
sampleLimitErr = err
added++
continue
default:
level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err) level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err)
}
break loop break loop
} }
if tp == nil { if tp == nil {
// Bypass staleness logic if there is an explicit timestamp. // Bypass staleness logic if there is an explicit timestamp.
sl.cache.trackStaleness(hash, lset) sl.cache.trackStaleness(hash, lset)
} }
sl.cache.addRef(mets, ref, lset, hash) sl.cache.addRef(mets, ref, lset, hash)
if sampleAdded && sampleLimitErr == nil {
seriesAdded++ seriesAdded++
} }
}
// Increment added even if there's a sampleLimitErr so we correctly report the number of samples scraped.
if sampleAdded || sampleLimitErr != nil {
added++ added++
} }
}
if sampleLimitErr != nil { if sampleLimitErr != nil {
if err == nil { if err == nil {
err = sampleLimitErr err = sampleLimitErr
@ -1230,14 +1199,14 @@ loop:
// We only want to increment this once per scrape, so this is Inc'd outside the loop. // We only want to increment this once per scrape, so this is Inc'd outside the loop.
targetScrapeSampleLimit.Inc() targetScrapeSampleLimit.Inc()
} }
if numOutOfOrder > 0 { if appErrs.numOutOfOrder > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder) level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
} }
if numDuplicates > 0 { if appErrs.numDuplicates > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates) level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates)
} }
if numOutOfBounds > 0 { if appErrs.numOutOfBounds > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds) level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds)
} }
if err == nil { if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool { sl.cache.forEachStale(func(lset labels.Labels) bool {
@ -1259,6 +1228,43 @@ func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b))) return *((*string)(unsafe.Pointer(&b)))
} }
// Adds samples to the appender, checking the error, and then returns the # of samples added,
// whether the caller should continue to process more samples, and any sample limit errors.
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr *error, appErrs appendErrors) (bool, error) {
switch errors.Cause(err) {
case nil:
if tp == nil && ce != nil {
sl.cache.trackStaleness(ce.hash, ce.lset)
}
return true, nil
case storage.ErrNotFound:
return false, storage.ErrNotFound
case storage.ErrOutOfOrderSample:
appErrs.numOutOfOrder++
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
targetScrapeSampleOutOfOrder.Inc()
return false, nil
case storage.ErrDuplicateSampleForTimestamp:
appErrs.numDuplicates++
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
targetScrapeSampleDuplicate.Inc()
return false, nil
case storage.ErrOutOfBounds:
appErrs.numOutOfBounds++
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
targetScrapeSampleOutOfBounds.Inc()
return false, nil
case errSampleLimit:
// Keep on parsing output if we hit the limit, so we report the correct
// total number of samples scraped.
*sampleLimitErr = err
return false, nil
default:
return false, err
}
}
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions // The constants are suffixed with the invalid \xff unicode rune to avoid collisions
// with scraped metrics in the cache. // with scraped metrics in the cache.
const ( const (

View file

@ -1014,6 +1014,48 @@ func TestScrapeLoopAppend(t *testing.T) {
} }
} }
func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
// collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next.
app := &collectResultAppender{}
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
nil,
0,
true,
)
fakeRef := uint64(1)
expValue := float64(1)
metric := `metric{n="1"} 1`
p := textparse.New([]byte(metric), "")
var lset labels.Labels
p.Next()
mets := p.Metric(&lset)
hash := lset.Hash()
// Create a fake entry in the cache
sl.cache.addRef(mets, fakeRef, lset, hash)
now := time.Now()
_, _, _, err := sl.append([]byte(metric), "", now)
testutil.Ok(t, err)
expected := []sample{
{
metric: lset,
t: timestamp.FromTime(now),
v: expValue,
},
}
testutil.Equals(t, expected, app.result)
}
func TestScrapeLoopAppendSampleLimit(t *testing.T) { func TestScrapeLoopAppendSampleLimit(t *testing.T) {
resApp := &collectResultAppender{} resApp := &collectResultAppender{}
app := &limitAppender{Appender: resApp, limit: 1} app := &limitAppender{Appender: resApp, limit: 1}