Merge pull request #416 from prometheus/bjoern/deal-with-complicated-todos

Remove the remaining races, new and old.
This commit is contained in:
Björn Rabenstein 2014-12-08 17:44:46 +01:00
commit 08d4dcf223
8 changed files with 119 additions and 69 deletions

View file

@ -19,6 +19,7 @@ import (
"net/http" "net/http"
"os" "os"
"strings" "strings"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -136,14 +137,9 @@ type Target interface {
RunScraper(extraction.Ingester, time.Duration) RunScraper(extraction.Ingester, time.Duration)
// Stop scraping, synchronous. // Stop scraping, synchronous.
StopScraper() StopScraper()
// Do a single scrape.
scrape(ingester extraction.Ingester) error
} }
// target is a Target that refers to a singular HTTP or HTTPS endpoint. // target is a Target that refers to a singular HTTP or HTTPS endpoint.
//
// TODO: The implementation is not yet goroutine safe, but for the web status,
// methods are called concurrently.
type target struct { type target struct {
// The current health state of the target. // The current health state of the target.
state TargetState state TargetState
@ -151,8 +147,10 @@ type target struct {
lastError error lastError error
// The last time a scrape was attempted. // The last time a scrape was attempted.
lastScrape time.Time lastScrape time.Time
// Closing stopScraper signals that scraping should stop. // Closing scraperStopping signals that scraping should stop.
stopScraper chan struct{} scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped.
scraperStopped chan struct{}
// Channel to queue base labels to be replaced. // Channel to queue base labels to be replaced.
newBaseLabels chan clientmodel.LabelSet newBaseLabels chan clientmodel.LabelSet
@ -163,17 +161,25 @@ type target struct {
baseLabels clientmodel.LabelSet baseLabels clientmodel.LabelSet
// The HTTP client used to scrape the target's endpoint. // The HTTP client used to scrape the target's endpoint.
httpClient *http.Client httpClient *http.Client
// Mutex protects lastError, lastScrape, state, and baseLabels. Writing
// the above must only happen in the goroutine running the RunScraper
// loop, and it must happen under the lock. In that way, no mutex lock
// is required for reading the above in the goroutine running the
// RunScraper loop, but only for reading in other goroutines.
sync.Mutex
} }
// Furnish a reasonably configured target for querying. // Furnish a reasonably configured target for querying.
func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target { func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target {
target := &target{ target := &target{
address: address, address: address,
Deadline: deadline, Deadline: deadline,
baseLabels: baseLabels, baseLabels: baseLabels,
httpClient: utility.NewDeadlineClient(deadline), httpClient: utility.NewDeadlineClient(deadline),
stopScraper: make(chan struct{}), scraperStopping: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1), scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
} }
return target return target
@ -213,6 +219,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
case <-t.newBaseLabels: case <-t.newBaseLabels:
// Do nothing. // Do nothing.
default: default:
close(t.scraperStopped)
return return
} }
} }
@ -221,7 +228,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64())) jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64()))
select { select {
case <-jitterTimer.C: case <-jitterTimer.C:
case <-t.stopScraper: case <-t.scraperStopping:
jitterTimer.Stop() jitterTimer.Stop()
return return
} }
@ -230,32 +237,40 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
t.Lock() // Writing t.lastScrape requires the lock.
t.lastScrape = time.Now() t.lastScrape = time.Now()
t.Unlock()
t.scrape(ingester) t.scrape(ingester)
// Explanation of the contraption below: // Explanation of the contraption below:
// //
// In case t.newBaseLabels or t.stopScraper have something to receive, // In case t.newBaseLabels or t.scraperStopping have something to receive,
// we want to read from those channels rather than starting a new scrape // we want to read from those channels rather than starting a new scrape
// (which might take very long). That's why the outer select has no // (which might take very long). That's why the outer select has no
// ticker.C. Should neither t.newBaseLabels nor t.stopScraper have // ticker.C. Should neither t.newBaseLabels nor t.scraperStopping have
// anything to receive, we go into the inner select, where ticker.C is // anything to receive, we go into the inner select, where ticker.C is
// in the mix. // in the mix.
for { for {
select { select {
case newBaseLabels := <-t.newBaseLabels: case newBaseLabels := <-t.newBaseLabels:
t.Lock() // Writing t.baseLabels requires the lock.
t.baseLabels = newBaseLabels t.baseLabels = newBaseLabels
case <-t.stopScraper: t.Unlock()
case <-t.scraperStopping:
return return
default: default:
select { select {
case newBaseLabels := <-t.newBaseLabels: case newBaseLabels := <-t.newBaseLabels:
t.Lock() // Writing t.baseLabels requires the lock.
t.baseLabels = newBaseLabels t.baseLabels = newBaseLabels
case <-t.stopScraper: t.Unlock()
case <-t.scraperStopping:
return return
case <-ticker.C: case <-ticker.C:
targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second)) targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second))
t.Lock() // Write t.lastScrape requires locking.
t.lastScrape = time.Now() t.lastScrape = time.Now()
t.Unlock()
t.scrape(ingester) t.scrape(ingester)
} }
} }
@ -264,7 +279,8 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
// StopScraper implements Target. // StopScraper implements Target.
func (t *target) StopScraper() { func (t *target) StopScraper() {
close(t.stopScraper) close(t.scraperStopping)
<-t.scraperStopped
} }
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
@ -278,16 +294,17 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
instance: t.Address(), instance: t.Address(),
outcome: success, outcome: success,
} }
t.Lock() // Writing t.state and t.lastError requires the lock.
if err == nil { if err == nil {
t.state = ALIVE t.state = ALIVE
t.recordScrapeHealth(ingester, timestamp, true)
labels[outcome] = failure labels[outcome] = failure
} else { } else {
t.state = UNREACHABLE t.state = UNREACHABLE
t.recordScrapeHealth(ingester, timestamp, false)
} }
targetOperationLatencies.With(labels).Observe(ms)
t.lastError = err t.lastError = err
t.Unlock()
targetOperationLatencies.With(labels).Observe(ms)
t.recordScrapeHealth(ingester, timestamp, err == nil)
}(time.Now()) }(time.Now())
req, err := http.NewRequest("GET", t.Address(), nil) req, err := http.NewRequest("GET", t.Address(), nil)
@ -310,8 +327,6 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
return err return err
} }
// TODO: This is a wart; we need to handle this more gracefully down the
// road, especially once we have service discovery support.
baseLabels := clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.Address())} baseLabels := clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.Address())}
for baseLabel, baseValue := range t.baseLabels { for baseLabel, baseValue := range t.baseLabels {
baseLabels[baseLabel] = baseValue baseLabels[baseLabel] = baseValue
@ -331,16 +346,22 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
// LastError implements Target. // LastError implements Target.
func (t *target) LastError() error { func (t *target) LastError() error {
t.Lock()
defer t.Unlock()
return t.lastError return t.lastError
} }
// State implements Target. // State implements Target.
func (t *target) State() TargetState { func (t *target) State() TargetState {
t.Lock()
defer t.Unlock()
return t.state return t.state
} }
// LastScrape implements Target. // LastScrape implements Target.
func (t *target) LastScrape() time.Time { func (t *target) LastScrape() time.Time {
t.Lock()
defer t.Unlock()
return t.lastScrape return t.lastScrape
} }
@ -365,6 +386,8 @@ func (t *target) GlobalAddress() string {
// BaseLabels implements Target. // BaseLabels implements Target.
func (t *target) BaseLabels() clientmodel.LabelSet { func (t *target) BaseLabels() clientmodel.LabelSet {
t.Lock()
defer t.Unlock()
return t.baseLabels return t.baseLabels
} }
@ -375,5 +398,3 @@ func (t *target) SetBaseLabelsFrom(newTarget Target) {
} }
t.newBaseLabels <- newTarget.BaseLabels() t.newBaseLabels <- newTarget.BaseLabels()
} }
type targets []Target

