Initilise scraped sample map, and rename to series map.

This commit is contained in:
Brian Brazil 2017-05-16 15:12:40 +01:00
parent bf38963118
commit 0920972f79

View file

@ -427,9 +427,9 @@ type scrapeLoop struct {
reportAppender func() storage.Appender
// TODO: Keep only the values from the last scrape to avoid a memory leak.
refCache map[string]uint64 // Parsed string to ref.
lsetCache map[uint64]lsetCacheEntry // Ref to labelset and string
samplesInPreviousScrape map[string]labels.Labels
refCache map[string]uint64 // Parsed string to ref.
lsetCache map[uint64]lsetCacheEntry // Ref to labelset and string
seriesInPreviousScrape map[string]labels.Labels
ctx context.Context
scrapeCtx context.Context
@ -503,8 +503,8 @@ mainLoop:
if err == nil {
b = buf.Bytes()
} else if errc != nil {
errc <- err
}
errc <- err
}
// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
if total, added, err = sl.append(b, start); err != nil {
@ -612,12 +612,12 @@ func (s samples) Less(i, j int) bool {
func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err error) {
var (
app = sl.appender()
p = textparse.New(b)
defTime = timestamp.FromTime(ts)
samplesScraped = map[string]labels.Labels{}
numOutOfOrder = 0
numDuplicates = 0
app = sl.appender()
p = textparse.New(b)
defTime = timestamp.FromTime(ts)
seriesScraped = make(map[string]labels.Labels, len(sl.seriesInPreviousScrape))
numOutOfOrder = 0
numDuplicates = 0
)
loop:
@ -635,7 +635,7 @@ loop:
if ok {
switch err = app.AddFast(ref, t, v); err {
case nil:
samplesScraped[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset
seriesScraped[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset
case storage.ErrNotFound:
ok = false
case errSeriesDropped:
@ -683,7 +683,7 @@ loop:
sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str}
if tp == nil {
// Bypass staleness logic if there is an explicit timestamp.
samplesScraped[str] = lset
seriesScraped[str] = lset
}
}
added++
@ -698,9 +698,9 @@ loop:
sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
}
if err == nil {
for metric, lset := range sl.samplesInPreviousScrape {
if _, ok := samplesScraped[metric]; !ok {
// Sample no longer exposed, mark it stale.
for metric, lset := range sl.seriesInPreviousScrape {
if _, ok := seriesScraped[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err {
case nil:
@ -725,7 +725,7 @@ loop:
if err := app.Commit(); err != nil {
return total, 0, err
}
sl.samplesInPreviousScrape = samplesScraped
sl.seriesInPreviousScrape = seriesScraped
return total, added, nil
}