retrieval: fix memory leak and consumption for caches

This commit is contained in:
Fabian Reinartz 2017-05-26 08:44:24 +02:00
parent ab0ce4a8d9
commit a83014f53c
3 changed files with 72 additions and 42 deletions

View file

@ -99,7 +99,7 @@ func (p *Parser) Err() error {
} }
// Metric writes the labels of the current sample into the passed labels. // Metric writes the labels of the current sample into the passed labels.
func (p *Parser) Metric(l *labels.Labels) { func (p *Parser) Metric(l *labels.Labels) string {
// Allocate the full immutable string immediately, so we just // Allocate the full immutable string immediately, so we just
// have to create references on it below. // have to create references on it below.
s := string(p.l.b[p.l.mstart:p.l.mend]) s := string(p.l.b[p.l.mstart:p.l.mend])
@ -118,6 +118,8 @@ func (p *Parser) Metric(l *labels.Labels) {
} }
sort.Sort((*l)[1:]) sort.Sort((*l)[1:])
return s
} }
func yoloString(b []byte) string { func yoloString(b []byte) string {

View file

@ -416,25 +416,31 @@ type loop interface {
type lsetCacheEntry struct { type lsetCacheEntry struct {
lset labels.Labels lset labels.Labels
str string hash uint64
}
type refEntry struct {
ref string
lastIter uint64
} }
type scrapeLoop struct { type scrapeLoop struct {
scraper scraper scraper scraper
l log.Logger l log.Logger
iter uint64 // scrape iteration
appender func() storage.Appender appender func() storage.Appender
reportAppender func() storage.Appender reportAppender func() storage.Appender
// TODO: Keep only the values from the last scrape to avoid a memory leak. refCache map[string]*refEntry // Parsed string to ref.
refCache map[string]string // Parsed string to ref. lsetCache map[string]*lsetCacheEntry // Ref to labelset and string
lsetCache map[string]lsetCacheEntry // Ref to labelset and string
// seriesCur and seriesPrev store the labels of series that were seen // seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape. // in the current and previous scrape.
// We hold two maps and swap them out to save allocations. // We hold two maps and swap them out to save allocations.
seriesCur map[string]labels.Labels seriesCur map[uint64]labels.Labels
seriesPrev map[string]labels.Labels seriesPrev map[uint64]labels.Labels
ctx context.Context ctx context.Context
scrapeCtx context.Context scrapeCtx context.Context
@ -450,10 +456,10 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag
scraper: sc, scraper: sc,
appender: app, appender: app,
reportAppender: reportApp, reportAppender: reportApp,
refCache: map[string]string{}, refCache: map[string]*refEntry{},
lsetCache: map[string]lsetCacheEntry{}, lsetCache: map[string]*lsetCacheEntry{},
seriesCur: map[string]labels.Labels{}, seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[string]labels.Labels{}, seriesPrev: map[uint64]labels.Labels{},
stopped: make(chan struct{}), stopped: make(chan struct{}),
ctx: ctx, ctx: ctx,
l: l, l: l,
@ -481,6 +487,8 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
mainLoop: mainLoop:
for { for {
sl.iter++
buf.Reset() buf.Reset()
select { select {
case <-sl.ctx.Done(): case <-sl.ctx.Done():
@ -526,6 +534,16 @@ mainLoop:
sl.report(start, time.Since(start), total, added, err) sl.report(start, time.Since(start), total, added, err)
last = start last = start
// refCache and lsetCache may grow over time through series churn
// or multiple string representation of the same metric. Clean up entries
// that haven't appeared in the last scrape.
for s, e := range sl.refCache {
if e.lastIter < sl.iter {
delete(sl.refCache, s)
delete(sl.lsetCache, e.ref)
}
}
select { select {
case <-sl.ctx.Done(): case <-sl.ctx.Done():
close(sl.stopped) close(sl.stopped)
@ -637,13 +655,17 @@ loop:
} }
mets := yoloString(met) mets := yoloString(met)
ref, ok := sl.refCache[mets] re, ok := sl.refCache[mets]
if ok { if ok {
switch err = app.AddFast(ref, t, v); err { re.lastIter = sl.iter
switch err = app.AddFast(re.ref, t, v); err {
case nil: case nil:
if tp == nil { if tp == nil {
e := sl.lsetCache[re.ref]
// Bypass staleness logic if there is an explicit timestamp. // Bypass staleness logic if there is an explicit timestamp.
sl.seriesCur[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset sl.seriesCur[e.hash] = e.lset
} }
case storage.ErrNotFound: case storage.ErrNotFound:
ok = false ok = false
@ -652,10 +674,10 @@ loop:
continue continue
case storage.ErrOutOfOrderSample: case storage.ErrOutOfOrderSample:
sl.l.With("timeseries", string(met)).Debug("Out of order sample") sl.l.With("timeseries", string(met)).Debug("Out of order sample")
numOutOfOrder += 1 numOutOfOrder++
continue continue
case storage.ErrDuplicateSampleForTimestamp: case storage.ErrDuplicateSampleForTimestamp:
numDuplicates += 1 numDuplicates++
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
continue continue
default: default:
@ -664,8 +686,9 @@ loop:
} }
if !ok { if !ok {
var lset labels.Labels var lset labels.Labels
p.Metric(&lset) mets = p.Metric(&lset)
var ref string
ref, err = app.Add(lset, t, v) ref, err = app.Add(lset, t, v)
// TODO(fabxc): also add a dropped-cache? // TODO(fabxc): also add a dropped-cache?
switch err { switch err {
@ -676,24 +699,27 @@ loop:
case storage.ErrOutOfOrderSample: case storage.ErrOutOfOrderSample:
err = nil err = nil
sl.l.With("timeseries", string(met)).Debug("Out of order sample") sl.l.With("timeseries", string(met)).Debug("Out of order sample")
numOutOfOrder += 1 numOutOfOrder++
continue continue
case storage.ErrDuplicateSampleForTimestamp: case storage.ErrDuplicateSampleForTimestamp:
err = nil err = nil
numDuplicates += 1 numDuplicates++
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
continue continue
default: default:
break loop break loop
} }
// Allocate a real string.
mets = string(met) sl.refCache[mets] = &refEntry{ref: ref, lastIter: sl.iter}
sl.refCache[mets] = ref
str := lset.String() // mets is the raw string the metric was ingested as and ambigious as it might
sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str} // not be sorted. Construct the authoritative string for the label set.
h := lset.Hash()
sl.lsetCache[ref] = &lsetCacheEntry{lset: lset, hash: h}
if tp == nil { if tp == nil {
// Bypass staleness logic if there is an explicit timestamp. // Bypass staleness logic if there is an explicit timestamp.
sl.seriesCur[str] = lset sl.seriesCur[h] = lset
} }
} }
added++ added++
@ -807,10 +833,12 @@ func (sl *scrapeLoop) reportStale(start time.Time) error {
} }
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
ref, ok := sl.refCache[s] re, ok := sl.refCache[s]
if ok { if ok {
err := app.AddFast(ref, t, v) re.lastIter = sl.iter
err := app.AddFast(re.ref, t, v)
switch err { switch err {
case nil: case nil:
return nil return nil
@ -830,7 +858,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
ref, err := app.Add(met, t, v) ref, err := app.Add(met, t, v)
switch err { switch err {
case nil: case nil:
sl.refCache[s] = ref sl.refCache[s] = &refEntry{ref: ref, lastIter: sl.iter}
return nil return nil
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
return nil return nil

View file

@ -604,10 +604,10 @@ func TestScrapeLoopAppend(t *testing.T) {
sl := &scrapeLoop{ sl := &scrapeLoop{
appender: func() storage.Appender { return app }, appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} }, reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{}, refCache: map[string]*refEntry{},
lsetCache: map[string]lsetCacheEntry{}, lsetCache: map[string]*lsetCacheEntry{},
seriesCur: map[string]labels.Labels{}, seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[string]labels.Labels{}, seriesPrev: map[uint64]labels.Labels{},
} }
now := time.Now() now := time.Now()
@ -645,10 +645,10 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
sl := &scrapeLoop{ sl := &scrapeLoop{
appender: func() storage.Appender { return app }, appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} }, reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{}, refCache: map[string]*refEntry{},
lsetCache: map[string]lsetCacheEntry{}, lsetCache: map[string]*lsetCacheEntry{},
seriesCur: map[string]labels.Labels{}, seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[string]labels.Labels{}, seriesPrev: map[uint64]labels.Labels{},
} }
now := time.Now() now := time.Now()
@ -691,8 +691,8 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
sl := &scrapeLoop{ sl := &scrapeLoop{
appender: func() storage.Appender { return app }, appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} }, reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{}, refCache: map[string]*refEntry{},
lsetCache: map[string]lsetCacheEntry{}, lsetCache: map[string]*lsetCacheEntry{},
} }
now := time.Now() now := time.Now()
@ -740,10 +740,10 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
sl := &scrapeLoop{ sl := &scrapeLoop{
appender: func() storage.Appender { return app }, appender: func() storage.Appender { return app },
reportAppender: func() storage.Appender { return nopAppender{} }, reportAppender: func() storage.Appender { return nopAppender{} },
refCache: map[string]string{}, refCache: map[string]*refEntry{},
lsetCache: map[string]lsetCacheEntry{}, lsetCache: map[string]*lsetCacheEntry{},
seriesCur: map[string]labels.Labels{}, seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[string]labels.Labels{}, seriesPrev: map[uint64]labels.Labels{},
l: log.Base(), l: log.Base(),
} }