Consolidate the duration params in CLI

* All CLI params moved to model.Duration

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-06-16 18:37:34 +05:30
parent f40a5990a0
commit d407bd150c
6 changed files with 60 additions and 41 deletions

View file

@ -48,10 +48,13 @@ var cfg = struct {
localStoragePath string localStoragePath string
localStorageEngine string localStorageEngine string
notifier notifier.Options notifier notifier.Options
notifierTimeout time.Duration notifierTimeout model.Duration
queryEngine promql.EngineOptions queryEngine promql.EngineOptions
web web.Options web web.Options
tsdb tsdb.Options tsdb tsdb.Options
lookbackDelta model.Duration
webTimeout model.Duration
queryTimeout model.Duration
alertmanagerURLs stringset alertmanagerURLs stringset
prometheusURL string prometheusURL string
@ -59,6 +62,16 @@ var cfg = struct {
logFormat string logFormat string
logLevel string logLevel string
}{ }{
// The defaults for model.Duration flag parsing.
notifierTimeout: model.Duration(10 * time.Second),
tsdb: tsdb.Options{
MinBlockDuration: model.Duration(2 * time.Hour),
Retention: model.Duration(15 * 24 * time.Hour),
},
lookbackDelta: model.Duration(5 * time.Minute),
webTimeout: model.Duration(30 * time.Second),
queryTimeout: model.Duration(2 * time.Minute),
alertmanagerURLs: stringset{}, alertmanagerURLs: stringset{},
notifier: notifier.Options{ notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer, Registerer: prometheus.DefaultRegisterer,
@ -77,13 +90,11 @@ func parse(args []string) error {
return err return err
} }
if promql.StalenessDelta < 0 {
return fmt.Errorf("negative staleness delta: %s", promql.StalenessDelta)
}
if err := parsePrometheusURL(); err != nil { if err := parsePrometheusURL(); err != nil {
return err return err
} }
cfg.web.ReadTimeout = time.Duration(cfg.webTimeout)
// Default -web.route-prefix to path of -web.external-url. // Default -web.route-prefix to path of -web.external-url.
if cfg.web.RoutePrefix == "" { if cfg.web.RoutePrefix == "" {
cfg.web.RoutePrefix = cfg.web.ExternalURL.Path cfg.web.RoutePrefix = cfg.web.ExternalURL.Path
@ -101,6 +112,12 @@ func parse(args []string) error {
cfg.tsdb.MaxBlockDuration = cfg.tsdb.Retention / 10 cfg.tsdb.MaxBlockDuration = cfg.tsdb.Retention / 10
} }
if cfg.lookbackDelta > 0 {
promql.LookbackDelta = time.Duration(cfg.lookbackDelta)
}
cfg.queryEngine.Timeout = time.Duration(cfg.queryTimeout)
return nil return nil
} }
@ -160,7 +177,7 @@ func parseAlertmanagerURLToConfig(us string) (*config.AlertmanagerConfig, error)
acfg := &config.AlertmanagerConfig{ acfg := &config.AlertmanagerConfig{
Scheme: u.Scheme, Scheme: u.Scheme,
PathPrefix: u.Path, PathPrefix: u.Path,
Timeout: cfg.notifierTimeout, Timeout: time.Duration(cfg.notifierTimeout),
ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{ ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{
StaticConfigs: []*config.TargetGroup{ StaticConfigs: []*config.TargetGroup{
{ {

View file

@ -83,8 +83,8 @@ func newRootCmd() *cobra.Command {
"Address to listen on for the web interface, API, and telemetry.", "Address to listen on for the web interface, API, and telemetry.",
) )
rootCmd.PersistentFlags().DurationVar( rootCmd.PersistentFlags().Var(
&cfg.web.ReadTimeout, "web.read-timeout", 30*time.Second, &cfg.webTimeout, "web.read-timeout",
"Maximum duration before timing out read of the request, and closing idle connections.", "Maximum duration before timing out read of the request, and closing idle connections.",
) )
rootCmd.PersistentFlags().IntVar( rootCmd.PersistentFlags().IntVar(
@ -129,16 +129,16 @@ func newRootCmd() *cobra.Command {
&cfg.tsdb.NoLockfile, "storage.tsdb.no-lockfile", false, &cfg.tsdb.NoLockfile, "storage.tsdb.no-lockfile", false,
"Disable lock file usage.", "Disable lock file usage.",
) )
rootCmd.PersistentFlags().DurationVar( rootCmd.PersistentFlags().Var(
&cfg.tsdb.MinBlockDuration, "storage.tsdb.min-block-duration", 2*time.Hour, &cfg.tsdb.MinBlockDuration, "storage.tsdb.min-block-duration",
"Minimum duration of a data block before being persisted.", "Minimum duration of a data block before being persisted.",
) )
rootCmd.PersistentFlags().DurationVar( rootCmd.PersistentFlags().Var(
&cfg.tsdb.MaxBlockDuration, "storage.tsdb.max-block-duration", 0, &cfg.tsdb.MaxBlockDuration, "storage.tsdb.max-block-duration",
"Maximum duration compacted blocks may span. (Defaults to 10% of the retention period)", "Maximum duration compacted blocks may span. (Defaults to 10% of the retention period)",
) )
rootCmd.PersistentFlags().DurationVar( rootCmd.PersistentFlags().Var(
&cfg.tsdb.Retention, "storage.tsdb.retention", 15*24*time.Hour, &cfg.tsdb.Retention, "storage.tsdb.retention",
"How long to retain samples in the storage.", "How long to retain samples in the storage.",
) )
rootCmd.PersistentFlags().StringVar( rootCmd.PersistentFlags().StringVar(
@ -151,18 +151,18 @@ func newRootCmd() *cobra.Command {
&cfg.notifier.QueueCapacity, "alertmanager.notification-queue-capacity", 10000, &cfg.notifier.QueueCapacity, "alertmanager.notification-queue-capacity", 10000,
"The capacity of the queue for pending alert manager notifications.", "The capacity of the queue for pending alert manager notifications.",
) )
rootCmd.PersistentFlags().DurationVar( rootCmd.PersistentFlags().Var(
&cfg.notifierTimeout, "alertmanager.timeout", 10*time.Second, &cfg.notifierTimeout, "alertmanager.timeout",
"Alert manager HTTP API timeout.", "Alert manager HTTP API timeout.",
) )
// Query engine. // Query engine.
rootCmd.PersistentFlags().DurationVar( rootCmd.PersistentFlags().Var(
&promql.StalenessDelta, "query.staleness-delta", promql.StalenessDelta, &cfg.lookbackDelta, "query.lookback-delta",
"Staleness delta allowance during expression evaluations.", "The delta difference allowed for retrieving metrics during expression evaluations.",
) )
rootCmd.PersistentFlags().DurationVar( rootCmd.PersistentFlags().Var(
&cfg.queryEngine.Timeout, "query.timeout", 2*time.Minute, &cfg.queryTimeout, "query.timeout",
"Maximum time a query may take before being aborted.", "Maximum time a query may take before being aborted.",
) )
rootCmd.PersistentFlags().IntVar( rootCmd.PersistentFlags().IntVar(

View file

@ -487,11 +487,11 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
Inspect(s.Expr, func(node Node) bool { Inspect(s.Expr, func(node Node) bool {
switch n := node.(type) { switch n := node.(type) {
case *VectorSelector: case *VectorSelector:
if maxOffset < StalenessDelta { if maxOffset < LookbackDelta {
maxOffset = StalenessDelta maxOffset = LookbackDelta
} }
if n.Offset+StalenessDelta > maxOffset { if n.Offset+LookbackDelta > maxOffset {
maxOffset = n.Offset + StalenessDelta maxOffset = n.Offset + LookbackDelta
} }
case *MatrixSelector: case *MatrixSelector:
if maxOffset < n.Range { if maxOffset < n.Range {
@ -521,7 +521,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
return false return false
} }
for _, s := range n.series { for _, s := range n.series {
it := storage.NewBuffer(s.Iterator(), durationMilliseconds(StalenessDelta)) it := storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta))
n.iterators = append(n.iterators, it) n.iterators = append(n.iterators, it)
} }
@ -763,8 +763,8 @@ func (ev *evaluator) vectorSelector(node *VectorSelector) Vector {
peek := 1 peek := 1
if !ok || t > refTime { if !ok || t > refTime {
t, v, ok = it.PeekBack(peek) t, v, ok = it.PeekBack(peek)
peek += 1 peek++
if !ok || t < refTime-durationMilliseconds(StalenessDelta) { if !ok || t < refTime-durationMilliseconds(LookbackDelta) {
continue continue
} }
} }
@ -773,7 +773,7 @@ func (ev *evaluator) vectorSelector(node *VectorSelector) Vector {
} }
// Find timestamp before this point, within the staleness delta. // Find timestamp before this point, within the staleness delta.
prevT, _, ok := it.PeekBack(peek) prevT, _, ok := it.PeekBack(peek)
if ok && prevT >= refTime-durationMilliseconds(StalenessDelta) { if ok && prevT >= refTime-durationMilliseconds(LookbackDelta) {
interval := t - prevT interval := t - prevT
if interval*4+interval/10 < refTime-t { if interval*4+interval/10 < refTime-t {
// It is more than 4 (+10% for safety) intervals // It is more than 4 (+10% for safety) intervals
@ -1460,9 +1460,9 @@ func shouldDropMetricName(op itemType) bool {
} }
} }
// StalenessDelta determines the time since the last sample after which a time // LookbackDelta determines the time since the last sample after which a time
// series is considered stale. // series is considered stale.
var StalenessDelta = 5 * time.Minute var LookbackDelta = 5 * time.Minute
// A queryGate controls the maximum number of concurrently running and waiting queries. // A queryGate controls the maximum number of concurrently running and waiting queries.
type queryGate struct { type queryGate struct {

View file

@ -19,6 +19,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/tsdb" "github.com/prometheus/tsdb"
@ -37,13 +38,13 @@ type Options struct {
// The timestamp range of head blocks after which they get persisted. // The timestamp range of head blocks after which they get persisted.
// It's the minimum duration of any persisted block. // It's the minimum duration of any persisted block.
MinBlockDuration time.Duration MinBlockDuration model.Duration
// The maximum timestamp range of compacted blocks. // The maximum timestamp range of compacted blocks.
MaxBlockDuration time.Duration MaxBlockDuration model.Duration
// Duration for how long to retain data. // Duration for how long to retain data.
Retention time.Duration Retention model.Duration
// Disable creation and consideration of lockfile. // Disable creation and consideration of lockfile.
NoLockfile bool NoLockfile bool
@ -53,9 +54,9 @@ type Options struct {
func Open(path string, r prometheus.Registerer, opts *Options) (storage.Storage, error) { func Open(path string, r prometheus.Registerer, opts *Options) (storage.Storage, error) {
db, err := tsdb.Open(path, nil, r, &tsdb.Options{ db, err := tsdb.Open(path, nil, r, &tsdb.Options{
WALFlushInterval: 10 * time.Second, WALFlushInterval: 10 * time.Second,
MinBlockDuration: uint64(opts.MinBlockDuration.Seconds() * 1000), MinBlockDuration: uint64(time.Duration(opts.MinBlockDuration).Seconds() * 1000),
MaxBlockDuration: uint64(opts.MaxBlockDuration.Seconds() * 1000), MaxBlockDuration: uint64(time.Duration(opts.MaxBlockDuration).Seconds() * 1000),
RetentionDuration: uint64(opts.Retention.Seconds() * 1000), RetentionDuration: uint64(time.Duration(opts.Retention).Seconds() * 1000),
NoLockfile: opts.NoLockfile, NoLockfile: opts.NoLockfile,
}) })
if err != nil { if err != nil {

View file

@ -19,6 +19,7 @@ import (
"time" "time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/tsdb" "github.com/prometheus/prometheus/storage/tsdb"
) )
@ -36,8 +37,8 @@ func NewStorage(t T) storage.Storage {
// Tests just load data for a series sequentially. Thus we // Tests just load data for a series sequentially. Thus we
// need a long appendable window. // need a long appendable window.
db, err := tsdb.Open(dir, nil, &tsdb.Options{ db, err := tsdb.Open(dir, nil, &tsdb.Options{
MinBlockDuration: 24 * time.Hour, MinBlockDuration: model.Duration(24 * time.Hour),
MaxBlockDuration: 24 * time.Hour, MaxBlockDuration: model.Duration(24 * time.Hour),
}) })
if err != nil { if err != nil {
t.Fatalf("Opening test storage failed: %s", err) t.Fatalf("Opening test storage failed: %s", err)

View file

@ -55,7 +55,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
} }
var ( var (
mint = timestamp.FromTime(h.now().Time().Add(-promql.StalenessDelta)) mint = timestamp.FromTime(h.now().Time().Add(-promql.LookbackDelta))
maxt = timestamp.FromTime(h.now().Time()) maxt = timestamp.FromTime(h.now().Time())
format = expfmt.Negotiate(req.Header) format = expfmt.Negotiate(req.Header)
enc = expfmt.NewEncoder(w, format) enc = expfmt.NewEncoder(w, format)
@ -86,7 +86,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
// TODO(fabxc): allow fast path for most recent sample either // TODO(fabxc): allow fast path for most recent sample either
// in the storage itself or caching layer in Prometheus. // in the storage itself or caching layer in Prometheus.
it := storage.NewBuffer(s.Iterator(), int64(promql.StalenessDelta/1e6)) it := storage.NewBuffer(s.Iterator(), int64(promql.LookbackDelta/1e6))
var t int64 var t int64
var v float64 var v float64