diff --git a/main.go b/main.go index 7bc4af2b56..eaa4f5bca8 100644 --- a/main.go +++ b/main.go @@ -86,8 +86,6 @@ func (p *prometheus) interruptHandler() { glog.Warning("Received SIGINT/SIGTERM; Exiting gracefully...") p.Close() - - os.Exit(0) } func (p *prometheus) Close() { @@ -138,7 +136,6 @@ func main() { if err != nil { glog.Fatal("Error opening memory series storage: ", err) } - defer memStorage.Close() registry.MustRegister(memStorage) var remoteTSDBQueue *remote.TSDBQueueManager @@ -220,7 +217,6 @@ func main() { storage: memStorage, remoteTSDBQueue: remoteTSDBQueue, } - defer prometheus.Close() webService := &web.WebService{ StatusHandler: prometheusStatus, diff --git a/retrieval/target.go b/retrieval/target.go index 2d5d4e282b..9140923d66 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -126,10 +126,12 @@ type Target interface { GlobalAddress() string // Return the target's base labels. BaseLabels() clientmodel.LabelSet - // Merge a new externally supplied target definition (e.g. with changed base - // labels) into an old target definition for the same endpoint. Preserve - // remaining information - like health state - from the old target. - Merge(newTarget Target) + // SetBaseLabelsFrom queues a replacement of the current base labels by + // the labels of the given target. The method returns immediately after + // queuing. The actual replacement of the base labels happens + // asynchronously (but most likely before the next scrape for the target + // begins). + SetBaseLabelsFrom(Target) // Scrape target at the specified interval. RunScraper(extraction.Ingester, time.Duration) // Stop scraping, synchronous. @@ -149,6 +151,8 @@ type target struct { // Channel to signal RunScraper should stop, holds a channel // to notify once stopped. stopScraper chan bool + // Channel to queue base labels to be replaced. + newBaseLabels chan clientmodel.LabelSet address string // What is the deadline for the HTTP or HTTPS against this endpoint. @@ -162,11 +166,12 @@ type target struct { // Furnish a reasonably configured target for querying. func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target { target := &target{ - address: address, - Deadline: deadline, - baseLabels: baseLabels, - httpClient: utility.NewDeadlineClient(deadline), - stopScraper: make(chan bool), + address: address, + Deadline: deadline, + baseLabels: baseLabels, + httpClient: utility.NewDeadlineClient(deadline), + stopScraper: make(chan bool), + newBaseLabels: make(chan clientmodel.LabelSet, 1), } return target @@ -197,6 +202,7 @@ func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp clie }) } +// RunScraper implements Target. func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration) { jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64())) select { @@ -217,12 +223,15 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second)) t.lastScrape = time.Now() t.scrape(ingester) + case newBaseLabels := <-t.newBaseLabels: + t.baseLabels = newBaseLabels case <-t.stopScraper: return } } } +// StopScraper implements Target. func (t *target) StopScraper() { t.stopScraper <- true } @@ -270,8 +279,8 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) { return err } - // XXX: This is a wart; we need to handle this more gracefully down the - // road, especially once we have service discovery support. + // TODO: This is a wart; we need to handle this more gracefully down the + // road, especially once we have service discovery support. baseLabels := clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.Address())} for baseLabel, baseValue := range t.baseLabels { baseLabels[baseLabel] = baseValue @@ -289,22 +298,27 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) { return processor.ProcessSingle(resp.Body, i, processOptions) } +// LastError implements Target. func (t *target) LastError() error { return t.lastError } +// State implements Target. func (t *target) State() TargetState { return t.state } +// LastScrape implements Target. func (t *target) LastScrape() time.Time { return t.lastScrape } +// Address implements Target. func (t *target) Address() string { return t.address } +// GlobalAddress implements Target. func (t *target) GlobalAddress() string { address := t.address hostname, err := os.Hostname() @@ -318,18 +332,17 @@ func (t *target) GlobalAddress() string { return address } +// BaseLabels implements Target. func (t *target) BaseLabels() clientmodel.LabelSet { return t.baseLabels } -// Merge a new externally supplied target definition (e.g. with changed base -// labels) into an old target definition for the same endpoint. Preserve -// remaining information - like health state - from the old target. -func (t *target) Merge(newTarget Target) { +// SetBaseLabelsFrom implements Target. +func (t *target) SetBaseLabelsFrom(newTarget Target) { if t.Address() != newTarget.Address() { panic("targets don't refer to the same endpoint") } - t.baseLabels = newTarget.BaseLabels() + t.newBaseLabels <- newTarget.BaseLabels() } type targets []Target diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index 2b161b45e4..f1c4b0c607 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -108,7 +108,7 @@ func (p *TargetPool) ReplaceTargets(newTargets []Target) { newTargetAddresses.Add(newTarget.Address()) oldTarget, ok := p.targetsByAddress[newTarget.Address()] if ok { - oldTarget.Merge(newTarget) + oldTarget.SetBaseLabelsFrom(newTarget) } else { p.targetsByAddress[newTarget.Address()] = newTarget go newTarget.RunScraper(p.ingester, p.interval) diff --git a/storage/local/storage.go b/storage/local/storage.go index 1c132b38b5..ff74e370c5 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -249,8 +249,8 @@ func (s *memorySeriesStorage) Close() error { stopped := make(chan bool) glog.Info("Waiting for storage to stop serving...") s.stopServing <- stopped - glog.Info("Serving stopped.") <-stopped + glog.Info("Serving stopped.") glog.Info("Stopping persist loop...") close(s.persistQueue) @@ -276,6 +276,7 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) { for { select { case <-stop: + glog.Info("Purging loop stopped.") return case <-purgeTicker.C: glog.Info("Purging old series data...")