Stop metrics that are 10mins ahead from now

Fixes #2893

Signed-off-by: Goutham Veeramachaneni <goutham@boomerangcommerce.com>
This commit is contained in:
Goutham Veeramachaneni 2017-07-04 14:55:33 +02:00
parent 3069bd3996
commit 643c5837a0
3 changed files with 85 additions and 14 deletions

View file

@ -113,7 +113,8 @@ type scrapePool struct {
// Constructor for new scrape loops. This is settable for testing convenience. // Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop
logger log.Logger logger log.Logger
maxAheadTime time.Duration
} }
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool { func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
@ -133,14 +134,15 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
} }
return &scrapePool{ return &scrapePool{
appendable: app, appendable: app,
config: cfg, config: cfg,
ctx: ctx, ctx: ctx,
client: client, client: client,
targets: map[uint64]*Target{}, targets: map[uint64]*Target{},
loops: map[uint64]loop{}, loops: map[uint64]loop{},
newLoop: newLoop, newLoop: newLoop,
logger: logger, logger: logger,
maxAheadTime: 10 * time.Minute,
} }
} }
@ -310,6 +312,13 @@ func (sp *scrapePool) sampleAppender(target *Target) storage.Appender {
panic(err) panic(err)
} }
if sp.maxAheadTime > 0 {
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(sp.maxAheadTime)),
}
}
// The limit is applied after metrics are potentially dropped via relabeling. // The limit is applied after metrics are potentially dropped via relabeling.
if sp.config.SampleLimit > 0 { if sp.config.SampleLimit > 0 {
app = &limitAppender{ app = &limitAppender{
@ -810,6 +819,7 @@ loop:
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
continue continue
case storage.ErrOutOfBounds: case storage.ErrOutOfBounds:
err = nil
numOutOfBounds++ numOutOfBounds++
sl.l.With("timeseries", string(met)).Debug("Out of bounds metric") sl.l.With("timeseries", string(met)).Debug("Out of bounds metric")
continue continue

View file

@ -273,6 +273,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
app := &nopAppendable{} app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app, log.Base()) sp := newScrapePool(context.Background(), cfg, app, log.Base())
sp.maxAheadTime = 0
cfg.HonorLabels = false cfg.HonorLabels = false
wrapped := sp.sampleAppender(target) wrapped := sp.sampleAppender(target)
@ -872,19 +873,23 @@ type errorAppender struct {
} }
func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
if lset.Get(model.MetricNameLabel) == "out_of_order" { switch lset.Get(model.MetricNameLabel) {
case "out_of_order":
return "", storage.ErrOutOfOrderSample return "", storage.ErrOutOfOrderSample
} else if lset.Get(model.MetricNameLabel) == "amend" { case "amend":
return "", storage.ErrDuplicateSampleForTimestamp return "", storage.ErrDuplicateSampleForTimestamp
case "out_of_bounds":
return "", storage.ErrOutOfBounds
default:
return app.collectResultAppender.Add(lset, t, v)
} }
return app.collectResultAppender.Add(lset, t, v)
} }
func (app *errorAppender) AddFast(ref string, t int64, v float64) error { func (app *errorAppender) AddFast(ref string, t int64, v float64) error {
return app.collectResultAppender.AddFast(ref, t, v) return app.collectResultAppender.AddFast(ref, t, v)
} }
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) { func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
app := &errorAppender{} app := &errorAppender{}
sl := newScrapeLoop(context.Background(), nil, sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app }, func() storage.Appender { return app },
@ -893,7 +898,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
) )
now := time.Unix(1, 0) now := time.Unix(1, 0)
_, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\n"), now) _, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), now)
if err != nil { if err != nil {
t.Fatalf("Unexpected append error: %s", err) t.Fatalf("Unexpected append error: %s", err)
} }
@ -907,7 +912,35 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
if !reflect.DeepEqual(want, app.result) { if !reflect.DeepEqual(want, app.result) {
t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result) t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result)
} }
}
func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
app := &collectResultAppender{}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender {
return &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)),
}
},
func() storage.Appender { return nopAppender{} },
nil,
)
now := time.Now().Add(20 * time.Minute)
total, added, err := sl.append([]byte("normal 1\n"), now)
if total != 1 {
t.Error("expected 1 metric")
return
}
if added != 0 {
t.Error("no metric should be added")
}
if err != nil {
t.Errorf("expect no error, got %s", err.Error())
}
} }
func TestTargetScraperScrapeOK(t *testing.T) { func TestTargetScraperScrapeOK(t *testing.T) {

View file

@ -225,6 +225,34 @@ func (app *limitAppender) AddFast(ref string, t int64, v float64) error {
return nil return nil
} }
type timeLimitAppender struct {
storage.Appender
maxTime int64
}
func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
if t > app.maxTime {
return "", storage.ErrOutOfBounds
}
ref, err := app.Appender.Add(lset, t, v)
if err != nil {
return "", err
}
return ref, nil
}
func (app *timeLimitAppender) AddFast(ref string, t int64, v float64) error {
if t > app.maxTime {
return storage.ErrOutOfBounds
}
if err := app.Appender.AddFast(ref, t, v); err != nil {
return err
}
return nil
}
// Merges the ingested sample's metric with the label set. On a collision the // Merges the ingested sample's metric with the label set. On a collision the
// value of the ingested label is stored in a label prefixed with 'exported_'. // value of the ingested label is stored in a label prefixed with 'exported_'.
type ruleLabelsAppender struct { type ruleLabelsAppender struct {