Inject a stale NaN when sample disappears between scrapes.

This commit is contained in:
Brian Brazil 2017-04-13 18:07:23 +01:00
parent beaa7d5a43
commit 4f35952cf3
2 changed files with 43 additions and 11 deletions

View file

@ -19,6 +19,7 @@ import (
"compress/gzip" "compress/gzip"
"fmt" "fmt"
"io" "io"
"math"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@ -35,6 +36,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
) )
@ -410,13 +412,20 @@ type loop interface {
stop() stop()
} }
type lsetCacheEntry struct {
lset labels.Labels
str string
}
type scrapeLoop struct { type scrapeLoop struct {
scraper scraper scraper scraper
appender func() storage.Appender appender func() storage.Appender
reportAppender func() storage.Appender reportAppender func() storage.Appender
cache map[string]uint64 refCache map[string]uint64 // Parsed string to ref.
lsetCache map[uint64]lsetCacheEntry // Ref to labelset and string
samplesInPreviousScrape map[string]labels.Labels
done chan struct{} done chan struct{}
ctx context.Context ctx context.Context
@ -428,7 +437,8 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag
scraper: sc, scraper: sc,
appender: app, appender: app,
reportAppender: reportApp, reportAppender: reportApp,
cache: map[string]uint64{}, refCache: map[string]uint64{},
lsetCache: map[uint64]lsetCacheEntry{},
done: make(chan struct{}), done: make(chan struct{}),
} }
sl.ctx, sl.cancel = context.WithCancel(ctx) sl.ctx, sl.cancel = context.WithCancel(ctx)
@ -528,6 +538,7 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro
app = sl.appender() app = sl.appender()
p = textparse.New(b) p = textparse.New(b)
defTime = timestamp.FromTime(ts) defTime = timestamp.FromTime(ts)
samplesScraped = map[string]labels.Labels{}
) )
loop: loop:
@ -541,10 +552,11 @@ loop:
} }
mets := yoloString(met) mets := yoloString(met)
ref, ok := sl.cache[mets] ref, ok := sl.refCache[mets]
if ok { if ok {
switch err = app.AddFast(ref, t, v); err { switch err = app.AddFast(ref, t, v); err {
case nil: case nil:
samplesScraped[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset
case storage.ErrNotFound: case storage.ErrNotFound:
ok = false ok = false
case errSeriesDropped: case errSeriesDropped:
@ -568,13 +580,31 @@ loop:
} }
// Allocate a real string. // Allocate a real string.
mets = string(met) mets = string(met)
sl.cache[mets] = ref sl.refCache[mets] = ref
str := lset.String()
sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str}
samplesScraped[str] = lset
} }
added++ added++
} }
if err == nil { if err == nil {
err = p.Err() err = p.Err()
} }
if err == nil {
for metric, lset := range sl.samplesInPreviousScrape {
if _, ok := samplesScraped[metric]; !ok {
// Sample no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case errSeriesDropped:
continue
default:
break
}
}
}
}
if err != nil { if err != nil {
app.Rollback() app.Rollback()
return total, 0, err return total, 0, err
@ -582,6 +612,7 @@ loop:
if err := app.Commit(); err != nil { if err := app.Commit(); err != nil {
return total, 0, err return total, 0, err
} }
sl.samplesInPreviousScrape = samplesScraped
return total, added, nil return total, added, nil
} }
@ -621,7 +652,7 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a
} }
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
ref, ok := sl.cache[s] ref, ok := sl.refCache[s]
if ok { if ok {
if err := app.AddFast(ref, t, v); err == nil { if err := app.AddFast(ref, t, v); err == nil {
@ -637,7 +668,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
if err != nil { if err != nil {
return err return err
} }
sl.cache[s] = ref sl.refCache[s] = ref
return nil return nil
} }

View file

@ -18,10 +18,10 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"math"
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
@ -442,7 +442,8 @@ func TestScrapeLoopAppend(t *testing.T) {
sl := &scrapeLoop{ sl := &scrapeLoop{
appender: func() storage.Appender { return app }, appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} }, reportAppender: func() storage.Appender { return nopAppender{} },
cache: map[string]uint64{}, refCache: map[string]uint64{},
lsetCache: map[uint64]lsetCacheEntry{},
} }
now := time.Now() now := time.Now()