diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 4961a968dd..05aadb3ea4 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -27,16 +27,16 @@ func (a nopAppendable) Appender() (storage.Appender, error) { type nopAppender struct{} -func (a nopAppender) Add(labels.Labels, int64, float64) (string, error) { return "", nil } -func (a nopAppender) AddFast(string, int64, float64) error { return nil } -func (a nopAppender) Commit() error { return nil } -func (a nopAppender) Rollback() error { return nil } +func (a nopAppender) Add(labels.Labels, int64, float64) (string, error) { return "", nil } +func (a nopAppender) AddFast(labels.Labels, string, int64, float64) error { return nil } +func (a nopAppender) Commit() error { return nil } +func (a nopAppender) Rollback() error { return nil } type collectResultAppender struct { result []sample } -func (a *collectResultAppender) AddFast(ref string, t int64, v float64) error { +func (a *collectResultAppender) AddFast(m labels.Labels, ref string, t int64, v float64) error { // Not implemented. return storage.ErrNotFound } diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 73873e98c6..ebfc3b4c02 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -751,8 +751,10 @@ loop: } ref, ok := sl.cache.getRef(yoloString(met)) + var lset labels.Labels + p.Metric(&lset) if ok { - switch err = app.AddFast(ref, t, v); err { + switch err = app.AddFast(lset, ref, t, v); err { case nil: if tp == nil { e := sl.cache.lsets[ref] @@ -945,10 +947,12 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v // Suffix s with the invalid \xff unicode rune to avoid collisions // with scraped metrics. s2 := s + "\xff" - + met := labels.Labels{ + labels.Label{Name: labels.MetricName, Value: s}, + } ref, ok := sl.cache.getRef(s2) if ok { - err := app.AddFast(ref, t, v) + err := app.AddFast(met, ref, t, v) switch err { case nil: return nil @@ -962,9 +966,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v return err } } - met := labels.Labels{ - labels.Label{Name: labels.MetricName, Value: s}, - } + ref, err := app.Add(met, t, v) switch err { case nil: diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 7ef8f6a818..5b4e23b554 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -885,8 +885,8 @@ func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (string, e } } -func (app *errorAppender) AddFast(ref string, t int64, v float64) error { - return app.collectResultAppender.AddFast(ref, t, v) +func (app *errorAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error { + return app.collectResultAppender.AddFast(lset, ref, t, v) } func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) { diff --git a/retrieval/target.go b/retrieval/target.go index 2949074a65..f4f1e4609d 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -212,14 +212,14 @@ func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (string, e return ref, nil } -func (app *limitAppender) AddFast(ref string, t int64, v float64) error { +func (app *limitAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error { if !value.IsStaleNaN(v) { app.i++ if app.i > app.limit { return errSampleLimit } } - if err := app.Appender.AddFast(ref, t, v); err != nil { + if err := app.Appender.AddFast(lset, ref, t, v); err != nil { return err } return nil @@ -243,11 +243,11 @@ func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (strin return ref, nil } -func (app *timeLimitAppender) AddFast(ref string, t int64, v float64) error { +func (app *timeLimitAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error { if t > app.maxTime { return storage.ErrOutOfBounds } - if err := app.Appender.AddFast(ref, t, v); err != nil { + if err := app.Appender.AddFast(lset, ref, t, v); err != nil { return err } return nil diff --git a/storage/fanout.go b/storage/fanout.go index 56e6980739..9572f96d46 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -87,9 +87,13 @@ func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (string, error return "", nil } -func (f *fanoutAppender) AddFast(ref string, t int64, v float64) error { - // TODO this is a cheat, and causes us to fall back to slow path even for local writes. - return ErrNotFound +func (f *fanoutAppender) AddFast(l labels.Labels, ref string, t int64, v float64) error { + for _, appender := range f.appenders { + if err := appender.AddFast(l, ref, t, v); err != nil { + return err + } + } + return nil } func (f *fanoutAppender) Commit() error { diff --git a/storage/interface.go b/storage/interface.go index a1b5221a28..cb07728673 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -56,7 +56,7 @@ type Querier interface { type Appender interface { Add(l labels.Labels, t int64, v float64) (string, error) - AddFast(ref string, t int64, v float64) error + AddFast(l labels.Labels, ref string, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error diff --git a/storage/remote/write.go b/storage/remote/write.go index f69561bc74..087ef8ca49 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -44,8 +44,9 @@ func labelsToMetric(ls labels.Labels) model.Metric { return metric } -func (*Storage) AddFast(ref string, t int64, v float64) error { - return storage.ErrNotFound +func (s *Storage) AddFast(l labels.Labels, _ string, t int64, v float64) error { + _, err := s.Add(l, t, v) + return err } func (*Storage) Commit() error { diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 09021fc5c0..63c4d52190 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -135,7 +135,7 @@ func (a appender) Add(lset labels.Labels, t int64, v float64) (string, error) { return ref, err } -func (a appender) AddFast(ref string, t int64, v float64) error { +func (a appender) AddFast(_ labels.Labels, ref string, t int64, v float64) error { err := a.a.AddFast(ref, t, v) switch errors.Cause(err) {