From bf0055536d8463a27725ce9a35ccba55e97d6e5b Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Thu, 12 Mar 2020 07:43:24 +0100 Subject: [PATCH 01/10] Release 2.17.0-rc.0 (#6966) * Release 2.17.0-rc.0 Signed-off-by: Julien Pivotto --- CHANGELOG.md | 24 ++++++++++++++++++++++++ VERSION | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f8745c22..6d5640f4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,27 @@ +## 2.17.0-rc.0 / 2020-03-12 + +This release implements isolation in TSDB. API queries and recording rules are +guaranteed to only see full scrapes and full recording rules. This comes with a +certain overhead in resource usage. Depending on the situation, there might be +some increase in memory usage, CPU usage, or query latency. + +* [FEATURE] TSDB: Support isolation #6841 +* [ENHANCEMENT] PromQL: Allow more keywords as metric names #6933 +* [ENHANCEMENT] React UI: Add normalization of localhost URLs in targets page #6794 +* [ENHANCEMENT] Remote read: Read from remote storage concurrently #6770 +* [ENHANCEMENT] Rules: Mark deleted rule series as stale after a reload #6745 +* [ENHANCEMENT] Scrape: Log scrape append failures as debug rather than warn #6852 +* [ENHANCEMENT] TSDB: Improve query performance for queries that partially hit the head #6676 +* [ENHANCEMENT] Consul SD: Expose service health as meta label #5313 +* [ENHANCEMENT] EC2 SD: Expose EC2 instance lifecycle as meta label #6914 +* [ENHANCEMENT] Kubernetes SD: Expose service type as meta label for K8s service role #6684 +* [ENHANCEMENT] Kubernetes SD: Expose label_selector and field_selector #6807 +* [ENHANCEMENT] Openstack SD: Expose hypervisor id as meta label #6962 +* [BUGFIX] PromQL: Do not escape HTML-like chars in query log #6834 #6795 +* [BUGFIX] React UI: Fix data table matrix values #6896 +* [BUGFIX] React UI: Fix new targets page not loading when using non-ASCII characters #6892 +* [BUGFIX] Scrape: Prevent removal of metric names upon relabeling #6891 + ## 2.16.0 / 2020-02-13 * [FEATURE] React UI: Support local timezone on /graph #6692 diff --git a/VERSION b/VERSION index 752490696..bf5e7d97b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.16.0 +2.17.0-rc.0 From d80b0810c1360d05904357386baee584f3b071bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Rabenstein?= Date: Fri, 13 Mar 2020 20:54:47 +0100 Subject: [PATCH 02/10] Move crucial actions to defer (#6918) With defer having less of a performance penalty, there is no reason not to do those crucial operations via defer. Context: With isolation in place, if we forget to Commit/Rollback, the low watermark will get stuck forever. The current code should not have any bugs, but moving to defer helps to avoid future bugs. This is also moving the `closeAppend` in the `Commit` implementation itself to defer. If logging to the WAL fails, we would have missed the `closeAppend`. Signed-off-by: beorn7 --- rules/manager.go | 12 +++--- scrape/scrape.go | 103 +++++++++++++++++++++++++---------------------- tsdb/head.go | 7 ++-- 3 files changed, 65 insertions(+), 57 deletions(-) diff --git a/rules/manager.go b/rules/manager.go index 50c4c8b7f..33b10d9cb 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -590,6 +590,13 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { app := g.opts.Appendable.Appender() seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) + defer func() { + if err := app.Commit(); err != nil { + level.Warn(g.logger).Log("msg", "rule sample appending failed", "err", err) + return + } + g.seriesInPreviousEval[i] = seriesReturned + }() for _, s := range vector { if _, err := app.Add(s.Metric, s.T, s.V); err != nil { switch err { @@ -627,11 +634,6 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { } } } - if err := app.Commit(); err != nil { - level.Warn(g.logger).Log("msg", "rule sample appending failed", "err", err) - } else { - g.seriesInPreviousEval[i] = seriesReturned - } }(i, rule) } g.cleanupStaleSeries(ts) diff --git a/scrape/scrape.go b/scrape/scrape.go index e0077e9fa..601fca553 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1069,6 +1069,19 @@ func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, ) var sampleLimitErr error + defer func() { + if err != nil { + app.Rollback() + return + } + if err = app.Commit(); err != nil { + return + } + // Only perform cache cleaning if the scrape was not empty. + // An empty scrape (usually) is used to indicate a failed scrape. + sl.cache.iterDone(len(b) > 0) + }() + loop: for { var et textparse.Entry @@ -1229,19 +1242,7 @@ loop: return err == nil }) } - if err != nil { - app.Rollback() - return total, added, seriesAdded, err - } - if err := app.Commit(); err != nil { - return total, added, seriesAdded, err - } - - // Only perform cache cleaning if the scrape was not empty. - // An empty scrape (usually) is used to indicate a failed scrape. - sl.cache.iterDone(len(b) > 0) - - return total, added, seriesAdded, nil + return } func yoloString(b []byte) string { @@ -1258,67 +1259,71 @@ const ( scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff" ) -func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, err error) error { - sl.scraper.Report(start, duration, err) +func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, scrapeErr error) (err error) { + sl.scraper.Report(start, duration, scrapeErr) ts := timestamp.FromTime(start) var health float64 - if err == nil { + if scrapeErr == nil { health = 1 } app := sl.appender() + defer func() { + if err != nil { + app.Rollback() + return + } + err = app.Commit() + }() - if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { + return } - if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil { + return } - if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil { + return } - if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil { + return } - if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil { + return } - return app.Commit() + return } -func (sl *scrapeLoop) reportStale(start time.Time) error { +func (sl *scrapeLoop) reportStale(start time.Time) (err error) { ts := timestamp.FromTime(start) app := sl.appender() + defer func() { + if err != nil { + app.Rollback() + return + } + err = app.Commit() + }() stale := math.Float64frombits(value.StaleNaN) - if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil { + return } - if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil { - app.Rollback() - return err + if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil { + return } - return app.Commit() + return } func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { diff --git a/tsdb/head.go b/tsdb/head.go index 0de043000..197fbe287 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1079,7 +1079,10 @@ func (a *headAppender) Commit() error { } func (a *headAppender) Rollback() error { - a.head.metrics.activeAppenders.Dec() + defer a.head.metrics.activeAppenders.Dec() + defer a.head.iso.closeAppend(a.appendID) + defer a.head.putSeriesBuffer(a.sampleSeries) + var series *memSeries for i := range a.samples { series = a.sampleSeries[i] @@ -1090,8 +1093,6 @@ func (a *headAppender) Rollback() error { } a.head.putAppendBuffer(a.samples) a.samples = nil - a.head.putSeriesBuffer(a.sampleSeries) - a.head.iso.closeAppend(a.appendID) // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. From 3128875ff4de14527502be787e3f3eb480a11fc9 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 13 Mar 2020 13:57:29 -0600 Subject: [PATCH 03/10] Fix panic when a remote read store errors (#6975) Signed-off-by: Chris Marchbanks --- storage/fanout.go | 1 + storage/fanout/fanout_test.go | 93 ++++++++++++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 2 deletions(-) diff --git a/storage/fanout.go b/storage/fanout.go index bad6a8df1..f55f4c8dc 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -262,6 +262,7 @@ func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Ma } else { priErr = qryResult.selectError } + continue } seriesSets = append(seriesSets, qryResult.set) } diff --git a/storage/fanout/fanout_test.go b/storage/fanout/fanout_test.go index cede30cca..7afb04202 100644 --- a/storage/fanout/fanout_test.go +++ b/storage/fanout/fanout_test.go @@ -15,6 +15,7 @@ package storage import ( "context" + "errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -26,7 +27,6 @@ import ( ) func TestSelectSorted(t *testing.T) { - inputLabel := labels.FromStrings(model.MetricNameLabel, "a") outputLabel := labels.FromStrings(model.MetricNameLabel, "a") @@ -97,5 +97,94 @@ func TestSelectSorted(t *testing.T) { testutil.Equals(t, labelsResult, outputLabel) testutil.Equals(t, inputTotalSize, len(result)) - +} + +func TestFanoutErrors(t *testing.T) { + workingStorage := teststorage.New(t) + defer workingStorage.Close() + + cases := []struct { + primary storage.Storage + secondary storage.Storage + warnings storage.Warnings + err error + }{ + { + primary: workingStorage, + secondary: errStorage{}, + warnings: storage.Warnings{errSelect}, + err: nil, + }, + { + primary: errStorage{}, + secondary: workingStorage, + warnings: nil, + err: errSelect, + }, + } + + for _, tc := range cases { + fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary) + + querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) + testutil.Ok(t, err) + defer querier.Close() + + matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b") + ss, warnings, err := querier.SelectSorted(nil, matcher) + testutil.Equals(t, tc.err, err) + testutil.Equals(t, tc.warnings, warnings) + + // Only test series iteration if there are no errors. + if err != nil { + continue + } + + for ss.Next() { + ss.At() + } + testutil.Ok(t, ss.Err()) + } +} + +var errSelect = errors.New("select error") + +type errStorage struct{} + +func (errStorage) Querier(_ context.Context, _, _ int64) (storage.Querier, error) { + return errQuerier{}, nil +} + +func (errStorage) Appender() storage.Appender { + return nil +} + +func (errStorage) StartTime() (int64, error) { + return 0, nil +} + +func (errStorage) Close() error { + return nil +} + +type errQuerier struct{} + +func (errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return nil, nil, errSelect +} + +func (errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return nil, nil, errSelect +} + +func (errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { + return nil, nil, errors.New("label values error") +} + +func (errQuerier) LabelNames() ([]string, storage.Warnings, error) { + return nil, nil, errors.New("label names error") +} + +func (errQuerier) Close() error { + return nil } From a28fa010eecffb946f762213b0e2d528959908cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Rabenstein?= Date: Sat, 14 Mar 2020 09:03:40 +0100 Subject: [PATCH 04/10] TSDB: Extract parts out of populateSeries (#6983) This addresses fabxc's TODO. More importantly, it now properly defers the querier.Close(). Previously, if a panic happened after creation of the querier within the populateSeries function, querier.Close() was never called. The latter was responsible for #6977. Signed-off-by: beorn7 --- promql/engine.go | 45 ++++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 2d35b90b8..ba52501b4 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -502,15 +502,16 @@ func durationMilliseconds(d time.Duration) int64 { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.EvalStmt) (parser.Value, storage.Warnings, error) { prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime) - querier, warnings, err := ng.populateSeries(ctxPrepare, query.queryable, s) - prepareSpanTimer.Finish() - - // XXX(fabxc): the querier returned by populateSeries might be instantiated - // we must not return without closing irrespective of the error. - // TODO: make this semantically saner. - if querier != nil { - defer querier.Close() + mint := ng.findMinTime(s) + querier, err := query.queryable.Querier(ctxPrepare, timestamp.FromTime(mint), timestamp.FromTime(s.End)) + if err != nil { + prepareSpanTimer.Finish() + return nil, nil, err } + defer querier.Close() + + warnings, err := ng.populateSeries(ctxPrepare, querier, s) + prepareSpanTimer.Finish() if err != nil { return nil, warnings, err @@ -616,7 +617,7 @@ func (ng *Engine) cumulativeSubqueryOffset(path []parser.Node) time.Duration { return subqOffset } -func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *parser.EvalStmt) (storage.Querier, storage.Warnings, error) { +func (ng *Engine) findMinTime(s *parser.EvalStmt) time.Time { var maxOffset time.Duration parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { subqOffset := ng.cumulativeSubqueryOffset(path) @@ -638,20 +639,18 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa } return nil }) + return s.Start.Add(-maxOffset) +} - mint := s.Start.Add(-maxOffset) - - querier, err := q.Querier(ctx, timestamp.FromTime(mint), timestamp.FromTime(s.End)) - if err != nil { - return nil, nil, err - } - - var warnings storage.Warnings - - // Whenever a MatrixSelector is evaluated this variable is set to the corresponding range. - // The evaluation of the VectorSelector inside then evaluates the given range and unsets - // the variable. - var evalRange time.Duration +func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s *parser.EvalStmt) (storage.Warnings, error) { + var ( + // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. + // The evaluation of the VectorSelector inside then evaluates the given range and unsets + // the variable. + evalRange time.Duration + warnings storage.Warnings + err error + ) parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { var set storage.SeriesSet @@ -703,7 +702,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa } return nil }) - return querier, warnings, err + return warnings, err } // extractFuncFromPath walks up the path and searches for the first instance of From ef138a11c86952fb17636cd22d57951de823f942 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Sun, 15 Mar 2020 00:42:14 +0100 Subject: [PATCH 05/10] Update client_golang to fix promhttp error handling (#6986) Fixes #6139 Signed-off-by: Julien Pivotto --- go.mod | 2 +- go.sum | 4 +-- .../prometheus/promhttp/delegator.go | 10 ++++--- .../client_golang/prometheus/promhttp/http.go | 27 +++++++++---------- vendor/modules.txt | 2 +- 5 files changed, 24 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 1d02853c3..437c7c5af 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/opentracing/opentracing-go v1.1.0 github.com/pkg/errors v0.9.1 github.com/prometheus/alertmanager v0.20.0 - github.com/prometheus/client_golang v1.5.0 + github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da diff --git a/go.sum b/go.sum index 3412ca796..efd013974 100644 --- a/go.sum +++ b/go.sum @@ -459,8 +459,8 @@ github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.2.1/go.mod h1:XMU6Z2MjaRKVu/dC1qupJI9SiNkDYzz3xecMgSW/F+U= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= -github.com/prometheus/client_golang v1.5.0 h1:Ctq0iGpCmr3jeP77kbF2UxgvRwzWWz+4Bh9/vJTyg1A= -github.com/prometheus/client_golang v1.5.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= +github.com/prometheus/client_golang v1.5.1 h1:bdHYieyGlH+6OLEk2YQha8THib30KP0/yD0YH9m6xcA= +github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go index d1354b101..5070e72e2 100644 --- a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go +++ b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go @@ -53,12 +53,16 @@ func (r *responseWriterDelegator) Written() int64 { } func (r *responseWriterDelegator) WriteHeader(code int) { + if r.observeWriteHeader != nil && !r.wroteHeader { + // Only call observeWriteHeader for the 1st time. It's a bug if + // WriteHeader is called more than once, but we want to protect + // against it here. Note that we still delegate the WriteHeader + // to the original ResponseWriter to not mask the bug from it. + r.observeWriteHeader(code) + } r.status = code r.wroteHeader = true r.ResponseWriter.WriteHeader(code) - if r.observeWriteHeader != nil { - r.observeWriteHeader(code) - } } func (r *responseWriterDelegator) Write(b []byte) (int, error) { diff --git a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go index b0ee4678e..5e1c4546c 100644 --- a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go +++ b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go @@ -167,15 +167,12 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { enc := expfmt.NewEncoder(w, contentType) - var lastErr error - // handleError handles the error according to opts.ErrorHandling // and returns true if we have to abort after the handling. handleError := func(err error) bool { if err == nil { return false } - lastErr = err if opts.ErrorLog != nil { opts.ErrorLog.Println("error encoding and sending metric family:", err) } @@ -184,7 +181,10 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { case PanicOnError: panic(err) case HTTPErrorOnError: - httpError(rsp, err) + // We cannot really send an HTTP error at this + // point because we most likely have written + // something to rsp already. But at least we can + // stop sending. return true } // Do nothing in all other cases, including ContinueOnError. @@ -202,10 +202,6 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { return } } - - if lastErr != nil { - httpError(rsp, lastErr) - } }) if opts.Timeout <= 0 { @@ -276,7 +272,12 @@ type HandlerErrorHandling int // errors are encountered. const ( // Serve an HTTP status code 500 upon the first error - // encountered. Report the error message in the body. + // encountered. Report the error message in the body. Note that HTTP + // errors cannot be served anymore once the beginning of a regular + // payload has been sent. Thus, in the (unlikely) case that encoding the + // payload into the negotiated wire format fails, serving the response + // will simply be aborted. Set an ErrorLog in HandlerOpts to detect + // those errors. HTTPErrorOnError HandlerErrorHandling = iota // Ignore errors and try to serve as many metrics as possible. However, // if no metrics can be served, serve an HTTP status code 500 and the @@ -365,11 +366,9 @@ func gzipAccepted(header http.Header) bool { } // httpError removes any content-encoding header and then calls http.Error with -// the provided error and http.StatusInternalServerErrer. Error contents is -// supposed to be uncompressed plain text. However, same as with a plain -// http.Error, any header settings will be void if the header has already been -// sent. The error message will still be written to the writer, but it will -// probably be of limited use. +// the provided error and http.StatusInternalServerError. Error contents is +// supposed to be uncompressed plain text. Same as with a plain http.Error, this +// must not be called if the header or any payload has already been sent. func httpError(rsp http.ResponseWriter, err error) { rsp.Header().Del(contentEncodingHeader) http.Error( diff --git a/vendor/modules.txt b/vendor/modules.txt index 4872d8522..e4ff4a4c9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -272,7 +272,7 @@ github.com/opentracing/opentracing-go/log github.com/pkg/errors # github.com/prometheus/alertmanager v0.20.0 github.com/prometheus/alertmanager/api/v2/models -# github.com/prometheus/client_golang v1.5.0 +# github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_golang/api github.com/prometheus/client_golang/api/prometheus/v1 github.com/prometheus/client_golang/prometheus From 0f9e78bd88660129f030037c0ea59aae31e016da Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Mon, 16 Mar 2020 13:59:22 +0100 Subject: [PATCH 06/10] tsdb: fix races around head chunks (#6985) * tsdb: fix races around head chunks Signed-off-by: Julien Pivotto --- tsdb/head.go | 6 +++++- tsdb/head_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/tsdb/head.go b/tsdb/head.go index 197fbe287..d4b8a7997 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1116,7 +1116,9 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { for p.Next() { series := h.series.getByID(p.At()) + series.RLock() t0, t1 := series.minTime(), series.maxTime() + series.RUnlock() if t0 == math.MinInt64 || t1 == math.MinInt64 { continue } @@ -1424,9 +1426,11 @@ func (h *headIndexReader) Postings(name string, values ...string) (index.Posting level.Debug(h.head.logger).Log("msg", "looked up series not found") continue } + s.RLock() if s.minTime() <= h.maxt && s.maxTime() >= h.mint { filtered = append(filtered, p.At()) } + s.RUnlock() } if p.Err() != nil { return nil, p.Err() @@ -1733,7 +1737,7 @@ func (s sample) V() float64 { // memSeries is the in-memory representation of a series. None of its methods // are goroutine safe and it is the caller's responsibility to lock it. type memSeries struct { - sync.Mutex + sync.RWMutex ref uint64 lset labels.Labels diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 967f86665..def09fe2c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -23,6 +23,7 @@ import ( "path/filepath" "sort" "strconv" + "sync" "testing" "github.com/pkg/errors" @@ -1336,6 +1337,7 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) { h, err := NewHead(nil, nil, nil, 15, DefaultStripeSize) testutil.Ok(t, err) defer h.Close() + testutil.Ok(t, h.Init(0)) app := h.Appender() s1, err := app.Add(labels.FromStrings("foo1", "bar"), 2, 0) @@ -1594,3 +1596,42 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { testutil.Assert(t, ok, "Series append failed.") testutil.Equals(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") } + +func TestHeadSeriesChunkRace(t *testing.T) { + for i := 0; i < 1000; i++ { + testHeadSeriesChunkRace(t) + } +} + +func testHeadSeriesChunkRace(t *testing.T) { + h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize) + testutil.Ok(t, err) + defer h.Close() + testutil.Ok(t, h.Init(0)) + app := h.Appender() + + s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0) + testutil.Ok(t, err) + for ts := int64(6); ts < 11; ts++ { + err = app.AddFast(s2, ts, 0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + var wg sync.WaitGroup + matcher := labels.MustNewMatcher(labels.MatchEqual, "", "") + q, err := NewBlockQuerier(h, 18, 22) + testutil.Ok(t, err) + defer q.Close() + + wg.Add(1) + go func() { + h.updateMinMaxTime(20, 25) + h.gc() + wg.Done() + }() + ss, _, err := q.Select(nil, matcher) + testutil.Ok(t, err) + testutil.Ok(t, ss.Err()) + wg.Wait() +} From ccc511456a7d533cd783ff41df092ef174e6e9d7 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Mon, 16 Mar 2020 16:59:12 +0100 Subject: [PATCH 07/10] Release 2.17.0-rc.1 (#6991) Signed-off-by: Julien Pivotto --- CHANGELOG.md | 2 +- VERSION | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d5640f4e..3ca69d3ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 2.17.0-rc.0 / 2020-03-12 +## 2.17.0-rc.1 / 2020-03-16 This release implements isolation in TSDB. API queries and recording rules are guaranteed to only see full scrapes and full recording rules. This comes with a diff --git a/VERSION b/VERSION index bf5e7d97b..f4ca5334e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.17.0-rc.0 +2.17.0-rc.1 From 8907ba6235cac3a533ef7dc93c909a4eb52f939c Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Mon, 16 Mar 2020 22:52:02 +0100 Subject: [PATCH 08/10] Make TSDB use storage errors This fixes #6992, which was introduced by #6777. There was an intermediate component which translated TSDB errors into storage errors, but that component was deleted and this bug went unnoticed, until we were watching at the Prombench results. Without this, scrape will fail instead of dropping samples or using "Add" when the series have been garbage collected. Signed-off-by: Julien Pivotto --- rules/manager.go | 6 +++--- scrape/scrape.go | 10 +++++----- scrape/scrape_test.go | 32 ++++++++++++++++++++++++++++++++ tsdb/block.go | 2 +- tsdb/cmd/tsdb/main.go | 3 ++- tsdb/db_test.go | 8 ++++---- tsdb/head.go | 33 +++++++++------------------------ tsdb/head_test.go | 6 +++--- tsdb/querier.go | 4 ++-- tsdb/querier_test.go | 2 +- 10 files changed, 62 insertions(+), 44 deletions(-) diff --git a/rules/manager.go b/rules/manager.go index 33b10d9cb..425fb01a8 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -599,7 +599,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { }() for _, s := range vector { if _, err := app.Add(s.Metric, s.T, s.V); err != nil { - switch err { + switch errors.Cause(err) { case storage.ErrOutOfOrderSample: numOutOfOrder++ level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) @@ -624,7 +624,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { if _, ok := seriesReturned[metric]; !ok { // Series no longer exposed, mark it stale. _, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if series @@ -647,7 +647,7 @@ func (g *Group) cleanupStaleSeries(ts time.Time) { for _, s := range g.staleSeries { // Rule that produced series no longer configured, mark it stale. _, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if series diff --git a/scrape/scrape.go b/scrape/scrape.go index 601fca553..fbabba247 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1121,7 +1121,7 @@ loop: } ce, ok := sl.cache.get(yoloString(met)) if ok { - switch err = app.AddFast(ce.ref, t, v); err { + switch err = app.AddFast(ce.ref, t, v); errors.Cause(err) { case nil: if tp == nil { sl.cache.trackStaleness(ce.hash, ce.lset) @@ -1176,7 +1176,7 @@ loop: var ref uint64 ref, err = app.Add(lset, t, v) - switch err { + switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample: err = nil @@ -1233,7 +1233,7 @@ loop: sl.cache.forEachStale(func(lset labels.Labels) bool { // Series no longer exposed, mark it stale. _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) - switch err { + switch errors.Cause(err) { case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if a target // goes away and comes back again with a new scrape loop. @@ -1330,7 +1330,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ce, ok := sl.cache.get(s) if ok { err := app.AddFast(ce.ref, t, v) - switch err { + switch errors.Cause(err) { case nil: return nil case storage.ErrNotFound: @@ -1354,7 +1354,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v lset = sl.reportSampleMutator(lset) ref, err := app.Add(lset, t, v) - switch err { + switch errors.Cause(err) { case nil: sl.cache.addRef(s, ref, lset, hash) return nil diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index d05b3c400..608c956b8 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1807,3 +1807,35 @@ func TestReuseScrapeCache(t *testing.T) { } } } + +func TestScrapeAddFast(t *testing.T) { + s := teststorage.New(t) + defer s.Close() + + app := s.Appender() + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + &testScraper{}, + nil, nil, + nopMutator, + nopMutator, + func() storage.Appender { return app }, + nil, + 0, + true, + ) + defer cancel() + + _, _, _, err := sl.append([]byte("up 1\n"), "", time.Time{}) + testutil.Ok(t, err) + + // Poison the cache. There is just one entry, and one series in the + // storage. Changing the ref will create a 'not found' error. + for _, v := range sl.getCache().series { + v.ref++ + } + + _, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second)) + testutil.Ok(t, err) +} diff --git a/tsdb/block.go b/tsdb/block.go index 536940ec9..bcdec5c88 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -77,7 +77,7 @@ type IndexReader interface { // Series populates the given labels and chunk metas for the series identified // by the reference. - // Returns ErrNotFound if the ref does not resolve to a known series. + // Returns storage.ErrNotFound if the ref does not resolve to a known series. Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error // LabelNames returns all the unique label names present in the index in sorted order. diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index 1febb4004..458ed5351 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -34,6 +34,7 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" @@ -309,7 +310,7 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in s.ref = &ref } else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil { - if errors.Cause(err) != tsdb.ErrNotFound { + if errors.Cause(err) != storage.ErrNotFound { panic(err) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 2493ffea6..e824b0c40 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -215,7 +215,7 @@ func TestDBAppenderAddRef(t *testing.T) { testutil.Ok(t, err) err = app2.AddFast(9999999, 1, 1) - testutil.Equals(t, ErrNotFound, errors.Cause(err)) + testutil.Equals(t, storage.ErrNotFound, errors.Cause(err)) testutil.Ok(t, app2.Commit()) @@ -363,7 +363,7 @@ func TestAmendDatapointCausesError(t *testing.T) { app = db.Appender() _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) - testutil.Equals(t, ErrAmendSample, err) + testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) testutil.Ok(t, app.Rollback()) } @@ -398,7 +398,7 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { app = db.Appender() _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002)) - testutil.Equals(t, ErrAmendSample, err) + testutil.Equals(t, storage.ErrDuplicateSampleForTimestamp, err) } func TestEmptyLabelsetCausesError(t *testing.T) { @@ -1660,7 +1660,7 @@ func TestNoEmptyBlocks(t *testing.T) { app = db.Appender() _, err = app.Add(defaultLabel, 1, 0) - testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") + testutil.Assert(t, err == storage.ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") // Adding new blocks. currentTime := db.Head().MaxTime() diff --git a/tsdb/head.go b/tsdb/head.go index d4b8a7997..bbdfead9d 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -39,21 +39,6 @@ import ( ) var ( - // ErrNotFound is returned if a looked up resource was not found. - ErrNotFound = errors.Errorf("not found") - - // ErrOutOfOrderSample is returned if an appended sample has a - // timestamp smaller than the most recent sample. - ErrOutOfOrderSample = errors.New("out of order sample") - - // ErrAmendSample is returned if an appended sample has the same timestamp - // as the most recent sample but a different value. - ErrAmendSample = errors.New("amending sample") - - // ErrOutOfBounds is returned if an appended sample is out of the - // writable time range. - ErrOutOfBounds = errors.New("out of bounds") - // ErrInvalidSample is returned if an appended sample is not valid and can't // be ingested. ErrInvalidSample = errors.New("invalid sample") @@ -841,7 +826,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *initAppender) AddFast(ref uint64, t int64, v float64) error { if a.app == nil { - return ErrNotFound + return storage.ErrNotFound } return a.app.AddFast(ref, t, v) } @@ -954,7 +939,7 @@ type headAppender struct { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if t < a.minValidTime { - return 0, ErrOutOfBounds + return 0, storage.ErrOutOfBounds } // Ensure no empty labels have gotten through. @@ -980,12 +965,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if t < a.minValidTime { - return ErrOutOfBounds + return storage.ErrOutOfBounds } s := a.head.series.getByID(ref) if s == nil { - return errors.Wrap(ErrNotFound, "unknown series") + return errors.Wrap(storage.ErrNotFound, "unknown series") } s.Lock() if err := s.appendable(t, v); err != nil { @@ -1318,7 +1303,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s := h.head.series.getByID(sid) // This means that the series has been garbage collected. if s == nil { - return nil, ErrNotFound + return nil, storage.ErrNotFound } s.Lock() @@ -1328,7 +1313,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { // the specified range. if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() - return nil, ErrNotFound + return nil, storage.ErrNotFound } s.Unlock() @@ -1474,7 +1459,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks if s == nil { h.head.metrics.seriesNotFound.Inc() - return ErrNotFound + return storage.ErrNotFound } *lbls = append((*lbls)[:0], s.lset...) @@ -1818,12 +1803,12 @@ func (s *memSeries) appendable(t int64, v float64) error { return nil } if t < c.maxTime { - return ErrOutOfOrderSample + return storage.ErrOutOfOrderSample } // We are allowing exact duplicates as we can encounter them in valid cases // like federation and erroring out at that time would be extremely noisy. if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { - return ErrAmendSample + return storage.ErrDuplicateSampleForTimestamp } return nil } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index def09fe2c..f7aa9975c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1012,7 +1012,7 @@ func TestGCChunkAccess(t *testing.T) { testutil.Ok(t, h.Truncate(1500)) // Remove a chunk. _, err = cr.Chunk(chunks[0].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) } @@ -1066,9 +1066,9 @@ func TestGCSeriesAccess(t *testing.T) { testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1)) _, err = cr.Chunk(chunks[0].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) _, err = cr.Chunk(chunks[1].Ref) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, storage.ErrNotFound, err) } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { diff --git a/tsdb/querier.go b/tsdb/querier.go index a5181c3f6..83f7b6ad6 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -739,7 +739,7 @@ func (s *baseChunkSeries) Next() bool { ref := s.p.At() if err := s.index.Series(ref, &lset, &chkMetas); err != nil { // Postings may be stale. Skip if no underlying series exists. - if errors.Cause(err) == ErrNotFound { + if errors.Cause(err) == storage.ErrNotFound { continue } s.err = err @@ -819,7 +819,7 @@ func (s *populatedChunkSeries) Next() bool { c.Chunk, s.err = s.chunks.Chunk(c.Ref) if s.err != nil { // This means that the chunk has be garbage collected. Remove it from the list. - if s.err == ErrNotFound { + if s.err == storage.ErrNotFound { s.err = nil // Delete in-place. s.chks = append(chks[:j], chks[j+1:]...) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 778081737..52e316051 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1491,7 +1491,7 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings { func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { s, ok := m.series[ref] if !ok { - return ErrNotFound + return storage.ErrNotFound } *lset = append((*lset)[:0], s.l...) *chks = append((*chks)[:0], s.chunks...) From 7f860171265e6942a0b07c5d126130f37af041a6 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Wed, 18 Mar 2020 17:32:31 +0100 Subject: [PATCH 09/10] Release 2.17.0-rc.2 (#7003) Signed-off-by: Julien Pivotto --- CHANGELOG.md | 3 ++- VERSION | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ca69d3ba..48f75bbe8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 2.17.0-rc.1 / 2020-03-16 +## 2.17.0-rc.2 / 2020-03-18 This release implements isolation in TSDB. API queries and recording rules are guaranteed to only see full scrapes and full recording rules. This comes with a @@ -21,6 +21,7 @@ some increase in memory usage, CPU usage, or query latency. * [BUGFIX] React UI: Fix data table matrix values #6896 * [BUGFIX] React UI: Fix new targets page not loading when using non-ASCII characters #6892 * [BUGFIX] Scrape: Prevent removal of metric names upon relabeling #6891 +* [BUGFIX] Scrape: Fix 'superfluous response.WriteHeader call' errors when scrape fails under some circonstances #6986 ## 2.16.0 / 2020-02-13 diff --git a/VERSION b/VERSION index f4ca5334e..4fd6e593b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.17.0-rc.1 +2.17.0-rc.2 From 7920305e4e5e05cc98c729ebaf4fbd92cc7afe19 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Wed, 18 Mar 2020 22:36:59 +0100 Subject: [PATCH 10/10] release 2.17.0-rc.3 (#7006) Signed-off-by: Julien Pivotto --- CHANGELOG.md | 2 +- VERSION | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48f75bbe8..5dcfcadfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 2.17.0-rc.2 / 2020-03-18 +## 2.17.0-rc.3 / 2020-03-18 This release implements isolation in TSDB. API queries and recording rules are guaranteed to only see full scrapes and full recording rules. This comes with a diff --git a/VERSION b/VERSION index 4fd6e593b..9dba67016 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.17.0-rc.2 +2.17.0-rc.3