Merge branch 'master' into release-0.17

This commit is contained in:
Fabian Reinartz 2016-02-05 13:30:56 +01:00
commit e048816316
28 changed files with 410 additions and 331 deletions

View file

@ -52,7 +52,7 @@ You can also clone the repository yourself and build using `make`:
$ cd $GOPATH/src/github.com/prometheus
$ git clone https://github.com/prometheus/prometheus.git
$ cd prometheus
$ make
$ make build
$ ./prometheus -config.file=your_config.yml
The Makefile provides several targets:

View file

@ -107,7 +107,7 @@ func init() {
)
cfg.fs.IntVar(
&cfg.storage.MemoryChunks, "storage.local.memory-chunks", 1024*1024,
"How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.",
"How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily. Sample ingestion will be throttled if the configured value is exceeded by more than 10%.",
)
cfg.fs.DurationVar(
&cfg.storage.PersistenceRetentionPeriod, "storage.local.retention", 15*24*time.Hour,
@ -115,7 +115,7 @@ func init() {
)
cfg.fs.IntVar(
&cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 512*1024,
"How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.",
"How many chunks can be waiting for persistence before sample ingestion will be throttled. Many chunks waiting to be persisted will increase the checkpoint size.",
)
cfg.fs.DurationVar(
&cfg.storage.CheckpointInterval, "storage.local.checkpoint-interval", 5*time.Minute,

View file

@ -25,8 +25,6 @@ import (
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/util/strutil"
)
var (
@ -75,9 +73,9 @@ var (
// DefaultGlobalConfig is the default global configuration.
DefaultGlobalConfig = GlobalConfig{
ScrapeInterval: Duration(1 * time.Minute),
ScrapeTimeout: Duration(10 * time.Second),
EvaluationInterval: Duration(1 * time.Minute),
ScrapeInterval: model.Duration(1 * time.Minute),
ScrapeTimeout: model.Duration(10 * time.Second),
EvaluationInterval: model.Duration(1 * time.Minute),
}
// DefaultScrapeConfig is the default scrape configuration.
@ -99,13 +97,13 @@ var (
// DefaultDNSSDConfig is the default DNS SD configuration.
DefaultDNSSDConfig = DNSSDConfig{
RefreshInterval: Duration(30 * time.Second),
RefreshInterval: model.Duration(30 * time.Second),
Type: "SRV",
}
// DefaultFileSDConfig is the default file SD configuration.
DefaultFileSDConfig = FileSDConfig{
RefreshInterval: Duration(5 * time.Minute),
RefreshInterval: model.Duration(5 * time.Minute),
}
// DefaultConsulSDConfig is the default Consul SD configuration.
@ -116,30 +114,30 @@ var (
// DefaultServersetSDConfig is the default Serverset SD configuration.
DefaultServersetSDConfig = ServersetSDConfig{
Timeout: Duration(10 * time.Second),
Timeout: model.Duration(10 * time.Second),
}
// DefaultNerveSDConfig is the default Nerve SD configuration.
DefaultNerveSDConfig = NerveSDConfig{
Timeout: Duration(10 * time.Second),
Timeout: model.Duration(10 * time.Second),
}
// DefaultMarathonSDConfig is the default Marathon SD configuration.
DefaultMarathonSDConfig = MarathonSDConfig{
RefreshInterval: Duration(30 * time.Second),
RefreshInterval: model.Duration(30 * time.Second),
}
// DefaultKubernetesSDConfig is the default Kubernetes SD configuration
DefaultKubernetesSDConfig = KubernetesSDConfig{
KubeletPort: 10255,
RequestTimeout: Duration(10 * time.Second),
RetryInterval: Duration(1 * time.Second),
RequestTimeout: model.Duration(10 * time.Second),
RetryInterval: model.Duration(1 * time.Second),
}
// DefaultEC2SDConfig is the default EC2 SD configuration.
DefaultEC2SDConfig = EC2SDConfig{
Port: 80,
RefreshInterval: Duration(60 * time.Second),
RefreshInterval: model.Duration(60 * time.Second),
}
)
@ -281,11 +279,11 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
// objects.
type GlobalConfig struct {
// How frequently to scrape targets by default.
ScrapeInterval Duration `yaml:"scrape_interval,omitempty"`
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
// The default timeout when scraping targets.
ScrapeTimeout Duration `yaml:"scrape_timeout,omitempty"`
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"`
// How frequently to evaluate rules by default.
EvaluationInterval Duration `yaml:"evaluation_interval,omitempty"`
EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"`
// The labels to add to any timeseries that this Prometheus instance scrapes.
ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
@ -344,9 +342,9 @@ type ScrapeConfig struct {
// A set of query parameters with which the target is scraped.
Params url.Values `yaml:"params,omitempty"`
// How frequently to scrape the targets of this scrape config.
ScrapeInterval Duration `yaml:"scrape_interval,omitempty"`
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
// The timeout for scraping targets of this config.
ScrapeTimeout Duration `yaml:"scrape_timeout,omitempty"`
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"`
// The HTTP resource path on which to fetch metrics from targets.
MetricsPath string `yaml:"metrics_path,omitempty"`
// The URL scheme with which to fetch metrics from targets.
@ -532,10 +530,10 @@ func (tg *TargetGroup) UnmarshalJSON(b []byte) error {
// DNSSDConfig is the configuration for DNS based service discovery.
type DNSSDConfig struct {
Names []string `yaml:"names"`
RefreshInterval Duration `yaml:"refresh_interval,omitempty"`
Type string `yaml:"type"`
Port int `yaml:"port"` // Ignored for SRV records
Names []string `yaml:"names"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
Type string `yaml:"type"`
Port int `yaml:"port"` // Ignored for SRV records
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
@ -565,8 +563,8 @@ func (c *DNSSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
// FileSDConfig is the configuration for file based discovery.
type FileSDConfig struct {
Names []string `yaml:"names"`
RefreshInterval Duration `yaml:"refresh_interval,omitempty"`
Names []string `yaml:"names"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
@ -624,9 +622,9 @@ func (c *ConsulSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
// ServersetSDConfig is the configuration for Twitter serversets in Zookeeper based discovery.
type ServersetSDConfig struct {
Servers []string `yaml:"servers"`
Paths []string `yaml:"paths"`
Timeout Duration `yaml:"timeout,omitempty"`
Servers []string `yaml:"servers"`
Paths []string `yaml:"paths"`
Timeout model.Duration `yaml:"timeout,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
@ -656,9 +654,9 @@ func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
// NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery.
type NerveSDConfig struct {
Servers []string `yaml:"servers"`
Paths []string `yaml:"paths"`
Timeout Duration `yaml:"timeout,omitempty"`
Servers []string `yaml:"servers"`
Paths []string `yaml:"paths"`
Timeout model.Duration `yaml:"timeout,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
@ -688,8 +686,8 @@ func (c *NerveSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
// MarathonSDConfig is the configuration for services running on Marathon.
type MarathonSDConfig struct {
Servers []string `yaml:"servers,omitempty"`
RefreshInterval Duration `yaml:"refresh_interval,omitempty"`
Servers []string `yaml:"servers,omitempty"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
@ -712,15 +710,15 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro
// KubernetesSDConfig is the configuration for Kubernetes service discovery.
type KubernetesSDConfig struct {
APIServers []URL `yaml:"api_servers"`
KubeletPort int `yaml:"kubelet_port,omitempty"`
InCluster bool `yaml:"in_cluster,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
RetryInterval Duration `yaml:"retry_interval,omitempty"`
RequestTimeout Duration `yaml:"request_timeout,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
APIServers []URL `yaml:"api_servers"`
KubeletPort int `yaml:"kubelet_port,omitempty"`
InCluster bool `yaml:"in_cluster,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
BearerToken string `yaml:"bearer_token,omitempty"`
BearerTokenFile string `yaml:"bearer_token_file,omitempty"`
RetryInterval model.Duration `yaml:"retry_interval,omitempty"`
RequestTimeout model.Duration `yaml:"request_timeout,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
@ -749,11 +747,11 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
// EC2SDConfig is the configuration for EC2 based service discovery.
type EC2SDConfig struct {
Region string `yaml:"region"`
AccessKey string `yaml:"access_key,omitempty"`
SecretKey string `yaml:"secret_key,omitempty"`
RefreshInterval Duration `yaml:"refresh_interval,omitempty"`
Port int `yaml:"port"`
Region string `yaml:"region"`
AccessKey string `yaml:"access_key,omitempty"`
SecretKey string `yaml:"secret_key,omitempty"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
Port int `yaml:"port"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
@ -883,28 +881,3 @@ func (re Regexp) MarshalYAML() (interface{}, error) {
}
return nil, nil
}
// Duration encapsulates a time.Duration and makes it YAML marshallable.
//
// TODO(fabxc): Since we have custom types for most things, including timestamps,
// we might want to move this into our model as well, eventually.
type Duration time.Duration
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (d *Duration) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
if err := unmarshal(&s); err != nil {
return err
}
dur, err := strutil.StringToDuration(s)
if err != nil {
return err
}
*d = Duration(dur)
return nil
}
// MarshalYAML implements the yaml.Marshaler interface.
func (d Duration) MarshalYAML() (interface{}, error) {
return strutil.DurationToString(time.Duration(d)), nil
}

View file

@ -28,9 +28,9 @@ import (
var expectedConf = &Config{
GlobalConfig: GlobalConfig{
ScrapeInterval: Duration(15 * time.Second),
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
EvaluationInterval: Duration(30 * time.Second),
EvaluationInterval: model.Duration(30 * time.Second),
ExternalLabels: model.LabelSet{
"monitor": "codelab",
@ -49,7 +49,7 @@ var expectedConf = &Config{
JobName: "prometheus",
HonorLabels: true,
ScrapeInterval: Duration(15 * time.Second),
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath,
@ -73,11 +73,11 @@ var expectedConf = &Config{
FileSDConfigs: []*FileSDConfig{
{
Names: []string{"foo/*.slow.json", "foo/*.slow.yml", "single/file.yml"},
RefreshInterval: Duration(10 * time.Minute),
RefreshInterval: model.Duration(10 * time.Minute),
},
{
Names: []string{"bar/*.yaml"},
RefreshInterval: Duration(5 * time.Minute),
RefreshInterval: model.Duration(5 * time.Minute),
},
},
@ -108,8 +108,8 @@ var expectedConf = &Config{
{
JobName: "service-x",
ScrapeInterval: Duration(50 * time.Second),
ScrapeTimeout: Duration(5 * time.Second),
ScrapeInterval: model.Duration(50 * time.Second),
ScrapeTimeout: model.Duration(5 * time.Second),
BasicAuth: &BasicAuth{
Username: "admin_name",
@ -124,14 +124,14 @@ var expectedConf = &Config{
"first.dns.address.domain.com",
"second.dns.address.domain.com",
},
RefreshInterval: Duration(15 * time.Second),
RefreshInterval: model.Duration(15 * time.Second),
Type: "SRV",
},
{
Names: []string{
"first.dns.address.domain.com",
},
RefreshInterval: Duration(30 * time.Second),
RefreshInterval: model.Duration(30 * time.Second),
Type: "SRV",
},
},
@ -180,7 +180,7 @@ var expectedConf = &Config{
{
JobName: "service-y",
ScrapeInterval: Duration(15 * time.Second),
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath,
@ -198,8 +198,8 @@ var expectedConf = &Config{
{
JobName: "service-z",
ScrapeInterval: Duration(15 * time.Second),
ScrapeTimeout: Duration(10 * time.Second),
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: model.Duration(10 * time.Second),
MetricsPath: "/metrics",
Scheme: "http",
@ -214,7 +214,7 @@ var expectedConf = &Config{
{
JobName: "service-kubernetes",
ScrapeInterval: Duration(15 * time.Second),
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath,
@ -228,15 +228,15 @@ var expectedConf = &Config{
Password: "mypassword",
},
KubeletPort: 10255,
RequestTimeout: Duration(10 * time.Second),
RetryInterval: Duration(1 * time.Second),
RequestTimeout: model.Duration(10 * time.Second),
RetryInterval: model.Duration(1 * time.Second),
},
},
},
{
JobName: "service-marathon",
ScrapeInterval: Duration(15 * time.Second),
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath,
@ -247,14 +247,14 @@ var expectedConf = &Config{
Servers: []string{
"http://marathon.example.com:8080",
},
RefreshInterval: Duration(30 * time.Second),
RefreshInterval: model.Duration(30 * time.Second),
},
},
},
{
JobName: "service-ec2",
ScrapeInterval: Duration(15 * time.Second),
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath,
@ -265,7 +265,7 @@ var expectedConf = &Config{
Region: "us-east-1",
AccessKey: "access",
SecretKey: "secret",
RefreshInterval: Duration(60 * time.Second),
RefreshInterval: model.Duration(60 * time.Second),
Port: 80,
},
},
@ -273,7 +273,7 @@ var expectedConf = &Config{
{
JobName: "service-nerve",
ScrapeInterval: Duration(15 * time.Second),
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath,
@ -283,7 +283,7 @@ var expectedConf = &Config{
{
Servers: []string{"localhost"},
Paths: []string{"/monitoring"},
Timeout: Duration(10 * time.Second),
Timeout: model.Duration(10 * time.Second),
},
},
},

View file

@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
@ -239,7 +240,7 @@ func (n *Handler) setMore() {
}
func (n *Handler) postURL() string {
return n.opts.AlertmanagerURL + alertPushEndpoint
return strings.TrimRight(n.opts.AlertmanagerURL, "/") + alertPushEndpoint
}
func (n *Handler) send(alerts ...*model.Alert) error {

View file

@ -25,6 +25,43 @@ import (
"github.com/prometheus/common/model"
)
func TestHandlerPostURL(t *testing.T) {
var cases = []struct {
in, out string
}{
{
in: "http://localhost:9093",
out: "http://localhost:9093/api/v1/alerts",
},
{
in: "http://localhost:9093/",
out: "http://localhost:9093/api/v1/alerts",
},
{
in: "http://localhost:9093/prefix",
out: "http://localhost:9093/prefix/api/v1/alerts",
},
{
in: "http://localhost:9093/prefix//",
out: "http://localhost:9093/prefix/api/v1/alerts",
},
{
in: "http://localhost:9093/prefix//",
out: "http://localhost:9093/prefix/api/v1/alerts",
},
}
h := &Handler{
opts: &HandlerOptions{},
}
for _, c := range cases {
h.opts.AlertmanagerURL = c.in
if res := h.postURL(); res != c.out {
t.Errorf("Expected post URL %q for %q but got %q", c.out, c.in, res)
}
}
}
func TestHandlerNextBatch(t *testing.T) {
h := New(&HandlerOptions{})

View file

@ -1140,12 +1140,12 @@ func (p *parser) unquoteString(s string) string {
}
func parseDuration(ds string) (time.Duration, error) {
dur, err := strutil.StringToDuration(ds)
dur, err := model.ParseDuration(ds)
if err != nil {
return 0, err
}
if dur == 0 {
return 0, fmt.Errorf("duration must be greater than 0")
}
return dur, nil
return time.Duration(dur), nil
}

View file

@ -22,7 +22,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/util/strutil"
)
// Tree returns a string of the tree structure of the given node.
@ -104,7 +103,7 @@ func (node *AlertStmt) String() string {
s := fmt.Sprintf("ALERT %s", node.Name)
s += fmt.Sprintf("\n\tIF %s", node.Expr)
if node.Duration > 0 {
s += fmt.Sprintf("\n\tFOR %s", strutil.DurationToString(node.Duration))
s += fmt.Sprintf("\n\tFOR %s", model.Duration(node.Duration))
}
if len(node.Labels) > 0 {
s += fmt.Sprintf("\n\tLABELS %s", node.Labels)
@ -178,9 +177,9 @@ func (node *MatrixSelector) String() string {
}
offset := ""
if node.Offset != time.Duration(0) {
offset = fmt.Sprintf(" OFFSET %s", strutil.DurationToString(node.Offset))
offset = fmt.Sprintf(" OFFSET %s", model.Duration(node.Offset))
}
return fmt.Sprintf("%s[%s]%s", vecSelector.String(), strutil.DurationToString(node.Range), offset)
return fmt.Sprintf("%s[%s]%s", vecSelector.String(), model.Duration(node.Range), offset)
}
func (node *NumberLiteral) String() string {
@ -210,7 +209,7 @@ func (node *VectorSelector) String() string {
}
offset := ""
if node.Offset != time.Duration(0) {
offset = fmt.Sprintf(" OFFSET %s", strutil.DurationToString(node.Offset))
offset = fmt.Sprintf(" OFFSET %s", model.Duration(node.Offset))
}
if len(labelStrings) == 0 {

View file

@ -26,7 +26,6 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/prometheus/util/testutil"
)
@ -98,11 +97,11 @@ func (t *Test) parseLoad(lines []string, i int) (int, *loadCmd, error) {
}
parts := patLoad.FindStringSubmatch(lines[i])
gap, err := strutil.StringToDuration(parts[1])
gap, err := model.ParseDuration(parts[1])
if err != nil {
return i, nil, raise(i, "invalid step definition %q: %s", parts[1], err)
}
cmd := newLoadCmd(gap)
cmd := newLoadCmd(time.Duration(gap))
for i+1 < len(lines) {
i++
defLine := lines[i]
@ -141,11 +140,11 @@ func (t *Test) parseEval(lines []string, i int) (int, *evalCmd, error) {
return i, nil, err
}
offset, err := strutil.StringToDuration(at)
offset, err := model.ParseDuration(at)
if err != nil {
return i, nil, raise(i, "invalid step definition %q: %s", parts[1], err)
}
ts := testStartTime.Add(offset)
ts := testStartTime.Add(time.Duration(offset))
cmd := newEvalCmd(expr, ts, ts, 0)
switch mod {

View file

@ -7,6 +7,8 @@ import (
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
)
@ -22,7 +24,7 @@ func testFileSD(t *testing.T, ext string) {
// whether file watches work as expected.
var conf config.FileSDConfig
conf.Names = []string{"fixtures/_*" + ext}
conf.RefreshInterval = config.Duration(1 * time.Hour)
conf.RefreshInterval = model.Duration(1 * time.Hour)
var (
fsd = NewFileDiscovery(&conf)

View file

@ -14,8 +14,6 @@
package retrieval
import (
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
@ -23,26 +21,31 @@ import (
type nopAppender struct{}
func (a nopAppender) Append(*model.Sample) {
func (a nopAppender) Append(*model.Sample) error {
return nil
}
type slowAppender struct{}
func (a slowAppender) Append(*model.Sample) {
time.Sleep(time.Millisecond)
func (a nopAppender) NeedsThrottling() bool {
return false
}
type collectResultAppender struct {
result model.Samples
result model.Samples
throttled bool
}
func (a *collectResultAppender) Append(s *model.Sample) {
func (a *collectResultAppender) Append(s *model.Sample) error {
for ln, lv := range s.Metric {
if len(lv) == 0 {
delete(s.Metric, ln)
}
}
a.result = append(a.result, s)
return nil
}
func (a *collectResultAppender) NeedsThrottling() bool {
return a.throttled
}
// fakeTargetProvider implements a TargetProvider and allows manual injection

View file

@ -32,6 +32,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/util/httputil"
)
@ -48,7 +49,7 @@ const (
)
var (
errIngestChannelFull = errors.New("ingestion channel full")
errSkippedScrape = errors.New("scrape skipped due to throttled ingestion")
targetIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
@ -59,10 +60,19 @@ var (
},
[]string{interval},
)
targetSkippedScrapes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "target_skipped_scrapes_total",
Help: "Total number of scrapes that were skipped because the metric storage was throttled.",
},
[]string{interval},
)
)
func init() {
prometheus.MustRegister(targetIntervalLength)
prometheus.MustRegister(targetSkippedScrapes)
}
// TargetHealth describes the health state of a target.
@ -151,8 +161,6 @@ type Target struct {
scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped.
scraperStopped chan struct{}
// Channel to buffer ingested samples.
ingestedSamples chan model.Vector
// Mutex protects the members below.
sync.RWMutex
@ -166,8 +174,6 @@ type Target struct {
baseLabels model.LabelSet
// Internal labels, such as scheme.
internalLabels model.LabelSet
// What is the deadline for the HTTP or HTTPS against this endpoint.
deadline time.Duration
// The time between two scrapes.
scrapeInterval time.Duration
// Whether the target's labels have precedence over the base labels
@ -237,7 +243,6 @@ func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.L
t.url.RawQuery = params.Encode()
t.scrapeInterval = time.Duration(cfg.ScrapeInterval)
t.deadline = time.Duration(cfg.ScrapeTimeout)
t.honorLabels = cfg.HonorLabels
t.metaLabels = metaLabels
@ -361,6 +366,11 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
targetIntervalLength.WithLabelValues(intervalStr).Observe(
float64(took) / float64(time.Second), // Sub-second precision.
)
if sampleAppender.NeedsThrottling() {
targetSkippedScrapes.WithLabelValues(intervalStr).Inc()
t.status.setLastError(errSkippedScrape)
continue
}
t.scrape(sampleAppender)
}
}
@ -377,26 +387,6 @@ func (t *Target) StopScraper() {
log.Debugf("Scraper for target %v stopped.", t)
}
func (t *Target) ingest(s model.Vector) error {
t.RLock()
deadline := t.deadline
t.RUnlock()
// Since the regular case is that ingestedSamples is ready to receive,
// first try without setting a timeout so that we don't need to allocate
// a timer most of the time.
select {
case t.ingestedSamples <- s:
return nil
default:
select {
case t.ingestedSamples <- s:
return nil
case <-time.After(deadline / 10):
return errIngestChannelFull
}
}
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
func (t *Target) scrape(appender storage.SampleAppender) (err error) {
@ -414,20 +404,20 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
// so the relabeling rules are applied to the correct label set.
if len(t.metricRelabelConfigs) > 0 {
appender = relabelAppender{
app: appender,
relabelings: t.metricRelabelConfigs,
SampleAppender: appender,
relabelings: t.metricRelabelConfigs,
}
}
if t.honorLabels {
appender = honorLabelsAppender{
app: appender,
labels: baseLabels,
SampleAppender: appender,
labels: baseLabels,
}
} else {
appender = ruleLabelsAppender{
app: appender,
labels: baseLabels,
SampleAppender: appender,
labels: baseLabels,
}
}
@ -460,31 +450,30 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
},
}
t.ingestedSamples = make(chan model.Vector, ingestedSamplesCap)
go func() {
for {
// TODO(fabxc): Change the SampleAppender interface to return an error
// so we can proceed based on the status and don't leak goroutines trying
// to append a single sample after dropping all the other ones.
//
// This will also allow use to reuse this vector and save allocations.
var samples model.Vector
if err = sdec.Decode(&samples); err != nil {
break
}
if err = t.ingest(samples); err != nil {
break
}
var (
samples model.Vector
numOutOfOrder int
logger = log.With("target", t.InstanceIdentifier())
)
for {
if err = sdec.Decode(&samples); err != nil {
break
}
close(t.ingestedSamples)
}()
for samples := range t.ingestedSamples {
for _, s := range samples {
appender.Append(s)
err := appender.Append(s)
if err != nil {
if err == local.ErrOutOfOrderSample {
numOutOfOrder++
} else {
logger.With("sample", s).Warnf("Error inserting sample: %s", err)
}
}
}
}
if numOutOfOrder > 0 {
logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
}
if err == io.EOF {
return nil
@ -495,11 +484,11 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
// Merges the ingested sample's metric with the label set. On a collision the
// value of the ingested label is stored in a label prefixed with 'exported_'.
type ruleLabelsAppender struct {
app storage.SampleAppender
storage.SampleAppender
labels model.LabelSet
}
func (app ruleLabelsAppender) Append(s *model.Sample) {
func (app ruleLabelsAppender) Append(s *model.Sample) error {
for ln, lv := range app.labels {
if v, ok := s.Metric[ln]; ok && v != "" {
s.Metric[model.ExportedLabelPrefix+ln] = v
@ -507,47 +496,46 @@ func (app ruleLabelsAppender) Append(s *model.Sample) {
s.Metric[ln] = lv
}
app.app.Append(s)
return app.SampleAppender.Append(s)
}
type honorLabelsAppender struct {
app storage.SampleAppender
storage.SampleAppender
labels model.LabelSet
}
// Merges the sample's metric with the given labels if the label is not
// already present in the metric.
// This also considers labels explicitly set to the empty string.
func (app honorLabelsAppender) Append(s *model.Sample) {
func (app honorLabelsAppender) Append(s *model.Sample) error {
for ln, lv := range app.labels {
if _, ok := s.Metric[ln]; !ok {
s.Metric[ln] = lv
}
}
app.app.Append(s)
return app.SampleAppender.Append(s)
}
// Applies a set of relabel configurations to the sample's metric
// before actually appending it.
type relabelAppender struct {
app storage.SampleAppender
storage.SampleAppender
relabelings []*config.RelabelConfig
}
func (app relabelAppender) Append(s *model.Sample) {
func (app relabelAppender) Append(s *model.Sample) error {
labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...)
if err != nil {
log.Errorf("Error while relabeling metric %s: %s", s.Metric, err)
return
return fmt.Errorf("metric relabeling error %s: %s", s.Metric, err)
}
// Check if the timeseries was dropped.
if labels == nil {
return
return nil
}
s.Metric = model.Metric(labels)
app.app.Append(s)
return app.SampleAppender.Append(s)
}
// URL returns a copy of the target's URL.

View file

@ -139,12 +139,12 @@ func TestTargetScrapeUpdatesState(t *testing.T) {
}
}
func TestTargetScrapeWithFullChannel(t *testing.T) {
func TestTargetScrapeWithThrottledStorage(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
for i := 0; i < 2*ingestedSamplesCap; i++ {
for i := 0; i < 10; i++ {
w.Write([]byte(
fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i),
))
@ -155,15 +155,21 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
defer server.Close()
testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"})
// Affects full channel but not HTTP fetch
testTarget.deadline = 0
testTarget.scrape(slowAppender{})
go testTarget.RunScraper(&collectResultAppender{throttled: true})
// Enough time for a scrape to happen.
time.Sleep(20 * time.Millisecond)
testTarget.StopScraper()
// Wait for it to take effect.
time.Sleep(20 * time.Millisecond)
if testTarget.status.Health() != HealthBad {
t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health())
}
if testTarget.status.LastError() != errIngestChannelFull {
t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.status.LastError())
if testTarget.status.LastError() != errSkippedScrape {
t.Errorf("Expected target error %q, actual: %q", errSkippedScrape, testTarget.status.LastError())
}
}
@ -420,8 +426,8 @@ func TestURLParams(t *testing.T) {
target := NewTarget(
&config.ScrapeConfig{
JobName: "test_job1",
ScrapeInterval: config.Duration(1 * time.Minute),
ScrapeTimeout: config.Duration(1 * time.Second),
ScrapeInterval: model.Duration(1 * time.Minute),
ScrapeTimeout: model.Duration(1 * time.Second),
Scheme: serverURL.Scheme,
Params: url.Values{
"foo": []string{"bar", "baz"},
@ -441,7 +447,7 @@ func TestURLParams(t *testing.T) {
func newTestTarget(targetURL string, deadline time.Duration, baseLabels model.LabelSet) *Target {
cfg := &config.ScrapeConfig{
ScrapeTimeout: config.Duration(deadline),
ScrapeTimeout: model.Duration(deadline),
}
c, _ := newHTTPClient(cfg)
t := &Target{
@ -450,7 +456,6 @@ func newTestTarget(targetURL string, deadline time.Duration, baseLabels model.La
Host: strings.TrimLeft(targetURL, "http://"),
Path: "/metrics",
},
deadline: deadline,
status: &TargetStatus{},
scrapeInterval: 1 * time.Millisecond,
httpClient: c,
@ -481,7 +486,7 @@ func TestNewHTTPBearerToken(t *testing.T) {
defer server.Close()
cfg := &config.ScrapeConfig{
ScrapeTimeout: config.Duration(1 * time.Second),
ScrapeTimeout: model.Duration(1 * time.Second),
BearerToken: "1234",
}
c, err := newHTTPClient(cfg)
@ -509,7 +514,7 @@ func TestNewHTTPBearerTokenFile(t *testing.T) {
defer server.Close()
cfg := &config.ScrapeConfig{
ScrapeTimeout: config.Duration(1 * time.Second),
ScrapeTimeout: model.Duration(1 * time.Second),
BearerTokenFile: "testdata/bearertoken.txt",
}
c, err := newHTTPClient(cfg)
@ -536,7 +541,7 @@ func TestNewHTTPBasicAuth(t *testing.T) {
defer server.Close()
cfg := &config.ScrapeConfig{
ScrapeTimeout: config.Duration(1 * time.Second),
ScrapeTimeout: model.Duration(1 * time.Second),
BasicAuth: &config.BasicAuth{
Username: "user",
Password: "password123",
@ -566,7 +571,7 @@ func TestNewHTTPCACert(t *testing.T) {
defer server.Close()
cfg := &config.ScrapeConfig{
ScrapeTimeout: config.Duration(1 * time.Second),
ScrapeTimeout: model.Duration(1 * time.Second),
TLSConfig: config.TLSConfig{
CAFile: "testdata/ca.cer",
},
@ -599,7 +604,7 @@ func TestNewHTTPClientCert(t *testing.T) {
defer server.Close()
cfg := &config.ScrapeConfig{
ScrapeTimeout: config.Duration(1 * time.Second),
ScrapeTimeout: model.Duration(1 * time.Second),
TLSConfig: config.TLSConfig{
CAFile: "testdata/ca.cer",
CertFile: "testdata/client.cer",

View file

@ -165,6 +165,7 @@ func (tm *TargetManager) Run() {
})
tm.running = true
log.Info("Target manager started.")
}
// handleUpdates receives target group updates and handles them in the

View file

@ -75,7 +75,7 @@ func TestPrefixedTargetProvider(t *testing.T) {
func TestTargetManagerChan(t *testing.T) {
testJob1 := &config.ScrapeConfig{
JobName: "test_job1",
ScrapeInterval: config.Duration(1 * time.Minute),
ScrapeInterval: model.Duration(1 * time.Minute),
TargetGroups: []*config.TargetGroup{{
Targets: []model.LabelSet{
{model.AddressLabel: "example.org:80"},
@ -204,7 +204,7 @@ func TestTargetManagerChan(t *testing.T) {
func TestTargetManagerConfigUpdate(t *testing.T) {
testJob1 := &config.ScrapeConfig{
JobName: "test_job1",
ScrapeInterval: config.Duration(1 * time.Minute),
ScrapeInterval: model.Duration(1 * time.Minute),
Params: url.Values{
"testParam": []string{"paramValue", "secondValue"},
},
@ -234,7 +234,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
}
testJob2 := &config.ScrapeConfig{
JobName: "test_job2",
ScrapeInterval: config.Duration(1 * time.Minute),
ScrapeInterval: model.Duration(1 * time.Minute),
TargetGroups: []*config.TargetGroup{
{
Targets: []model.LabelSet{
@ -288,7 +288,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
// Test that targets without host:port addresses are dropped.
testJob3 := &config.ScrapeConfig{
JobName: "test_job1",
ScrapeInterval: config.Duration(1 * time.Minute),
ScrapeInterval: model.Duration(1 * time.Minute),
TargetGroups: []*config.TargetGroup{{
Targets: []model.LabelSet{
{model.AddressLabel: "example.net:80"},

View file

@ -39,7 +39,7 @@ const (
type AlertState int
const (
// StateInactive is the state of an alert that is either firing nor pending.
// StateInactive is the state of an alert that is neither firing nor pending.
StateInactive AlertState = iota
// StatePending is the state of an alert that has been active for less than
// the configured threshold duration.
@ -58,7 +58,7 @@ func (s AlertState) String() string {
case StateFiring:
return "firing"
}
panic(fmt.Errorf("unknown alert state: %v", s))
panic(fmt.Errorf("unknown alert state: %v", s.String()))
}
// Alert is the user-level representation of a single instance of an alerting rule.
@ -159,7 +159,7 @@ func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine) (model.Vector,
fp := smpl.Metric.Fingerprint()
resultFPs[fp] = struct{}{}
if alert, ok := r.active[fp]; ok {
if alert, ok := r.active[fp]; ok && alert.State != StateInactive {
alert.Value = smpl.Value
continue
}
@ -255,7 +255,7 @@ func (rule *AlertingRule) String() string {
s := fmt.Sprintf("ALERT %s", rule.name)
s += fmt.Sprintf("\n\tIF %s", rule.vector)
if rule.holdDuration > 0 {
s += fmt.Sprintf("\n\tFOR %s", strutil.DurationToString(rule.holdDuration))
s += fmt.Sprintf("\n\tFOR %s", model.Duration(rule.holdDuration))
}
if len(rule.labels) > 0 {
s += fmt.Sprintf("\n\tLABELS %s", rule.labels)
@ -277,7 +277,7 @@ func (rule *AlertingRule) HTMLSnippet(pathPrefix string) template.HTML {
s := fmt.Sprintf("ALERT <a href=%q>%s</a>", pathPrefix+strutil.GraphLinkForExpression(alertMetric.String()), rule.name)
s += fmt.Sprintf("\n IF <a href=%q>%s</a>", pathPrefix+strutil.GraphLinkForExpression(rule.vector.String()), rule.vector)
if rule.holdDuration > 0 {
s += fmt.Sprintf("\n FOR %s", strutil.DurationToString(rule.holdDuration))
s += fmt.Sprintf("\n FOR %s", model.Duration(rule.holdDuration))
}
if len(rule.labels) > 0 {
s += fmt.Sprintf("\n LABELS %s", rule.labels)

View file

@ -66,9 +66,19 @@ var (
iterationDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Name: "evaluator_duration_seconds",
Help: "The duration for all evaluations to execute.",
Help: "The duration of rule group evaluations.",
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
})
iterationsSkipped = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "evaluator_iterations_skipped_total",
Help: "The total number of rule group evaluations skipped due to throttled metric storage.",
})
iterationsScheduled = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "evaluator_iterations_total",
Help: "The total number of scheduled rule group evaluations, whether skipped or executed.",
})
)
func init() {
@ -78,6 +88,7 @@ func init() {
evalFailures.WithLabelValues(string(ruleTypeRecording))
prometheus.MustRegister(iterationDuration)
prometheus.MustRegister(iterationsSkipped)
prometheus.MustRegister(evalFailures)
prometheus.MustRegister(evalDuration)
}
@ -133,6 +144,11 @@ func (g *Group) run() {
}
iter := func() {
iterationsScheduled.Inc()
if g.opts.SampleAppender.NeedsThrottling() {
iterationsSkipped.Inc()
return
}
start := time.Now()
g.eval()

View file

@ -27,14 +27,8 @@ import (
func TestAlertingRule(t *testing.T) {
suite, err := promql.NewTest(t, `
load 5m
http_requests{job="api-server", instance="0", group="production"} 0+10x10
http_requests{job="api-server", instance="1", group="production"} 0+20x10
http_requests{job="api-server", instance="0", group="canary"} 0+30x10
http_requests{job="api-server", instance="1", group="canary"} 0+40x10
http_requests{job="app-server", instance="0", group="production"} 0+50x10
http_requests{job="app-server", instance="1", group="production"} 0+60x10
http_requests{job="app-server", instance="0", group="canary"} 0+70x10
http_requests{job="app-server", instance="1", group="canary"} 0+80x10
http_requests{job="app-server", instance="0", group="canary"} 75 85 95 105 105 95 85
http_requests{job="app-server", instance="1", group="canary"} 80 90 100 110 120 130 140
`)
if err != nil {
t.Fatal(err)
@ -79,17 +73,32 @@ func TestAlertingRule(t *testing.T) {
}, {
time: 10 * time.Minute,
result: []string{
`ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
`ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="1", job="app-server", severity="critical"} => 0 @[%v]`,
},
},
{
time: 15 * time.Minute,
result: []string{
`ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 0 @[%v]`,
},
},
{
time: 15 * time.Minute,
result: nil,
time: 20 * time.Minute,
result: []string{},
},
{
time: 20 * time.Minute,
result: nil,
time: 25 * time.Minute,
result: []string{
`ALERTS{alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
},
},
{
time: 30 * time.Minute,
result: []string{
`ALERTS{alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 0 @[%v]`,
`ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`,
},
},
}

View file

@ -33,7 +33,10 @@ type Storage interface {
// processing.) The implementation might remove labels with empty value
// from the provided Sample as those labels are considered equivalent to
// a label not present at all.
Append(*model.Sample)
Append(*model.Sample) error
// NeedsThrottling returns true if the Storage has too many chunks in memory
// already or has too many chunks waiting for persistence.
NeedsThrottling() bool
// NewPreloader returns a new Preloader which allows preloading and pinning
// series data into memory for use within a query.
NewPreloader() Preloader

View file

@ -47,9 +47,9 @@ const (
persintenceUrgencyScoreForLeavingRushedMode = 0.7
// This factor times -storage.local.memory-chunks is the number of
// memory chunks we tolerate before suspending ingestion (TODO!). It is
// also a basis for calculating the persistenceUrgencyScore.
toleranceFactorForMemChunks = 1.1
// memory chunks we tolerate before throttling the storage. It is also a
// basis for calculating the persistenceUrgencyScore.
toleranceFactorMemChunks = 1.1
// This factor times -storage.local.max-chunks-to-persist is the minimum
// required number of chunks waiting for persistence before the number
// of chunks in memory may influence the persistenceUrgencyScore. (In
@ -121,9 +121,10 @@ type syncStrategy func() bool
type memorySeriesStorage struct {
// numChunksToPersist has to be aligned for atomic operations.
numChunksToPersist int64 // The number of chunks waiting for persistence.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall.
rushed bool // Whether the storage is in rushed mode.
numChunksToPersist int64 // The number of chunks waiting for persistence.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled.
rushed bool // Whether the storage is in rushed mode.
throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging).
fpLocker *fingerprintLocker
fpToSeries *seriesMap
@ -180,6 +181,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
loopStopping: make(chan struct{}),
loopStopped: make(chan struct{}),
throttled: make(chan struct{}, 1),
maxMemoryChunks: o.MemoryChunks,
dropAfter: o.PersistenceRetentionPeriod,
checkpointInterval: o.CheckpointInterval,
@ -306,6 +308,7 @@ func (s *memorySeriesStorage) Start() (err error) {
}
go s.handleEvictList()
go s.logThrottling()
go s.loop()
return nil
@ -564,23 +567,15 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin
}
}
var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order")
// Append implements Storage.
func (s *memorySeriesStorage) Append(sample *model.Sample) {
func (s *memorySeriesStorage) Append(sample *model.Sample) error {
for ln, lv := range sample.Metric {
if len(lv) == 0 {
delete(sample.Metric, ln)
}
}
if s.getNumChunksToPersist() >= s.maxChunksToPersist {
log.Warnf(
"%d chunks waiting for persistence, sample ingestion suspended.",
s.getNumChunksToPersist(),
)
for s.getNumChunksToPersist() >= s.maxChunksToPersist {
time.Sleep(time.Second)
}
log.Warn("Sample ingestion resumed.")
}
rawFP := sample.Metric.FastFingerprint()
s.fpLocker.Lock(rawFP)
fp, err := s.mapper.mapFP(rawFP, sample.Metric)
@ -596,16 +591,16 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) {
series := s.getOrCreateSeries(fp, sample.Metric)
if sample.Timestamp <= series.lastTime {
s.fpLocker.Unlock(fp)
// Don't log and track equal timestamps, as they are a common occurrence
// when using client-side timestamps (e.g. Pushgateway or federation).
// It would be even better to also compare the sample values here, but
// we don't have efficient access to a series's last value.
if sample.Timestamp != series.lastTime {
log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime)
s.outOfOrderSamplesCount.Inc()
return ErrOutOfOrderSample
}
s.fpLocker.Unlock(fp)
return
return nil
}
completedChunksCount := series.add(&model.SamplePair{
Value: sample.Value,
@ -614,6 +609,59 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) {
s.fpLocker.Unlock(fp)
s.ingestedSamplesCount.Inc()
s.incNumChunksToPersist(completedChunksCount)
return nil
}
// NeedsThrottling implements Storage.
func (s *memorySeriesStorage) NeedsThrottling() bool {
if s.getNumChunksToPersist() > s.maxChunksToPersist ||
float64(atomic.LoadInt64(&numMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks {
select {
case s.throttled <- struct{}{}:
default: // Do nothing, signal aready pending.
}
return true
}
return false
}
// logThrottling handles logging of throttled events and has to be started as a
// goroutine. It stops once s.loopStopping is closed.
//
// Logging strategy: Whenever Throttle() is called and returns true, an signal
// is sent to s.throttled. If that happens for the first time, an Error is
// logged that the storage is now throttled. As long as signals continues to be
// sent via s.throttled at least once per minute, nothing else is logged. Once
// no signal has arrived for a minute, an Info is logged that the storage is not
// throttled anymore. This resets things to the initial state, i.e. once a
// signal arrives again, the Error will be logged again.
func (s *memorySeriesStorage) logThrottling() {
timer := time.NewTimer(time.Minute)
timer.Stop()
for {
select {
case <-s.throttled:
if !timer.Reset(time.Minute) {
log.
With("chunksToPersist", s.getNumChunksToPersist()).
With("maxChunksToPersist", s.maxChunksToPersist).
With("memoryChunks", atomic.LoadInt64(&numMemChunks)).
With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)).
Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.")
}
case <-timer.C:
log.
With("chunksToPersist", s.getNumChunksToPersist()).
With("maxChunksToPersist", s.maxChunksToPersist).
With("memoryChunks", atomic.LoadInt64(&numMemChunks)).
With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)).
Info("Storage does not need throttling anymore.")
case <-s.loopStopping:
return
}
}
}
func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) *memorySeries {
@ -1210,7 +1258,7 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
if chunksToPersist > maxChunksToPersist*factorMinChunksToPersist {
score = math.Max(
score,
(memChunks/maxMemChunks-1)/(toleranceFactorForMemChunks-1),
(memChunks/maxMemChunks-1)/(toleranceFactorMemChunks-1),
)
}
if score > 1 {
@ -1230,11 +1278,11 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
s.rushedMode.Set(0)
log.
With("urgencyScore", score).
With("chunksToPersist", chunksToPersist).
With("maxChunksToPersist", maxChunksToPersist).
With("memoryChunks", memChunks).
With("maxMemoryChunks", maxMemChunks).
Warn("Storage has left rushed mode.")
With("chunksToPersist", int(chunksToPersist)).
With("maxChunksToPersist", int(maxChunksToPersist)).
With("memoryChunks", int(memChunks)).
With("maxMemoryChunks", int(maxMemChunks)).
Info("Storage has left rushed mode.")
return score
}
if score > persintenceUrgencyScoreForEnteringRushedMode {
@ -1243,10 +1291,10 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
s.rushedMode.Set(1)
log.
With("urgencyScore", score).
With("chunksToPersist", chunksToPersist).
With("maxChunksToPersist", maxChunksToPersist).
With("memoryChunks", memChunks).
With("maxMemoryChunks", maxMemChunks).
With("chunksToPersist", int(chunksToPersist)).
With("maxChunksToPersist", int(maxChunksToPersist)).
With("memoryChunks", int(memChunks)).
With("maxMemoryChunks", int(maxMemChunks)).
Warn("Storage has entered rushed mode.")
return 1
}

View file

@ -132,15 +132,16 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue
}
// Append queues a sample to be sent to the remote storage. It drops the
// sample on the floor if the queue is full. It implements
// storage.SampleAppender.
func (t *StorageQueueManager) Append(s *model.Sample) {
// sample on the floor if the queue is full.
// Always returns nil.
func (t *StorageQueueManager) Append(s *model.Sample) error {
select {
case t.queue <- s:
default:
t.samplesCount.WithLabelValues(dropped).Inc()
log.Warn("Remote storage queue full, discarding sample.")
}
return nil
}
// Stop stops sending samples to the remote storage and waits for pending

View file

@ -104,8 +104,8 @@ func (s *Storage) Stop() {
}
}
// Append implements storage.SampleAppender.
func (s *Storage) Append(smpl *model.Sample) {
// Append implements storage.SampleAppender. Always returns nil.
func (s *Storage) Append(smpl *model.Sample) error {
s.mtx.RLock()
var snew model.Sample
@ -122,6 +122,14 @@ func (s *Storage) Append(smpl *model.Sample) {
for _, q := range s.queues {
q.Append(&snew)
}
return nil
}
// NeedsThrottling implements storage.SampleAppender. It will always return
// false as a remote storage drops samples on the floor if backlogging instead
// of asking for throttling.
func (s *Storage) NeedsThrottling() bool {
return false
}
// Describe implements prometheus.Collector.

View file

@ -18,9 +18,31 @@ import (
)
// SampleAppender is the interface to append samples to both, local and remote
// storage.
// storage. All methods are goroutine-safe.
type SampleAppender interface {
Append(*model.Sample)
// Append appends a sample to the underlying storage. Depending on the
// storage implementation, there are different guarantees for the fate
// of the sample after Append has returned. Remote storage
// implementation will simply drop samples if they cannot keep up with
// sending samples. Local storage implementations will only drop metrics
// upon unrecoverable errors.
Append(*model.Sample) error
// NeedsThrottling returns true if the underlying storage wishes to not
// receive any more samples. Append will still work but might lead to
// undue resource usage. It is recommended to call NeedsThrottling once
// before an upcoming batch of Append calls (e.g. a full scrape of a
// target or the evaluation of a rule group) and only proceed with the
// batch if NeedsThrottling returns false. In that way, the result of a
// scrape or of an evaluation of a rule group will always be appended
// completely or not at all, and the work of scraping or evaluation will
// not be performed in vain. Also, a call of NeedsThrottling is
// potentially expensive, so limiting the number of calls is reasonable.
//
// Only SampleAppenders for which it is considered critical to receive
// each and every sample should ever return true. SampleAppenders that
// tolerate not receiving all samples should always return false and
// instead drop samples as they see fit to avoid overload.
NeedsThrottling() bool
}
// Fanout is a SampleAppender that appends every sample to each SampleAppender
@ -30,8 +52,25 @@ type Fanout []SampleAppender
// Append implements SampleAppender. It appends the provided sample to all
// SampleAppenders in the Fanout slice and waits for each append to complete
// before proceeding with the next.
func (f Fanout) Append(s *model.Sample) {
// If any of the SampleAppenders returns an error, the first one is returned
// at the end.
func (f Fanout) Append(s *model.Sample) error {
var err error
for _, a := range f {
a.Append(s)
if e := a.Append(s); e != nil && err == nil {
err = e
}
}
return err
}
// NeedsThrottling returns true if at least one of the SampleAppenders in the
// Fanout slice is throttled.
func (f Fanout) NeedsThrottling() bool {
for _, a := range f {
if a.NeedsThrottling() {
return true
}
}
return false
}

View file

@ -17,75 +17,13 @@ import (
"fmt"
"net/url"
"regexp"
"strconv"
"strings"
"time"
)
var (
durationRE = regexp.MustCompile("^([0-9]+)([ywdhms]+)$")
invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
)
// DurationToString formats a time.Duration as a string with the assumption that
// a year always has 365 days and a day always has 24h. (The former doesn't work
// in leap years, the latter is broken by DST switches, not to speak about leap
// seconds, but those are not even treated properly by the duration strings in
// the standard library.)
func DurationToString(duration time.Duration) string {
seconds := int64(duration / time.Second)
factors := map[string]int64{
"y": 60 * 60 * 24 * 365,
"d": 60 * 60 * 24,
"h": 60 * 60,
"m": 60,
"s": 1,
}
unit := "s"
switch int64(0) {
case seconds % factors["y"]:
unit = "y"
case seconds % factors["d"]:
unit = "d"
case seconds % factors["h"]:
unit = "h"
case seconds % factors["m"]:
unit = "m"
}
return fmt.Sprintf("%v%v", seconds/factors[unit], unit)
}
// StringToDuration parses a string into a time.Duration, assuming that a year
// always has 365d, a week 7d, a day 24h. See DurationToString for problems with
// that.
func StringToDuration(durationStr string) (duration time.Duration, err error) {
matches := durationRE.FindStringSubmatch(durationStr)
if len(matches) != 3 {
err = fmt.Errorf("not a valid duration string: %q", durationStr)
return
}
durationSeconds, _ := strconv.Atoi(matches[1])
duration = time.Duration(durationSeconds) * time.Second
unit := matches[2]
switch unit {
case "y":
duration *= 60 * 60 * 24 * 365
case "w":
duration *= 60 * 60 * 24 * 7
case "d":
duration *= 60 * 60 * 24
case "h":
duration *= 60 * 60
case "m":
duration *= 60
case "s":
duration *= 1
default:
return 0, fmt.Errorf("invalid time unit in duration string: %q", unit)
}
return
}
// TableLinkForExpression creates an escaped relative link to the table view of
// the provided expression.
func TableLinkForExpression(expr string) string {

View file

@ -163,10 +163,10 @@ func (t *Time) UnmarshalJSON(b []byte) error {
// This type should not propagate beyond the scope of input/output processing.
type Duration time.Duration
var durationRE = regexp.MustCompile("^([0-9]+)(d|h|m|s|ms)$")
var durationRE = regexp.MustCompile("^([0-9]+)(y|w|d|h|m|s|ms)$")
// StringToDuration parses a string into a time.Duration, assuming that a year
// a day always has 24h.
// always has 365d, a week always has 7d, and a day always has 24h.
func ParseDuration(durationStr string) (Duration, error) {
matches := durationRE.FindStringSubmatch(durationStr)
if len(matches) != 3 {
@ -177,6 +177,10 @@ func ParseDuration(durationStr string) (Duration, error) {
dur = time.Duration(n) * time.Millisecond
)
switch unit := matches[2]; unit {
case "y":
dur *= 1000 * 60 * 60 * 24 * 365
case "w":
dur *= 1000 * 60 * 60 * 24 * 7
case "d":
dur *= 1000 * 60 * 60 * 24
case "h":
@ -199,6 +203,8 @@ func (d Duration) String() string {
unit = "ms"
)
factors := map[string]int64{
"y": 1000 * 60 * 60 * 24 * 365,
"w": 1000 * 60 * 60 * 24 * 7,
"d": 1000 * 60 * 60 * 24,
"h": 1000 * 60 * 60,
"m": 1000 * 60,
@ -207,6 +213,10 @@ func (d Duration) String() string {
}
switch int64(0) {
case ms % factors["y"]:
unit = "y"
case ms % factors["w"]:
unit = "w"
case ms % factors["d"]:
unit = "d"
case ms % factors["h"]:

4
vendor/vendor.json vendored
View file

@ -174,8 +174,8 @@
},
{
"path": "github.com/prometheus/common/model",
"revision": "b0d797186bfbaf6d785031c6c2d32f75c720007d",
"revisionTime": "2016-01-22T12:15:42+01:00"
"revision": "0e53cc19aa67dd2e8587a26e28643cb152f5403d",
"revisionTime": "2016-01-29T15:16:16+01:00"
},
{
"path": "github.com/prometheus/common/route",

View file

@ -18,7 +18,6 @@ import (
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/util/httputil"
"github.com/prometheus/prometheus/util/strutil"
)
type status string
@ -324,8 +323,8 @@ func parseDuration(s string) (time.Duration, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
return time.Duration(d * float64(time.Second)), nil
}
if d, err := strutil.StringToDuration(s); err == nil {
return d, nil
if d, err := model.ParseDuration(s); err == nil {
return time.Duration(d), nil
}
return 0, fmt.Errorf("cannot parse %q to a valid duration", s)
}

View file

@ -41,7 +41,7 @@
<li><a href="{{ pathPrefix }}/graph">Graph</a></li>
<li><a href="{{ pathPrefix }}/status">Status</a></li>
<li>
<a href="http://prometheus.io" target="_blank">Help</a>
<a href="https://prometheus.io" target="_blank">Help</a>
</li>
</ul>
</div>