Merge pull request #596 from prometheus/beorn7/ingestion-tweaks

Remove the ingestion channel.
This commit is contained in:
Björn Rabenstein 2015-03-19 17:56:55 +01:00
commit 942686427d
31 changed files with 645 additions and 581 deletions

136
main.go
View file

@ -24,13 +24,13 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
registry "github.com/prometheus/client_golang/prometheus" registry "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notification" "github.com/prometheus/prometheus/notification"
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules/manager" "github.com/prometheus/prometheus/rules/manager"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/opentsdb" "github.com/prometheus/prometheus/storage/remote/opentsdb"
@ -52,38 +52,22 @@ var (
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.") remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.") remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.")
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 1024*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.") maxChunksToPersist = flag.Int("storage.local.max-chunks-to-persist", 1024*1024, "How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.")
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.") checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.")
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.") checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
seriesSyncStrategy = flag.String("storage.local.series-sync-strategy", "adaptive", "When to sync series files after modification. Possible values: 'never', 'always', 'adaptive'. Sync'ing slows down storage performance but reduces the risk of data loss in case of an OS crash. With the 'adaptive' strategy, series files are sync'd for as long as the storage is not too much behind on chunk persistence.")
storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.") storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.")
storagePedanticChecks = flag.Bool("storage.local.pedantic-checks", false, "If set, a crash recovery will perform checks on each series file. This might take a very long time.")
printVersion = flag.Bool("version", false, "Print version information.") printVersion = flag.Bool("version", false, "Print version information.")
) )
// Instrumentation.
var (
samplesQueueCapDesc = registry.NewDesc(
"prometheus_samples_queue_capacity",
"Capacity of the queue for unwritten samples.",
nil, nil,
)
samplesQueueLenDesc = registry.NewDesc(
"prometheus_samples_queue_length",
"Current number of items in the queue for unwritten samples. Each item comprises all samples exposed by one target as one metric family (i.e. metrics of the same name).",
nil, nil,
)
)
type prometheus struct { type prometheus struct {
incomingSamples chan clientmodel.Samples
ruleManager manager.RuleManager ruleManager manager.RuleManager
targetManager retrieval.TargetManager targetManager retrieval.TargetManager
notificationHandler *notification.NotificationHandler notificationHandler *notification.NotificationHandler
@ -103,34 +87,55 @@ func NewPrometheus() *prometheus {
glog.Fatalf("Error loading configuration from %s: %v", *configFile, err) glog.Fatalf("Error loading configuration from %s: %v", *configFile, err)
} }
incomingSamples := make(chan clientmodel.Samples, *samplesQueueCapacity)
ingester := &retrieval.MergeLabelsIngester{
Labels: conf.GlobalLabels(),
CollisionPrefix: clientmodel.ExporterLabelPrefix,
Ingester: retrieval.ChannelIngester(incomingSamples),
}
targetManager := retrieval.NewTargetManager(ingester)
targetManager.AddTargetsFromConfig(conf)
notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity) notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity)
var syncStrategy local.SyncStrategy
switch *seriesSyncStrategy {
case "never":
syncStrategy = local.Never
case "always":
syncStrategy = local.Always
case "adaptive":
syncStrategy = local.Adaptive
default:
glog.Fatalf("Invalid flag value for 'storage.local.series-sync-strategy': %s", *seriesSyncStrategy)
}
o := &local.MemorySeriesStorageOptions{ o := &local.MemorySeriesStorageOptions{
MemoryChunks: *numMemoryChunks, MemoryChunks: *numMemoryChunks,
MaxChunksToPersist: *maxChunksToPersist,
PersistenceStoragePath: *persistenceStoragePath, PersistenceStoragePath: *persistenceStoragePath,
PersistenceRetentionPeriod: *persistenceRetentionPeriod, PersistenceRetentionPeriod: *persistenceRetentionPeriod,
PersistenceQueueCapacity: *persistenceQueueCapacity,
CheckpointInterval: *checkpointInterval, CheckpointInterval: *checkpointInterval,
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit, CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
Dirty: *storageDirty, Dirty: *storageDirty,
PedanticChecks: *storagePedanticChecks,
SyncStrategy: syncStrategy,
} }
memStorage, err := local.NewMemorySeriesStorage(o) memStorage, err := local.NewMemorySeriesStorage(o)
if err != nil { if err != nil {
glog.Fatal("Error opening memory series storage: ", err) glog.Fatal("Error opening memory series storage: ", err)
} }
var sampleAppender storage.SampleAppender
var remoteTSDBQueue *remote.TSDBQueueManager
if *remoteTSDBUrl == "" {
glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage")
sampleAppender = memStorage
} else {
openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout)
remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 100*1024)
sampleAppender = storage.Tee{
Appender1: remoteTSDBQueue,
Appender2: memStorage,
}
}
targetManager := retrieval.NewTargetManager(sampleAppender, conf.GlobalLabels())
targetManager.AddTargetsFromConfig(conf)
ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{ ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{
Results: incomingSamples, SampleAppender: sampleAppender,
NotificationHandler: notificationHandler, NotificationHandler: notificationHandler,
EvaluationInterval: conf.EvaluationInterval(), EvaluationInterval: conf.EvaluationInterval(),
Storage: memStorage, Storage: memStorage,
@ -140,14 +145,6 @@ func NewPrometheus() *prometheus {
glog.Fatal("Error loading rule files: ", err) glog.Fatal("Error loading rule files: ", err)
} }
var remoteTSDBQueue *remote.TSDBQueueManager
if *remoteTSDBUrl == "" {
glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage")
} else {
openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout)
remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 512)
}
flags := map[string]string{} flags := map[string]string{}
flag.VisitAll(func(f *flag.Flag) { flag.VisitAll(func(f *flag.Flag) {
flags[f.Name] = f.Value.String() flags[f.Name] = f.Value.String()
@ -183,8 +180,6 @@ func NewPrometheus() *prometheus {
} }
p := &prometheus{ p := &prometheus{
incomingSamples: incomingSamples,
ruleManager: ruleManager, ruleManager: ruleManager,
targetManager: targetManager, targetManager: targetManager,
notificationHandler: notificationHandler, notificationHandler: notificationHandler,
@ -193,7 +188,7 @@ func NewPrometheus() *prometheus {
webService: webService, webService: webService,
} }
webService.QuitDelegate = p.Close webService.QuitChan = make(chan struct{})
return p return p
} }
@ -206,7 +201,6 @@ func (p *prometheus) Serve() {
} }
go p.ruleManager.Run() go p.ruleManager.Run()
go p.notificationHandler.Run() go p.notificationHandler.Run()
go p.interruptHandler()
p.storage.Start() p.storage.Start()
@ -217,15 +211,18 @@ func (p *prometheus) Serve() {
} }
}() }()
for samples := range p.incomingSamples { notifier := make(chan os.Signal)
p.storage.AppendSamples(samples) signal.Notify(notifier, os.Interrupt, syscall.SIGTERM)
if p.remoteTSDBQueue != nil { select {
p.remoteTSDBQueue.Queue(samples) case <-notifier:
} glog.Warning("Received SIGTERM, exiting gracefully...")
case <-p.webService.QuitChan:
glog.Warning("Received termination request via web service, exiting gracefully...")
} }
// The following shut-down operations have to happen after p.targetManager.Stop()
// incomingSamples is drained. So do not move them into close(). p.ruleManager.Stop()
if err := p.storage.Stop(); err != nil { if err := p.storage.Stop(); err != nil {
glog.Error("Error stopping local storage: ", err) glog.Error("Error stopping local storage: ", err)
} }
@ -238,35 +235,8 @@ func (p *prometheus) Serve() {
glog.Info("See you next time!") glog.Info("See you next time!")
} }
// Close cleanly shuts down the Prometheus server.
func (p *prometheus) Close() {
p.closeOnce.Do(p.close)
}
func (p *prometheus) interruptHandler() {
notifier := make(chan os.Signal)
signal.Notify(notifier, os.Interrupt, syscall.SIGTERM)
<-notifier
glog.Warning("Received SIGTERM, exiting gracefully...")
p.Close()
}
func (p *prometheus) close() {
glog.Info("Shutdown has been requested; subsytems are closing:")
p.targetManager.Stop()
p.ruleManager.Stop()
close(p.incomingSamples)
// Note: Before closing the remaining subsystems (storage, ...), we have
// to wait until p.incomingSamples is actually drained. Therefore,
// remaining shut-downs happen in Serve().
}
// Describe implements registry.Collector. // Describe implements registry.Collector.
func (p *prometheus) Describe(ch chan<- *registry.Desc) { func (p *prometheus) Describe(ch chan<- *registry.Desc) {
ch <- samplesQueueCapDesc
ch <- samplesQueueLenDesc
p.notificationHandler.Describe(ch) p.notificationHandler.Describe(ch)
p.storage.Describe(ch) p.storage.Describe(ch)
if p.remoteTSDBQueue != nil { if p.remoteTSDBQueue != nil {
@ -276,16 +246,6 @@ func (p *prometheus) Describe(ch chan<- *registry.Desc) {
// Collect implements registry.Collector. // Collect implements registry.Collector.
func (p *prometheus) Collect(ch chan<- registry.Metric) { func (p *prometheus) Collect(ch chan<- registry.Metric) {
ch <- registry.MustNewConstMetric(
samplesQueueCapDesc,
registry.GaugeValue,
float64(cap(p.incomingSamples)),
)
ch <- registry.MustNewConstMetric(
samplesQueueLenDesc,
registry.GaugeValue,
float64(len(p.incomingSamples)),
)
p.notificationHandler.Collect(ch) p.notificationHandler.Collect(ch)
p.storage.Collect(ch) p.storage.Collect(ch)
if p.remoteTSDBQueue != nil { if p.remoteTSDBQueue != nil {

View file

@ -14,11 +14,27 @@
package retrieval package retrieval
import ( import (
"time"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
) )
type nopIngester struct{} type nopAppender struct{}
func (i nopIngester) Ingest(clientmodel.Samples) error { func (a nopAppender) Append(*clientmodel.Sample) {
return nil }
type slowAppender struct{}
func (a slowAppender) Append(*clientmodel.Sample) {
time.Sleep(time.Millisecond)
return
}
type collectResultAppender struct {
result clientmodel.Samples
}
func (a *collectResultAppender) Append(s *clientmodel.Sample) {
a.result = append(a.result, s)
} }

View file

@ -1,71 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"errors"
"time"
"github.com/prometheus/client_golang/extraction"
clientmodel "github.com/prometheus/client_golang/model"
)
const ingestTimeout = 100 * time.Millisecond // TODO(beorn7): Adjust this to a fraction of the actual HTTP timeout.
var errIngestChannelFull = errors.New("ingestion channel full")
// MergeLabelsIngester merges a labelset ontop of a given extraction result and
// passes the result on to another ingester. Label collisions are avoided by
// appending a label prefix to any newly merged colliding labels.
type MergeLabelsIngester struct {
Labels clientmodel.LabelSet
CollisionPrefix clientmodel.LabelName
Ingester extraction.Ingester
}
// Ingest ingests the provided extraction result by merging in i.Labels and then
// handing it over to i.Ingester.
func (i *MergeLabelsIngester) Ingest(samples clientmodel.Samples) error {
for _, s := range samples {
s.Metric.MergeFromLabelSet(i.Labels, i.CollisionPrefix)
}
return i.Ingester.Ingest(samples)
}
// ChannelIngester feeds results into a channel without modifying them.
type ChannelIngester chan<- clientmodel.Samples
// Ingest ingests the provided extraction result by sending it to its channel.
// If the channel was not able to receive the samples within the ingestTimeout,
// an error is returned. This is important to fail fast and to not pile up
// ingestion requests in case of overload.
func (i ChannelIngester) Ingest(s clientmodel.Samples) error {
// Since the regular case is that i is ready to receive, first try
// without setting a timeout so that we don't need to allocate a timer
// most of the time.
select {
case i <- s:
return nil
default:
select {
case i <- s:
return nil
case <-time.After(ingestTimeout):
return errIngestChannelFull
}
}
}

View file

@ -14,6 +14,7 @@
package retrieval package retrieval
import ( import (
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net/http" "net/http"
@ -29,6 +30,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
@ -41,6 +43,8 @@ const (
// ScrapeTimeMetricName is the metric name for the synthetic scrape duration // ScrapeTimeMetricName is the metric name for the synthetic scrape duration
// variable. // variable.
scrapeDurationMetricName clientmodel.LabelValue = "scrape_duration_seconds" scrapeDurationMetricName clientmodel.LabelValue = "scrape_duration_seconds"
// Capacity of the channel to buffer samples during ingestion.
ingestedSamplesCap = 256
// Constants for instrumentation. // Constants for instrumentation.
namespace = "prometheus" namespace = "prometheus"
@ -48,6 +52,8 @@ const (
) )
var ( var (
errIngestChannelFull = errors.New("ingestion channel full")
localhostRepresentations = []string{"http://127.0.0.1", "http://localhost"} localhostRepresentations = []string{"http://127.0.0.1", "http://localhost"}
targetIntervalLength = prometheus.NewSummaryVec( targetIntervalLength = prometheus.NewSummaryVec(
@ -100,6 +106,8 @@ const (
// For the future, the Target protocol will abstract away the exact means that // For the future, the Target protocol will abstract away the exact means that
// metrics are retrieved and deserialized from the given instance to which it // metrics are retrieved and deserialized from the given instance to which it
// refers. // refers.
//
// Target implements extraction.Ingester.
type Target interface { type Target interface {
// Return the last encountered scrape error, if any. // Return the last encountered scrape error, if any.
LastError() error LastError() error
@ -118,6 +126,9 @@ type Target interface {
GlobalURL() string GlobalURL() string
// Return the target's base labels. // Return the target's base labels.
BaseLabels() clientmodel.LabelSet BaseLabels() clientmodel.LabelSet
// Return the target's base labels without job and instance label. That's
// useful for display purposes.
BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet
// SetBaseLabelsFrom queues a replacement of the current base labels by // SetBaseLabelsFrom queues a replacement of the current base labels by
// the labels of the given target. The method returns immediately after // the labels of the given target. The method returns immediately after
// queuing. The actual replacement of the base labels happens // queuing. The actual replacement of the base labels happens
@ -125,9 +136,11 @@ type Target interface {
// begins). // begins).
SetBaseLabelsFrom(Target) SetBaseLabelsFrom(Target)
// Scrape target at the specified interval. // Scrape target at the specified interval.
RunScraper(extraction.Ingester, time.Duration) RunScraper(storage.SampleAppender, time.Duration)
// Stop scraping, synchronous. // Stop scraping, synchronous.
StopScraper() StopScraper()
// Ingest implements extraction.Ingester.
Ingest(clientmodel.Samples) error
} }
// target is a Target that refers to a singular HTTP or HTTPS endpoint. // target is a Target that refers to a singular HTTP or HTTPS endpoint.
@ -144,10 +157,12 @@ type target struct {
scraperStopped chan struct{} scraperStopped chan struct{}
// Channel to queue base labels to be replaced. // Channel to queue base labels to be replaced.
newBaseLabels chan clientmodel.LabelSet newBaseLabels chan clientmodel.LabelSet
// Channel to buffer ingested samples.
ingestedSamples chan clientmodel.Samples
url string url string
// What is the deadline for the HTTP or HTTPS against this endpoint. // What is the deadline for the HTTP or HTTPS against this endpoint.
Deadline time.Duration deadline time.Duration
// Any base labels that are added to this target and its metrics. // Any base labels that are added to this target and its metrics.
baseLabels clientmodel.LabelSet baseLabels clientmodel.LabelSet
// The HTTP client used to scrape the target's endpoint. // The HTTP client used to scrape the target's endpoint.
@ -163,52 +178,41 @@ type target struct {
// NewTarget creates a reasonably configured target for querying. // NewTarget creates a reasonably configured target for querying.
func NewTarget(url string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target { func NewTarget(url string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target {
target := &target{ t := &target{
url: url, url: url,
Deadline: deadline, deadline: deadline,
baseLabels: baseLabels,
httpClient: utility.NewDeadlineClient(deadline), httpClient: utility.NewDeadlineClient(deadline),
scraperStopping: make(chan struct{}), scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}), scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1), newBaseLabels: make(chan clientmodel.LabelSet, 1),
} }
t.baseLabels = clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())}
return target for baseLabel, baseValue := range baseLabels {
t.baseLabels[baseLabel] = baseValue
}
return t
} }
func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) { // Ingest implements Target and extraction.Ingester.
healthMetric := clientmodel.Metric{} func (t *target) Ingest(s clientmodel.Samples) error {
durationMetric := clientmodel.Metric{} // Since the regular case is that ingestedSamples is ready to receive,
for label, value := range t.baseLabels { // first try without setting a timeout so that we don't need to allocate
healthMetric[label] = value // a timer most of the time.
durationMetric[label] = value select {
case t.ingestedSamples <- s:
return nil
default:
select {
case t.ingestedSamples <- s:
return nil
case <-time.After(t.deadline / 10):
return errIngestChannelFull
} }
healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName)
durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName)
healthMetric[InstanceLabel] = clientmodel.LabelValue(t.InstanceIdentifier())
durationMetric[InstanceLabel] = clientmodel.LabelValue(t.InstanceIdentifier())
healthValue := clientmodel.SampleValue(0)
if healthy {
healthValue = clientmodel.SampleValue(1)
} }
healthSample := &clientmodel.Sample{
Metric: healthMetric,
Timestamp: timestamp,
Value: healthValue,
}
durationSample := &clientmodel.Sample{
Metric: durationMetric,
Timestamp: timestamp,
Value: clientmodel.SampleValue(float64(scrapeDuration) / float64(time.Second)),
}
ingester.Ingest(clientmodel.Samples{healthSample, durationSample})
} }
// RunScraper implements Target. // RunScraper implements Target.
func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration) { func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time.Duration) {
defer func() { defer func() {
// Need to drain t.newBaseLabels to not make senders block during shutdown. // Need to drain t.newBaseLabels to not make senders block during shutdown.
for { for {
@ -237,7 +241,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
t.Lock() // Writing t.lastScrape requires the lock. t.Lock() // Writing t.lastScrape requires the lock.
t.lastScrape = time.Now() t.lastScrape = time.Now()
t.Unlock() t.Unlock()
t.scrape(ingester) t.scrape(sampleAppender)
// Explanation of the contraption below: // Explanation of the contraption below:
// //
@ -271,7 +275,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
targetIntervalLength.WithLabelValues(interval.String()).Observe( targetIntervalLength.WithLabelValues(interval.String()).Observe(
float64(took) / float64(time.Second), // Sub-second precision. float64(took) / float64(time.Second), // Sub-second precision.
) )
t.scrape(ingester) t.scrape(sampleAppender)
} }
} }
} }
@ -285,7 +289,7 @@ func (t *target) StopScraper() {
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
func (t *target) scrape(ingester extraction.Ingester) (err error) { func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) {
timestamp := clientmodel.Now() timestamp := clientmodel.Now()
defer func(start time.Time) { defer func(start time.Time) {
t.Lock() // Writing t.state and t.lastError requires the lock. t.Lock() // Writing t.state and t.lastError requires the lock.
@ -296,7 +300,7 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
} }
t.lastError = err t.lastError = err
t.Unlock() t.Unlock()
t.recordScrapeHealth(ingester, timestamp, err == nil, time.Since(start)) t.recordScrapeHealth(sampleAppender, timestamp, err == nil, time.Since(start))
}(time.Now()) }(time.Now())
req, err := http.NewRequest("GET", t.URL(), nil) req, err := http.NewRequest("GET", t.URL(), nil)
@ -319,21 +323,23 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
return err return err
} }
baseLabels := clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())} t.ingestedSamples = make(chan clientmodel.Samples, ingestedSamplesCap)
for baseLabel, baseValue := range t.baseLabels {
baseLabels[baseLabel] = baseValue
}
i := &MergeLabelsIngester{
Labels: baseLabels,
CollisionPrefix: clientmodel.ExporterLabelPrefix,
Ingester: ingester,
}
processOptions := &extraction.ProcessOptions{ processOptions := &extraction.ProcessOptions{
Timestamp: timestamp, Timestamp: timestamp,
} }
return processor.ProcessSingle(resp.Body, i, processOptions) go func() {
err = processor.ProcessSingle(resp.Body, t, processOptions)
close(t.ingestedSamples)
}()
for samples := range t.ingestedSamples {
for _, s := range samples {
s.Metric.MergeFromLabelSet(t.baseLabels, clientmodel.ExporterLabelPrefix)
sampleAppender.Append(s)
}
}
return err
} }
// LastError implements Target. // LastError implements Target.
@ -405,6 +411,17 @@ func (t *target) BaseLabels() clientmodel.LabelSet {
return t.baseLabels return t.baseLabels
} }
// BaseLabelsWithoutJobAndInstance implements Target.
func (t *target) BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet {
ls := clientmodel.LabelSet{}
for ln, lv := range t.BaseLabels() {
if ln != clientmodel.JobLabel && ln != InstanceLabel {
ls[ln] = lv
}
}
return ls
}
// SetBaseLabelsFrom implements Target. // SetBaseLabelsFrom implements Target.
func (t *target) SetBaseLabelsFrom(newTarget Target) { func (t *target) SetBaseLabelsFrom(newTarget Target) {
if t.URL() != newTarget.URL() { if t.URL() != newTarget.URL() {
@ -412,3 +429,33 @@ func (t *target) SetBaseLabelsFrom(newTarget Target) {
} }
t.newBaseLabels <- newTarget.BaseLabels() t.newBaseLabels <- newTarget.BaseLabels()
} }
func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) {
healthMetric := clientmodel.Metric{}
durationMetric := clientmodel.Metric{}
for label, value := range t.baseLabels {
healthMetric[label] = value
durationMetric[label] = value
}
healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName)
durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName)
healthValue := clientmodel.SampleValue(0)
if healthy {
healthValue = clientmodel.SampleValue(1)
}
healthSample := &clientmodel.Sample{
Metric: healthMetric,
Timestamp: timestamp,
Value: healthValue,
}
durationSample := &clientmodel.Sample{
Metric: durationMetric,
Timestamp: timestamp,
Value: clientmodel.SampleValue(float64(scrapeDuration) / float64(time.Second)),
}
sampleAppender.Append(healthSample)
sampleAppender.Append(durationSample)
}

View file

@ -60,7 +60,7 @@ type TargetProvider interface {
type sdTargetProvider struct { type sdTargetProvider struct {
job config.JobConfig job config.JobConfig
globalLabels clientmodel.LabelSet
targets []Target targets []Target
lastRefresh time.Time lastRefresh time.Time
@ -68,13 +68,14 @@ type sdTargetProvider struct {
} }
// NewSdTargetProvider constructs a new sdTargetProvider for a job. // NewSdTargetProvider constructs a new sdTargetProvider for a job.
func NewSdTargetProvider(job config.JobConfig) *sdTargetProvider { func NewSdTargetProvider(job config.JobConfig, globalLabels clientmodel.LabelSet) *sdTargetProvider {
i, err := utility.StringToDuration(job.GetSdRefreshInterval()) i, err := utility.StringToDuration(job.GetSdRefreshInterval())
if err != nil { if err != nil {
panic(fmt.Sprintf("illegal refresh duration string %s: %s", job.GetSdRefreshInterval(), err)) panic(fmt.Sprintf("illegal refresh duration string %s: %s", job.GetSdRefreshInterval(), err))
} }
return &sdTargetProvider{ return &sdTargetProvider{
job: job, job: job,
globalLabels: globalLabels,
refreshInterval: i, refreshInterval: i,
} }
} }
@ -101,6 +102,9 @@ func (p *sdTargetProvider) Targets() ([]Target, error) {
baseLabels := clientmodel.LabelSet{ baseLabels := clientmodel.LabelSet{
clientmodel.JobLabel: clientmodel.LabelValue(p.job.GetName()), clientmodel.JobLabel: clientmodel.LabelValue(p.job.GetName()),
} }
for n, v := range p.globalLabels {
baseLabels[n] = v
}
targets := make([]Target, 0, len(response.Answer)) targets := make([]Target, 0, len(response.Answer))
endpoint := &url.URL{ endpoint := &url.URL{

View file

@ -15,8 +15,10 @@ package retrieval
import ( import (
"errors" "errors"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"reflect"
"testing" "testing"
"time" "time"
@ -25,13 +27,19 @@ import (
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
type collectResultIngester struct { func TestBaseLabels(t *testing.T) {
result clientmodel.Samples target := NewTarget("http://example.com/metrics", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"})
} want := clientmodel.LabelSet{"job": "some_job", "foo": "bar", "instance": "example.com:80"}
got := target.BaseLabels()
func (i *collectResultIngester) Ingest(s clientmodel.Samples) error { if !reflect.DeepEqual(want, got) {
i.result = s t.Errorf("want base labels %v, got %v", want, got)
return nil }
delete(want, "job")
delete(want, "instance")
got = target.BaseLabelsWithoutJobAndInstance()
if !reflect.DeepEqual(want, got) {
t.Errorf("want base labels %v, got %v", want, got)
}
} }
func TestTargetHidesURLAuth(t *testing.T) { func TestTargetHidesURLAuth(t *testing.T) {
@ -60,7 +68,7 @@ func TestTargetScrapeUpdatesState(t *testing.T) {
url: "bad schema", url: "bad schema",
httpClient: utility.NewDeadlineClient(0), httpClient: utility.NewDeadlineClient(0),
} }
testTarget.scrape(nopIngester{}) testTarget.scrape(nopAppender{})
if testTarget.state != Unhealthy { if testTarget.state != Unhealthy {
t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state) t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state)
} }
@ -71,7 +79,11 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
http.HandlerFunc( http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) { func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`) w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n")) for i := 0; i < 2*ingestedSamplesCap; i++ {
w.Write([]byte(
fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i),
))
}
}, },
), ),
) )
@ -79,11 +91,11 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
testTarget := NewTarget( testTarget := NewTarget(
server.URL, server.URL,
100*time.Millisecond, 10*time.Millisecond,
clientmodel.LabelSet{"dings": "bums"}, clientmodel.LabelSet{"dings": "bums"},
).(*target) ).(*target)
testTarget.scrape(ChannelIngester(make(chan clientmodel.Samples))) // Capacity 0. testTarget.scrape(slowAppender{})
if testTarget.state != Unhealthy { if testTarget.state != Unhealthy {
t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state) t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state)
} }
@ -93,17 +105,15 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
} }
func TestTargetRecordScrapeHealth(t *testing.T) { func TestTargetRecordScrapeHealth(t *testing.T) {
testTarget := target{ testTarget := NewTarget(
url: "http://example.url", "http://example.url", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"},
baseLabels: clientmodel.LabelSet{clientmodel.JobLabel: "testjob"}, ).(*target)
httpClient: utility.NewDeadlineClient(0),
}
now := clientmodel.Now() now := clientmodel.Now()
ingester := &collectResultIngester{} appender := &collectResultAppender{}
testTarget.recordScrapeHealth(ingester, now, true, 2*time.Second) testTarget.recordScrapeHealth(appender, now, true, 2*time.Second)
result := ingester.result result := appender.result
if len(result) != 2 { if len(result) != 2 {
t.Fatalf("Expected two samples, got %d", len(result)) t.Fatalf("Expected two samples, got %d", len(result))
@ -154,11 +164,11 @@ func TestTargetScrapeTimeout(t *testing.T) {
defer server.Close() defer server.Close()
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
ingester := nopIngester{} appender := nopAppender{}
// scrape once without timeout // scrape once without timeout
signal <- true signal <- true
if err := testTarget.(*target).scrape(ingester); err != nil { if err := testTarget.(*target).scrape(appender); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -167,12 +177,12 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again // now scrape again
signal <- true signal <- true
if err := testTarget.(*target).scrape(ingester); err != nil { if err := testTarget.(*target).scrape(appender); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// now timeout // now timeout
if err := testTarget.(*target).scrape(ingester); err == nil { if err := testTarget.(*target).scrape(appender); err == nil {
t.Fatal("expected scrape to timeout") t.Fatal("expected scrape to timeout")
} else { } else {
signal <- true // let handler continue signal <- true // let handler continue
@ -180,7 +190,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again without timeout // now scrape again without timeout
signal <- true signal <- true
if err := testTarget.(*target).scrape(ingester); err != nil { if err := testTarget.(*target).scrape(appender); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -196,10 +206,10 @@ func TestTargetScrape404(t *testing.T) {
defer server.Close() defer server.Close()
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
ingester := nopIngester{} appender := nopAppender{}
want := errors.New("server returned HTTP status 404 Not Found") want := errors.New("server returned HTTP status 404 Not Found")
got := testTarget.(*target).scrape(ingester) got := testTarget.(*target).scrape(appender)
if got == nil || want.Error() != got.Error() { if got == nil || want.Error() != got.Error() {
t.Fatalf("want err %q, got %q", want, got) t.Fatalf("want err %q, got %q", want, got)
} }
@ -213,7 +223,7 @@ func TestTargetRunScraperScrapes(t *testing.T) {
scraperStopping: make(chan struct{}), scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}), scraperStopped: make(chan struct{}),
} }
go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond)) go testTarget.RunScraper(nopAppender{}, time.Duration(time.Millisecond))
// Enough time for a scrape to happen. // Enough time for a scrape to happen.
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
@ -248,11 +258,11 @@ func BenchmarkScrape(b *testing.B) {
100*time.Millisecond, 100*time.Millisecond,
clientmodel.LabelSet{"dings": "bums"}, clientmodel.LabelSet{"dings": "bums"},
) )
ingester := nopIngester{} appender := nopAppender{}
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
if err := testTarget.(*target).scrape(ingester); err != nil { if err := testTarget.(*target).scrape(appender); err != nil {
b.Fatal(err) b.Fatal(err)
} }
} }

View file

@ -17,11 +17,11 @@ import (
"sync" "sync"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/extraction"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
) )
// TargetManager manages all scrape targets. All methods are goroutine-safe. // TargetManager manages all scrape targets. All methods are goroutine-safe.
@ -36,14 +36,16 @@ type TargetManager interface {
type targetManager struct { type targetManager struct {
sync.Mutex // Protects poolByJob. sync.Mutex // Protects poolByJob.
globalLabels clientmodel.LabelSet
sampleAppender storage.SampleAppender
poolsByJob map[string]*TargetPool poolsByJob map[string]*TargetPool
ingester extraction.Ingester
} }
// NewTargetManager returns a newly initialized TargetManager ready to use. // NewTargetManager returns a newly initialized TargetManager ready to use.
func NewTargetManager(ingester extraction.Ingester) TargetManager { func NewTargetManager(sampleAppender storage.SampleAppender, globalLabels clientmodel.LabelSet) TargetManager {
return &targetManager{ return &targetManager{
ingester: ingester, sampleAppender: sampleAppender,
globalLabels: globalLabels,
poolsByJob: make(map[string]*TargetPool), poolsByJob: make(map[string]*TargetPool),
} }
} }
@ -54,11 +56,11 @@ func (m *targetManager) targetPoolForJob(job config.JobConfig) *TargetPool {
if !ok { if !ok {
var provider TargetProvider var provider TargetProvider
if job.SdName != nil { if job.SdName != nil {
provider = NewSdTargetProvider(job) provider = NewSdTargetProvider(job, m.globalLabels)
} }
interval := job.ScrapeInterval() interval := job.ScrapeInterval()
targetPool = NewTargetPool(m, provider, m.ingester, interval) targetPool = NewTargetPool(provider, m.sampleAppender, interval)
glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName()) glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName())
m.poolsByJob[job.GetName()] = targetPool m.poolsByJob[job.GetName()] = targetPool
@ -102,6 +104,9 @@ func (m *targetManager) AddTargetsFromConfig(config config.Config) {
baseLabels := clientmodel.LabelSet{ baseLabels := clientmodel.LabelSet{
clientmodel.JobLabel: clientmodel.LabelValue(job.GetName()), clientmodel.JobLabel: clientmodel.LabelValue(job.GetName()),
} }
for n, v := range m.globalLabels {
baseLabels[n] = v
}
if targetGroup.Labels != nil { if targetGroup.Labels != nil {
for _, label := range targetGroup.Labels.Label { for _, label := range targetGroup.Labels.Label {
baseLabels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue()) baseLabels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue())

View file

@ -21,9 +21,8 @@ import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/client_golang/extraction"
pb "github.com/prometheus/prometheus/config/generated" pb "github.com/prometheus/prometheus/config/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
@ -54,6 +53,10 @@ func (t fakeTarget) BaseLabels() clientmodel.LabelSet {
return clientmodel.LabelSet{} return clientmodel.LabelSet{}
} }
func (t fakeTarget) BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet {
return clientmodel.LabelSet{}
}
func (t fakeTarget) Interval() time.Duration { func (t fakeTarget) Interval() time.Duration {
return t.interval return t.interval
} }
@ -62,13 +65,13 @@ func (t fakeTarget) LastScrape() time.Time {
return t.lastScrape return t.lastScrape
} }
func (t fakeTarget) scrape(i extraction.Ingester) error { func (t fakeTarget) scrape(storage.SampleAppender) error {
t.scrapeCount++ t.scrapeCount++
return nil return nil
} }
func (t fakeTarget) RunScraper(ingester extraction.Ingester, interval time.Duration) { func (t fakeTarget) RunScraper(storage.SampleAppender, time.Duration) {
return return
} }
@ -82,8 +85,10 @@ func (t fakeTarget) State() TargetState {
func (t *fakeTarget) SetBaseLabelsFrom(newTarget Target) {} func (t *fakeTarget) SetBaseLabelsFrom(newTarget Target) {}
func (t *fakeTarget) Ingest(clientmodel.Samples) error { return nil }
func testTargetManager(t testing.TB) { func testTargetManager(t testing.TB) {
targetManager := NewTargetManager(nopIngester{}) targetManager := NewTargetManager(nopAppender{}, nil)
testJob1 := config.JobConfig{ testJob1 := config.JobConfig{
JobConfig: pb.JobConfig{ JobConfig: pb.JobConfig{
Name: proto.String("test_job1"), Name: proto.String("test_job1"),

View file

@ -19,7 +19,8 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
@ -35,7 +36,7 @@ type TargetPool struct {
manager TargetManager manager TargetManager
targetsByURL map[string]Target targetsByURL map[string]Target
interval time.Duration interval time.Duration
ingester extraction.Ingester sampleAppender storage.SampleAppender
addTargetQueue chan Target addTargetQueue chan Target
targetProvider TargetProvider targetProvider TargetProvider
@ -44,11 +45,10 @@ type TargetPool struct {
} }
// NewTargetPool creates a TargetPool, ready to be started by calling Run. // NewTargetPool creates a TargetPool, ready to be started by calling Run.
func NewTargetPool(m TargetManager, p TargetProvider, ing extraction.Ingester, i time.Duration) *TargetPool { func NewTargetPool(p TargetProvider, app storage.SampleAppender, i time.Duration) *TargetPool {
return &TargetPool{ return &TargetPool{
manager: m,
interval: i, interval: i,
ingester: ing, sampleAppender: app,
targetsByURL: make(map[string]Target), targetsByURL: make(map[string]Target),
addTargetQueue: make(chan Target, targetAddQueueSize), addTargetQueue: make(chan Target, targetAddQueueSize),
targetProvider: p, targetProvider: p,
@ -100,7 +100,7 @@ func (p *TargetPool) addTarget(target Target) {
defer p.Unlock() defer p.Unlock()
p.targetsByURL[target.URL()] = target p.targetsByURL[target.URL()] = target
go target.RunScraper(p.ingester, p.interval) go target.RunScraper(p.sampleAppender, p.interval)
} }
// ReplaceTargets replaces the old targets by the provided new ones but reuses // ReplaceTargets replaces the old targets by the provided new ones but reuses
@ -118,7 +118,7 @@ func (p *TargetPool) ReplaceTargets(newTargets []Target) {
oldTarget.SetBaseLabelsFrom(newTarget) oldTarget.SetBaseLabelsFrom(newTarget)
} else { } else {
p.targetsByURL[newTarget.URL()] = newTarget p.targetsByURL[newTarget.URL()] = newTarget
go newTarget.RunScraper(p.ingester, p.interval) go newTarget.RunScraper(p.sampleAppender, p.interval)
} }
} }

View file

@ -80,7 +80,7 @@ func testTargetPool(t testing.TB) {
} }
for i, scenario := range scenarios { for i, scenario := range scenarios {
pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1)) pool := NewTargetPool(nil, nopAppender{}, time.Duration(1))
for _, input := range scenario.inputs { for _, input := range scenario.inputs {
target := target{ target := target{
@ -112,7 +112,7 @@ func TestTargetPool(t *testing.T) {
} }
func TestTargetPoolReplaceTargets(t *testing.T) { func TestTargetPoolReplaceTargets(t *testing.T) {
pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1)) pool := NewTargetPool(nil, nopAppender{}, time.Duration(1))
oldTarget1 := &target{ oldTarget1 := &target{
url: "example1", url: "example1",
state: Unhealthy, state: Unhealthy,

View file

@ -249,8 +249,10 @@ type (
} }
) )
// VectorMatchCardinality is an enum describing vector matches (1:1, n:1, 1:n, n:m).
type VectorMatchCardinality int type VectorMatchCardinality int
// Constants for VectorMatchCardinality enum.
const ( const (
MatchOneToOne VectorMatchCardinality = iota MatchOneToOne VectorMatchCardinality = iota
MatchManyToOne MatchManyToOne
@ -874,7 +876,9 @@ func (node *VectorArithExpr) evalVectors(timestamp clientmodel.Timestamp, lhs, r
Timestamp: timestamp, Timestamp: timestamp,
} }
result = append(result, ns) result = append(result, ns)
added[hash] = added[hash] // Set existance to true. if _, ok := added[hash]; !ok {
added[hash] = nil // Set existence to true.
}
} }
} }

View file

@ -63,7 +63,9 @@ func storeMatrix(storage local.Storage, matrix ast.Matrix) {
}) })
} }
} }
storage.AppendSamples(pendingSamples) for _, s := range pendingSamples {
storage.Append(s)
}
storage.WaitForIndexing() storage.WaitForIndexing()
} }

View file

@ -26,6 +26,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notification" "github.com/prometheus/prometheus/notification"
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/templates" "github.com/prometheus/prometheus/templates"
) )
@ -94,7 +95,7 @@ type ruleManager struct {
interval time.Duration interval time.Duration
storage local.Storage storage local.Storage
results chan<- clientmodel.Samples sampleAppender storage.SampleAppender
notificationHandler *notification.NotificationHandler notificationHandler *notification.NotificationHandler
prometheusURL string prometheusURL string
@ -106,7 +107,7 @@ type RuleManagerOptions struct {
Storage local.Storage Storage local.Storage
NotificationHandler *notification.NotificationHandler NotificationHandler *notification.NotificationHandler
Results chan<- clientmodel.Samples SampleAppender storage.SampleAppender
PrometheusURL string PrometheusURL string
} }
@ -120,7 +121,7 @@ func NewRuleManager(o *RuleManagerOptions) RuleManager {
interval: o.EvaluationInterval, interval: o.EvaluationInterval,
storage: o.Storage, storage: o.Storage,
results: o.Results, sampleAppender: o.SampleAppender,
notificationHandler: o.NotificationHandler, notificationHandler: o.NotificationHandler,
prometheusURL: o.PrometheusURL, prometheusURL: o.PrometheusURL,
} }
@ -145,7 +146,7 @@ func (m *ruleManager) Run() {
select { select {
case <-ticker.C: case <-ticker.C:
start := time.Now() start := time.Now()
m.runIteration(m.results) m.runIteration()
iterationDuration.Observe(float64(time.Since(start) / time.Millisecond)) iterationDuration.Observe(float64(time.Since(start) / time.Millisecond))
case <-m.done: case <-m.done:
return return
@ -213,7 +214,7 @@ func (m *ruleManager) queueAlertNotifications(rule *rules.AlertingRule, timestam
m.notificationHandler.SubmitReqs(notifications) m.notificationHandler.SubmitReqs(notifications)
} }
func (m *ruleManager) runIteration(results chan<- clientmodel.Samples) { func (m *ruleManager) runIteration() {
now := clientmodel.Now() now := clientmodel.Now()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@ -232,20 +233,10 @@ func (m *ruleManager) runIteration(results chan<- clientmodel.Samples) {
vector, err := rule.Eval(now, m.storage) vector, err := rule.Eval(now, m.storage)
duration := time.Since(start) duration := time.Since(start)
samples := make(clientmodel.Samples, len(vector))
for i, s := range vector {
samples[i] = &clientmodel.Sample{
Metric: s.Metric.Metric,
Value: s.Value,
Timestamp: s.Timestamp,
}
}
if err != nil { if err != nil {
evalFailures.Inc() evalFailures.Inc()
glog.Warningf("Error while evaluating rule %q: %s", rule, err) glog.Warningf("Error while evaluating rule %q: %s", rule, err)
} else { return
m.results <- samples
} }
switch r := rule.(type) { switch r := rule.(type) {
@ -261,9 +252,16 @@ func (m *ruleManager) runIteration(results chan<- clientmodel.Samples) {
default: default:
panic(fmt.Sprintf("Unknown rule type: %T", rule)) panic(fmt.Sprintf("Unknown rule type: %T", rule))
} }
for _, s := range vector {
m.sampleAppender.Append(&clientmodel.Sample{
Metric: s.Metric.Metric,
Value: s.Value,
Timestamp: s.Timestamp,
})
}
}(rule) }(rule)
} }
wg.Wait() wg.Wait()
} }

View file

@ -193,6 +193,7 @@ func (p *persistence) sanitizeSeries(
bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen) bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen)
chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen) chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen)
modTime := fi.ModTime()
if bytesToTrim != 0 { if bytesToTrim != 0 {
glog.Warningf( glog.Warningf(
"Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.", "Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.",
@ -221,7 +222,11 @@ func (p *persistence) sanitizeSeries(
if s == nil { if s == nil {
panic("fingerprint mapped to nil pointer") panic("fingerprint mapped to nil pointer")
} }
if bytesToTrim == 0 && s.chunkDescsOffset != -1 && chunksInFile == s.chunkDescsOffset+s.persistWatermark { if !p.pedanticChecks &&
bytesToTrim == 0 &&
s.chunkDescsOffset != -1 &&
chunksInFile == s.chunkDescsOffset+s.persistWatermark &&
modTime.Equal(s.modTime) {
// Everything is consistent. We are good. // Everything is consistent. We are good.
return fp, true return fp, true
} }
@ -238,8 +243,9 @@ func (p *persistence) sanitizeSeries(
s.metric, fp, chunksInFile, s.metric, fp, chunksInFile,
) )
s.chunkDescs = nil s.chunkDescs = nil
s.chunkDescsOffset = -1 s.chunkDescsOffset = chunksInFile
s.persistWatermark = 0 s.persistWatermark = 0
s.modTime = modTime
return fp, true return fp, true
} }
// This is the tricky one: We have chunks from heads.db, but // This is the tricky one: We have chunks from heads.db, but
@ -265,6 +271,7 @@ func (p *persistence) sanitizeSeries(
} }
s.persistWatermark = len(cds) s.persistWatermark = len(cds)
s.chunkDescsOffset = 0 s.chunkDescsOffset = 0
s.modTime = modTime
lastTime := cds[len(cds)-1].lastTime() lastTime := cds[len(cds)-1].lastTime()
keepIdx := -1 keepIdx := -1

View file

@ -72,6 +72,12 @@ const (
// Op-types for chunkOps and chunkDescOps. // Op-types for chunkOps and chunkDescOps.
evict = "evict" evict = "evict"
load = "load" load = "load"
seriesLocationLabel = "location"
// Maintenance types for maintainSeriesDuration.
maintainInMemory = "memory"
maintainArchived = "archived"
) )
func init() { func init() {

View file

@ -24,16 +24,15 @@ import (
) )
// Storage ingests and manages samples, along with various indexes. All methods // Storage ingests and manages samples, along with various indexes. All methods
// except AppendSamples are goroutine-safe. // are goroutine-safe. Storage implements storage.SampleAppender.
type Storage interface { type Storage interface {
prometheus.Collector prometheus.Collector
// AppendSamples stores a group of new samples. Multiple samples for the // Append stores a sample in the Storage. Multiple samples for the same
// same fingerprint need to be submitted in chronological order, from // fingerprint need to be submitted in chronological order, from oldest
// oldest to newest (both in the same call to AppendSamples and across // to newest. When Append has returned, the appended sample might not be
// multiple calls). When AppendSamples has returned, the appended // queryable immediately. (Use WaitForIndexing to wait for complete
// samples might not be queryable immediately. (Use WaitForIndexing to // processing.)
// wait for complete processing.) This method is not goroutine-safe. Append(*clientmodel.Sample)
AppendSamples(clientmodel.Samples)
// NewPreloader returns a new Preloader which allows preloading and pinning // NewPreloader returns a new Preloader which allows preloading and pinning
// series data into memory for use within a query. // series data into memory for use within a query.
NewPreloader() Preloader NewPreloader() Preloader

View file

@ -111,18 +111,21 @@ type persistence struct {
indexingQueueLength prometheus.Gauge indexingQueueLength prometheus.Gauge
indexingQueueCapacity prometheus.Metric indexingQueueCapacity prometheus.Metric
indexingBatchSizes prometheus.Summary indexingBatchSizes prometheus.Summary
indexingBatchLatency prometheus.Summary indexingBatchDuration prometheus.Summary
checkpointDuration prometheus.Gauge checkpointDuration prometheus.Gauge
dirtyMtx sync.Mutex // Protects dirty and becameDirty. dirtyMtx sync.Mutex // Protects dirty and becameDirty.
dirty bool // true if persistence was started in dirty state. dirty bool // true if persistence was started in dirty state.
becameDirty bool // true if an inconsistency came up during runtime. becameDirty bool // true if an inconsistency came up during runtime.
pedanticChecks bool // true if crash recovery should check each series.
dirtyFileName string // The file used for locking and to mark dirty state. dirtyFileName string // The file used for locking and to mark dirty state.
fLock flock.Releaser // The file lock to protect against concurrent usage. fLock flock.Releaser // The file lock to protect against concurrent usage.
shouldSync syncStrategy
} }
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
func newPersistence(basePath string, dirty bool) (*persistence, error) { func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync syncStrategy) (*persistence, error) {
dirtyPath := filepath.Join(basePath, dirtyFileName) dirtyPath := filepath.Join(basePath, dirtyFileName)
versionPath := filepath.Join(basePath, versionFileName) versionPath := filepath.Join(basePath, versionFileName)
@ -211,12 +214,12 @@ func newPersistence(basePath string, dirty bool) (*persistence, error) {
Help: "Quantiles for indexing batch sizes (number of metrics per batch).", Help: "Quantiles for indexing batch sizes (number of metrics per batch).",
}, },
), ),
indexingBatchLatency: prometheus.NewSummary( indexingBatchDuration: prometheus.NewSummary(
prometheus.SummaryOpts{ prometheus.SummaryOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "indexing_batch_latency_milliseconds", Name: "indexing_batch_duration_milliseconds",
Help: "Quantiles for batch indexing latencies in milliseconds.", Help: "Quantiles for batch indexing duration in milliseconds.",
}, },
), ),
checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{ checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{
@ -226,8 +229,10 @@ func newPersistence(basePath string, dirty bool) (*persistence, error) {
Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.", Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.",
}), }),
dirty: dirty, dirty: dirty,
pedanticChecks: pedanticChecks,
dirtyFileName: dirtyPath, dirtyFileName: dirtyPath,
fLock: fLock, fLock: fLock,
shouldSync: shouldSync,
} }
if p.dirty { if p.dirty {
@ -259,7 +264,7 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
ch <- p.indexingQueueLength.Desc() ch <- p.indexingQueueLength.Desc()
ch <- p.indexingQueueCapacity.Desc() ch <- p.indexingQueueCapacity.Desc()
p.indexingBatchSizes.Describe(ch) p.indexingBatchSizes.Describe(ch)
p.indexingBatchLatency.Describe(ch) p.indexingBatchDuration.Describe(ch)
ch <- p.checkpointDuration.Desc() ch <- p.checkpointDuration.Desc()
} }
@ -270,7 +275,7 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
ch <- p.indexingQueueLength ch <- p.indexingQueueLength
ch <- p.indexingQueueCapacity ch <- p.indexingQueueCapacity
p.indexingBatchSizes.Collect(ch) p.indexingBatchSizes.Collect(ch)
p.indexingBatchLatency.Collect(ch) p.indexingBatchDuration.Collect(ch)
ch <- p.checkpointDuration ch <- p.checkpointDuration
} }
@ -340,7 +345,7 @@ func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk)
if err != nil { if err != nil {
return -1, err return -1, err
} }
defer f.Close() defer p.closeChunkFile(f)
if err := writeChunks(f, chunks); err != nil { if err := writeChunks(f, chunks); err != nil {
return -1, err return -1, err
@ -477,7 +482,11 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
// //
// (4.4) The varint-encoded persistWatermark. (Missing in v1.) // (4.4) The varint-encoded persistWatermark. (Missing in v1.)
// //
// (4.5) The varint-encoded chunkDescsOffset. // (4.5) The modification time of the series file as nanoseconds elapsed since
// January 1, 1970 UTC. -1 if the modification time is unknown or no series file
// exists yet. (Missing in v1.)
//
// (4.6) The varint-encoded chunkDescsOffset.
// //
// (4.6) The varint-encoded savedFirstTime. // (4.6) The varint-encoded savedFirstTime.
// //
@ -569,6 +578,15 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil { if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil {
return return
} }
if m.series.modTime.IsZero() {
if _, err = codable.EncodeVarint(w, -1); err != nil {
return
}
} else {
if _, err = codable.EncodeVarint(w, m.series.modTime.UnixNano()); err != nil {
return
}
}
if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil { if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil {
return return
} }
@ -627,7 +645,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
// unrecoverable error is encountered, it is returned. Call this method during // unrecoverable error is encountered, it is returned. Call this method during
// start-up while nothing else is running in storage land. This method is // start-up while nothing else is running in storage land. This method is
// utterly goroutine-unsafe. // utterly goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen int64, err error) { func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) {
var chunkDescsTotal int64 var chunkDescsTotal int64
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries) fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries)
sm = &seriesMap{m: fingerprintToSeries} sm = &seriesMap{m: fingerprintToSeries}
@ -690,48 +708,58 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
if err != nil { if err != nil {
glog.Warning("Could not read series flags:", err) glog.Warning("Could not read series flags:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
fp, err := codable.DecodeUint64(r) fp, err := codable.DecodeUint64(r)
if err != nil { if err != nil {
glog.Warning("Could not decode fingerprint:", err) glog.Warning("Could not decode fingerprint:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
var metric codable.Metric var metric codable.Metric
if err := metric.UnmarshalFromReader(r); err != nil { if err := metric.UnmarshalFromReader(r); err != nil {
glog.Warning("Could not decode metric:", err) glog.Warning("Could not decode metric:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
var persistWatermark int64 var persistWatermark int64
var modTime time.Time
if version != headsFormatLegacyVersion { if version != headsFormatLegacyVersion {
// persistWatermark only present in v2. // persistWatermark only present in v2.
persistWatermark, err = binary.ReadVarint(r) persistWatermark, err = binary.ReadVarint(r)
if err != nil { if err != nil {
glog.Warning("Could not decode persist watermark:", err) glog.Warning("Could not decode persist watermark:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
}
modTimeNano, err := binary.ReadVarint(r)
if err != nil {
glog.Warning("Could not decode modification time:", err)
p.dirty = true
return sm, chunksToPersist, nil
}
if modTimeNano != -1 {
modTime = time.Unix(0, modTimeNano)
} }
} }
chunkDescsOffset, err := binary.ReadVarint(r) chunkDescsOffset, err := binary.ReadVarint(r)
if err != nil { if err != nil {
glog.Warning("Could not decode chunk descriptor offset:", err) glog.Warning("Could not decode chunk descriptor offset:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
savedFirstTime, err := binary.ReadVarint(r) savedFirstTime, err := binary.ReadVarint(r)
if err != nil { if err != nil {
glog.Warning("Could not decode saved first time:", err) glog.Warning("Could not decode saved first time:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
numChunkDescs, err := binary.ReadVarint(r) numChunkDescs, err := binary.ReadVarint(r)
if err != nil { if err != nil {
glog.Warning("Could not decode number of chunk descriptors:", err) glog.Warning("Could not decode number of chunk descriptors:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
chunkDescs := make([]*chunkDesc, numChunkDescs) chunkDescs := make([]*chunkDesc, numChunkDescs)
if version == headsFormatLegacyVersion { if version == headsFormatLegacyVersion {
@ -748,13 +776,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
if err != nil { if err != nil {
glog.Warning("Could not decode first time:", err) glog.Warning("Could not decode first time:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
lastTime, err := binary.ReadVarint(r) lastTime, err := binary.ReadVarint(r)
if err != nil { if err != nil {
glog.Warning("Could not decode last time:", err) glog.Warning("Could not decode last time:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
chunkDescs[i] = &chunkDesc{ chunkDescs[i] = &chunkDesc{
chunkFirstTime: clientmodel.Timestamp(firstTime), chunkFirstTime: clientmodel.Timestamp(firstTime),
@ -767,16 +795,16 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
if err != nil { if err != nil {
glog.Warning("Could not decode chunk type:", err) glog.Warning("Could not decode chunk type:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
chunk := newChunkForEncoding(chunkEncoding(encoding)) chunk := newChunkForEncoding(chunkEncoding(encoding))
if err := chunk.unmarshal(r); err != nil { if err := chunk.unmarshal(r); err != nil {
glog.Warning("Could not decode chunk type:", err) glog.Warning("Could not decode chunk type:", err)
p.dirty = true p.dirty = true
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
chunkDescs[i] = newChunkDesc(chunk) chunkDescs[i] = newChunkDesc(chunk)
persistQueueLen++ chunksToPersist++
} }
} }
@ -784,12 +812,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
metric: clientmodel.Metric(metric), metric: clientmodel.Metric(metric),
chunkDescs: chunkDescs, chunkDescs: chunkDescs,
persistWatermark: int(persistWatermark), persistWatermark: int(persistWatermark),
modTime: modTime,
chunkDescsOffset: int(chunkDescsOffset), chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: clientmodel.Timestamp(savedFirstTime), savedFirstTime: clientmodel.Timestamp(savedFirstTime),
headChunkClosed: persistWatermark >= numChunkDescs, headChunkClosed: persistWatermark >= numChunkDescs,
} }
} }
return sm, persistQueueLen, nil return sm, chunksToPersist, nil
} }
// dropAndPersistChunks deletes all chunks from a series file whose last sample // dropAndPersistChunks deletes all chunks from a series file whose last sample
@ -921,7 +950,7 @@ func (p *persistence) dropAndPersistChunks(
return return
} }
defer func() { defer func() {
temp.Close() p.closeChunkFile(temp)
if err == nil { if err == nil {
err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp)) err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp))
} }
@ -962,6 +991,17 @@ func (p *persistence) deleteSeriesFile(fp clientmodel.Fingerprint) (int, error)
return numChunks, nil return numChunks, nil
} }
// getSeriesFileModTime returns the modification time of the series file
// belonging to the provided fingerprint. In case of an error, the zero value of
// time.Time is returned.
func (p *persistence) getSeriesFileModTime(fp clientmodel.Fingerprint) time.Time {
var modTime time.Time
if fi, err := os.Stat(p.fileNameForFingerprint(fp)); err == nil {
return fi.ModTime()
}
return modTime
}
// indexMetric queues the given metric for addition to the indexes needed by // indexMetric queues the given metric for addition to the indexes needed by
// getFingerprintsForLabelPair, getLabelValuesForLabelName, and // getFingerprintsForLabelPair, getLabelValuesForLabelName, and
// getFingerprintsModifiedBefore. If the queue is full, this method blocks // getFingerprintsModifiedBefore. If the queue is full, this method blocks
@ -1195,6 +1235,19 @@ func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.F
// would still be detected. // would still be detected.
} }
// closeChunkFile first syncs the provided file if mandated so by the sync
// strategy. Then it closes the file. Errors are logged.
func (p *persistence) closeChunkFile(f *os.File) {
if p.shouldSync() {
if err := f.Sync(); err != nil {
glog.Error("Error syncing file:", err)
}
}
if err := f.Close(); err != nil {
glog.Error("Error closing chunk file:", err)
}
}
func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) { func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) {
return os.Open(p.fileNameForFingerprint(fp)) return os.Open(p.fileNameForFingerprint(fp))
} }
@ -1217,7 +1270,9 @@ func (p *persistence) processIndexingQueue() {
commitBatch := func() { commitBatch := func() {
p.indexingBatchSizes.Observe(float64(batchSize)) p.indexingBatchSizes.Observe(float64(batchSize))
defer func(begin time.Time) { defer func(begin time.Time) {
p.indexingBatchLatency.Observe(float64(time.Since(begin) / time.Millisecond)) p.indexingBatchDuration.Observe(
float64(time.Since(begin)) / float64(time.Millisecond),
)
}(time.Now()) }(time.Now())
if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil { if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil {

View file

@ -36,7 +36,7 @@ var (
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) { func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) {
*defaultChunkEncoding = int(encoding) *defaultChunkEncoding = int(encoding)
dir := test.NewTemporaryDirectory("test_persistence", t) dir := test.NewTemporaryDirectory("test_persistence", t)
p, err := newPersistence(dir.Path(), false) p, err := newPersistence(dir.Path(), false, false, func() bool { return false })
if err != nil { if err != nil {
dir.Close() dir.Close()
t.Fatal(err) t.Fatal(err)

View file

@ -143,6 +143,9 @@ type memorySeries struct {
// points to a non-persisted chunk. If all chunks are persisted, then // points to a non-persisted chunk. If all chunks are persisted, then
// persistWatermark == len(chunkDescs). // persistWatermark == len(chunkDescs).
persistWatermark int persistWatermark int
// The modification time of the series file. The zero value of time.Time
// is used to mark an unknown modification time.
modTime time.Time
// The chunkDescs in memory might not have all the chunkDescs for the // The chunkDescs in memory might not have all the chunkDescs for the
// chunks that are persisted to disk. The missing chunkDescs are all // chunks that are persisted to disk. The missing chunkDescs are all
// contiguous and at the tail end. chunkDescsOffset is the index of the // contiguous and at the tail end. chunkDescsOffset is the index of the

View file

@ -16,7 +16,6 @@ package local
import ( import (
"container/list" "container/list"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -38,14 +37,22 @@ const (
maxEvictInterval = time.Minute maxEvictInterval = time.Minute
appendWorkers = 16 // Should be enough to not make appending samples a bottleneck. // If numChunskToPersist is this percentage of maxChunksToPersist, we
appendQueueCap = 2 * appendWorkers // consider the storage in "graceful degradation mode", i.e. we do not
// checkpoint anymore based on the dirty series count, and we do not
// sync series files anymore if using the adaptive sync strategy.
percentChunksToPersistForDegradation = 80
) )
var ( var (
persistQueueLengthDesc = prometheus.NewDesc( numChunksToPersistDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "persist_queue_length"), prometheus.BuildFQName(namespace, subsystem, "chunks_to_persist"),
"The current number of chunks waiting in the persist queue.", "The current number of chunks waiting for persistence.",
nil, nil,
)
maxChunksToPersistDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "max_chunks_to_persist"),
"The maximum number of chunks that can be waiting for persistence before sample ingestion will stop.",
nil, nil, nil, nil,
) )
) )
@ -55,6 +62,21 @@ type evictRequest struct {
evict bool evict bool
} }
// SyncStrategy is an enum to select a sync strategy for series files.
type SyncStrategy int
// Possible values for SyncStrategy.
const (
_ SyncStrategy = iota
Never
Always
Adaptive
)
// A syncStrategy is a function that returns whether series files should be
// synced or not. It does not need to be goroutine safe.
type syncStrategy func() bool
type memorySeriesStorage struct { type memorySeriesStorage struct {
fpLocker *fingerprintLocker fpLocker *fingerprintLocker
fpToSeries *seriesMap fpToSeries *seriesMap
@ -65,29 +87,22 @@ type memorySeriesStorage struct {
checkpointInterval time.Duration checkpointInterval time.Duration
checkpointDirtySeriesLimit int checkpointDirtySeriesLimit int
appendQueue chan *clientmodel.Sample numChunksToPersist int64 // The number of chunks waiting for persistence.
appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue. maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall.
appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed. degraded bool
persistQueueLen int64 // The number of chunks that need persistence.
persistQueueCap int // If persistQueueLen reaches this threshold, ingestion will stall.
// Note that internally, the chunks to persist are not organized in a queue-like data structure,
// but handled in a more sophisticated way (see maintainMemorySeries).
persistence *persistence persistence *persistence
countPersistedHeadChunks chan struct{}
evictList *list.List evictList *list.List
evictRequests chan evictRequest evictRequests chan evictRequest
evictStopping, evictStopped chan struct{} evictStopping, evictStopped chan struct{}
persistErrors prometheus.Counter persistErrors prometheus.Counter
persistQueueCapacity prometheus.Metric
numSeries prometheus.Gauge numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter ingestedSamplesCount prometheus.Counter
invalidPreloadRequestsCount prometheus.Counter invalidPreloadRequestsCount prometheus.Counter
maintainSeriesDuration *prometheus.SummaryVec
} }
// MemorySeriesStorageOptions contains options needed by // MemorySeriesStorageOptions contains options needed by
@ -95,38 +110,21 @@ type memorySeriesStorage struct {
// values. // values.
type MemorySeriesStorageOptions struct { type MemorySeriesStorageOptions struct {
MemoryChunks int // How many chunks to keep in memory. MemoryChunks int // How many chunks to keep in memory.
MaxChunksToPersist int // Max number of chunks waiting to be persisted.
PersistenceStoragePath string // Location of persistence files. PersistenceStoragePath string // Location of persistence files.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped. PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
Dirty bool // Force the storage to consider itself dirty on startup. Dirty bool // Force the storage to consider itself dirty on startup.
PedanticChecks bool // If dirty, perform crash-recovery checks on each series file.
SyncStrategy SyncStrategy // Which sync strategy to apply to series files.
} }
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage. // has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
p, err := newPersistence(o.PersistenceStoragePath, o.Dirty)
if err != nil {
return nil, err
}
glog.Info("Loading series map and head chunks...")
fpToSeries, persistQueueLen, err := p.loadSeriesMapAndHeads()
if err != nil {
return nil, err
}
glog.Infof("%d series loaded.", fpToSeries.length())
numSeries := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_series",
Help: "The current number of series in memory.",
})
numSeries.Set(float64(fpToSeries.length()))
s := &memorySeriesStorage{ s := &memorySeriesStorage{
fpLocker: newFingerprintLocker(1024), fpLocker: newFingerprintLocker(1024),
fpToSeries: fpToSeries,
loopStopping: make(chan struct{}), loopStopping: make(chan struct{}),
loopStopped: make(chan struct{}), loopStopped: make(chan struct{}),
@ -135,14 +133,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
checkpointInterval: o.CheckpointInterval, checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
appendLastTimestamp: clientmodel.Earliest, maxChunksToPersist: o.MaxChunksToPersist,
appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
persistQueueLen: persistQueueLen,
persistQueueCap: o.PersistenceQueueCapacity,
persistence: p,
countPersistedHeadChunks: make(chan struct{}, 100),
evictList: list.New(), evictList: list.New(),
evictRequests: make(chan evictRequest, evictRequestsCap), evictRequests: make(chan evictRequest, evictRequestsCap),
@ -155,15 +146,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
Name: "persist_errors_total", Name: "persist_errors_total",
Help: "The total number of errors while persisting chunks.", Help: "The total number of errors while persisting chunks.",
}), }),
persistQueueCapacity: prometheus.MustNewConstMetric( numSeries: prometheus.NewGauge(prometheus.GaugeOpts{
prometheus.NewDesc( Namespace: namespace,
prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"), Subsystem: subsystem,
"The total capacity of the persist queue.", Name: "memory_series",
nil, nil, Help: "The current number of series in memory.",
), }),
prometheus.GaugeValue, float64(o.PersistenceQueueCapacity),
),
numSeries: numSeries,
seriesOps: prometheus.NewCounterVec( seriesOps: prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
@ -185,16 +173,42 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
Name: "invalid_preload_requests_total", Name: "invalid_preload_requests_total",
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.",
}), }),
maintainSeriesDuration: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "maintain_series_duration_milliseconds",
Help: "The duration (in milliseconds) it took to perform maintenance on a series.",
},
[]string{seriesLocationLabel},
),
} }
for i := 0; i < appendWorkers; i++ { var syncStrategy syncStrategy
go func() { switch o.SyncStrategy {
for sample := range s.appendQueue { case Never:
s.appendSample(sample) syncStrategy = func() bool { return false }
s.appendWaitGroup.Done() case Always:
syncStrategy = func() bool { return true }
case Adaptive:
syncStrategy = func() bool { return !s.isDegraded() }
default:
panic("unknown sync strategy")
} }
}()
p, err := newPersistence(o.PersistenceStoragePath, o.Dirty, o.PedanticChecks, syncStrategy)
if err != nil {
return nil, err
} }
s.persistence = p
glog.Info("Loading series map and head chunks...")
s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads()
if err != nil {
return nil, err
}
glog.Infof("%d series loaded.", s.fpToSeries.length())
s.numSeries.Set(float64(s.fpToSeries.length()))
return s, nil return s, nil
} }
@ -209,11 +223,6 @@ func (s *memorySeriesStorage) Start() {
func (s *memorySeriesStorage) Stop() error { func (s *memorySeriesStorage) Stop() error {
glog.Info("Stopping local storage...") glog.Info("Stopping local storage...")
glog.Info("Draining append queue...")
close(s.appendQueue)
s.appendWaitGroup.Wait()
glog.Info("Append queue drained.")
glog.Info("Stopping maintenance loop...") glog.Info("Stopping maintenance loop...")
close(s.loopStopping) close(s.loopStopping)
<-s.loopStopped <-s.loopStopped
@ -236,9 +245,6 @@ func (s *memorySeriesStorage) Stop() error {
// WaitForIndexing implements Storage. // WaitForIndexing implements Storage.
func (s *memorySeriesStorage) WaitForIndexing() { func (s *memorySeriesStorage) WaitForIndexing() {
// First let all goroutines appending samples stop.
s.appendWaitGroup.Wait()
// Only then wait for the persistence to index them.
s.persistence.waitForIndexing() s.persistence.waitForIndexing()
} }
@ -363,32 +369,18 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
} }
} }
// AppendSamples implements Storage. // Append implements Storage.
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
for _, sample := range samples { if s.getNumChunksToPersist() >= s.maxChunksToPersist {
if s.getPersistQueueLen() >= s.persistQueueCap { glog.Warningf(
glog.Warningf("%d chunks waiting for persistence, sample ingestion suspended.", s.getPersistQueueLen()) "%d chunks waiting for persistence, sample ingestion suspended.",
for s.getPersistQueueLen() >= s.persistQueueCap { s.getNumChunksToPersist(),
)
for s.getNumChunksToPersist() >= s.maxChunksToPersist {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
glog.Warning("Sample ingestion resumed.") glog.Warning("Sample ingestion resumed.")
} }
if sample.Timestamp != s.appendLastTimestamp {
// Timestamp has changed. We have to wait for processing
// of all appended samples before proceeding. Otherwise,
// we might violate the storage contract that each
// sample appended to a given series has to have a
// timestamp greater or equal to the previous sample
// appended to that series.
s.appendWaitGroup.Wait()
s.appendLastTimestamp = sample.Timestamp
}
s.appendWaitGroup.Add(1)
s.appendQueue <- sample
}
}
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
fp := sample.Metric.Fingerprint() fp := sample.Metric.Fingerprint()
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
series := s.getOrCreateSeries(fp, sample.Metric) series := s.getOrCreateSeries(fp, sample.Metric)
@ -398,7 +390,7 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
}) })
s.fpLocker.Unlock(fp) s.fpLocker.Unlock(fp)
s.ingestedSamplesCount.Inc() s.ingestedSamplesCount.Inc()
s.incPersistQueueLen(completedChunksCount) s.incNumChunksToPersist(completedChunksCount)
} }
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries { func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
@ -677,20 +669,14 @@ loop:
case fp := <-memoryFingerprints: case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) { if s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) {
dirtySeriesCount++ dirtySeriesCount++
// Check if we have enough "dirty" series so // Check if we have enough "dirty" series so that we need an early checkpoint.
// that we need an early checkpoint. However, // However, if we are already behind persisting chunks, creating a checkpoint
// if we are already at 90% capacity of the // would be counterproductive, as it would slow down chunk persisting even more,
// persist queue, creating a checkpoint would be // while in a situation like that, where we are clearly lacking speed of disk
// counterproductive, as it would slow down // maintenance, the best we can do for crash recovery is to persist chunks as
// chunk persisting even more, while in a // quickly as possible. So only checkpoint if the storage is not in "graceful
// situation like that, where we are clearly // degratadion mode".
// lacking speed of disk maintenance, the best if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.isDegraded() {
// we can do for crash recovery is to work
// through the persist queue as quickly as
// possible. So only checkpoint if the persist
// queue is at most 90% full.
if dirtySeriesCount >= s.checkpointDirtySeriesLimit &&
s.getPersistQueueLen() < s.persistQueueCap*9/10 {
checkpointTimer.Reset(0) checkpointTimer.Reset(0)
} }
} }
@ -739,6 +725,12 @@ loop:
func (s *memorySeriesStorage) maintainMemorySeries( func (s *memorySeriesStorage) maintainMemorySeries(
fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp, fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp,
) (becameDirty bool) { ) (becameDirty bool) {
defer func(begin time.Time) {
s.maintainSeriesDuration.WithLabelValues(maintainInMemory).Observe(
float64(time.Since(begin)) / float64(time.Millisecond),
)
}(time.Now())
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
@ -751,7 +743,7 @@ func (s *memorySeriesStorage) maintainMemorySeries(
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc() defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
if series.maybeCloseHeadChunk() { if series.maybeCloseHeadChunk() {
s.incPersistQueueLen(1) s.incNumChunksToPersist(1)
} }
seriesWasDirty := series.dirty seriesWasDirty := series.dirty
@ -819,8 +811,9 @@ func (s *memorySeriesStorage) writeMemorySeries(
for _, cd := range cds { for _, cd := range cds {
cd.unpin(s.evictRequests) cd.unpin(s.evictRequests)
} }
s.incPersistQueueLen(-len(cds)) s.incNumChunksToPersist(-len(cds))
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds))) chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
series.modTime = s.persistence.getSeriesFileModTime(fp)
}() }()
// Get the actual chunks from underneath the chunkDescs. // Get the actual chunks from underneath the chunkDescs.
@ -857,7 +850,8 @@ func (s *memorySeriesStorage) writeMemorySeries(
series.dropChunks(beforeTime) series.dropChunks(beforeTime)
if len(series.chunkDescs) == 0 { // All chunks dropped from memory series. if len(series.chunkDescs) == 0 { // All chunks dropped from memory series.
if !allDroppedFromPersistence { if !allDroppedFromPersistence {
panic("all chunks dropped from memory but chunks left in persistence") glog.Errorf("All chunks dropped from memory but chunks left in persistence for fingerprint %v, series %v.", fp, series)
s.persistence.setDirty(true)
} }
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
s.numSeries.Dec() s.numSeries.Dec()
@ -871,7 +865,9 @@ func (s *memorySeriesStorage) writeMemorySeries(
} else { } else {
series.chunkDescsOffset -= numDroppedFromPersistence series.chunkDescsOffset -= numDroppedFromPersistence
if series.chunkDescsOffset < 0 { if series.chunkDescsOffset < 0 {
panic("dropped more chunks from persistence than from memory") glog.Errorf("Dropped more chunks from persistence than from memory for fingerprint %v, series %v.", fp, series)
s.persistence.setDirty(true)
series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery.
} }
} }
return false return false
@ -880,6 +876,12 @@ func (s *memorySeriesStorage) writeMemorySeries(
// maintainArchivedSeries drops chunks older than beforeTime from an archived // maintainArchivedSeries drops chunks older than beforeTime from an archived
// series. If the series contains no chunks after that, it is purged entirely. // series. If the series contains no chunks after that, it is purged entirely.
func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
defer func(begin time.Time) {
s.maintainSeriesDuration.WithLabelValues(maintainArchived).Observe(
float64(time.Since(begin)) / float64(time.Millisecond),
)
}(time.Now())
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
@ -920,15 +922,37 @@ func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeT
return s.persistence.loadChunkDescs(fp, beforeTime) return s.persistence.loadChunkDescs(fp, beforeTime)
} }
// getPersistQueueLen returns persistQueueLen in a goroutine-safe way. // getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way.
func (s *memorySeriesStorage) getPersistQueueLen() int { func (s *memorySeriesStorage) getNumChunksToPersist() int {
return int(atomic.LoadInt64(&s.persistQueueLen)) return int(atomic.LoadInt64(&s.numChunksToPersist))
} }
// incPersistQueueLen increments persistQueueLen in a goroutine-safe way. Use a // incNumChunksToPersist increments numChunksToPersist in a goroutine-safe way. Use a
// negative 'by' to decrement. // negative 'by' to decrement.
func (s *memorySeriesStorage) incPersistQueueLen(by int) { func (s *memorySeriesStorage) incNumChunksToPersist(by int) {
atomic.AddInt64(&s.persistQueueLen, int64(by)) atomic.AddInt64(&s.numChunksToPersist, int64(by))
}
// isDegraded returns whether the storage is in "graceful degradation mode",
// which is the case if the number of chunks waiting for persistence has reached
// a percentage of maxChunksToPersist that exceeds
// percentChunksToPersistForDegradation. The method is not goroutine safe (but
// only ever called from the goroutine dealing with series maintenance).
// Changes of degradation mode are logged.
func (s *memorySeriesStorage) isDegraded() bool {
nowDegraded := s.getNumChunksToPersist() > s.maxChunksToPersist*percentChunksToPersistForDegradation/100
if s.degraded && !nowDegraded {
glog.Warning("Storage has left graceful degradation mode. Things are back to normal.")
} else if !s.degraded && nowDegraded {
glog.Warningf(
"%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v.",
s.getNumChunksToPersist(),
s.getNumChunksToPersist()*100/s.maxChunksToPersist,
s.maxChunksToPersist,
s.checkpointInterval)
}
s.degraded = nowDegraded
return s.degraded
} }
// Describe implements prometheus.Collector. // Describe implements prometheus.Collector.
@ -936,14 +960,14 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch) s.persistence.Describe(ch)
ch <- s.persistErrors.Desc() ch <- s.persistErrors.Desc()
ch <- s.persistQueueCapacity.Desc() ch <- maxChunksToPersistDesc
ch <- persistQueueLengthDesc ch <- numChunksToPersistDesc
ch <- s.numSeries.Desc() ch <- s.numSeries.Desc()
s.seriesOps.Describe(ch) s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc() ch <- s.ingestedSamplesCount.Desc()
ch <- s.invalidPreloadRequestsCount.Desc() ch <- s.invalidPreloadRequestsCount.Desc()
ch <- numMemChunksDesc ch <- numMemChunksDesc
s.maintainSeriesDuration.Describe(ch)
} }
// Collect implements prometheus.Collector. // Collect implements prometheus.Collector.
@ -951,20 +975,24 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.persistence.Collect(ch) s.persistence.Collect(ch)
ch <- s.persistErrors ch <- s.persistErrors
ch <- s.persistQueueCapacity
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
persistQueueLengthDesc, maxChunksToPersistDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(s.getPersistQueueLen()), float64(s.maxChunksToPersist),
)
ch <- prometheus.MustNewConstMetric(
numChunksToPersistDesc,
prometheus.GaugeValue,
float64(s.getNumChunksToPersist()),
) )
ch <- s.numSeries ch <- s.numSeries
s.seriesOps.Collect(ch) s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount ch <- s.ingestedSamplesCount
ch <- s.invalidPreloadRequestsCount ch <- s.invalidPreloadRequestsCount
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
numMemChunksDesc, numMemChunksDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(atomic.LoadInt64(&numMemChunks)), float64(atomic.LoadInt64(&numMemChunks)),
) )
s.maintainSeriesDuration.Collect(ch)
} }

View file

@ -48,8 +48,9 @@ func TestGetFingerprintsForLabelMatchers(t *testing.T) {
} }
fingerprints[i] = metric.Fingerprint() fingerprints[i] = metric.Fingerprint()
} }
for _, s := range samples {
storage.AppendSamples(samples) storage.Append(s)
}
storage.WaitForIndexing() storage.WaitForIndexing()
newMatcher := func(matchType metric.MatchType, name clientmodel.LabelName, value clientmodel.LabelValue) *metric.LabelMatcher { newMatcher := func(matchType metric.MatchType, name clientmodel.LabelName, value clientmodel.LabelValue) *metric.LabelMatcher {
@ -156,17 +157,20 @@ func TestLoop(t *testing.T) {
defer directory.Close() defer directory.Close()
o := &MemorySeriesStorageOptions{ o := &MemorySeriesStorageOptions{
MemoryChunks: 50, MemoryChunks: 50,
MaxChunksToPersist: 1000000,
PersistenceRetentionPeriod: 24 * 7 * time.Hour, PersistenceRetentionPeriod: 24 * 7 * time.Hour,
PersistenceStoragePath: directory.Path(), PersistenceStoragePath: directory.Path(),
PersistenceQueueCapacity: 1000000,
CheckpointInterval: 250 * time.Millisecond, CheckpointInterval: 250 * time.Millisecond,
SyncStrategy: Adaptive,
} }
storage, err := NewMemorySeriesStorage(o) storage, err := NewMemorySeriesStorage(o)
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %s", err) t.Fatalf("Error creating storage: %s", err)
} }
storage.Start() storage.Start()
storage.AppendSamples(samples) for _, s := range samples {
storage.Append(s)
}
storage.WaitForIndexing() storage.WaitForIndexing()
series, _ := storage.(*memorySeriesStorage).fpToSeries.get(clientmodel.Metric{}.Fingerprint()) series, _ := storage.(*memorySeriesStorage).fpToSeries.get(clientmodel.Metric{}.Fingerprint())
cdsBefore := len(series.chunkDescs) cdsBefore := len(series.chunkDescs)
@ -192,7 +196,9 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
s, closer := NewTestStorage(t, encoding) s, closer := NewTestStorage(t, encoding)
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) for _, sample := range samples {
s.Append(sample)
}
s.WaitForIndexing() s.WaitForIndexing()
for m := range s.(*memorySeriesStorage).fpToSeries.iter() { for m := range s.(*memorySeriesStorage).fpToSeries.iter() {
@ -240,7 +246,9 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) {
s, closer := NewTestStorage(t, encoding) s, closer := NewTestStorage(t, encoding)
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) for _, sample := range samples {
s.Append(sample)
}
s.WaitForIndexing() s.WaitForIndexing()
fp := clientmodel.Metric{}.Fingerprint() fp := clientmodel.Metric{}.Fingerprint()
@ -331,7 +339,9 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
s, closer := NewTestStorage(t, encoding) s, closer := NewTestStorage(t, encoding)
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) for _, sample := range samples {
s.Append(sample)
}
s.WaitForIndexing() s.WaitForIndexing()
fp := clientmodel.Metric{}.Fingerprint() fp := clientmodel.Metric{}.Fingerprint()
@ -483,7 +493,9 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods. ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods.
s.AppendSamples(samples) for _, sample := range samples {
s.Append(sample)
}
s.WaitForIndexing() s.WaitForIndexing()
fp := clientmodel.Metric{}.Fingerprint() fp := clientmodel.Metric{}.Fingerprint()
@ -518,7 +530,9 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
} }
// Recreate series. // Recreate series.
s.AppendSamples(samples) for _, sample := range samples {
s.Append(sample)
}
s.WaitForIndexing() s.WaitForIndexing()
series, ok := ms.fpToSeries.get(fp) series, ok := ms.fpToSeries.get(fp)
@ -592,7 +606,9 @@ func benchmarkAppend(b *testing.B, encoding chunkEncoding) {
s, closer := NewTestStorage(b, encoding) s, closer := NewTestStorage(b, encoding)
defer closer.Close() defer closer.Close()
s.AppendSamples(samples) for _, sample := range samples {
s.Append(sample)
}
} }
func BenchmarkAppendType0(b *testing.B) { func BenchmarkAppendType0(b *testing.B) {
@ -616,7 +632,9 @@ func testFuzz(t *testing.T, encoding chunkEncoding) {
defer c.Close() defer c.Close()
samples := createRandomSamples("test_fuzz", 1000) samples := createRandomSamples("test_fuzz", 1000)
s.AppendSamples(samples) for _, sample := range samples {
s.Append(sample)
}
return verifyStorage(t, s, samples, 24*7*time.Hour) return verifyStorage(t, s, samples, 24*7*time.Hour)
} }
@ -652,10 +670,11 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
defer directory.Close() defer directory.Close()
o := &MemorySeriesStorageOptions{ o := &MemorySeriesStorageOptions{
MemoryChunks: 100, MemoryChunks: 100,
MaxChunksToPersist: 1000000,
PersistenceRetentionPeriod: time.Hour, PersistenceRetentionPeriod: time.Hour,
PersistenceStoragePath: directory.Path(), PersistenceStoragePath: directory.Path(),
PersistenceQueueCapacity: 1000000,
CheckpointInterval: time.Second, CheckpointInterval: time.Second,
SyncStrategy: Adaptive,
} }
s, err := NewMemorySeriesStorage(o) s, err := NewMemorySeriesStorage(o)
if err != nil { if err != nil {
@ -672,9 +691,13 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
start := samplesPerRun * i start := samplesPerRun * i
end := samplesPerRun * (i + 1) end := samplesPerRun * (i + 1)
middle := (start + end) / 2 middle := (start + end) / 2
s.AppendSamples(samples[start:middle]) for _, sample := range samples[start:middle] {
s.Append(sample)
}
verifyStorage(b, s, samples[:middle], o.PersistenceRetentionPeriod) verifyStorage(b, s, samples[:middle], o.PersistenceRetentionPeriod)
s.AppendSamples(samples[middle:end]) for _, sample := range samples[middle:end] {
s.Append(sample)
}
verifyStorage(b, s, samples[:end], o.PersistenceRetentionPeriod) verifyStorage(b, s, samples[:end], o.PersistenceRetentionPeriod)
} }
} }

View file

@ -42,10 +42,11 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) {
directory := test.NewTemporaryDirectory("test_storage", t) directory := test.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{ o := &MemorySeriesStorageOptions{
MemoryChunks: 1000000, MemoryChunks: 1000000,
MaxChunksToPersist: 1000000,
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging. PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
PersistenceStoragePath: directory.Path(), PersistenceStoragePath: directory.Path(),
PersistenceQueueCapacity: 1000000,
CheckpointInterval: time.Hour, CheckpointInterval: time.Hour,
SyncStrategy: Adaptive,
} }
storage, err := NewMemorySeriesStorage(o) storage, err := NewMemorySeriesStorage(o)
if err != nil { if err != nil {

View file

@ -53,7 +53,7 @@ type TSDBClient interface {
// by the provided TSDBClient. // by the provided TSDBClient.
type TSDBQueueManager struct { type TSDBQueueManager struct {
tsdb TSDBClient tsdb TSDBClient
queue chan clientmodel.Samples queue chan *clientmodel.Sample
pendingSamples clientmodel.Samples pendingSamples clientmodel.Samples
sendSemaphore chan bool sendSemaphore chan bool
drained chan bool drained chan bool
@ -69,7 +69,7 @@ type TSDBQueueManager struct {
func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
return &TSDBQueueManager{ return &TSDBQueueManager{
tsdb: tsdb, tsdb: tsdb,
queue: make(chan clientmodel.Samples, queueCapacity), queue: make(chan *clientmodel.Sample, queueCapacity),
sendSemaphore: make(chan bool, maxConcurrentSends), sendSemaphore: make(chan bool, maxConcurrentSends),
drained: make(chan bool), drained: make(chan bool),
@ -112,17 +112,14 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
} }
} }
// Queue queues a sample batch to be sent to the TSDB. It drops the most // Append queues a sample to be sent to the TSDB. It drops the sample on the
// recently queued samples on the floor if the queue is full. // floor if the queue is full. It implements storage.SampleAppender.
func (t *TSDBQueueManager) Queue(s clientmodel.Samples) { func (t *TSDBQueueManager) Append(s *clientmodel.Sample) {
if len(s) == 0 {
return
}
select { select {
case t.queue <- s: case t.queue <- s:
default: default:
t.samplesCount.WithLabelValues(dropped).Add(float64(len(s))) t.samplesCount.WithLabelValues(dropped).Inc()
glog.Warningf("TSDB queue full, discarding %d samples", len(s)) glog.Warning("TSDB queue full, discarding sample.")
} }
} }
@ -195,7 +192,7 @@ func (t *TSDBQueueManager) Run() {
return return
} }
t.pendingSamples = append(t.pendingSamples, s...) t.pendingSamples = append(t.pendingSamples, s)
for len(t.pendingSamples) >= maxSamplesPerSend { for len(t.pendingSamples) >= maxSamplesPerSend {
go t.sendSamples(t.pendingSamples[:maxSamplesPerSend]) go t.sendSamples(t.pendingSamples[:maxSamplesPerSend])

View file

@ -63,13 +63,16 @@ func TestSampleDelivery(t *testing.T) {
c := &TestTSDBClient{} c := &TestTSDBClient{}
c.expectSamples(samples[:len(samples)/2]) c.expectSamples(samples[:len(samples)/2])
m := NewTSDBQueueManager(c, 1) m := NewTSDBQueueManager(c, len(samples)/2)
// These should be received by the client. // These should be received by the client.
m.Queue(samples[:len(samples)/2]) for _, s := range samples[:len(samples)/2] {
m.Append(s)
}
// These will be dropped because the queue is full. // These will be dropped because the queue is full.
m.Queue(samples[len(samples)/2:]) for _, s := range samples[len(samples)/2:] {
m.Append(s)
}
go m.Run() go m.Run()
defer m.Stop() defer m.Stop()

38
storage/storage.go Normal file
View file

@ -0,0 +1,38 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
clientmodel "github.com/prometheus/client_golang/model"
)
// SampleAppender is the interface to append samples to both, local and remote
// storage.
type SampleAppender interface {
Append(*clientmodel.Sample)
}
// Tee is a SampleAppender that appends every sample to two other
// SampleAppenders.
type Tee struct {
Appender1, Appender2 SampleAppender
}
// Append implements SampleAppender. It appends the provided sample first
// to Appender1, then to Appender2, waiting for each to return before
// proceeding.
func (t Tee) Append(s *clientmodel.Sample) {
t.Appender1.Append(s)
t.Appender2.Append(s)
}

View file

@ -154,19 +154,17 @@ func TestTemplateExpansion(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1) storage, closer := local.NewTestStorage(t, 1)
defer closer.Close() defer closer.Close()
storage.AppendSamples(clientmodel.Samples{ storage.Append(&clientmodel.Sample{
{
Metric: clientmodel.Metric{ Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "metric", clientmodel.MetricNameLabel: "metric",
"instance": "a"}, "instance": "a"},
Value: 11, Value: 11,
}, })
{ storage.Append(&clientmodel.Sample{
Metric: clientmodel.Metric{ Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "metric", clientmodel.MetricNameLabel: "metric",
"instance": "b"}, "instance": "b"},
Value: 21, Value: 21,
},
}) })
storage.WaitForIndexing() storage.WaitForIndexing()

View file

@ -49,7 +49,4 @@ func (msrv *MetricsService) RegisterHandler() {
http.Handle("/api/metrics", prometheus.InstrumentHandler( http.Handle("/api/metrics", prometheus.InstrumentHandler(
"/api/metrics", handler(msrv.Metrics), "/api/metrics", handler(msrv.Metrics),
)) ))
http.Handle("/api/targets", prometheus.InstrumentHandler(
"/api/targets", handler(msrv.SetTargets),
))
} }

View file

@ -1,72 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"encoding/json"
"net/http"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/web/httputils"
)
// TargetGroup bundles endpoints and base labels with appropriate JSON
// annotations.
type TargetGroup struct {
Endpoints []string `json:"endpoints"`
BaseLabels map[string]string `json:"baseLabels"`
}
// SetTargets handles the /api/targets endpoint.
func (serv MetricsService) SetTargets(w http.ResponseWriter, r *http.Request) {
params := httputils.GetQueryParams(r)
jobName := params.Get("job")
decoder := json.NewDecoder(r.Body)
var targetGroups []TargetGroup
err := decoder.Decode(&targetGroups)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
job := serv.Config.GetJobByName(jobName)
if job == nil {
http.Error(w, "job not found", http.StatusNotFound)
return
}
newTargets := []retrieval.Target{}
for _, targetGroup := range targetGroups {
// Do mandatory map type conversion due to Go shortcomings.
baseLabels := clientmodel.LabelSet{
clientmodel.JobLabel: clientmodel.LabelValue(job.GetName()),
}
for label, value := range targetGroup.BaseLabels {
baseLabels[clientmodel.LabelName(label)] = clientmodel.LabelValue(value)
}
for _, endpoint := range targetGroup.Endpoints {
newTarget := retrieval.NewTarget(endpoint, job.ScrapeTimeout(), baseLabels)
newTargets = append(newTargets, newTarget)
}
}
// BUG(julius): Validate that this ScrapeInterval is in fact the proper one
// for the job.
serv.TargetManager.ReplaceTargets(*job, newTargets)
}

View file

@ -35,6 +35,7 @@ type PrometheusStatusHandler struct {
Birth time.Time Birth time.Time
} }
// TargetStateToClass returns a map of TargetState to the name of a Bootstrap CSS class.
func (h *PrometheusStatusHandler) TargetStateToClass() map[retrieval.TargetState]string { func (h *PrometheusStatusHandler) TargetStateToClass() map[retrieval.TargetState]string {
return map[retrieval.TargetState]string{ return map[retrieval.TargetState]string{
retrieval.Unknown: "warning", retrieval.Unknown: "warning",

View file

@ -56,7 +56,7 @@
</span> </span>
</td> </td>
<td> <td>
{{.BaseLabels}} {{.BaseLabelsWithoutJobAndInstance}}
</td> </td>
<td> <td>
{{if .LastScrape.IsZero}}Never{{else}}{{since .LastScrape}} ago{{end}} {{if .LastScrape.IsZero}}Never{{else}}{{since .LastScrape}} ago{{end}}

View file

@ -48,7 +48,7 @@ type WebService struct {
AlertsHandler *AlertsHandler AlertsHandler *AlertsHandler
ConsolesHandler *ConsolesHandler ConsolesHandler *ConsolesHandler
QuitDelegate func() QuitChan chan struct{}
} }
// ServeForever serves the HTTP endpoints and only returns upon errors. // ServeForever serves the HTTP endpoints and only returns upon errors.
@ -109,7 +109,7 @@ func (ws WebService) quitHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Requesting termination... Goodbye!") fmt.Fprintf(w, "Requesting termination... Goodbye!")
ws.QuitDelegate() close(ws.QuitChan)
} }
func getTemplateFile(name string) (string, error) { func getTemplateFile(name string) (string, error) {