diff --git a/CHANGELOG.md b/CHANGELOG.md index 51fb9048eb..faf9ade792 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## 2.17.2 / 2020-04-20 + +* [BUGFIX] Federation: Register federation metrics #7081 +* [BUGFIX] PromQL: Fix panic in parser error handling #7132 +* [BUGFIX] Rules: Fix reloads hanging when deleting a rule group that is being evaluated #7138 +* [BUGFIX] TSDB: Fix a memory leak when prometheus starts with an empty TSDB WAL #7135 +* [BUGFIX] TSDB: Make isolation more robust to panics in web handlers #7129 #7136 + ## 2.17.1 / 2020-03-26 * [BUGFIX] TSDB: Fix query performance regression that increased memory and CPU usage #7051 diff --git a/VERSION b/VERSION index 3f8eb714d0..94dc0ec910 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.17.1 +2.17.2 diff --git a/rules/manager.go b/rules/manager.go index e480e3be15..1e549fef67 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -236,7 +236,8 @@ type Group struct { shouldRestore bool - done chan bool + markStale bool + done chan struct{} terminated chan struct{} managerDone chan struct{} @@ -277,7 +278,7 @@ func NewGroup(o GroupOptions) *Group { shouldRestore: o.ShouldRestore, opts: o.Opts, seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), - done: make(chan bool), + done: make(chan struct{}), managerDone: o.done, terminated: make(chan struct{}), logger: log.With(o.Opts.Logger, "group", o.Name), @@ -333,8 +334,8 @@ func (g *Group) run(ctx context.Context) { tick := time.NewTicker(g.interval) defer tick.Stop() - makeStale := func(s bool) { - if !s { + defer func() { + if !g.markStale { return } go func(now time.Time) { @@ -354,7 +355,7 @@ func (g *Group) run(ctx context.Context) { g.cleanupStaleSeries(now) } }(time.Now()) - } + }() iter() if g.shouldRestore { @@ -363,8 +364,7 @@ func (g *Group) run(ctx context.Context) { // we might not have enough data scraped, and recording rules would not // have updated the latest values, on which some alerts might depend. select { - case stale := <-g.done: - makeStale(stale) + case <-g.done: return case <-tick.C: missed := (time.Since(evalTimestamp) / g.interval) - 1 @@ -382,13 +382,11 @@ func (g *Group) run(ctx context.Context) { for { select { - case stale := <-g.done: - makeStale(stale) + case <-g.done: return default: select { - case stale := <-g.done: - makeStale(stale) + case <-g.done: return case <-tick.C: missed := (time.Since(evalTimestamp) / g.interval) - 1 @@ -403,11 +401,6 @@ func (g *Group) run(ctx context.Context) { } } -func (g *Group) stopAndMakeStale() { - g.done <- true - <-g.terminated -} - func (g *Group) stop() { close(g.done) <-g.terminated @@ -950,7 +943,8 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels wg.Add(len(m.groups)) for n, oldg := range m.groups { go func(n string, g *Group) { - g.stopAndMakeStale() + g.markStale = true + g.stop() if m := g.metrics; m != nil { m.evalTotal.DeleteLabelValues(n) m.evalFailures.DeleteLabelValues(n) diff --git a/tsdb/head.go b/tsdb/head.go index fa5946e487..8da6fc2be0 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -805,8 +805,6 @@ func (h *RangeHead) Meta() BlockMeta { type initAppender struct { app storage.Appender head *Head - - appendID, cleanupAppendIDsBelow uint64 } func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -814,7 +812,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro return a.app.Add(lset, t, v) } a.head.initTime(t) - a.app = a.head.appender(a.appendID, a.cleanupAppendIDsBelow) + a.app = a.head.appender() return a.app.Add(lset, t, v) } @@ -844,22 +842,20 @@ func (a *initAppender) Rollback() error { func (h *Head) Appender() storage.Appender { h.metrics.activeAppenders.Inc() - appendID := h.iso.newAppendID() - cleanupAppendIDsBelow := h.iso.lowWatermark() - // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. if h.MinTime() == math.MaxInt64 { return &initAppender{ - head: h, - appendID: appendID, - cleanupAppendIDsBelow: cleanupAppendIDsBelow, + head: h, } } - return h.appender(appendID, cleanupAppendIDsBelow) + return h.appender() } -func (h *Head) appender(appendID, cleanupAppendIDsBelow uint64) *headAppender { +func (h *Head) appender() *headAppender { + appendID := h.iso.newAppendID() + cleanupAppendIDsBelow := h.iso.lowWatermark() + return &headAppender{ head: h, // Set the minimum valid time to whichever is greater the head min valid time or the compaction window. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index d6eb4b3180..ed4a4e51dc 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1078,7 +1078,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { h.initTime(0) - app := h.appender(0, 0) + app := h.appender() lset := labels.FromStrings("a", "1") _, err = app.Add(lset, 2100, 1) testutil.Ok(t, err) @@ -1106,7 +1106,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { h.initTime(0) - app := h.appender(0, 0) + app := h.appender() lset := labels.FromStrings("a", "1") _, err = app.Add(lset, 2100, 1) testutil.Ok(t, err) @@ -1368,14 +1368,16 @@ func TestMemSeriesIsolation(t *testing.T) { return -1 } - i := 0 + i := 1 for ; i <= 1000; i++ { var app storage.Appender // To initialize bounds. if hb.MinTime() == math.MaxInt64 { - app = &initAppender{head: hb, appendID: uint64(i), cleanupAppendIDsBelow: 0} + app = &initAppender{head: hb} } else { - app = hb.appender(uint64(i), 0) + a := hb.appender() + a.cleanupAppendIDsBelow = 0 + app = a } _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) @@ -1394,7 +1396,8 @@ func TestMemSeriesIsolation(t *testing.T) { testutil.Equals(t, 999, lastValue(999)) // Cleanup appendIDs below 500. - app := hb.appender(uint64(i), 500) + app := hb.appender() + app.cleanupAppendIDsBelow = 500 _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1412,7 +1415,8 @@ func TestMemSeriesIsolation(t *testing.T) { // Cleanup appendIDs below 1000, which means the sample buffer is // the only thing with appendIDs. - app = hb.appender(uint64(i), 1000) + app = hb.appender() + app.cleanupAppendIDsBelow = 1000 _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1425,7 +1429,8 @@ func TestMemSeriesIsolation(t *testing.T) { i++ // Cleanup appendIDs below 1001, but with a rollback. - app = hb.appender(uint64(i), 1001) + app = hb.appender() + app.cleanupAppendIDsBelow = 1001 _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) @@ -1515,6 +1520,22 @@ func TestHeadSeriesChunkRace(t *testing.T) { } } +func TestIsolationWithoutAdd(t *testing.T) { + hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + testutil.Ok(t, err) + defer hb.Close() + + app := hb.Appender() + testutil.Ok(t, app.Commit()) + + app = hb.Appender() + _, err = app.Add(labels.FromStrings("foo", "baz"), 1, 1) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + testutil.Equals(t, hb.iso.lastAppendID, hb.iso.lowWatermark(), "High watermark should be equal to the low watermark") +} + func testHeadSeriesChunkRace(t *testing.T) { h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize) testutil.Ok(t, err) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 5d6222ad1c..b94c919449 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -318,7 +318,7 @@ func (api *API) options(r *http.Request) apiFuncResult { return apiFuncResult{nil, nil, nil, nil} } -func (api *API) query(r *http.Request) apiFuncResult { +func (api *API) query(r *http.Request) (result apiFuncResult) { ts, err := parseTimeParam(r, "time", api.now()) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} @@ -341,6 +341,14 @@ func (api *API) query(r *http.Request) apiFuncResult { err = errors.Wrapf(err, "invalid parameter 'query'") return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } + // From now on, we must only return with a finalizer in the result (to + // be called by the caller) or call qry.Close ourselves (which is + // required in the case of a panic). + defer func() { + if result.finalizer == nil { + qry.Close() + } + }() ctx = httputil.ContextFromRequest(ctx, r) @@ -362,7 +370,7 @@ func (api *API) query(r *http.Request) apiFuncResult { }, nil, res.Warnings, qry.Close} } -func (api *API) queryRange(r *http.Request) apiFuncResult { +func (api *API) queryRange(r *http.Request) (result apiFuncResult) { start, err := parseTime(r.FormValue("start")) if err != nil { err = errors.Wrapf(err, "invalid parameter 'start'") @@ -413,6 +421,14 @@ func (api *API) queryRange(r *http.Request) apiFuncResult { if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } + // From now on, we must only return with a finalizer in the result (to + // be called by the caller) or call qry.Close ourselves (which is + // required in the case of a panic). + defer func() { + if result.finalizer == nil { + qry.Close() + } + }() ctx = httputil.ContextFromRequest(ctx, r) @@ -465,7 +481,7 @@ func (api *API) labelNames(r *http.Request) apiFuncResult { return apiFuncResult{names, nil, warnings, nil} } -func (api *API) labelValues(r *http.Request) apiFuncResult { +func (api *API) labelValues(r *http.Request) (result apiFuncResult) { ctx := r.Context() name := route.Param(ctx, "name") @@ -476,7 +492,14 @@ func (api *API) labelValues(r *http.Request) apiFuncResult { if err != nil { return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } - + // From now on, we must only return with a finalizer in the result (to + // be called by the caller) or call q.Close ourselves (which is required + // in the case of a panic). + defer func() { + if result.finalizer == nil { + q.Close() + } + }() closer := func() { q.Close() } @@ -497,7 +520,7 @@ var ( maxTimeFormatted = maxTime.Format(time.RFC3339Nano) ) -func (api *API) series(r *http.Request) apiFuncResult { +func (api *API) series(r *http.Request) (result apiFuncResult) { if err := r.ParseForm(); err != nil { return apiFuncResult{nil, &apiError{errorBadData, errors.Wrapf(err, "error parsing form values")}, nil, nil} } @@ -527,7 +550,17 @@ func (api *API) series(r *http.Request) apiFuncResult { if err != nil { return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } - defer q.Close() + // From now on, we must only return with a finalizer in the result (to + // be called by the caller) or call q.Close ourselves (which is required + // in the case of a panic). + defer func() { + if result.finalizer == nil { + q.Close() + } + }() + closer := func() { + q.Close() + } var sets []storage.SeriesSet var warnings storage.Warnings @@ -535,7 +568,7 @@ func (api *API) series(r *http.Request) apiFuncResult { s, wrn, err := q.Select(false, nil, mset...) warnings = append(warnings, wrn...) if err != nil { - return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil} + return apiFuncResult{nil, &apiError{errorExec, err}, warnings, closer} } sets = append(sets, s) } @@ -546,10 +579,10 @@ func (api *API) series(r *http.Request) apiFuncResult { metrics = append(metrics, set.At().Labels()) } if set.Err() != nil { - return apiFuncResult{nil, &apiError{errorExec, set.Err()}, warnings, nil} + return apiFuncResult{nil, &apiError{errorExec, set.Err()}, warnings, closer} } - return apiFuncResult{metrics, nil, warnings, nil} + return apiFuncResult{metrics, nil, warnings, closer} } func (api *API) dropSeries(r *http.Request) apiFuncResult {