Merge pull request #518 from prometheus/beorn7/ingestion-tweaks

Next try to deal with backed-up ingestion.
This commit is contained in:
juliusv 2015-02-10 14:59:13 +01:00
commit fd9ee9b009
4 changed files with 94 additions and 26 deletions

View file

@ -52,7 +52,7 @@ var (
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.") remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.") remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 4096, "The capacity of the queue of samples to be stored.") samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.")
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")

View file

@ -14,11 +14,18 @@
package retrieval package retrieval
import ( import (
"errors"
"time"
"github.com/prometheus/client_golang/extraction" "github.com/prometheus/client_golang/extraction"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
) )
const ingestTimeout = 100 * time.Millisecond // TODO(beorn7): Adjust this to a fraction of the actual HTTP timeout.
var errIngestChannelFull = errors.New("ingestion channel full")
// MergeLabelsIngester merges a labelset ontop of a given extraction result and // MergeLabelsIngester merges a labelset ontop of a given extraction result and
// passes the result on to another ingester. Label collisions are avoided by // passes the result on to another ingester. Label collisions are avoided by
// appending a label prefix to any newly merged colliding labels. // appending a label prefix to any newly merged colliding labels.
@ -42,8 +49,23 @@ func (i *MergeLabelsIngester) Ingest(samples clientmodel.Samples) error {
// ChannelIngester feeds results into a channel without modifying them. // ChannelIngester feeds results into a channel without modifying them.
type ChannelIngester chan<- clientmodel.Samples type ChannelIngester chan<- clientmodel.Samples
// Ingest ingests the provided extraction result by sending it to i. // Ingest ingests the provided extraction result by sending it to its channel.
// If the channel was not able to receive the samples within the ingestTimeout,
// an error is returned. This is important to fail fast and to not pile up
// ingestion requests in case of overload.
func (i ChannelIngester) Ingest(s clientmodel.Samples) error { func (i ChannelIngester) Ingest(s clientmodel.Samples) error {
i <- s // Since the regular case is that i is ready to receive, first try
// without setting a timeout so that we don't need to allocate a timer
// most of the time.
select {
case i <- s:
return nil return nil
default:
select {
case i <- s:
return nil
case <-time.After(ingestTimeout):
return errIngestChannelFull
}
}
} }

View file

@ -272,18 +272,6 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
targetIntervalLength.WithLabelValues(interval.String()).Observe( targetIntervalLength.WithLabelValues(interval.String()).Observe(
float64(took) / float64(time.Second), // Sub-second precision. float64(took) / float64(time.Second), // Sub-second precision.
) )
// Throttle the scrape if it took longer than interval - by
// sleeping for the time it took longer. This will make the
// actual scrape interval increase as long as a scrape takes
// longer than the interval we are aiming for.
time.Sleep(took - interval)
// After the sleep, we should check again if we have been stopped.
select {
case <-t.scraperStopping:
return
default:
// Do nothing.
}
t.scrape(ingester) t.scrape(ingester)
} }
} }

View file

@ -46,6 +46,32 @@ func TestTargetScrapeUpdatesState(t *testing.T) {
} }
} }
func TestTargetScrapeWithFullChannel(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n"))
},
),
)
defer server.Close()
testTarget := NewTarget(
server.URL,
100*time.Millisecond,
clientmodel.LabelSet{"dings": "bums"},
).(*target)
testTarget.scrape(ChannelIngester(make(chan clientmodel.Samples))) // Capacity 0.
if testTarget.state != Unreachable {
t.Errorf("Expected target state %v, actual: %v", Unreachable, testTarget.state)
}
if testTarget.lastError != errIngestChannelFull {
t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.lastError)
}
}
func TestTargetRecordScrapeHealth(t *testing.T) { func TestTargetRecordScrapeHealth(t *testing.T) {
testTarget := target{ testTarget := target{
url: "http://example.url", url: "http://example.url",
@ -96,12 +122,15 @@ func TestTargetRecordScrapeHealth(t *testing.T) {
func TestTargetScrapeTimeout(t *testing.T) { func TestTargetScrapeTimeout(t *testing.T) {
signal := make(chan bool, 1) signal := make(chan bool, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
<-signal <-signal
w.Header().Set("Content-Type", `application/json; schema="prometheus/telemetry"; version=0.0.2`) w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte(`[]`)) w.Write([]byte{})
})) },
),
)
defer server.Close() defer server.Close()
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
@ -137,10 +166,13 @@ func TestTargetScrapeTimeout(t *testing.T) {
} }
func TestTargetScrape404(t *testing.T) { func TestTargetScrape404(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
})) },
),
)
defer server.Close() defer server.Close()
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
@ -179,3 +211,29 @@ func TestTargetRunScraperScrapes(t *testing.T) {
t.Errorf("Scrape occured after it was stopped.") t.Errorf("Scrape occured after it was stopped.")
} }
} }
func BenchmarkScrape(b *testing.B) {
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n"))
},
),
)
defer server.Close()
testTarget := NewTarget(
server.URL,
100*time.Millisecond,
clientmodel.LabelSet{"dings": "bums"},
)
ingester := nopIngester{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := testTarget.(*target).scrape(ingester); err != nil {
b.Fatal(err)
}
}
}