Merge pull request #7204 from prometheus/release-2.18

[Merge Without Squash] Merge release-2.18 back to master.
This commit is contained in:
Bartlomiej Plotka 2020-05-05 18:58:45 +01:00 committed by GitHub
commit 532f7bbac9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 288 additions and 173 deletions

View file

@ -1,3 +1,16 @@
## 2.18.0 / 2020-05-05
* [CHANGE] Federation: Only use local TSDB for federation (ignore remote read). #7077
* [CHANGE] Rules: `rule_evaluations_total` and `rule_evaluation_failures_total` have a `rule_group` label now. #7094
* [FEATURE] Tracing: Added experimental Jaeger support #7148
* [ENHANCEMENT] TSDB: Significantly reduce WAL size kept around after a block cut. #7098
* [ENHANCEMENT] Discovery: Add `architecture` meta label for EC2. #7000
* [BUGFIX] UI: Fixed wrong MinTime reported by /status. #7182
* [BUGFIX] React UI: Fixed multiselect legend on OSX. #6880
* [BUGFIX] Remote Write: Fixed blocked resharding edge case. #7122
* [BUGFIX] Remote Write: Fixed remote write not updating on relabel configs change. #7073
## 2.17.2 / 2020-04-20 ## 2.17.2 / 2020-04-20
* [BUGFIX] Federation: Register federation metrics #7081 * [BUGFIX] Federation: Register federation metrics #7081

View file

@ -1 +1 @@
2.17.2 2.18.0

View file

