Next try to deal with backed-up ingestion.

This is now not even trying to throttle in a benign way, but creates a
fully-fledged error. Advantage: It shows up very visible on the status
page. Disadvantage: The server does not really adjusts to a lower
scraping rate. However, if your ingestion backs up, you are in a very
irregulare state, I'd say it _should_ be considered an error and not
dealt with in a more graceful way.

In different news: I'll work on optimizing ingestion so that we will
not as easily run into that situation in the first place.
This commit is contained in:
beorn7 2015-02-09 17:32:47 +01:00
parent 5a4fe403ff
commit 0f191629c6
3 changed files with 81 additions and 25 deletions

View file

@ -14,11 +14,15 @@
package retrieval
import (
"errors"
"github.com/prometheus/client_golang/extraction"
clientmodel "github.com/prometheus/client_golang/model"
)
var errIngestChannelFull = errors.New("ingestion channel full")
// MergeLabelsIngester merges a labelset ontop of a given extraction result and
// passes the result on to another ingester. Label collisions are avoided by
// appending a label prefix to any newly merged colliding labels.
@ -42,8 +46,14 @@ func (i *MergeLabelsIngester) Ingest(samples clientmodel.Samples) error {
// ChannelIngester feeds results into a channel without modifying them.
type ChannelIngester chan<- clientmodel.Samples
// Ingest ingests the provided extraction result by sending it to i.
// Ingest ingests the provided extraction result by sending it to its channel.
// It returns an error if the channel is not ready to receive. This is important
// to fail fast and to not pile up ingestion requests in case of overload.
func (i ChannelIngester) Ingest(s clientmodel.Samples) error {
i <- s
return nil
select {
case i <- s:
return nil
default:
return errIngestChannelFull
}
}

View file

@ -272,18 +272,6 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
targetIntervalLength.WithLabelValues(interval.String()).Observe(
float64(took) / float64(time.Second), // Sub-second precision.
)
// Throttle the scrape if it took longer than interval - by
// sleeping for the time it took longer. This will make the
// actual scrape interval increase as long as a scrape takes
// longer than the interval we are aiming for.
time.Sleep(took - interval)
// After the sleep, we should check again if we have been stopped.
select {
case <-t.scraperStopping:
return
default:
// Do nothing.
}
t.scrape(ingester)
}
}

View file

@ -46,6 +46,32 @@ func TestTargetScrapeUpdatesState(t *testing.T) {
}
}
func TestTargetScrapeWithFullChannel(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n"))
},
),
)
defer server.Close()
testTarget := NewTarget(
server.URL,
100*time.Millisecond,
clientmodel.LabelSet{"dings": "bums"},
).(*target)
testTarget.scrape(ChannelIngester(make(chan clientmodel.Samples))) // Capacity 0.
if testTarget.state != Unreachable {
t.Errorf("Expected target state %v, actual: %v", Unreachable, testTarget.state)
}
if testTarget.lastError != errIngestChannelFull {
t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.lastError)
}
}
func TestTargetRecordScrapeHealth(t *testing.T) {
testTarget := target{
url: "http://example.url",
@ -96,12 +122,15 @@ func TestTargetRecordScrapeHealth(t *testing.T) {
func TestTargetScrapeTimeout(t *testing.T) {
signal := make(chan bool, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-signal
w.Header().Set("Content-Type", `application/json; schema="prometheus/telemetry"; version=0.0.2`)
w.Write([]byte(`[]`))
}))
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
<-signal
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte{})
},
),
)
defer server.Close()
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
@ -137,10 +166,13 @@ func TestTargetScrapeTimeout(t *testing.T) {
}
func TestTargetScrape404(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
},
),
)
defer server.Close()
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
@ -179,3 +211,29 @@ func TestTargetRunScraperScrapes(t *testing.T) {
t.Errorf("Scrape occured after it was stopped.")
}
}
func BenchmarkScrape(b *testing.B) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n"))
},
),
)
defer server.Close()
testTarget := NewTarget(
server.URL,
100*time.Millisecond,
clientmodel.LabelSet{"dings": "bums"},
)
ingester := nopIngester{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := testTarget.(*target).scrape(ingester); err != nil {
b.Fatal(err)
}
}
}