View file

@ -100,7 +100,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
// scrape once without timeout // scrape once without timeout
signal <- true signal <- true
if err := testTarget.scrape(ingester); err != nil { if err := testTarget.(*target).scrape(ingester); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -109,12 +109,12 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again // now scrape again
signal <- true signal <- true
if err := testTarget.scrape(ingester); err != nil { if err := testTarget.(*target).scrape(ingester); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// now timeout // now timeout
if err := testTarget.scrape(ingester); err == nil { if err := testTarget.(*target).scrape(ingester); err == nil {
t.Fatal("expected scrape to timeout") t.Fatal("expected scrape to timeout")
} else { } else {
signal <- true // let handler continue signal <- true // let handler continue
@ -122,7 +122,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again without timeout // now scrape again without timeout
signal <- true signal <- true
if err := testTarget.scrape(ingester); err != nil { if err := testTarget.(*target).scrape(ingester); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -138,7 +138,7 @@ func TestTargetScrape404(t *testing.T) {
ingester := nopIngester{} ingester := nopIngester{}
want := errors.New("server returned HTTP status 404 Not Found") want := errors.New("server returned HTTP status 404 Not Found")
got := testTarget.scrape(ingester) got := testTarget.(*target).scrape(ingester)
if got == nil || want.Error() != got.Error() { if got == nil || want.Error() != got.Error() {
t.Fatalf("want err %q, got %q", want, got) t.Fatalf("want err %q, got %q", want, got)
} }
@ -146,10 +146,11 @@ func TestTargetScrape404(t *testing.T) {
func TestTargetRunScraperScrapes(t *testing.T) { func TestTargetRunScraperScrapes(t *testing.T) {
testTarget := target{ testTarget := target{
state: UNKNOWN, state: UNKNOWN,
address: "bad schema", address: "bad schema",
httpClient: utility.NewDeadlineClient(0), httpClient: utility.NewDeadlineClient(0),
stopScraper: make(chan struct{}), scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
} }
go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond)) go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond))

View file

@ -23,16 +23,18 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
// TargetManager manages all scrape targets. All methods are goroutine-safe.
type TargetManager interface { type TargetManager interface {
AddTarget(job config.JobConfig, t Target) AddTarget(job config.JobConfig, t Target)
ReplaceTargets(job config.JobConfig, newTargets []Target) ReplaceTargets(job config.JobConfig, newTargets []Target)
Remove(t Target) Remove(t Target)
AddTargetsFromConfig(config config.Config) AddTargetsFromConfig(config config.Config)
Stop() Stop()
Pools() map[string]*TargetPool Pools() map[string]*TargetPool // Returns a copy of the name -> TargetPool mapping.
} }
type targetManager struct { type targetManager struct {
sync.Mutex // Protects poolByJob.
poolsByJob map[string]*TargetPool poolsByJob map[string]*TargetPool
ingester extraction.Ingester ingester extraction.Ingester
} }
@ -44,7 +46,7 @@ func NewTargetManager(ingester extraction.Ingester) TargetManager {
} }
} }
func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool { func (m *targetManager) targetPoolForJob(job config.JobConfig) *TargetPool {
targetPool, ok := m.poolsByJob[job.GetName()] targetPool, ok := m.poolsByJob[job.GetName()]
if !ok { if !ok {
@ -58,7 +60,6 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool {
glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName()) glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName())
m.poolsByJob[job.GetName()] = targetPool m.poolsByJob[job.GetName()] = targetPool
// TODO: Investigate whether this auto-goroutine creation is desired.
go targetPool.Run() go targetPool.Run()
} }
@ -66,24 +67,32 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool {
} }
func (m *targetManager) AddTarget(job config.JobConfig, t Target) { func (m *targetManager) AddTarget(job config.JobConfig, t Target) {
targetPool := m.TargetPoolForJob(job) m.Lock()
defer m.Unlock()
targetPool := m.targetPoolForJob(job)
targetPool.AddTarget(t) targetPool.AddTarget(t)
m.poolsByJob[job.GetName()] = targetPool m.poolsByJob[job.GetName()] = targetPool
} }
func (m *targetManager) ReplaceTargets(job config.JobConfig, newTargets []Target) { func (m *targetManager) ReplaceTargets(job config.JobConfig, newTargets []Target) {
targetPool := m.TargetPoolForJob(job) m.Lock()
defer m.Unlock()
targetPool := m.targetPoolForJob(job)
targetPool.ReplaceTargets(newTargets) targetPool.ReplaceTargets(newTargets)
} }
func (m targetManager) Remove(t Target) { func (m *targetManager) Remove(t Target) {
panic("not implemented") panic("not implemented")
} }
func (m *targetManager) AddTargetsFromConfig(config config.Config) { func (m *targetManager) AddTargetsFromConfig(config config.Config) {
for _, job := range config.Jobs() { for _, job := range config.Jobs() {
if job.SdName != nil { if job.SdName != nil {
m.TargetPoolForJob(job) m.Lock()
m.targetPoolForJob(job)
m.Unlock()
continue continue
} }
@ -106,6 +115,9 @@ func (m *targetManager) AddTargetsFromConfig(config config.Config) {
} }
func (m *targetManager) Stop() { func (m *targetManager) Stop() {
m.Lock()
defer m.Unlock()
glog.Info("Stopping target manager...") glog.Info("Stopping target manager...")
var wg sync.WaitGroup var wg sync.WaitGroup
for j, p := range m.poolsByJob { for j, p := range m.poolsByJob {
@ -121,7 +133,13 @@ func (m *targetManager) Stop() {
glog.Info("Target manager stopped.") glog.Info("Target manager stopped.")
} }
// TODO: Not goroutine-safe. Only used in /status page for now.
func (m *targetManager) Pools() map[string]*TargetPool { func (m *targetManager) Pools() map[string]*TargetPool {
return m.poolsByJob m.Lock()
defer m.Unlock()
result := make(map[string]*TargetPool, len(m.poolsByJob))
for k, v := range m.poolsByJob {
result[k] = v
}
return result
} }

View file

@ -122,9 +122,9 @@ func (p *TargetPool) ReplaceTargets(newTargets []Target) {
defer wg.Done() defer wg.Done()
glog.V(1).Infof("Stopping scraper for target %s...", k) glog.V(1).Infof("Stopping scraper for target %s...", k)
oldTarget.StopScraper() oldTarget.StopScraper()
delete(p.targetsByAddress, k)
glog.V(1).Infof("Scraper for target %s stopped.", k) glog.V(1).Infof("Scraper for target %s stopped.", k)
}(k, oldTarget) }(k, oldTarget)
delete(p.targetsByAddress, k)
} }
} }
wg.Wait() wg.Wait()

View file

@ -114,32 +114,36 @@ func TestTargetPool(t *testing.T) {
func TestTargetPoolReplaceTargets(t *testing.T) { func TestTargetPoolReplaceTargets(t *testing.T) {
pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1)) pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1))
oldTarget1 := &target{ oldTarget1 := &target{
address: "example1", address: "example1",
state: UNREACHABLE, state: UNREACHABLE,
stopScraper: make(chan struct{}), scraperStopping: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1), scraperStopped: make(chan struct{}),
httpClient: &http.Client{}, newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
} }
oldTarget2 := &target{ oldTarget2 := &target{
address: "example2", address: "example2",
state: UNREACHABLE, state: UNREACHABLE,
stopScraper: make(chan struct{}), scraperStopping: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1), scraperStopped: make(chan struct{}),
httpClient: &http.Client{}, newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
} }
newTarget1 := &target{ newTarget1 := &target{
address: "example1", address: "example1",
state: ALIVE, state: ALIVE,
stopScraper: make(chan struct{}), scraperStopping: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1), scraperStopped: make(chan struct{}),
httpClient: &http.Client{}, newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
} }
newTarget2 := &target{ newTarget2 := &target{
address: "example3", address: "example3",
state: ALIVE, state: ALIVE,
stopScraper: make(chan struct{}), scraperStopping: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1), scraperStopped: make(chan struct{}),
httpClient: &http.Client{}, newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
} }
pool.addTarget(oldTarget1) pool.addTarget(oldTarget1)

