mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Timeout sample appends
This commit is contained in:
parent
47aa0d536c
commit
25bf5fdaf5
|
@ -150,7 +150,7 @@ type Target struct {
|
|||
// Closing scraperStopped signals that scraping has been stopped.
|
||||
scraperStopped chan struct{}
|
||||
// Channel to buffer ingested samples.
|
||||
ingestedSamples chan model.Samples
|
||||
ingestedSamples chan model.Vector
|
||||
|
||||
// Mutex protects the members below.
|
||||
sync.RWMutex
|
||||
|
@ -376,6 +376,26 @@ func (t *Target) StopScraper() {
|
|||
log.Debugf("Scraper for target %v stopped.", t)
|
||||
}
|
||||
|
||||
func (t *Target) ingest(s model.Vector) error {
|
||||
t.RLock()
|
||||
deadline := t.deadline
|
||||
t.RUnlock()
|
||||
// Since the regular case is that ingestedSamples is ready to receive,
|
||||
// first try without setting a timeout so that we don't need to allocate
|
||||
// a timer most of the time.
|
||||
select {
|
||||
case t.ingestedSamples <- s:
|
||||
return nil
|
||||
default:
|
||||
select {
|
||||
case t.ingestedSamples <- s:
|
||||
return nil
|
||||
case <-time.After(deadline / 10):
|
||||
return errIngestChannelFull
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
|
||||
|
||||
func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
|
||||
|
@ -422,13 +442,27 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
|
|||
},
|
||||
}
|
||||
|
||||
var samples model.Vector
|
||||
t.ingestedSamples = make(chan model.Vector, ingestedSamplesCap)
|
||||
|
||||
for {
|
||||
if err = sdec.Decode(&samples); err != nil {
|
||||
break
|
||||
go func() {
|
||||
for {
|
||||
// TODO(fabxc): Changex the SampleAppender interface to return an error
|
||||
// so we can proceed based on the status and don't leak goroutines trying
|
||||
// to append a single sample after dropping all the other ones.
|
||||
//
|
||||
// This will also allow use to reuse this vector and save allocations.
|
||||
var samples model.Vector
|
||||
if err = sdec.Decode(&samples); err != nil {
|
||||
break
|
||||
}
|
||||
if err = t.ingest(samples); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
close(t.ingestedSamples)
|
||||
}()
|
||||
|
||||
for samples := range t.ingestedSamples {
|
||||
for _, s := range samples {
|
||||
if honorLabels {
|
||||
// Merge the metric with the baseLabels for labels not already set in the
|
||||
|
|
Loading…
Reference in a new issue