mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Remove the sample ingestion channel.
The one central sample ingestion channel has caused a variety of trouble. This commit removes it. Targets and rule evaluation call an Append method directly now. To incorporate multiple storage backends (like OpenTSDB), storage.Tee forks the Append into two different appenders. Note that the tsdb queue manager had its own queue anyway. It was a queue after a queue... Much queue, so overhead... Targets have their own little buffer (implemented as a channel) to avoid stalling during an http scrape. But a new scrape will only be started once the old one is fully ingested. The contraption of three pipelined ingesters was removed. A Target is an ingester itself now. Despite more logic in Target, things should be less confusing now. Also, remove lint and vet warnings in ast.go.
This commit is contained in:
parent
0056eaeb4f
commit
be11cb2b07
116
main.go
116
main.go
|
@ -24,13 +24,13 @@ import (
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
|
||||||
registry "github.com/prometheus/client_golang/prometheus"
|
registry "github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/notification"
|
"github.com/prometheus/prometheus/notification"
|
||||||
"github.com/prometheus/prometheus/retrieval"
|
"github.com/prometheus/prometheus/retrieval"
|
||||||
"github.com/prometheus/prometheus/rules/manager"
|
"github.com/prometheus/prometheus/rules/manager"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/storage/local"
|
"github.com/prometheus/prometheus/storage/local"
|
||||||
"github.com/prometheus/prometheus/storage/remote"
|
"github.com/prometheus/prometheus/storage/remote"
|
||||||
"github.com/prometheus/prometheus/storage/remote/opentsdb"
|
"github.com/prometheus/prometheus/storage/remote/opentsdb"
|
||||||
|
@ -52,7 +52,7 @@ var (
|
||||||
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
|
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
|
||||||
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
|
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
|
||||||
|
|
||||||
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.")
|
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 0, "Deprecated. Has no effect anymore.")
|
||||||
|
|
||||||
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
|
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
|
||||||
|
|
||||||
|
@ -67,23 +67,7 @@ var (
|
||||||
printVersion = flag.Bool("version", false, "Print version information.")
|
printVersion = flag.Bool("version", false, "Print version information.")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Instrumentation.
|
|
||||||
var (
|
|
||||||
samplesQueueCapDesc = registry.NewDesc(
|
|
||||||
"prometheus_samples_queue_capacity",
|
|
||||||
"Capacity of the queue for unwritten samples.",
|
|
||||||
nil, nil,
|
|
||||||
)
|
|
||||||
samplesQueueLenDesc = registry.NewDesc(
|
|
||||||
"prometheus_samples_queue_length",
|
|
||||||
"Current number of items in the queue for unwritten samples. Each item comprises all samples exposed by one target as one metric family (i.e. metrics of the same name).",
|
|
||||||
nil, nil,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
type prometheus struct {
|
type prometheus struct {
|
||||||
incomingSamples chan clientmodel.Samples
|
|
||||||
|
|
||||||
ruleManager manager.RuleManager
|
ruleManager manager.RuleManager
|
||||||
targetManager retrieval.TargetManager
|
targetManager retrieval.TargetManager
|
||||||
notificationHandler *notification.NotificationHandler
|
notificationHandler *notification.NotificationHandler
|
||||||
|
@ -103,16 +87,6 @@ func NewPrometheus() *prometheus {
|
||||||
glog.Fatalf("Error loading configuration from %s: %v", *configFile, err)
|
glog.Fatalf("Error loading configuration from %s: %v", *configFile, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
incomingSamples := make(chan clientmodel.Samples, *samplesQueueCapacity)
|
|
||||||
|
|
||||||
ingester := &retrieval.MergeLabelsIngester{
|
|
||||||
Labels: conf.GlobalLabels(),
|
|
||||||
CollisionPrefix: clientmodel.ExporterLabelPrefix,
|
|
||||||
Ingester: retrieval.ChannelIngester(incomingSamples),
|
|
||||||
}
|
|
||||||
targetManager := retrieval.NewTargetManager(ingester)
|
|
||||||
targetManager.AddTargetsFromConfig(conf)
|
|
||||||
|
|
||||||
notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity)
|
notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity)
|
||||||
|
|
||||||
o := &local.MemorySeriesStorageOptions{
|
o := &local.MemorySeriesStorageOptions{
|
||||||
|
@ -129,8 +103,25 @@ func NewPrometheus() *prometheus {
|
||||||
glog.Fatal("Error opening memory series storage: ", err)
|
glog.Fatal("Error opening memory series storage: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var sampleAppender storage.SampleAppender
|
||||||
|
var remoteTSDBQueue *remote.TSDBQueueManager
|
||||||
|
if *remoteTSDBUrl == "" {
|
||||||
|
glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage")
|
||||||
|
sampleAppender = memStorage
|
||||||
|
} else {
|
||||||
|
openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout)
|
||||||
|
remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 100*1024)
|
||||||
|
sampleAppender = storage.Tee{
|
||||||
|
Appender1: remoteTSDBQueue,
|
||||||
|
Appender2: memStorage,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
targetManager := retrieval.NewTargetManager(sampleAppender, conf.GlobalLabels())
|
||||||
|
targetManager.AddTargetsFromConfig(conf)
|
||||||
|
|
||||||
ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{
|
ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{
|
||||||
Results: incomingSamples,
|
SampleAppender: sampleAppender,
|
||||||
NotificationHandler: notificationHandler,
|
NotificationHandler: notificationHandler,
|
||||||
EvaluationInterval: conf.EvaluationInterval(),
|
EvaluationInterval: conf.EvaluationInterval(),
|
||||||
Storage: memStorage,
|
Storage: memStorage,
|
||||||
|
@ -140,14 +131,6 @@ func NewPrometheus() *prometheus {
|
||||||
glog.Fatal("Error loading rule files: ", err)
|
glog.Fatal("Error loading rule files: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var remoteTSDBQueue *remote.TSDBQueueManager
|
|
||||||
if *remoteTSDBUrl == "" {
|
|
||||||
glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage")
|
|
||||||
} else {
|
|
||||||
openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout)
|
|
||||||
remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 512)
|
|
||||||
}
|
|
||||||
|
|
||||||
flags := map[string]string{}
|
flags := map[string]string{}
|
||||||
flag.VisitAll(func(f *flag.Flag) {
|
flag.VisitAll(func(f *flag.Flag) {
|
||||||
flags[f.Name] = f.Value.String()
|
flags[f.Name] = f.Value.String()
|
||||||
|
@ -183,8 +166,6 @@ func NewPrometheus() *prometheus {
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &prometheus{
|
p := &prometheus{
|
||||||
incomingSamples: incomingSamples,
|
|
||||||
|
|
||||||
ruleManager: ruleManager,
|
ruleManager: ruleManager,
|
||||||
targetManager: targetManager,
|
targetManager: targetManager,
|
||||||
notificationHandler: notificationHandler,
|
notificationHandler: notificationHandler,
|
||||||
|
@ -193,7 +174,7 @@ func NewPrometheus() *prometheus {
|
||||||
|
|
||||||
webService: webService,
|
webService: webService,
|
||||||
}
|
}
|
||||||
webService.QuitDelegate = p.Close
|
webService.QuitChan = make(chan struct{})
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,7 +187,6 @@ func (p *prometheus) Serve() {
|
||||||
}
|
}
|
||||||
go p.ruleManager.Run()
|
go p.ruleManager.Run()
|
||||||
go p.notificationHandler.Run()
|
go p.notificationHandler.Run()
|
||||||
go p.interruptHandler()
|
|
||||||
|
|
||||||
p.storage.Start()
|
p.storage.Start()
|
||||||
|
|
||||||
|
@ -217,15 +197,18 @@ func (p *prometheus) Serve() {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for samples := range p.incomingSamples {
|
notifier := make(chan os.Signal)
|
||||||
p.storage.AppendSamples(samples)
|
signal.Notify(notifier, os.Interrupt, syscall.SIGTERM)
|
||||||
if p.remoteTSDBQueue != nil {
|
select {
|
||||||
p.remoteTSDBQueue.Queue(samples)
|
case <-notifier:
|
||||||
}
|
glog.Warning("Received SIGTERM, exiting gracefully...")
|
||||||
|
case <-p.webService.QuitChan:
|
||||||
|
glog.Warning("Received termination request via web service, exiting gracefully...")
|
||||||
}
|
}
|
||||||
|
|
||||||
// The following shut-down operations have to happen after
|
p.targetManager.Stop()
|
||||||
// incomingSamples is drained. So do not move them into close().
|
p.ruleManager.Stop()
|
||||||
|
|
||||||
if err := p.storage.Stop(); err != nil {
|
if err := p.storage.Stop(); err != nil {
|
||||||
glog.Error("Error stopping local storage: ", err)
|
glog.Error("Error stopping local storage: ", err)
|
||||||
}
|
}
|
||||||
|
@ -238,35 +221,8 @@ func (p *prometheus) Serve() {
|
||||||
glog.Info("See you next time!")
|
glog.Info("See you next time!")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close cleanly shuts down the Prometheus server.
|
|
||||||
func (p *prometheus) Close() {
|
|
||||||
p.closeOnce.Do(p.close)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *prometheus) interruptHandler() {
|
|
||||||
notifier := make(chan os.Signal)
|
|
||||||
signal.Notify(notifier, os.Interrupt, syscall.SIGTERM)
|
|
||||||
<-notifier
|
|
||||||
|
|
||||||
glog.Warning("Received SIGTERM, exiting gracefully...")
|
|
||||||
p.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *prometheus) close() {
|
|
||||||
glog.Info("Shutdown has been requested; subsytems are closing:")
|
|
||||||
p.targetManager.Stop()
|
|
||||||
p.ruleManager.Stop()
|
|
||||||
|
|
||||||
close(p.incomingSamples)
|
|
||||||
// Note: Before closing the remaining subsystems (storage, ...), we have
|
|
||||||
// to wait until p.incomingSamples is actually drained. Therefore,
|
|
||||||
// remaining shut-downs happen in Serve().
|
|
||||||
}
|
|
||||||
|
|
||||||
// Describe implements registry.Collector.
|
// Describe implements registry.Collector.
|
||||||
func (p *prometheus) Describe(ch chan<- *registry.Desc) {
|
func (p *prometheus) Describe(ch chan<- *registry.Desc) {
|
||||||
ch <- samplesQueueCapDesc
|
|
||||||
ch <- samplesQueueLenDesc
|
|
||||||
p.notificationHandler.Describe(ch)
|
p.notificationHandler.Describe(ch)
|
||||||
p.storage.Describe(ch)
|
p.storage.Describe(ch)
|
||||||
if p.remoteTSDBQueue != nil {
|
if p.remoteTSDBQueue != nil {
|
||||||
|
@ -276,16 +232,6 @@ func (p *prometheus) Describe(ch chan<- *registry.Desc) {
|
||||||
|
|
||||||
// Collect implements registry.Collector.
|
// Collect implements registry.Collector.
|
||||||
func (p *prometheus) Collect(ch chan<- registry.Metric) {
|
func (p *prometheus) Collect(ch chan<- registry.Metric) {
|
||||||
ch <- registry.MustNewConstMetric(
|
|
||||||
samplesQueueCapDesc,
|
|
||||||
registry.GaugeValue,
|
|
||||||
float64(cap(p.incomingSamples)),
|
|
||||||
)
|
|
||||||
ch <- registry.MustNewConstMetric(
|
|
||||||
samplesQueueLenDesc,
|
|
||||||
registry.GaugeValue,
|
|
||||||
float64(len(p.incomingSamples)),
|
|
||||||
)
|
|
||||||
p.notificationHandler.Collect(ch)
|
p.notificationHandler.Collect(ch)
|
||||||
p.storage.Collect(ch)
|
p.storage.Collect(ch)
|
||||||
if p.remoteTSDBQueue != nil {
|
if p.remoteTSDBQueue != nil {
|
||||||
|
|
|
@ -14,11 +14,27 @@
|
||||||
package retrieval
|
package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
clientmodel "github.com/prometheus/client_golang/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
type nopIngester struct{}
|
type nopAppender struct{}
|
||||||
|
|
||||||
func (i nopIngester) Ingest(clientmodel.Samples) error {
|
func (a nopAppender) Append(*clientmodel.Sample) {
|
||||||
return nil
|
}
|
||||||
|
|
||||||
|
type slowAppender struct{}
|
||||||
|
|
||||||
|
func (a slowAppender) Append(*clientmodel.Sample) {
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
type collectResultAppender struct {
|
||||||
|
result clientmodel.Samples
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *collectResultAppender) Append(s *clientmodel.Sample) {
|
||||||
|
a.result = append(a.result, s)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,71 +0,0 @@
|
||||||
// Copyright 2013 The Prometheus Authors
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package retrieval
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/extraction"
|
|
||||||
|
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
|
||||||
)
|
|
||||||
|
|
||||||
const ingestTimeout = 100 * time.Millisecond // TODO(beorn7): Adjust this to a fraction of the actual HTTP timeout.
|
|
||||||
|
|
||||||
var errIngestChannelFull = errors.New("ingestion channel full")
|
|
||||||
|
|
||||||
// MergeLabelsIngester merges a labelset ontop of a given extraction result and
|
|
||||||
// passes the result on to another ingester. Label collisions are avoided by
|
|
||||||
// appending a label prefix to any newly merged colliding labels.
|
|
||||||
type MergeLabelsIngester struct {
|
|
||||||
Labels clientmodel.LabelSet
|
|
||||||
CollisionPrefix clientmodel.LabelName
|
|
||||||
|
|
||||||
Ingester extraction.Ingester
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ingest ingests the provided extraction result by merging in i.Labels and then
|
|
||||||
// handing it over to i.Ingester.
|
|
||||||
func (i *MergeLabelsIngester) Ingest(samples clientmodel.Samples) error {
|
|
||||||
for _, s := range samples {
|
|
||||||
s.Metric.MergeFromLabelSet(i.Labels, i.CollisionPrefix)
|
|
||||||
}
|
|
||||||
|
|
||||||
return i.Ingester.Ingest(samples)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ChannelIngester feeds results into a channel without modifying them.
|
|
||||||
type ChannelIngester chan<- clientmodel.Samples
|
|
||||||
|
|
||||||
// Ingest ingests the provided extraction result by sending it to its channel.
|
|
||||||
// If the channel was not able to receive the samples within the ingestTimeout,
|
|
||||||
// an error is returned. This is important to fail fast and to not pile up
|
|
||||||
// ingestion requests in case of overload.
|
|
||||||
func (i ChannelIngester) Ingest(s clientmodel.Samples) error {
|
|
||||||
// Since the regular case is that i is ready to receive, first try
|
|
||||||
// without setting a timeout so that we don't need to allocate a timer
|
|
||||||
// most of the time.
|
|
||||||
select {
|
|
||||||
case i <- s:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
select {
|
|
||||||
case i <- s:
|
|
||||||
return nil
|
|
||||||
case <-time.After(ingestTimeout):
|
|
||||||
return errIngestChannelFull
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -14,6 +14,7 @@
|
||||||
package retrieval
|
package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -29,6 +30,7 @@ import (
|
||||||
|
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
clientmodel "github.com/prometheus/client_golang/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/utility"
|
"github.com/prometheus/prometheus/utility"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -41,6 +43,8 @@ const (
|
||||||
// ScrapeTimeMetricName is the metric name for the synthetic scrape duration
|
// ScrapeTimeMetricName is the metric name for the synthetic scrape duration
|
||||||
// variable.
|
// variable.
|
||||||
scrapeDurationMetricName clientmodel.LabelValue = "scrape_duration_seconds"
|
scrapeDurationMetricName clientmodel.LabelValue = "scrape_duration_seconds"
|
||||||
|
// Capacity of the channel to buffer samples during ingestion.
|
||||||
|
ingestedSamplesCap = 256
|
||||||
|
|
||||||
// Constants for instrumentation.
|
// Constants for instrumentation.
|
||||||
namespace = "prometheus"
|
namespace = "prometheus"
|
||||||
|
@ -48,6 +52,8 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
errIngestChannelFull = errors.New("ingestion channel full")
|
||||||
|
|
||||||
localhostRepresentations = []string{"http://127.0.0.1", "http://localhost"}
|
localhostRepresentations = []string{"http://127.0.0.1", "http://localhost"}
|
||||||
|
|
||||||
targetIntervalLength = prometheus.NewSummaryVec(
|
targetIntervalLength = prometheus.NewSummaryVec(
|
||||||
|
@ -100,6 +106,8 @@ const (
|
||||||
// For the future, the Target protocol will abstract away the exact means that
|
// For the future, the Target protocol will abstract away the exact means that
|
||||||
// metrics are retrieved and deserialized from the given instance to which it
|
// metrics are retrieved and deserialized from the given instance to which it
|
||||||
// refers.
|
// refers.
|
||||||
|
//
|
||||||
|
// Target implements extraction.Ingester.
|
||||||
type Target interface {
|
type Target interface {
|
||||||
// Return the last encountered scrape error, if any.
|
// Return the last encountered scrape error, if any.
|
||||||
LastError() error
|
LastError() error
|
||||||
|
@ -125,9 +133,11 @@ type Target interface {
|
||||||
// begins).
|
// begins).
|
||||||
SetBaseLabelsFrom(Target)
|
SetBaseLabelsFrom(Target)
|
||||||
// Scrape target at the specified interval.
|
// Scrape target at the specified interval.
|
||||||
RunScraper(extraction.Ingester, time.Duration)
|
RunScraper(storage.SampleAppender, time.Duration)
|
||||||
// Stop scraping, synchronous.
|
// Stop scraping, synchronous.
|
||||||
StopScraper()
|
StopScraper()
|
||||||
|
// Ingest implements extraction.Ingester.
|
||||||
|
Ingest(clientmodel.Samples) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
|
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
|
||||||
|
@ -144,10 +154,12 @@ type target struct {
|
||||||
scraperStopped chan struct{}
|
scraperStopped chan struct{}
|
||||||
// Channel to queue base labels to be replaced.
|
// Channel to queue base labels to be replaced.
|
||||||
newBaseLabels chan clientmodel.LabelSet
|
newBaseLabels chan clientmodel.LabelSet
|
||||||
|
// Channel to buffer ingested samples.
|
||||||
|
ingestedSamples chan clientmodel.Samples
|
||||||
|
|
||||||
url string
|
url string
|
||||||
// What is the deadline for the HTTP or HTTPS against this endpoint.
|
// What is the deadline for the HTTP or HTTPS against this endpoint.
|
||||||
Deadline time.Duration
|
deadline time.Duration
|
||||||
// Any base labels that are added to this target and its metrics.
|
// Any base labels that are added to this target and its metrics.
|
||||||
baseLabels clientmodel.LabelSet
|
baseLabels clientmodel.LabelSet
|
||||||
// The HTTP client used to scrape the target's endpoint.
|
// The HTTP client used to scrape the target's endpoint.
|
||||||
|
@ -163,52 +175,42 @@ type target struct {
|
||||||
|
|
||||||
// NewTarget creates a reasonably configured target for querying.
|
// NewTarget creates a reasonably configured target for querying.
|
||||||
func NewTarget(url string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target {
|
func NewTarget(url string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target {
|
||||||
target := &target{
|
t := &target{
|
||||||
url: url,
|
url: url,
|
||||||
Deadline: deadline,
|
deadline: deadline,
|
||||||
baseLabels: baseLabels,
|
|
||||||
httpClient: utility.NewDeadlineClient(deadline),
|
httpClient: utility.NewDeadlineClient(deadline),
|
||||||
scraperStopping: make(chan struct{}),
|
scraperStopping: make(chan struct{}),
|
||||||
scraperStopped: make(chan struct{}),
|
scraperStopped: make(chan struct{}),
|
||||||
newBaseLabels: make(chan clientmodel.LabelSet, 1),
|
newBaseLabels: make(chan clientmodel.LabelSet, 1),
|
||||||
}
|
}
|
||||||
|
labels := clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())}
|
||||||
return target
|
for baseLabel, baseValue := range baseLabels {
|
||||||
|
labels[baseLabel] = baseValue
|
||||||
|
}
|
||||||
|
t.baseLabels = labels
|
||||||
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) {
|
// Ingest implements Target and extraction.Ingester.
|
||||||
healthMetric := clientmodel.Metric{}
|
func (t *target) Ingest(s clientmodel.Samples) error {
|
||||||
durationMetric := clientmodel.Metric{}
|
// Since the regular case is that ingestedSamples is ready to receive,
|
||||||
for label, value := range t.baseLabels {
|
// first try without setting a timeout so that we don't need to allocate
|
||||||
healthMetric[label] = value
|
// a timer most of the time.
|
||||||
durationMetric[label] = value
|
select {
|
||||||
|
case t.ingestedSamples <- s:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
select {
|
||||||
|
case t.ingestedSamples <- s:
|
||||||
|
return nil
|
||||||
|
case <-time.After(t.deadline / 10):
|
||||||
|
return errIngestChannelFull
|
||||||
|
}
|
||||||
}
|
}
|
||||||
healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName)
|
|
||||||
durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName)
|
|
||||||
healthMetric[InstanceLabel] = clientmodel.LabelValue(t.InstanceIdentifier())
|
|
||||||
durationMetric[InstanceLabel] = clientmodel.LabelValue(t.InstanceIdentifier())
|
|
||||||
|
|
||||||
healthValue := clientmodel.SampleValue(0)
|
|
||||||
if healthy {
|
|
||||||
healthValue = clientmodel.SampleValue(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
healthSample := &clientmodel.Sample{
|
|
||||||
Metric: healthMetric,
|
|
||||||
Timestamp: timestamp,
|
|
||||||
Value: healthValue,
|
|
||||||
}
|
|
||||||
durationSample := &clientmodel.Sample{
|
|
||||||
Metric: durationMetric,
|
|
||||||
Timestamp: timestamp,
|
|
||||||
Value: clientmodel.SampleValue(float64(scrapeDuration) / float64(time.Second)),
|
|
||||||
}
|
|
||||||
|
|
||||||
ingester.Ingest(clientmodel.Samples{healthSample, durationSample})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunScraper implements Target.
|
// RunScraper implements Target.
|
||||||
func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration) {
|
func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time.Duration) {
|
||||||
defer func() {
|
defer func() {
|
||||||
// Need to drain t.newBaseLabels to not make senders block during shutdown.
|
// Need to drain t.newBaseLabels to not make senders block during shutdown.
|
||||||
for {
|
for {
|
||||||
|
@ -237,7 +239,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
|
||||||
t.Lock() // Writing t.lastScrape requires the lock.
|
t.Lock() // Writing t.lastScrape requires the lock.
|
||||||
t.lastScrape = time.Now()
|
t.lastScrape = time.Now()
|
||||||
t.Unlock()
|
t.Unlock()
|
||||||
t.scrape(ingester)
|
t.scrape(sampleAppender)
|
||||||
|
|
||||||
// Explanation of the contraption below:
|
// Explanation of the contraption below:
|
||||||
//
|
//
|
||||||
|
@ -271,7 +273,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
|
||||||
targetIntervalLength.WithLabelValues(interval.String()).Observe(
|
targetIntervalLength.WithLabelValues(interval.String()).Observe(
|
||||||
float64(took) / float64(time.Second), // Sub-second precision.
|
float64(took) / float64(time.Second), // Sub-second precision.
|
||||||
)
|
)
|
||||||
t.scrape(ingester)
|
t.scrape(sampleAppender)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -285,7 +287,7 @@ func (t *target) StopScraper() {
|
||||||
|
|
||||||
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
|
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
|
||||||
|
|
||||||
func (t *target) scrape(ingester extraction.Ingester) (err error) {
|
func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) {
|
||||||
timestamp := clientmodel.Now()
|
timestamp := clientmodel.Now()
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
t.Lock() // Writing t.state and t.lastError requires the lock.
|
t.Lock() // Writing t.state and t.lastError requires the lock.
|
||||||
|
@ -296,7 +298,7 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
|
||||||
}
|
}
|
||||||
t.lastError = err
|
t.lastError = err
|
||||||
t.Unlock()
|
t.Unlock()
|
||||||
t.recordScrapeHealth(ingester, timestamp, err == nil, time.Since(start))
|
t.recordScrapeHealth(sampleAppender, timestamp, err == nil, time.Since(start))
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", t.URL(), nil)
|
req, err := http.NewRequest("GET", t.URL(), nil)
|
||||||
|
@ -319,21 +321,23 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
baseLabels := clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())}
|
t.ingestedSamples = make(chan clientmodel.Samples, ingestedSamplesCap)
|
||||||
for baseLabel, baseValue := range t.baseLabels {
|
|
||||||
baseLabels[baseLabel] = baseValue
|
|
||||||
}
|
|
||||||
|
|
||||||
i := &MergeLabelsIngester{
|
|
||||||
Labels: baseLabels,
|
|
||||||
CollisionPrefix: clientmodel.ExporterLabelPrefix,
|
|
||||||
|
|
||||||
Ingester: ingester,
|
|
||||||
}
|
|
||||||
processOptions := &extraction.ProcessOptions{
|
processOptions := &extraction.ProcessOptions{
|
||||||
Timestamp: timestamp,
|
Timestamp: timestamp,
|
||||||
}
|
}
|
||||||
return processor.ProcessSingle(resp.Body, i, processOptions)
|
go func() {
|
||||||
|
err = processor.ProcessSingle(resp.Body, t, processOptions)
|
||||||
|
close(t.ingestedSamples)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for samples := range t.ingestedSamples {
|
||||||
|
for _, s := range samples {
|
||||||
|
s.Metric.MergeFromLabelSet(t.baseLabels, clientmodel.ExporterLabelPrefix)
|
||||||
|
sampleAppender.Append(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// LastError implements Target.
|
// LastError implements Target.
|
||||||
|
@ -412,3 +416,33 @@ func (t *target) SetBaseLabelsFrom(newTarget Target) {
|
||||||
}
|
}
|
||||||
t.newBaseLabels <- newTarget.BaseLabels()
|
t.newBaseLabels <- newTarget.BaseLabels()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) {
|
||||||
|
healthMetric := clientmodel.Metric{}
|
||||||
|
durationMetric := clientmodel.Metric{}
|
||||||
|
for label, value := range t.baseLabels {
|
||||||
|
healthMetric[label] = value
|
||||||
|
durationMetric[label] = value
|
||||||
|
}
|
||||||
|
healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName)
|
||||||
|
durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName)
|
||||||
|
|
||||||
|
healthValue := clientmodel.SampleValue(0)
|
||||||
|
if healthy {
|
||||||
|
healthValue = clientmodel.SampleValue(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
healthSample := &clientmodel.Sample{
|
||||||
|
Metric: healthMetric,
|
||||||
|
Timestamp: timestamp,
|
||||||
|
Value: healthValue,
|
||||||
|
}
|
||||||
|
durationSample := &clientmodel.Sample{
|
||||||
|
Metric: durationMetric,
|
||||||
|
Timestamp: timestamp,
|
||||||
|
Value: clientmodel.SampleValue(float64(scrapeDuration) / float64(time.Second)),
|
||||||
|
}
|
||||||
|
|
||||||
|
sampleAppender.Append(healthSample)
|
||||||
|
sampleAppender.Append(durationSample)
|
||||||
|
}
|
||||||
|
|
|
@ -59,22 +59,23 @@ type TargetProvider interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type sdTargetProvider struct {
|
type sdTargetProvider struct {
|
||||||
job config.JobConfig
|
job config.JobConfig
|
||||||
|
globalLabels clientmodel.LabelSet
|
||||||
targets []Target
|
targets []Target
|
||||||
|
|
||||||
lastRefresh time.Time
|
lastRefresh time.Time
|
||||||
refreshInterval time.Duration
|
refreshInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSdTargetProvider constructs a new sdTargetProvider for a job.
|
// NewSdTargetProvider constructs a new sdTargetProvider for a job.
|
||||||
func NewSdTargetProvider(job config.JobConfig) *sdTargetProvider {
|
func NewSdTargetProvider(job config.JobConfig, globalLabels clientmodel.LabelSet) *sdTargetProvider {
|
||||||
i, err := utility.StringToDuration(job.GetSdRefreshInterval())
|
i, err := utility.StringToDuration(job.GetSdRefreshInterval())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("illegal refresh duration string %s: %s", job.GetSdRefreshInterval(), err))
|
panic(fmt.Sprintf("illegal refresh duration string %s: %s", job.GetSdRefreshInterval(), err))
|
||||||
}
|
}
|
||||||
return &sdTargetProvider{
|
return &sdTargetProvider{
|
||||||
job: job,
|
job: job,
|
||||||
|
globalLabels: globalLabels,
|
||||||
refreshInterval: i,
|
refreshInterval: i,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -101,6 +102,9 @@ func (p *sdTargetProvider) Targets() ([]Target, error) {
|
||||||
baseLabels := clientmodel.LabelSet{
|
baseLabels := clientmodel.LabelSet{
|
||||||
clientmodel.JobLabel: clientmodel.LabelValue(p.job.GetName()),
|
clientmodel.JobLabel: clientmodel.LabelValue(p.job.GetName()),
|
||||||
}
|
}
|
||||||
|
for n, v := range p.globalLabels {
|
||||||
|
baseLabels[n] = v
|
||||||
|
}
|
||||||
|
|
||||||
targets := make([]Target, 0, len(response.Answer))
|
targets := make([]Target, 0, len(response.Answer))
|
||||||
endpoint := &url.URL{
|
endpoint := &url.URL{
|
||||||
|
|
|
@ -15,6 +15,7 @@ package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -25,15 +26,6 @@ import (
|
||||||
"github.com/prometheus/prometheus/utility"
|
"github.com/prometheus/prometheus/utility"
|
||||||
)
|
)
|
||||||
|
|
||||||
type collectResultIngester struct {
|
|
||||||
result clientmodel.Samples
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *collectResultIngester) Ingest(s clientmodel.Samples) error {
|
|
||||||
i.result = s
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTargetHidesURLAuth(t *testing.T) {
|
func TestTargetHidesURLAuth(t *testing.T) {
|
||||||
testVectors := []string{"http://secret:data@host.com/query?args#fragment", "https://example.net/foo", "http://foo.com:31337/bar"}
|
testVectors := []string{"http://secret:data@host.com/query?args#fragment", "https://example.net/foo", "http://foo.com:31337/bar"}
|
||||||
testResults := []string{"host.com:80", "example.net:443", "foo.com:31337"}
|
testResults := []string{"host.com:80", "example.net:443", "foo.com:31337"}
|
||||||
|
@ -60,7 +52,7 @@ func TestTargetScrapeUpdatesState(t *testing.T) {
|
||||||
url: "bad schema",
|
url: "bad schema",
|
||||||
httpClient: utility.NewDeadlineClient(0),
|
httpClient: utility.NewDeadlineClient(0),
|
||||||
}
|
}
|
||||||
testTarget.scrape(nopIngester{})
|
testTarget.scrape(nopAppender{})
|
||||||
if testTarget.state != Unhealthy {
|
if testTarget.state != Unhealthy {
|
||||||
t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state)
|
t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state)
|
||||||
}
|
}
|
||||||
|
@ -71,7 +63,11 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
|
||||||
http.HandlerFunc(
|
http.HandlerFunc(
|
||||||
func(w http.ResponseWriter, r *http.Request) {
|
func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
|
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
|
||||||
w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n"))
|
for i := 0; i < 2*ingestedSamplesCap; i++ {
|
||||||
|
w.Write([]byte(
|
||||||
|
fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i),
|
||||||
|
))
|
||||||
|
}
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
@ -79,11 +75,11 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
|
||||||
|
|
||||||
testTarget := NewTarget(
|
testTarget := NewTarget(
|
||||||
server.URL,
|
server.URL,
|
||||||
100*time.Millisecond,
|
10*time.Millisecond,
|
||||||
clientmodel.LabelSet{"dings": "bums"},
|
clientmodel.LabelSet{"dings": "bums"},
|
||||||
).(*target)
|
).(*target)
|
||||||
|
|
||||||
testTarget.scrape(ChannelIngester(make(chan clientmodel.Samples))) // Capacity 0.
|
testTarget.scrape(slowAppender{})
|
||||||
if testTarget.state != Unhealthy {
|
if testTarget.state != Unhealthy {
|
||||||
t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state)
|
t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state)
|
||||||
}
|
}
|
||||||
|
@ -93,17 +89,15 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTargetRecordScrapeHealth(t *testing.T) {
|
func TestTargetRecordScrapeHealth(t *testing.T) {
|
||||||
testTarget := target{
|
testTarget := NewTarget(
|
||||||
url: "http://example.url",
|
"http://example.url", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"},
|
||||||
baseLabels: clientmodel.LabelSet{clientmodel.JobLabel: "testjob"},
|
).(*target)
|
||||||
httpClient: utility.NewDeadlineClient(0),
|
|
||||||
}
|
|
||||||
|
|
||||||
now := clientmodel.Now()
|
now := clientmodel.Now()
|
||||||
ingester := &collectResultIngester{}
|
appender := &collectResultAppender{}
|
||||||
testTarget.recordScrapeHealth(ingester, now, true, 2*time.Second)
|
testTarget.recordScrapeHealth(appender, now, true, 2*time.Second)
|
||||||
|
|
||||||
result := ingester.result
|
result := appender.result
|
||||||
|
|
||||||
if len(result) != 2 {
|
if len(result) != 2 {
|
||||||
t.Fatalf("Expected two samples, got %d", len(result))
|
t.Fatalf("Expected two samples, got %d", len(result))
|
||||||
|
@ -154,11 +148,11 @@ func TestTargetScrapeTimeout(t *testing.T) {
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
|
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
|
||||||
ingester := nopIngester{}
|
appender := nopAppender{}
|
||||||
|
|
||||||
// scrape once without timeout
|
// scrape once without timeout
|
||||||
signal <- true
|
signal <- true
|
||||||
if err := testTarget.(*target).scrape(ingester); err != nil {
|
if err := testTarget.(*target).scrape(appender); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,12 +161,12 @@ func TestTargetScrapeTimeout(t *testing.T) {
|
||||||
|
|
||||||
// now scrape again
|
// now scrape again
|
||||||
signal <- true
|
signal <- true
|
||||||
if err := testTarget.(*target).scrape(ingester); err != nil {
|
if err := testTarget.(*target).scrape(appender); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// now timeout
|
// now timeout
|
||||||
if err := testTarget.(*target).scrape(ingester); err == nil {
|
if err := testTarget.(*target).scrape(appender); err == nil {
|
||||||
t.Fatal("expected scrape to timeout")
|
t.Fatal("expected scrape to timeout")
|
||||||
} else {
|
} else {
|
||||||
signal <- true // let handler continue
|
signal <- true // let handler continue
|
||||||
|
@ -180,7 +174,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
|
||||||
|
|
||||||
// now scrape again without timeout
|
// now scrape again without timeout
|
||||||
signal <- true
|
signal <- true
|
||||||
if err := testTarget.(*target).scrape(ingester); err != nil {
|
if err := testTarget.(*target).scrape(appender); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -196,10 +190,10 @@ func TestTargetScrape404(t *testing.T) {
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
|
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
|
||||||
ingester := nopIngester{}
|
appender := nopAppender{}
|
||||||
|
|
||||||
want := errors.New("server returned HTTP status 404 Not Found")
|
want := errors.New("server returned HTTP status 404 Not Found")
|
||||||
got := testTarget.(*target).scrape(ingester)
|
got := testTarget.(*target).scrape(appender)
|
||||||
if got == nil || want.Error() != got.Error() {
|
if got == nil || want.Error() != got.Error() {
|
||||||
t.Fatalf("want err %q, got %q", want, got)
|
t.Fatalf("want err %q, got %q", want, got)
|
||||||
}
|
}
|
||||||
|
@ -213,7 +207,7 @@ func TestTargetRunScraperScrapes(t *testing.T) {
|
||||||
scraperStopping: make(chan struct{}),
|
scraperStopping: make(chan struct{}),
|
||||||
scraperStopped: make(chan struct{}),
|
scraperStopped: make(chan struct{}),
|
||||||
}
|
}
|
||||||
go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond))
|
go testTarget.RunScraper(nopAppender{}, time.Duration(time.Millisecond))
|
||||||
|
|
||||||
// Enough time for a scrape to happen.
|
// Enough time for a scrape to happen.
|
||||||
time.Sleep(2 * time.Millisecond)
|
time.Sleep(2 * time.Millisecond)
|
||||||
|
@ -248,11 +242,11 @@ func BenchmarkScrape(b *testing.B) {
|
||||||
100*time.Millisecond,
|
100*time.Millisecond,
|
||||||
clientmodel.LabelSet{"dings": "bums"},
|
clientmodel.LabelSet{"dings": "bums"},
|
||||||
)
|
)
|
||||||
ingester := nopIngester{}
|
appender := nopAppender{}
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
if err := testTarget.(*target).scrape(ingester); err != nil {
|
if err := testTarget.(*target).scrape(appender); err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"github.com/prometheus/client_golang/extraction"
|
|
||||||
|
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
clientmodel "github.com/prometheus/client_golang/model"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TargetManager manages all scrape targets. All methods are goroutine-safe.
|
// TargetManager manages all scrape targets. All methods are goroutine-safe.
|
||||||
|
@ -35,16 +35,18 @@ type TargetManager interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type targetManager struct {
|
type targetManager struct {
|
||||||
sync.Mutex // Protects poolByJob.
|
sync.Mutex // Protects poolByJob.
|
||||||
poolsByJob map[string]*TargetPool
|
globalLabels clientmodel.LabelSet
|
||||||
ingester extraction.Ingester
|
sampleAppender storage.SampleAppender
|
||||||
|
poolsByJob map[string]*TargetPool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTargetManager returns a newly initialized TargetManager ready to use.
|
// NewTargetManager returns a newly initialized TargetManager ready to use.
|
||||||
func NewTargetManager(ingester extraction.Ingester) TargetManager {
|
func NewTargetManager(sampleAppender storage.SampleAppender, globalLabels clientmodel.LabelSet) TargetManager {
|
||||||
return &targetManager{
|
return &targetManager{
|
||||||
ingester: ingester,
|
sampleAppender: sampleAppender,
|
||||||
poolsByJob: make(map[string]*TargetPool),
|
globalLabels: globalLabels,
|
||||||
|
poolsByJob: make(map[string]*TargetPool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,11 +56,11 @@ func (m *targetManager) targetPoolForJob(job config.JobConfig) *TargetPool {
|
||||||
if !ok {
|
if !ok {
|
||||||
var provider TargetProvider
|
var provider TargetProvider
|
||||||
if job.SdName != nil {
|
if job.SdName != nil {
|
||||||
provider = NewSdTargetProvider(job)
|
provider = NewSdTargetProvider(job, m.globalLabels)
|
||||||
}
|
}
|
||||||
|
|
||||||
interval := job.ScrapeInterval()
|
interval := job.ScrapeInterval()
|
||||||
targetPool = NewTargetPool(m, provider, m.ingester, interval)
|
targetPool = NewTargetPool(provider, m.sampleAppender, interval)
|
||||||
glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName())
|
glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName())
|
||||||
|
|
||||||
m.poolsByJob[job.GetName()] = targetPool
|
m.poolsByJob[job.GetName()] = targetPool
|
||||||
|
@ -102,6 +104,9 @@ func (m *targetManager) AddTargetsFromConfig(config config.Config) {
|
||||||
baseLabels := clientmodel.LabelSet{
|
baseLabels := clientmodel.LabelSet{
|
||||||
clientmodel.JobLabel: clientmodel.LabelValue(job.GetName()),
|
clientmodel.JobLabel: clientmodel.LabelValue(job.GetName()),
|
||||||
}
|
}
|
||||||
|
for n, v := range m.globalLabels {
|
||||||
|
baseLabels[n] = v
|
||||||
|
}
|
||||||
if targetGroup.Labels != nil {
|
if targetGroup.Labels != nil {
|
||||||
for _, label := range targetGroup.Labels.Label {
|
for _, label := range targetGroup.Labels.Label {
|
||||||
baseLabels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue())
|
baseLabels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue())
|
||||||
|
|
|
@ -21,9 +21,8 @@ import (
|
||||||
|
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
clientmodel "github.com/prometheus/client_golang/model"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/extraction"
|
|
||||||
|
|
||||||
pb "github.com/prometheus/prometheus/config/generated"
|
pb "github.com/prometheus/prometheus/config/generated"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
)
|
)
|
||||||
|
@ -62,13 +61,13 @@ func (t fakeTarget) LastScrape() time.Time {
|
||||||
return t.lastScrape
|
return t.lastScrape
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t fakeTarget) scrape(i extraction.Ingester) error {
|
func (t fakeTarget) scrape(storage.SampleAppender) error {
|
||||||
t.scrapeCount++
|
t.scrapeCount++
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t fakeTarget) RunScraper(ingester extraction.Ingester, interval time.Duration) {
|
func (t fakeTarget) RunScraper(storage.SampleAppender, time.Duration) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,8 +81,10 @@ func (t fakeTarget) State() TargetState {
|
||||||
|
|
||||||
func (t *fakeTarget) SetBaseLabelsFrom(newTarget Target) {}
|
func (t *fakeTarget) SetBaseLabelsFrom(newTarget Target) {}
|
||||||
|
|
||||||
|
func (t *fakeTarget) Ingest(clientmodel.Samples) error { return nil }
|
||||||
|
|
||||||
func testTargetManager(t testing.TB) {
|
func testTargetManager(t testing.TB) {
|
||||||
targetManager := NewTargetManager(nopIngester{})
|
targetManager := NewTargetManager(nopAppender{}, nil)
|
||||||
testJob1 := config.JobConfig{
|
testJob1 := config.JobConfig{
|
||||||
JobConfig: pb.JobConfig{
|
JobConfig: pb.JobConfig{
|
||||||
Name: proto.String("test_job1"),
|
Name: proto.String("test_job1"),
|
||||||
|
|
|
@ -19,7 +19,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"github.com/prometheus/client_golang/extraction"
|
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/utility"
|
"github.com/prometheus/prometheus/utility"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -35,7 +36,7 @@ type TargetPool struct {
|
||||||
manager TargetManager
|
manager TargetManager
|
||||||
targetsByURL map[string]Target
|
targetsByURL map[string]Target
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
ingester extraction.Ingester
|
sampleAppender storage.SampleAppender
|
||||||
addTargetQueue chan Target
|
addTargetQueue chan Target
|
||||||
|
|
||||||
targetProvider TargetProvider
|
targetProvider TargetProvider
|
||||||
|
@ -44,11 +45,10 @@ type TargetPool struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTargetPool creates a TargetPool, ready to be started by calling Run.
|
// NewTargetPool creates a TargetPool, ready to be started by calling Run.
|
||||||
func NewTargetPool(m TargetManager, p TargetProvider, ing extraction.Ingester, i time.Duration) *TargetPool {
|
func NewTargetPool(p TargetProvider, app storage.SampleAppender, i time.Duration) *TargetPool {
|
||||||
return &TargetPool{
|
return &TargetPool{
|
||||||
manager: m,
|
|
||||||
interval: i,
|
interval: i,
|
||||||
ingester: ing,
|
sampleAppender: app,
|
||||||
targetsByURL: make(map[string]Target),
|
targetsByURL: make(map[string]Target),
|
||||||
addTargetQueue: make(chan Target, targetAddQueueSize),
|
addTargetQueue: make(chan Target, targetAddQueueSize),
|
||||||
targetProvider: p,
|
targetProvider: p,
|
||||||
|
@ -100,7 +100,7 @@ func (p *TargetPool) addTarget(target Target) {
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
|
|
||||||
p.targetsByURL[target.URL()] = target
|
p.targetsByURL[target.URL()] = target
|
||||||
go target.RunScraper(p.ingester, p.interval)
|
go target.RunScraper(p.sampleAppender, p.interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReplaceTargets replaces the old targets by the provided new ones but reuses
|
// ReplaceTargets replaces the old targets by the provided new ones but reuses
|
||||||
|
@ -118,7 +118,7 @@ func (p *TargetPool) ReplaceTargets(newTargets []Target) {
|
||||||
oldTarget.SetBaseLabelsFrom(newTarget)
|
oldTarget.SetBaseLabelsFrom(newTarget)
|
||||||
} else {
|
} else {
|
||||||
p.targetsByURL[newTarget.URL()] = newTarget
|
p.targetsByURL[newTarget.URL()] = newTarget
|
||||||
go newTarget.RunScraper(p.ingester, p.interval)
|
go newTarget.RunScraper(p.sampleAppender, p.interval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ func testTargetPool(t testing.TB) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, scenario := range scenarios {
|
for i, scenario := range scenarios {
|
||||||
pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1))
|
pool := NewTargetPool(nil, nopAppender{}, time.Duration(1))
|
||||||
|
|
||||||
for _, input := range scenario.inputs {
|
for _, input := range scenario.inputs {
|
||||||
target := target{
|
target := target{
|
||||||
|
@ -112,7 +112,7 @@ func TestTargetPool(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTargetPoolReplaceTargets(t *testing.T) {
|
func TestTargetPoolReplaceTargets(t *testing.T) {
|
||||||
pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1))
|
pool := NewTargetPool(nil, nopAppender{}, time.Duration(1))
|
||||||
oldTarget1 := &target{
|
oldTarget1 := &target{
|
||||||
url: "example1",
|
url: "example1",
|
||||||
state: Unhealthy,
|
state: Unhealthy,
|
||||||
|
|
|
@ -249,8 +249,10 @@ type (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// VectorMatchCardinality is an enum describing vector matches (1:1, n:1, 1:n, n:m).
|
||||||
type VectorMatchCardinality int
|
type VectorMatchCardinality int
|
||||||
|
|
||||||
|
// Constants for VectorMatchCardinality enum.
|
||||||
const (
|
const (
|
||||||
MatchOneToOne VectorMatchCardinality = iota
|
MatchOneToOne VectorMatchCardinality = iota
|
||||||
MatchManyToOne
|
MatchManyToOne
|
||||||
|
@ -880,7 +882,9 @@ func (node *VectorArithExpr) evalVectors(timestamp clientmodel.Timestamp, lhs, r
|
||||||
Timestamp: timestamp,
|
Timestamp: timestamp,
|
||||||
}
|
}
|
||||||
result = append(result, ns)
|
result = append(result, ns)
|
||||||
added[hash] = added[hash] // Set existance to true.
|
if _, ok := added[hash]; !ok {
|
||||||
|
added[hash] = nil // Set existence to true.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,9 @@ func storeMatrix(storage local.Storage, matrix ast.Matrix) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
storage.AppendSamples(pendingSamples)
|
for _, s := range pendingSamples {
|
||||||
|
storage.Append(s)
|
||||||
|
}
|
||||||
storage.WaitForIndexing()
|
storage.WaitForIndexing()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/notification"
|
"github.com/prometheus/prometheus/notification"
|
||||||
"github.com/prometheus/prometheus/rules"
|
"github.com/prometheus/prometheus/rules"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/storage/local"
|
"github.com/prometheus/prometheus/storage/local"
|
||||||
"github.com/prometheus/prometheus/templates"
|
"github.com/prometheus/prometheus/templates"
|
||||||
)
|
)
|
||||||
|
@ -94,7 +95,7 @@ type ruleManager struct {
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
storage local.Storage
|
storage local.Storage
|
||||||
|
|
||||||
results chan<- clientmodel.Samples
|
sampleAppender storage.SampleAppender
|
||||||
notificationHandler *notification.NotificationHandler
|
notificationHandler *notification.NotificationHandler
|
||||||
|
|
||||||
prometheusURL string
|
prometheusURL string
|
||||||
|
@ -106,7 +107,7 @@ type RuleManagerOptions struct {
|
||||||
Storage local.Storage
|
Storage local.Storage
|
||||||
|
|
||||||
NotificationHandler *notification.NotificationHandler
|
NotificationHandler *notification.NotificationHandler
|
||||||
Results chan<- clientmodel.Samples
|
SampleAppender storage.SampleAppender
|
||||||
|
|
||||||
PrometheusURL string
|
PrometheusURL string
|
||||||
}
|
}
|
||||||
|
@ -120,7 +121,7 @@ func NewRuleManager(o *RuleManagerOptions) RuleManager {
|
||||||
|
|
||||||
interval: o.EvaluationInterval,
|
interval: o.EvaluationInterval,
|
||||||
storage: o.Storage,
|
storage: o.Storage,
|
||||||
results: o.Results,
|
sampleAppender: o.SampleAppender,
|
||||||
notificationHandler: o.NotificationHandler,
|
notificationHandler: o.NotificationHandler,
|
||||||
prometheusURL: o.PrometheusURL,
|
prometheusURL: o.PrometheusURL,
|
||||||
}
|
}
|
||||||
|
@ -145,7 +146,7 @@ func (m *ruleManager) Run() {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
m.runIteration(m.results)
|
m.runIteration()
|
||||||
iterationDuration.Observe(float64(time.Since(start) / time.Millisecond))
|
iterationDuration.Observe(float64(time.Since(start) / time.Millisecond))
|
||||||
case <-m.done:
|
case <-m.done:
|
||||||
return
|
return
|
||||||
|
@ -213,7 +214,7 @@ func (m *ruleManager) queueAlertNotifications(rule *rules.AlertingRule, timestam
|
||||||
m.notificationHandler.SubmitReqs(notifications)
|
m.notificationHandler.SubmitReqs(notifications)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *ruleManager) runIteration(results chan<- clientmodel.Samples) {
|
func (m *ruleManager) runIteration() {
|
||||||
now := clientmodel.Now()
|
now := clientmodel.Now()
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
|
@ -232,20 +233,10 @@ func (m *ruleManager) runIteration(results chan<- clientmodel.Samples) {
|
||||||
vector, err := rule.Eval(now, m.storage)
|
vector, err := rule.Eval(now, m.storage)
|
||||||
duration := time.Since(start)
|
duration := time.Since(start)
|
||||||
|
|
||||||
samples := make(clientmodel.Samples, len(vector))
|
|
||||||
for i, s := range vector {
|
|
||||||
samples[i] = &clientmodel.Sample{
|
|
||||||
Metric: s.Metric.Metric,
|
|
||||||
Value: s.Value,
|
|
||||||
Timestamp: s.Timestamp,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
evalFailures.Inc()
|
evalFailures.Inc()
|
||||||
glog.Warningf("Error while evaluating rule %q: %s", rule, err)
|
glog.Warningf("Error while evaluating rule %q: %s", rule, err)
|
||||||
} else {
|
return
|
||||||
m.results <- samples
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch r := rule.(type) {
|
switch r := rule.(type) {
|
||||||
|
@ -261,9 +252,16 @@ func (m *ruleManager) runIteration(results chan<- clientmodel.Samples) {
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("Unknown rule type: %T", rule))
|
panic(fmt.Sprintf("Unknown rule type: %T", rule))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, s := range vector {
|
||||||
|
m.sampleAppender.Append(&clientmodel.Sample{
|
||||||
|
Metric: s.Metric.Metric,
|
||||||
|
Value: s.Value,
|
||||||
|
Timestamp: s.Timestamp,
|
||||||
|
})
|
||||||
|
}
|
||||||
}(rule)
|
}(rule)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,16 +24,15 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Storage ingests and manages samples, along with various indexes. All methods
|
// Storage ingests and manages samples, along with various indexes. All methods
|
||||||
// except AppendSamples are goroutine-safe.
|
// are goroutine-safe. Storage implements storage.SampleAppender.
|
||||||
type Storage interface {
|
type Storage interface {
|
||||||
prometheus.Collector
|
prometheus.Collector
|
||||||
// AppendSamples stores a group of new samples. Multiple samples for the
|
// Append stores a sample in the Storage. Multiple samples for the same
|
||||||
// same fingerprint need to be submitted in chronological order, from
|
// fingerprint need to be submitted in chronological order, from oldest
|
||||||
// oldest to newest (both in the same call to AppendSamples and across
|
// to newest. When Append has returned, the appended sample might not be
|
||||||
// multiple calls). When AppendSamples has returned, the appended
|
// queryable immediately. (Use WaitForIndexing to wait for complete
|
||||||
// samples might not be queryable immediately. (Use WaitForIndexing to
|
// processing.)
|
||||||
// wait for complete processing.) This method is not goroutine-safe.
|
Append(*clientmodel.Sample)
|
||||||
AppendSamples(clientmodel.Samples)
|
|
||||||
// NewPreloader returns a new Preloader which allows preloading and pinning
|
// NewPreloader returns a new Preloader which allows preloading and pinning
|
||||||
// series data into memory for use within a query.
|
// series data into memory for use within a query.
|
||||||
NewPreloader() Preloader
|
NewPreloader() Preloader
|
||||||
|
|
|
@ -16,7 +16,6 @@ package local
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -37,9 +36,6 @@ const (
|
||||||
fpMaxSweepTime = 6 * time.Hour
|
fpMaxSweepTime = 6 * time.Hour
|
||||||
|
|
||||||
maxEvictInterval = time.Minute
|
maxEvictInterval = time.Minute
|
||||||
|
|
||||||
appendWorkers = 16 // Should be enough to not make appending samples a bottleneck.
|
|
||||||
appendQueueCap = 2 * appendWorkers
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -65,10 +61,6 @@ type memorySeriesStorage struct {
|
||||||
checkpointInterval time.Duration
|
checkpointInterval time.Duration
|
||||||
checkpointDirtySeriesLimit int
|
checkpointDirtySeriesLimit int
|
||||||
|
|
||||||
appendQueue chan *clientmodel.Sample
|
|
||||||
appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue.
|
|
||||||
appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed.
|
|
||||||
|
|
||||||
persistQueueLen int64 // The number of chunks that need persistence.
|
persistQueueLen int64 // The number of chunks that need persistence.
|
||||||
persistQueueCap int // If persistQueueLen reaches this threshold, ingestion will stall.
|
persistQueueCap int // If persistQueueLen reaches this threshold, ingestion will stall.
|
||||||
// Note that internally, the chunks to persist are not organized in a queue-like data structure,
|
// Note that internally, the chunks to persist are not organized in a queue-like data structure,
|
||||||
|
@ -135,9 +127,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
checkpointInterval: o.CheckpointInterval,
|
checkpointInterval: o.CheckpointInterval,
|
||||||
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
||||||
|
|
||||||
appendLastTimestamp: clientmodel.Earliest,
|
|
||||||
appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
|
|
||||||
|
|
||||||
persistQueueLen: persistQueueLen,
|
persistQueueLen: persistQueueLen,
|
||||||
persistQueueCap: o.PersistenceQueueCapacity,
|
persistQueueCap: o.PersistenceQueueCapacity,
|
||||||
persistence: p,
|
persistence: p,
|
||||||
|
@ -187,15 +176,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < appendWorkers; i++ {
|
|
||||||
go func() {
|
|
||||||
for sample := range s.appendQueue {
|
|
||||||
s.appendSample(sample)
|
|
||||||
s.appendWaitGroup.Done()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,11 +189,6 @@ func (s *memorySeriesStorage) Start() {
|
||||||
func (s *memorySeriesStorage) Stop() error {
|
func (s *memorySeriesStorage) Stop() error {
|
||||||
glog.Info("Stopping local storage...")
|
glog.Info("Stopping local storage...")
|
||||||
|
|
||||||
glog.Info("Draining append queue...")
|
|
||||||
close(s.appendQueue)
|
|
||||||
s.appendWaitGroup.Wait()
|
|
||||||
glog.Info("Append queue drained.")
|
|
||||||
|
|
||||||
glog.Info("Stopping maintenance loop...")
|
glog.Info("Stopping maintenance loop...")
|
||||||
close(s.loopStopping)
|
close(s.loopStopping)
|
||||||
<-s.loopStopped
|
<-s.loopStopped
|
||||||
|
@ -236,9 +211,6 @@ func (s *memorySeriesStorage) Stop() error {
|
||||||
|
|
||||||
// WaitForIndexing implements Storage.
|
// WaitForIndexing implements Storage.
|
||||||
func (s *memorySeriesStorage) WaitForIndexing() {
|
func (s *memorySeriesStorage) WaitForIndexing() {
|
||||||
// First let all goroutines appending samples stop.
|
|
||||||
s.appendWaitGroup.Wait()
|
|
||||||
// Only then wait for the persistence to index them.
|
|
||||||
s.persistence.waitForIndexing()
|
s.persistence.waitForIndexing()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -363,32 +335,18 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendSamples implements Storage.
|
// Append implements Storage.
|
||||||
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
|
func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
|
||||||
for _, sample := range samples {
|
if s.getPersistQueueLen() >= s.persistQueueCap {
|
||||||
if s.getPersistQueueLen() >= s.persistQueueCap {
|
glog.Warningf(
|
||||||
glog.Warningf("%d chunks waiting for persistence, sample ingestion suspended.", s.getPersistQueueLen())
|
"%d chunks waiting for persistence, sample ingestion suspended.",
|
||||||
for s.getPersistQueueLen() >= s.persistQueueCap {
|
s.getPersistQueueLen(),
|
||||||
time.Sleep(time.Second)
|
)
|
||||||
}
|
for s.getPersistQueueLen() >= s.persistQueueCap {
|
||||||
glog.Warning("Sample ingestion resumed.")
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
if sample.Timestamp != s.appendLastTimestamp {
|
glog.Warning("Sample ingestion resumed.")
|
||||||
// Timestamp has changed. We have to wait for processing
|
|
||||||
// of all appended samples before proceeding. Otherwise,
|
|
||||||
// we might violate the storage contract that each
|
|
||||||
// sample appended to a given series has to have a
|
|
||||||
// timestamp greater or equal to the previous sample
|
|
||||||
// appended to that series.
|
|
||||||
s.appendWaitGroup.Wait()
|
|
||||||
s.appendLastTimestamp = sample.Timestamp
|
|
||||||
}
|
|
||||||
s.appendWaitGroup.Add(1)
|
|
||||||
s.appendQueue <- sample
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
|
|
||||||
fp := sample.Metric.Fingerprint()
|
fp := sample.Metric.Fingerprint()
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
series := s.getOrCreateSeries(fp, sample.Metric)
|
series := s.getOrCreateSeries(fp, sample.Metric)
|
||||||
|
|
|
@ -48,8 +48,9 @@ func TestGetFingerprintsForLabelMatchers(t *testing.T) {
|
||||||
}
|
}
|
||||||
fingerprints[i] = metric.Fingerprint()
|
fingerprints[i] = metric.Fingerprint()
|
||||||
}
|
}
|
||||||
|
for _, s := range samples {
|
||||||
storage.AppendSamples(samples)
|
storage.Append(s)
|
||||||
|
}
|
||||||
storage.WaitForIndexing()
|
storage.WaitForIndexing()
|
||||||
|
|
||||||
newMatcher := func(matchType metric.MatchType, name clientmodel.LabelName, value clientmodel.LabelValue) *metric.LabelMatcher {
|
newMatcher := func(matchType metric.MatchType, name clientmodel.LabelName, value clientmodel.LabelValue) *metric.LabelMatcher {
|
||||||
|
@ -166,7 +167,9 @@ func TestLoop(t *testing.T) {
|
||||||
t.Fatalf("Error creating storage: %s", err)
|
t.Fatalf("Error creating storage: %s", err)
|
||||||
}
|
}
|
||||||
storage.Start()
|
storage.Start()
|
||||||
storage.AppendSamples(samples)
|
for _, s := range samples {
|
||||||
|
storage.Append(s)
|
||||||
|
}
|
||||||
storage.WaitForIndexing()
|
storage.WaitForIndexing()
|
||||||
series, _ := storage.(*memorySeriesStorage).fpToSeries.get(clientmodel.Metric{}.Fingerprint())
|
series, _ := storage.(*memorySeriesStorage).fpToSeries.get(clientmodel.Metric{}.Fingerprint())
|
||||||
cdsBefore := len(series.chunkDescs)
|
cdsBefore := len(series.chunkDescs)
|
||||||
|
@ -192,7 +195,9 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
|
||||||
s, closer := NewTestStorage(t, encoding)
|
s, closer := NewTestStorage(t, encoding)
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
|
||||||
s.AppendSamples(samples)
|
for _, sample := range samples {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
s.WaitForIndexing()
|
s.WaitForIndexing()
|
||||||
|
|
||||||
for m := range s.(*memorySeriesStorage).fpToSeries.iter() {
|
for m := range s.(*memorySeriesStorage).fpToSeries.iter() {
|
||||||
|
@ -240,7 +245,9 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) {
|
||||||
s, closer := NewTestStorage(t, encoding)
|
s, closer := NewTestStorage(t, encoding)
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
|
||||||
s.AppendSamples(samples)
|
for _, sample := range samples {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
s.WaitForIndexing()
|
s.WaitForIndexing()
|
||||||
|
|
||||||
fp := clientmodel.Metric{}.Fingerprint()
|
fp := clientmodel.Metric{}.Fingerprint()
|
||||||
|
@ -331,7 +338,9 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
s, closer := NewTestStorage(t, encoding)
|
s, closer := NewTestStorage(t, encoding)
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
|
||||||
s.AppendSamples(samples)
|
for _, sample := range samples {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
s.WaitForIndexing()
|
s.WaitForIndexing()
|
||||||
|
|
||||||
fp := clientmodel.Metric{}.Fingerprint()
|
fp := clientmodel.Metric{}.Fingerprint()
|
||||||
|
@ -483,7 +492,9 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods.
|
ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods.
|
||||||
|
|
||||||
s.AppendSamples(samples)
|
for _, sample := range samples {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
s.WaitForIndexing()
|
s.WaitForIndexing()
|
||||||
|
|
||||||
fp := clientmodel.Metric{}.Fingerprint()
|
fp := clientmodel.Metric{}.Fingerprint()
|
||||||
|
@ -518,7 +529,9 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recreate series.
|
// Recreate series.
|
||||||
s.AppendSamples(samples)
|
for _, sample := range samples {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
s.WaitForIndexing()
|
s.WaitForIndexing()
|
||||||
|
|
||||||
series, ok := ms.fpToSeries.get(fp)
|
series, ok := ms.fpToSeries.get(fp)
|
||||||
|
@ -592,7 +605,9 @@ func benchmarkAppend(b *testing.B, encoding chunkEncoding) {
|
||||||
s, closer := NewTestStorage(b, encoding)
|
s, closer := NewTestStorage(b, encoding)
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
|
||||||
s.AppendSamples(samples)
|
for _, sample := range samples {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkAppendType0(b *testing.B) {
|
func BenchmarkAppendType0(b *testing.B) {
|
||||||
|
@ -616,7 +631,9 @@ func testFuzz(t *testing.T, encoding chunkEncoding) {
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
|
||||||
samples := createRandomSamples("test_fuzz", 1000)
|
samples := createRandomSamples("test_fuzz", 1000)
|
||||||
s.AppendSamples(samples)
|
for _, sample := range samples {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
return verifyStorage(t, s, samples, 24*7*time.Hour)
|
return verifyStorage(t, s, samples, 24*7*time.Hour)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -672,9 +689,13 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
|
||||||
start := samplesPerRun * i
|
start := samplesPerRun * i
|
||||||
end := samplesPerRun * (i + 1)
|
end := samplesPerRun * (i + 1)
|
||||||
middle := (start + end) / 2
|
middle := (start + end) / 2
|
||||||
s.AppendSamples(samples[start:middle])
|
for _, sample := range samples[start:middle] {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
verifyStorage(b, s, samples[:middle], o.PersistenceRetentionPeriod)
|
verifyStorage(b, s, samples[:middle], o.PersistenceRetentionPeriod)
|
||||||
s.AppendSamples(samples[middle:end])
|
for _, sample := range samples[middle:end] {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
verifyStorage(b, s, samples[:end], o.PersistenceRetentionPeriod)
|
verifyStorage(b, s, samples[:end], o.PersistenceRetentionPeriod)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ type TSDBClient interface {
|
||||||
// by the provided TSDBClient.
|
// by the provided TSDBClient.
|
||||||
type TSDBQueueManager struct {
|
type TSDBQueueManager struct {
|
||||||
tsdb TSDBClient
|
tsdb TSDBClient
|
||||||
queue chan clientmodel.Samples
|
queue chan *clientmodel.Sample
|
||||||
pendingSamples clientmodel.Samples
|
pendingSamples clientmodel.Samples
|
||||||
sendSemaphore chan bool
|
sendSemaphore chan bool
|
||||||
drained chan bool
|
drained chan bool
|
||||||
|
@ -69,7 +69,7 @@ type TSDBQueueManager struct {
|
||||||
func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
|
func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
|
||||||
return &TSDBQueueManager{
|
return &TSDBQueueManager{
|
||||||
tsdb: tsdb,
|
tsdb: tsdb,
|
||||||
queue: make(chan clientmodel.Samples, queueCapacity),
|
queue: make(chan *clientmodel.Sample, queueCapacity),
|
||||||
sendSemaphore: make(chan bool, maxConcurrentSends),
|
sendSemaphore: make(chan bool, maxConcurrentSends),
|
||||||
drained: make(chan bool),
|
drained: make(chan bool),
|
||||||
|
|
||||||
|
@ -112,17 +112,14 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Queue queues a sample batch to be sent to the TSDB. It drops the most
|
// Append queues a sample to be sent to the TSDB. It drops the sample on the
|
||||||
// recently queued samples on the floor if the queue is full.
|
// floor if the queue is full. It implements storage.SampleAppender.
|
||||||
func (t *TSDBQueueManager) Queue(s clientmodel.Samples) {
|
func (t *TSDBQueueManager) Append(s *clientmodel.Sample) {
|
||||||
if len(s) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
select {
|
select {
|
||||||
case t.queue <- s:
|
case t.queue <- s:
|
||||||
default:
|
default:
|
||||||
t.samplesCount.WithLabelValues(dropped).Add(float64(len(s)))
|
t.samplesCount.WithLabelValues(dropped).Inc()
|
||||||
glog.Warningf("TSDB queue full, discarding %d samples", len(s))
|
glog.Warning("TSDB queue full, discarding sample.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +192,7 @@ func (t *TSDBQueueManager) Run() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
t.pendingSamples = append(t.pendingSamples, s...)
|
t.pendingSamples = append(t.pendingSamples, s)
|
||||||
|
|
||||||
for len(t.pendingSamples) >= maxSamplesPerSend {
|
for len(t.pendingSamples) >= maxSamplesPerSend {
|
||||||
go t.sendSamples(t.pendingSamples[:maxSamplesPerSend])
|
go t.sendSamples(t.pendingSamples[:maxSamplesPerSend])
|
||||||
|
|
|
@ -63,13 +63,16 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
|
|
||||||
c := &TestTSDBClient{}
|
c := &TestTSDBClient{}
|
||||||
c.expectSamples(samples[:len(samples)/2])
|
c.expectSamples(samples[:len(samples)/2])
|
||||||
m := NewTSDBQueueManager(c, 1)
|
m := NewTSDBQueueManager(c, len(samples)/2)
|
||||||
|
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
m.Queue(samples[:len(samples)/2])
|
for _, s := range samples[:len(samples)/2] {
|
||||||
|
m.Append(s)
|
||||||
|
}
|
||||||
// These will be dropped because the queue is full.
|
// These will be dropped because the queue is full.
|
||||||
m.Queue(samples[len(samples)/2:])
|
for _, s := range samples[len(samples)/2:] {
|
||||||
|
m.Append(s)
|
||||||
|
}
|
||||||
go m.Run()
|
go m.Run()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
|
||||||
|
|
38
storage/storage.go
Normal file
38
storage/storage.go
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
// Copyright 2015 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
clientmodel "github.com/prometheus/client_golang/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SampleAppender is the interface to append samples to both, local and remote
|
||||||
|
// storage.
|
||||||
|
type SampleAppender interface {
|
||||||
|
Append(*clientmodel.Sample)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tee is a SampleAppender that appends every sample to two other
|
||||||
|
// SampleAppenders.
|
||||||
|
type Tee struct {
|
||||||
|
Appender1, Appender2 SampleAppender
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append implements SampleAppender. It appends the provided sample first
|
||||||
|
// to Appender1, then to Appender2, waiting for each to return before
|
||||||
|
// proceeding.
|
||||||
|
func (t Tee) Append(s *clientmodel.Sample) {
|
||||||
|
t.Appender1.Append(s)
|
||||||
|
t.Appender2.Append(s)
|
||||||
|
}
|
|
@ -154,19 +154,17 @@ func TestTemplateExpansion(t *testing.T) {
|
||||||
|
|
||||||
storage, closer := local.NewTestStorage(t, 1)
|
storage, closer := local.NewTestStorage(t, 1)
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
storage.AppendSamples(clientmodel.Samples{
|
storage.Append(&clientmodel.Sample{
|
||||||
{
|
Metric: clientmodel.Metric{
|
||||||
Metric: clientmodel.Metric{
|
clientmodel.MetricNameLabel: "metric",
|
||||||
clientmodel.MetricNameLabel: "metric",
|
"instance": "a"},
|
||||||
"instance": "a"},
|
Value: 11,
|
||||||
Value: 11,
|
})
|
||||||
},
|
storage.Append(&clientmodel.Sample{
|
||||||
{
|
Metric: clientmodel.Metric{
|
||||||
Metric: clientmodel.Metric{
|
clientmodel.MetricNameLabel: "metric",
|
||||||
clientmodel.MetricNameLabel: "metric",
|
"instance": "b"},
|
||||||
"instance": "b"},
|
Value: 21,
|
||||||
Value: 21,
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
storage.WaitForIndexing()
|
storage.WaitForIndexing()
|
||||||
|
|
||||||
|
|
|
@ -48,7 +48,7 @@ type WebService struct {
|
||||||
AlertsHandler *AlertsHandler
|
AlertsHandler *AlertsHandler
|
||||||
ConsolesHandler *ConsolesHandler
|
ConsolesHandler *ConsolesHandler
|
||||||
|
|
||||||
QuitDelegate func()
|
QuitChan chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServeForever serves the HTTP endpoints and only returns upon errors.
|
// ServeForever serves the HTTP endpoints and only returns upon errors.
|
||||||
|
@ -109,7 +109,7 @@ func (ws WebService) quitHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
fmt.Fprintf(w, "Requesting termination... Goodbye!")
|
fmt.Fprintf(w, "Requesting termination... Goodbye!")
|
||||||
|
|
||||||
ws.QuitDelegate()
|
close(ws.QuitChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTemplateFile(name string) (string, error) {
|
func getTemplateFile(name string) (string, error) {
|
||||||
|
|
Loading…
Reference in a new issue