View file

@ -255,8 +255,7 @@ func dropCommonLabelsImpl(timestamp clientmodel.Timestamp, args []Node) interfac
} }
common := clientmodel.LabelSet{} common := clientmodel.LabelSet{}
for k, v := range vector[0].Metric { for k, v := range vector[0].Metric {
// TODO(julius): Revisit this when https://github.com/prometheus/prometheus/issues/380 // TODO(julius): Should we also drop common metric names?
// is implemented.
if k == clientmodel.MetricNameLabel { if k == clientmodel.MetricNameLabel {
continue continue
} }

View file

@ -125,6 +125,13 @@ func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool {
return !t.Before(cd.firstTime()) && !t.After(cd.lastTime()) return !t.Before(cd.firstTime()) && !t.After(cd.lastTime())
} }
func (cd *chunkDesc) getChunk() chunk {
cd.Lock()
defer cd.Unlock()
return cd.chunk
}
func (cd *chunkDesc) setChunk(c chunk) { func (cd *chunkDesc) setChunk(c chunk) {
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()

View file

@ -381,11 +381,11 @@ func (s *memorySeries) preloadChunksForRange(
func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator { func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
chunks := make([]chunk, 0, len(s.chunkDescs)) chunks := make([]chunk, 0, len(s.chunkDescs))
for i, cd := range s.chunkDescs { for i, cd := range s.chunkDescs {
if !cd.isEvicted() { if chunk := cd.getChunk(); chunk != nil {
if i == len(s.chunkDescs)-1 && !s.headChunkPersisted { if i == len(s.chunkDescs)-1 && !s.headChunkPersisted {
s.headChunkUsedByIterator = true s.headChunkUsedByIterator = true
} }
chunks = append(chunks, cd.chunk) chunks = append(chunks, chunk)
} }
} }