@ -387,9 +387,9 @@ func main() {
) )
cfg.web.Context = ctxWeb cfg.web.Context = ctxWeb
cfg.web.TSDB = localStorage.Get
cfg.web.TSDBRetentionDuration = cfg.tsdb.RetentionDuration cfg.web.TSDBRetentionDuration = cfg.tsdb.RetentionDuration
cfg.web.TSDBMaxBytes = cfg.tsdb.MaxBytes cfg.web.TSDBMaxBytes = cfg.tsdb.MaxBytes
cfg.web.LocalStorage = localStorage
cfg.web.Storage = fanoutStorage cfg.web.Storage = fanoutStorage
cfg.web.QueryEngine = queryEngine cfg.web.QueryEngine = queryEngine
cfg.web.ScrapeManager = scrapeManager cfg.web.ScrapeManager = scrapeManager
@ -921,14 +921,7 @@ func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) {
s.startTimeMargin = startTimeMargin s.startTimeMargin = startTimeMargin
} }
// Get the storage. // get is internal, you should use readyStorage as the front implementation layer.
func (s *readyStorage) Get() *tsdb.DB {
if x := s.get(); x != nil {
return x
}
return nil
}
func (s *readyStorage) get() *tsdb.DB { func (s *readyStorage) get() *tsdb.DB {
s.mtx.RLock() s.mtx.RLock()
x := s.db x := s.db
@ -983,12 +976,44 @@ func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }
// Close implements the Storage interface. // Close implements the Storage interface.
func (s *readyStorage) Close() error { func (s *readyStorage) Close() error {
if x := s.Get(); x != nil { if x := s.get(); x != nil {
return x.Close() return x.Close()
} }
return nil return nil
} }
// CleanTombstones implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) CleanTombstones() error {
if x := s.get(); x != nil {
return x.CleanTombstones()
}
return tsdb.ErrNotReady
}
// Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
if x := s.get(); x != nil {
return x.Delete(mint, maxt, ms...)
}
return tsdb.ErrNotReady
}
// Snapshot implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Snapshot(dir string, withHead bool) error {
if x := s.get(); x != nil {
return x.Snapshot(dir, withHead)
}
return tsdb.ErrNotReady
}
// Stats implements the api_v1.TSDBAdminStats interface.
func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) {
if x := s.get(); x != nil {
return x.Head().Stats(statsByLabelName), nil
}
return nil, tsdb.ErrNotReady
}
// tsdbOptions is tsdb.Option version with defined units. // tsdbOptions is tsdb.Option version with defined units.
// This is required as tsdb.Option fields are unit agnostic (time). // This is required as tsdb.Option fields are unit agnostic (time).
type tsdbOptions struct { type tsdbOptions struct {

View file

@ -25,10 +25,10 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -54,7 +54,7 @@ type Test struct {
cmds []testCommand cmds []testCommand
storage storage.Storage storage *teststorage.TestStorage
queryEngine *Engine queryEngine *Engine
context context.Context context context.Context
@ -101,6 +101,11 @@ func (t *Test) Storage() storage.Storage {
return t.storage return t.storage
} }
// TSDB returns test's TSDB.
func (t *Test) TSDB() *tsdb.DB {
return t.storage.DB
}
func raise(line int, format string, v ...interface{}) error { func raise(line int, format string, v ...interface{}) error {
return &parser.ParseErr{ return &parser.ParseErr{
LineOffset: line, LineOffset: line,

View file

@ -39,6 +39,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/fileutil"
// Load the package into main to make sure minium Go version is met. // Load the package into main to make sure minium Go version is met.
_ "github.com/prometheus/prometheus/tsdb/goversion" _ "github.com/prometheus/prometheus/tsdb/goversion"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wal"

View file

@ -751,6 +751,23 @@ func (h *Head) initTime(t int64) (initialized bool) {
return true return true
} }
type Stats struct {
NumSeries uint64
MinTime, MaxTime int64
IndexPostingStats *index.PostingsStats
}
// Stats returns important current HEAD statistics. Note that it is expensive to
// calculate these.
func (h *Head) Stats(statsByLabelName string) *Stats {
return &Stats{
NumSeries: h.NumSeries(),
MaxTime: h.MaxTime(),
MinTime: h.MinTime(),
IndexPostingStats: h.PostingsCardinalityStats(statsByLabelName),
}
}
type RangeHead struct { type RangeHead struct {
head *Head head *Head
mint, maxt int64 mint, maxt int64

View file

@ -18,14 +18,13 @@ import (
"os" "os"
"time" "time"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
// New returns a new storage for testing purposes // New returns a new TestStorage for testing purposes
// that removes all associated files on closing. // that removes all associated files on closing.
func New(t testutil.T) storage.Storage { func New(t testutil.T) *TestStorage {
dir, err := ioutil.TempDir("", "test_storage") dir, err := ioutil.TempDir("", "test_storage")
if err != nil { if err != nil {
t.Fatalf("Opening test dir failed: %s", err) t.Fatalf("Opening test dir failed: %s", err)
@ -40,16 +39,16 @@ func New(t testutil.T) storage.Storage {
if err != nil { if err != nil {
t.Fatalf("Opening test storage failed: %s", err) t.Fatalf("Opening test storage failed: %s", err)
} }
return testStorage{Storage: db, dir: dir} return &TestStorage{DB: db, dir: dir}
} }
type testStorage struct { type TestStorage struct {
storage.Storage *tsdb.DB
dir string dir string
} }
func (s testStorage) Close() error { func (s TestStorage) Close() error {
if err := s.Storage.Close(); err != nil { if err := s.DB.Close(); err != nil {
return err return err
} }
return os.RemoveAll(s.dir) return os.RemoveAll(s.dir)

View file

@ -37,7 +37,6 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/common/route" "github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/gate" "github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
@ -159,13 +158,13 @@ type apiFuncResult struct {
type apiFunc func(r *http.Request) apiFuncResult type apiFunc func(r *http.Request) apiFuncResult
// TSDBAdmin defines the tsdb interfaces used by the v1 API for admin operations. // TSDBAdminStats defines the tsdb interfaces used by the v1 API for admin operations as well as statistics.
type TSDBAdmin interface { type TSDBAdminStats interface {
CleanTombstones() error CleanTombstones() error
Delete(mint, maxt int64, ms ...*labels.Matcher) error Delete(mint, maxt int64, ms ...*labels.Matcher) error
Dir() string
Snapshot(dir string, withHead bool) error Snapshot(dir string, withHead bool) error
Head() *tsdb.Head
Stats(statsByLabelName string) (*tsdb.Stats, error)
} }
// API can register a set of endpoints in a router and handle // API can register a set of endpoints in a router and handle
@ -183,7 +182,8 @@ type API struct {
ready func(http.HandlerFunc) http.HandlerFunc ready func(http.HandlerFunc) http.HandlerFunc
globalURLOptions GlobalURLOptions globalURLOptions GlobalURLOptions
db func() TSDBAdmin db TSDBAdminStats
dbDir string
enableAdmin bool enableAdmin bool
logger log.Logger logger log.Logger
remoteReadSampleLimit int remoteReadSampleLimit int
@ -209,7 +209,8 @@ func NewAPI(
flagsMap map[string]string, flagsMap map[string]string,
globalURLOptions GlobalURLOptions, globalURLOptions GlobalURLOptions,
readyFunc func(http.HandlerFunc) http.HandlerFunc, readyFunc func(http.HandlerFunc) http.HandlerFunc,
db func() TSDBAdmin, db TSDBAdminStats,
dbDir string,
enableAdmin bool, enableAdmin bool,
logger log.Logger, logger log.Logger,
rr rulesRetriever, rr rulesRetriever,
@ -232,6 +233,7 @@ func NewAPI(
ready: readyFunc, ready: readyFunc,
globalURLOptions: globalURLOptions, globalURLOptions: globalURLOptions,
db: db, db: db,
dbDir: dbDir,
enableAdmin: enableAdmin, enableAdmin: enableAdmin,
rulesRetriever: rr, rulesRetriever: rr,
remoteReadSampleLimit: remoteReadSampleLimit, remoteReadSampleLimit: remoteReadSampleLimit,
@ -244,22 +246,32 @@ func NewAPI(
} }
} }
func setUnavailStatusOnTSDBNotReady(r apiFuncResult) apiFuncResult {
if r.err != nil && errors.Cause(r.err.err) == tsdb.ErrNotReady {
r.err.typ = errorUnavailable
}
return r
}
// Register the API's endpoints in the given router. // Register the API's endpoints in the given router.
func (api *API) Register(r *route.Router) { func (api *API) Register(r *route.Router) {
wrap := func(f apiFunc) http.HandlerFunc { wrap := func(f apiFunc) http.HandlerFunc {
hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
httputil.SetCORS(w, api.CORSOrigin, r) httputil.SetCORS(w, api.CORSOrigin, r)
result := f(r) result := setUnavailStatusOnTSDBNotReady(f(r))
if result.finalizer != nil { if result.finalizer != nil {
defer result.finalizer() defer result.finalizer()
} }
if result.err != nil { if result.err != nil {
api.respondError(w, result.err, result.data) api.respondError(w, result.err, result.data)
} else if result.data != nil { return
api.respond(w, result.data, result.warnings)
} else {
w.WriteHeader(http.StatusNoContent)
} }
if result.data != nil {
api.respond(w, result.data, result.warnings)
return
}
w.WriteHeader(http.StatusNoContent)
}) })
return api.ready(httputil.CompressionHandler{ return api.ready(httputil.CompressionHandler{
Handler: hf, Handler: hf,
@ -1124,29 +1136,27 @@ type tsdbStatus struct {
SeriesCountByLabelValuePair []stat `json:"seriesCountByLabelValuePair"` SeriesCountByLabelValuePair []stat `json:"seriesCountByLabelValuePair"`
} }
func (api *API) serveTSDBStatus(r *http.Request) apiFuncResult { func convertStats(stats []index.Stat) []stat {
db := api.db() result := make([]stat, 0, len(stats))
if db == nil { for _, item := range stats {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil} item := stat{Name: item.Name, Value: item.Count}
result = append(result, item)
} }
convert := func(stats []index.Stat) []stat { return result
result := make([]stat, 0, len(stats)) }
for _, item := range stats {
item := stat{Name: item.Name, Value: item.Count} func (api *API) serveTSDBStatus(*http.Request) apiFuncResult {
result = append(result, item) s, err := api.db.Stats("__name__")
} if err != nil {
return result return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
} }
posting := db.Head().PostingsCardinalityStats(model.MetricNameLabel) return apiFuncResult{tsdbStatus{
response := tsdbStatus{ SeriesCountByMetricName: convertStats(s.IndexPostingStats.CardinalityMetricsStats),
SeriesCountByMetricName: convert(posting.CardinalityMetricsStats), LabelValueCountByLabelName: convertStats(s.IndexPostingStats.CardinalityLabelStats),
LabelValueCountByLabelName: convert(posting.CardinalityLabelStats), MemoryInBytesByLabelName: convertStats(s.IndexPostingStats.LabelValueStats),
MemoryInBytesByLabelName: convert(posting.LabelValueStats), SeriesCountByLabelValuePair: convertStats(s.IndexPostingStats.LabelValuePairsStats),
SeriesCountByLabelValuePair: convert(posting.LabelValuePairsStats), }, nil, nil, nil}
}
return apiFuncResult{response, nil, nil, nil}
} }
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
@ -1322,11 +1332,6 @@ func (api *API) deleteSeries(r *http.Request) apiFuncResult {
if !api.enableAdmin { if !api.enableAdmin {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil}
} }
db := api.db()
if db == nil {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
}
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
return apiFuncResult{nil, &apiError{errorBadData, errors.Wrap(err, "error parsing form values")}, nil, nil} return apiFuncResult{nil, &apiError{errorBadData, errors.Wrap(err, "error parsing form values")}, nil, nil}
} }
@ -1348,8 +1353,7 @@ func (api *API) deleteSeries(r *http.Request) apiFuncResult {
if err != nil { if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
} }
if err := api.db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), matchers...); err != nil {
if err := db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), matchers...); err != nil {
return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil} return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
} }
} }
@ -1372,13 +1376,8 @@ func (api *API) snapshot(r *http.Request) apiFuncResult {
} }
} }
db := api.db()
if db == nil {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
}
var ( var (
snapdir = filepath.Join(db.Dir(), "snapshots") snapdir = filepath.Join(api.dbDir, "snapshots")
name = fmt.Sprintf("%s-%x", name = fmt.Sprintf("%s-%x",
time.Now().UTC().Format("20060102T150405Z0700"), time.Now().UTC().Format("20060102T150405Z0700"),
rand.Int()) rand.Int())
@ -1387,7 +1386,7 @@ func (api *API) snapshot(r *http.Request) apiFuncResult {
if err := os.MkdirAll(dir, 0777); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
return apiFuncResult{nil, &apiError{errorInternal, errors.Wrap(err, "create snapshot directory")}, nil, nil} return apiFuncResult{nil, &apiError{errorInternal, errors.Wrap(err, "create snapshot directory")}, nil, nil}
} }
if err := db.Snapshot(dir, !skipHead); err != nil { if err := api.db.Snapshot(dir, !skipHead); err != nil {
return apiFuncResult{nil, &apiError{errorInternal, errors.Wrap(err, "create snapshot")}, nil, nil} return apiFuncResult{nil, &apiError{errorInternal, errors.Wrap(err, "create snapshot")}, nil, nil}
} }
@ -1400,12 +1399,7 @@ func (api *API) cleanTombstones(r *http.Request) apiFuncResult {
if !api.enableAdmin { if !api.enableAdmin {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil}
} }
db := api.db() if err := api.db.CleanTombstones(); err != nil {
if db == nil {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil, nil}
}
if err := db.CleanTombstones(); err != nil {
return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil} return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
} }

View file

@ -1892,32 +1892,24 @@ func TestStreamReadEndpoint(t *testing.T) {
} }
type fakeDB struct { type fakeDB struct {
err error err error
closer func()
} }
func (f *fakeDB) CleanTombstones() error { return f.err } func (f *fakeDB) CleanTombstones() error { return f.err }
func (f *fakeDB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { return f.err } func (f *fakeDB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { return f.err }
func (f *fakeDB) Dir() string { func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err }
dir, _ := ioutil.TempDir("", "fakeDB") func (f *fakeDB) Stats(statsByLabelName string) (*tsdb.Stats, error) {
f.closer = func() {
os.RemoveAll(dir)
}
return dir
}
func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err }
func (f *fakeDB) Head() *tsdb.Head {
h, _ := tsdb.NewHead(nil, nil, nil, 1000, tsdb.DefaultStripeSize) h, _ := tsdb.NewHead(nil, nil, nil, 1000, tsdb.DefaultStripeSize)
return h return h.Stats(statsByLabelName), nil
} }
func TestAdminEndpoints(t *testing.T) { func TestAdminEndpoints(t *testing.T) {
tsdb, tsdbWithError := &fakeDB{}, &fakeDB{err: errors.New("some error")} tsdb, tsdbWithError, tsdbNotReady := &fakeDB{}, &fakeDB{err: errors.New("some error")}, &fakeDB{err: errors.Wrap(tsdb.ErrNotReady, "wrap")}
snapshotAPI := func(api *API) apiFunc { return api.snapshot } snapshotAPI := func(api *API) apiFunc { return api.snapshot }
cleanAPI := func(api *API) apiFunc { return api.cleanTombstones } cleanAPI := func(api *API) apiFunc { return api.cleanTombstones }
deleteAPI := func(api *API) apiFunc { return api.deleteSeries } deleteAPI := func(api *API) apiFunc { return api.deleteSeries }
for i, tc := range []struct { for _, tc := range []struct {
db *fakeDB db *fakeDB
enableAdmin bool enableAdmin bool
endpoint func(api *API) apiFunc endpoint func(api *API) apiFunc
@ -1965,7 +1957,7 @@ func TestAdminEndpoints(t *testing.T) {
errType: errorInternal, errType: errorInternal,
}, },
{ {
db: nil, db: tsdbNotReady,
enableAdmin: true, enableAdmin: true,
endpoint: snapshotAPI, endpoint: snapshotAPI,
@ -1994,7 +1986,7 @@ func TestAdminEndpoints(t *testing.T) {
errType: errorInternal, errType: errorInternal,
}, },
{ {
db: nil, db: tsdbNotReady,
enableAdmin: true, enableAdmin: true,
endpoint: cleanAPI, endpoint: cleanAPI,
@ -2064,37 +2056,31 @@ func TestAdminEndpoints(t *testing.T) {
errType: errorInternal, errType: errorInternal,
}, },
{ {
db: nil, db: tsdbNotReady,
enableAdmin: true, enableAdmin: true,
endpoint: deleteAPI, endpoint: deleteAPI,
values: map[string][]string{"match[]": {"up"}},
errType: errorUnavailable, errType: errorUnavailable,
}, },
} { } {
tc := tc tc := tc
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { t.Run("", func(t *testing.T) {
dir, _ := ioutil.TempDir("", "fakeDB")
defer testutil.Ok(t, os.RemoveAll(dir))
api := &API{ api := &API{
db: func() TSDBAdmin { db: tc.db,
if tc.db != nil { dbDir: dir,
return tc.db
}
return nil
},
ready: func(f http.HandlerFunc) http.HandlerFunc { return f }, ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
enableAdmin: tc.enableAdmin, enableAdmin: tc.enableAdmin,
} }
defer func() {
if tc.db != nil && tc.db.closer != nil {
tc.db.closer()
}
}()
endpoint := tc.endpoint(api) endpoint := tc.endpoint(api)
req, err := http.NewRequest(tc.method, fmt.Sprintf("?%s", tc.values.Encode()), nil) req, err := http.NewRequest(tc.method, fmt.Sprintf("?%s", tc.values.Encode()), nil)
if err != nil { testutil.Ok(t, err)
t.Fatalf("Error when creating test request: %s", err)
} res := setUnavailStatusOnTSDBNotReady(endpoint(req))
res := endpoint(req)
assertAPIError(t, res.err, tc.errType) assertAPIError(t, res.err, tc.errType)
}) })
} }
@ -2504,14 +2490,7 @@ func TestTSDBStatus(t *testing.T) {
} { } {
tc := tc tc := tc
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
api := &API{ api := &API{db: tc.db}
db: func() TSDBAdmin {
if tc.db != nil {
return tc.db
}
return nil
},
}
endpoint := tc.endpoint(api) endpoint := tc.endpoint(api)
req, err := http.NewRequest(tc.method, fmt.Sprintf("?%s", tc.values.Encode()), nil) req, err := http.NewRequest(tc.method, fmt.Sprintf("?%s", tc.values.Encode()), nil)
if err != nil { if err != nil {

View file

@ -26,7 +26,6 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -40,16 +39,19 @@ import (
// API encapsulates all API services. // API encapsulates all API services.
type API struct { type API struct {
enableAdmin bool enableAdmin bool
db func() *tsdb.DB db TSDBAdmin
dbDir string
} }
// New returns a new API object. // New returns a new API object.
func New( func New(
db func() *tsdb.DB, db TSDBAdmin,
dbDir string,
enableAdmin bool, enableAdmin bool,
) *API { ) *API {
return &API{ return &API{
db: db, db: db,
dbDir: dbDir,
enableAdmin: enableAdmin, enableAdmin: enableAdmin,
} }
} }
@ -57,7 +59,7 @@ func New(
// RegisterGRPC registers all API services with the given server. // RegisterGRPC registers all API services with the given server.
func (api *API) RegisterGRPC(srv *grpc.Server) { func (api *API) RegisterGRPC(srv *grpc.Server) {
if api.enableAdmin { if api.enableAdmin {
pb.RegisterAdminServer(srv, NewAdmin(api.db)) pb.RegisterAdminServer(srv, NewAdmin(api.db, api.dbDir))
} else { } else {
pb.RegisterAdminServer(srv, &AdminDisabled{}) pb.RegisterAdminServer(srv, &AdminDisabled{})
} }
@ -133,13 +135,21 @@ func (s *AdminDisabled) DeleteSeries(_ context.Context, r *pb.SeriesDeleteReques
return nil, errAdminDisabled return nil, errAdminDisabled
} }
// TSDBAdmin defines the tsdb interfaces used by the v1 API for admin operations as well as statistics.
type TSDBAdmin interface {
CleanTombstones() error
Delete(mint, maxt int64, ms ...*labels.Matcher) error
Snapshot(dir string, withHead bool) error
}
// Admin provides an administration interface to Prometheus. // Admin provides an administration interface to Prometheus.
type Admin struct { type Admin struct {
db func() *tsdb.DB db TSDBAdmin
dbDir string
} }
// NewAdmin returns a Admin server. // NewAdmin returns a Admin server.
func NewAdmin(db func() *tsdb.DB) *Admin { func NewAdmin(db TSDBAdmin, dbDir string) *Admin {
return &Admin{ return &Admin{
db: db, db: db,
} }
@ -147,12 +157,8 @@ func NewAdmin(db func() *tsdb.DB) *Admin {
// TSDBSnapshot implements pb.AdminServer. // TSDBSnapshot implements pb.AdminServer.
func (s *Admin) TSDBSnapshot(_ context.Context, req *pb.TSDBSnapshotRequest) (*pb.TSDBSnapshotResponse, error) { func (s *Admin) TSDBSnapshot(_ context.Context, req *pb.TSDBSnapshotRequest) (*pb.TSDBSnapshotResponse, error) {
db := s.db()
if db == nil {
return nil, errTSDBNotReady
}
var ( var (
snapdir = filepath.Join(db.Dir(), "snapshots") snapdir = filepath.Join(s.dbDir, "snapshots")
name = fmt.Sprintf("%s-%x", name = fmt.Sprintf("%s-%x",
time.Now().UTC().Format("20060102T150405Z0700"), time.Now().UTC().Format("20060102T150405Z0700"),
rand.Int()) rand.Int())
@ -161,7 +167,11 @@ func (s *Admin) TSDBSnapshot(_ context.Context, req *pb.TSDBSnapshotRequest) (*p
if err := os.MkdirAll(dir, 0777); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
return nil, status.Errorf(codes.Internal, "created snapshot directory: %s", err) return nil, status.Errorf(codes.Internal, "created snapshot directory: %s", err)
} }
if err := db.Snapshot(dir, !req.SkipHead); err != nil { if err := s.db.Snapshot(dir, !req.SkipHead); err != nil {
if errors.Cause(err) == tsdb.ErrNotReady {
return nil, errTSDBNotReady
}
return nil, status.Errorf(codes.Internal, "create snapshot: %s", err) return nil, status.Errorf(codes.Internal, "create snapshot: %s", err)
} }
return &pb.TSDBSnapshotResponse{Name: name}, nil return &pb.TSDBSnapshotResponse{Name: name}, nil
@ -169,12 +179,10 @@ func (s *Admin) TSDBSnapshot(_ context.Context, req *pb.TSDBSnapshotRequest) (*p
// TSDBCleanTombstones implements pb.AdminServer. // TSDBCleanTombstones implements pb.AdminServer.
func (s *Admin) TSDBCleanTombstones(_ context.Context, _ *pb.TSDBCleanTombstonesRequest) (*pb.TSDBCleanTombstonesResponse, error) { func (s *Admin) TSDBCleanTombstones(_ context.Context, _ *pb.TSDBCleanTombstonesRequest) (*pb.TSDBCleanTombstonesResponse, error) {
db := s.db() if err := s.db.CleanTombstones(); err != nil {
if db == nil { if errors.Cause(err) == tsdb.ErrNotReady {
return nil, errTSDBNotReady return nil, errTSDBNotReady
} }
if err := db.CleanTombstones(); err != nil {
return nil, status.Errorf(codes.Internal, "clean tombstones: %s", err) return nil, status.Errorf(codes.Internal, "clean tombstones: %s", err)
} }
@ -212,11 +220,10 @@ func (s *Admin) DeleteSeries(_ context.Context, r *pb.SeriesDeleteRequest) (*pb.
matchers = append(matchers, lm) matchers = append(matchers, lm)
} }
db := s.db() if err := s.db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil {
if db == nil { if errors.Cause(err) == tsdb.ErrNotReady {
return nil, errTSDBNotReady return nil, errTSDBNotReady
} }
if err := db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
return &pb.SeriesDeleteResponse{}, nil return &pb.SeriesDeleteResponse{}, nil

View file

@ -20,10 +20,12 @@ import (
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/timestamp"
@ -78,6 +80,10 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
q, err := h.localStorage.Querier(req.Context(), mint, maxt) q, err := h.localStorage.Querier(req.Context(), mint, maxt)
if err != nil { if err != nil {
federationErrors.Inc() federationErrors.Inc()
if errors.Cause(err) == tsdb.ErrNotReady {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }

View file

@ -15,16 +15,22 @@ package web
import ( import (
"bytes" "bytes"
"context"
"net/http"
"net/http/httptest" "net/http/httptest"
"sort" "sort"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/pkg/errors"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/testutil"
) )
var scenarios = map[string]struct { var scenarios = map[string]struct {
@ -199,7 +205,7 @@ func TestFederation(t *testing.T) {
} }
h := &Handler{ h := &Handler{
localStorage: suite.Storage(), localStorage: &dbAdapter{suite.TSDB()},
lookbackDelta: 5 * time.Minute, lookbackDelta: 5 * time.Minute,
now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch. now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch.
config: &config.Config{ config: &config.Config{
@ -208,16 +214,59 @@ func TestFederation(t *testing.T) {
} }
for name, scenario := range scenarios { for name, scenario := range scenarios {
h.config.GlobalConfig.ExternalLabels = scenario.externalLabels t.Run(name, func(t *testing.T) {
req := httptest.NewRequest("GET", "http://example.org/federate?"+scenario.params, nil) h.config.GlobalConfig.ExternalLabels = scenario.externalLabels
res := httptest.NewRecorder() req := httptest.NewRequest("GET", "http://example.org/federate?"+scenario.params, nil)
h.federation(res, req) res := httptest.NewRecorder()
if got, want := res.Code, scenario.code; got != want {
t.Errorf("Scenario %q: got code %d, want %d", name, got, want) h.federation(res, req)
} testutil.Equals(t, scenario.code, res.Code)
if got, want := normalizeBody(res.Body), scenario.body; got != want { testutil.Equals(t, scenario.body, normalizeBody(res.Body))
t.Errorf("Scenario %q: got body\n%s\n, want\n%s\n", name, got, want) })
} }
}
type notReadyReadStorage struct {
LocalStorage
}
func (notReadyReadStorage) Querier(context.Context, int64, int64) (storage.Querier, error) {
return nil, errors.Wrap(tsdb.ErrNotReady, "wrap")
}
func (notReadyReadStorage) StartTime() (int64, error) {
return 0, errors.Wrap(tsdb.ErrNotReady, "wrap")
}
func (notReadyReadStorage) Stats(string) (*tsdb.Stats, error) {
return nil, errors.Wrap(tsdb.ErrNotReady, "wrap")
}
// Regression test for https://github.com/prometheus/prometheus/issues/7181.
func TestFederation_NotReady(t *testing.T) {
h := &Handler{
localStorage: notReadyReadStorage{},
lookbackDelta: 5 * time.Minute,
now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch.
config: &config.Config{
GlobalConfig: config.GlobalConfig{},
},
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
h.config.GlobalConfig.ExternalLabels = scenario.externalLabels
req := httptest.NewRequest("GET", "http://example.org/federate?"+scenario.params, nil)
res := httptest.NewRecorder()
h.federation(res, req)
if scenario.code == http.StatusBadRequest {
// Request are expected to be checked before DB readiness.
testutil.Equals(t, http.StatusBadRequest, res.Code)
return
}
testutil.Equals(t, http.StatusServiceUnavailable, res.Code)
})
} }
} }

View file

@ -166,6 +166,11 @@ func (m *metrics) instrumentHandler(handlerName string, handler http.HandlerFunc
// PrometheusVersion contains build information about Prometheus. // PrometheusVersion contains build information about Prometheus.
type PrometheusVersion = api_v1.PrometheusVersion type PrometheusVersion = api_v1.PrometheusVersion
type LocalStorage interface {
storage.Storage
api_v1.TSDBAdminStats
}
// Handler serves various HTTP endpoints of the Prometheus server // Handler serves various HTTP endpoints of the Prometheus server
type Handler struct { type Handler struct {
logger log.Logger logger log.Logger
@ -178,9 +183,8 @@ type Handler struct {
queryEngine *promql.Engine queryEngine *promql.Engine
lookbackDelta time.Duration lookbackDelta time.Duration
context context.Context context context.Context
tsdb func() *tsdb.DB
storage storage.Storage storage storage.Storage
localStorage storage.Storage localStorage LocalStorage
notifier *notifier.Manager notifier *notifier.Manager
apiV1 *api_v1.API apiV1 *api_v1.API
@ -214,9 +218,10 @@ func (h *Handler) ApplyConfig(conf *config.Config) error {
// Options for the web Handler. // Options for the web Handler.
type Options struct { type Options struct {
Context context.Context Context context.Context
TSDB func() *tsdb.DB
TSDBRetentionDuration model.Duration TSDBRetentionDuration model.Duration
TSDBDir string
TSDBMaxBytes units.Base2Bytes TSDBMaxBytes units.Base2Bytes
LocalStorage LocalStorage
Storage storage.Storage Storage storage.Storage
QueryEngine *promql.Engine QueryEngine *promql.Engine
LookbackDelta time.Duration LookbackDelta time.Duration
@ -283,9 +288,8 @@ func New(logger log.Logger, o *Options) *Handler {
ruleManager: o.RuleManager, ruleManager: o.RuleManager,
queryEngine: o.QueryEngine, queryEngine: o.QueryEngine,
lookbackDelta: o.LookbackDelta, lookbackDelta: o.LookbackDelta,
tsdb: o.TSDB,
storage: o.Storage, storage: o.Storage,
localStorage: o.TSDB(), localStorage: o.LocalStorage,
notifier: o.Notifier, notifier: o.Notifier,
now: model.Now, now: model.Now,
@ -309,9 +313,8 @@ func New(logger log.Logger, o *Options) *Handler {
Scheme: o.ExternalURL.Scheme, Scheme: o.ExternalURL.Scheme,
}, },
h.testReady, h.testReady,
func() api_v1.TSDBAdmin { h.options.LocalStorage,
return h.options.TSDB() h.options.TSDBDir,
},
h.options.EnableAdminAPI, h.options.EnableAdminAPI,
logger, logger,
h.ruleManager, h.ruleManager,
@ -538,7 +541,8 @@ func (h *Handler) Run(ctx context.Context) error {
grpcSrv = grpc.NewServer() grpcSrv = grpc.NewServer()
) )
av2 := api_v2.New( av2 := api_v2.New(
h.options.TSDB, h.options.LocalStorage,
h.options.TSDBDir,
h.options.EnableAdminAPI, h.options.EnableAdminAPI,
) )
av2.RegisterGRPC(grpcSrv) av2.RegisterGRPC(grpcSrv)
@ -789,13 +793,22 @@ func (h *Handler) status(w http.ResponseWriter, r *http.Request) {
status.LastConfigTime = time.Unix(int64(toFloat64(mF)), 0).UTC() status.LastConfigTime = time.Unix(int64(toFloat64(mF)), 0).UTC()
} }
} }
db := h.tsdb()
startTime := time.Now().UnixNano() startTime := time.Now().UnixNano()
status.Stats = db.Head().PostingsCardinalityStats("__name__") s, err := h.localStorage.Stats("__name__")
if err != nil {
if errors.Cause(err) == tsdb.ErrNotReady {
http.Error(w, tsdb.ErrNotReady.Error(), http.StatusServiceUnavailable)
return
}
http.Error(w, fmt.Sprintf("error gathering local storage statistics: %s", err), http.StatusInternalServerError)
return
}
status.Duration = fmt.Sprintf("%.3f", float64(time.Now().UnixNano()-startTime)/float64(1e9)) status.Duration = fmt.Sprintf("%.3f", float64(time.Now().UnixNano()-startTime)/float64(1e9))
status.NumSeries = db.Head().NumSeries() status.Stats = s.IndexPostingStats
status.MaxTime = db.Head().MaxTime() status.NumSeries = s.NumSeries
status.MinTime = db.Head().MaxTime() status.MaxTime = s.MaxTime
status.MinTime = s.MinTime
h.executeTemplate(w, "status.html", status) h.executeTemplate(w, "status.html", status)
} }

View file

@ -41,6 +41,7 @@ func TestMain(m *testing.M) {
os.Setenv("no_proxy", "localhost,127.0.0.1,0.0.0.0,:") os.Setenv("no_proxy", "localhost,127.0.0.1,0.0.0.0,:")
os.Exit(m.Run()) os.Exit(m.Run())
} }
func TestGlobalURL(t *testing.T) { func TestGlobalURL(t *testing.T) {
opts := &Options{ opts := &Options{
ListenAddress: ":9090", ListenAddress: ":9090",
@ -89,15 +90,21 @@ func TestGlobalURL(t *testing.T) {
} }
} }
type dbAdapter struct {
*tsdb.DB
}
func (a *dbAdapter) Stats(statsByLabelName string) (*tsdb.Stats, error) {
return a.Head().Stats(statsByLabelName), nil
}
func TestReadyAndHealthy(t *testing.T) { func TestReadyAndHealthy(t *testing.T) {
t.Parallel() t.Parallel()
dbDir, err := ioutil.TempDir("", "tsdb-ready") dbDir, err := ioutil.TempDir("", "tsdb-ready")
testutil.Ok(t, err) testutil.Ok(t, err)
defer testutil.Ok(t, os.RemoveAll(dbDir))
defer os.RemoveAll(dbDir)
db, err := tsdb.Open(dbDir, nil, nil, nil) db, err := tsdb.Open(dbDir, nil, nil, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
opts := &Options{ opts := &Options{
@ -106,13 +113,14 @@ func TestReadyAndHealthy(t *testing.T) {
MaxConnections: 512, MaxConnections: 512,
Context: nil, Context: nil,
Storage: nil, Storage: nil,
LocalStorage: &dbAdapter{db},
TSDBDir: dbDir,
QueryEngine: nil, QueryEngine: nil,
ScrapeManager: &scrape.Manager{}, ScrapeManager: &scrape.Manager{},
RuleManager: &rules.Manager{}, RuleManager: &rules.Manager{},
Notifier: nil, Notifier: nil,
RoutePrefix: "/", RoutePrefix: "/",
EnableAdminAPI: true, EnableAdminAPI: true,
TSDB: func() *tsdb.DB { return db },
ExternalURL: &url.URL{ ExternalURL: &url.URL{
Scheme: "http", Scheme: "http",
Host: "localhost:9090", Host: "localhost:9090",
@ -136,6 +144,9 @@ func TestReadyAndHealthy(t *testing.T) {
} }
}() }()
// TODO(bwplotka): Those tests create tons of new connection and memory that is never cleaned.
// Close and exhaust all response bodies.
// Give some time for the web goroutine to run since we need the server // Give some time for the web goroutine to run since we need the server
// to be up before starting tests. // to be up before starting tests.
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
@ -282,13 +293,10 @@ func TestReadyAndHealthy(t *testing.T) {
func TestRoutePrefix(t *testing.T) { func TestRoutePrefix(t *testing.T) {
t.Parallel() t.Parallel()
dbDir, err := ioutil.TempDir("", "tsdb-ready") dbDir, err := ioutil.TempDir("", "tsdb-ready")
testutil.Ok(t, err) testutil.Ok(t, err)
defer testutil.Ok(t, os.RemoveAll(dbDir))
defer os.RemoveAll(dbDir)
db, err := tsdb.Open(dbDir, nil, nil, nil) db, err := tsdb.Open(dbDir, nil, nil, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
opts := &Options{ opts := &Options{
@ -296,6 +304,8 @@ func TestRoutePrefix(t *testing.T) {
ReadTimeout: 30 * time.Second, ReadTimeout: 30 * time.Second,
MaxConnections: 512, MaxConnections: 512,
Context: nil, Context: nil,
TSDBDir: dbDir,
LocalStorage: &dbAdapter{db},
Storage: nil, Storage: nil,
QueryEngine: nil, QueryEngine: nil,
ScrapeManager: nil, ScrapeManager: nil,
@ -307,7 +317,6 @@ func TestRoutePrefix(t *testing.T) {
Host: "localhost.localdomain:9090", Host: "localhost.localdomain:9090",
Scheme: "http", Scheme: "http",
}, },
TSDB: func() *tsdb.DB { return db },
} }
opts.Flags = map[string]string{} opts.Flags = map[string]string{}
@ -399,7 +408,6 @@ func TestDebugHandler(t *testing.T) {
Host: "localhost.localdomain:9090", Host: "localhost.localdomain:9090",
Scheme: "http", Scheme: "http",
}, },
TSDB: func() *tsdb.DB { return nil },
} }
handler := New(nil, opts) handler := New(nil, opts)
handler.Ready() handler.Ready()
@ -426,7 +434,6 @@ func TestHTTPMetrics(t *testing.T) {
Host: "localhost.localdomain:9090", Host: "localhost.localdomain:9090",
Scheme: "http", Scheme: "http",
}, },
TSDB: func() *tsdb.DB { return nil },
}) })
getReady := func() int { getReady := func() int {
t.Helper() t.Helper()