Remove the remaining races, new and old.

Also, resolve a few other TODOs.

Change-Id: Icb39b5a5e8ca22ebcb48771cd8951c5d9e112691
This commit is contained in:
Bjoern Rabenstein 2014-12-03 18:07:23 +01:00
parent c0e5ac35f0
commit fee88a7a77
9 changed files with 118 additions and 67 deletions

View file

@ -14,7 +14,7 @@ intervals, evaluate rule expressions, display the results, and trigger an
action if some condition is observed to be true.
TODO: The above description is somewhat esoteric. Rephrase it into
somethith that tells normal people how they will usually benefit from
something that tells normal people how they will usually benefit from
using Prometheus.
## Install

View file

@ -19,6 +19,7 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/golang/glog"
@ -136,14 +137,9 @@ type Target interface {
RunScraper(extraction.Ingester, time.Duration)
// Stop scraping, synchronous.
StopScraper()
// Do a single scrape.
scrape(ingester extraction.Ingester) error
}
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
//
// TODO: The implementation is not yet goroutine safe, but for the web status,
// methods are called concurrently.
type target struct {
// The current health state of the target.
state TargetState
@ -151,8 +147,10 @@ type target struct {
lastError error
// The last time a scrape was attempted.
lastScrape time.Time
// Closing stopScraper signals that scraping should stop.
stopScraper chan struct{}
// Closing scraperStopping signals that scraping should stop.
scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped.
scraperStopped chan struct{}
// Channel to queue base labels to be replaced.
newBaseLabels chan clientmodel.LabelSet
@ -163,17 +161,25 @@ type target struct {
baseLabels clientmodel.LabelSet
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
// Mutex protects lastError, lastScrape, state, and baseLabels. Writing
// the above must only happen in the goroutine running the RunScraper
// loop, and it must happen under the lock. In that way, no mutex lock
// is required for reading the above in the goroutine running the
// RunScraper loop, but only for reading in other goroutines.
sync.Mutex
}
// Furnish a reasonably configured target for querying.
func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target {
target := &target{
address: address,
Deadline: deadline,
baseLabels: baseLabels,
httpClient: utility.NewDeadlineClient(deadline),
stopScraper: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
address: address,
Deadline: deadline,
baseLabels: baseLabels,
httpClient: utility.NewDeadlineClient(deadline),
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
}
return target
@ -213,6 +219,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
case <-t.newBaseLabels:
// Do nothing.
default:
close(t.scraperStopped)
return
}
}
@ -221,7 +228,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64()))
select {
case <-jitterTimer.C:
case <-t.stopScraper:
case <-t.scraperStopping:
jitterTimer.Stop()
return
}
@ -230,32 +237,40 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
ticker := time.NewTicker(interval)
defer ticker.Stop()
t.Lock() // Writing t.lastScrape requires the lock.
t.lastScrape = time.Now()
t.Unlock()
t.scrape(ingester)
// Explanation of the contraption below:
//
// In case t.newBaseLabels or t.stopScraper have something to receive,
// In case t.newBaseLabels or t.scraperStopping have something to receive,
// we want to read from those channels rather than starting a new scrape
// (which might take very long). That's why the outer select has no
// ticker.C. Should neither t.newBaseLabels nor t.stopScraper have
// ticker.C. Should neither t.newBaseLabels nor t.scraperStopping have
// anything to receive, we go into the inner select, where ticker.C is
// in the mix.
for {
select {
case newBaseLabels := <-t.newBaseLabels:
t.Lock() // Writing t.baseLabels requires the lock.
t.baseLabels = newBaseLabels
case <-t.stopScraper:
t.Unlock()
case <-t.scraperStopping:
return
default:
select {
case newBaseLabels := <-t.newBaseLabels:
t.Lock() // Writing t.baseLabels requires the lock.
t.baseLabels = newBaseLabels
case <-t.stopScraper:
t.Unlock()
case <-t.scraperStopping:
return
case <-ticker.C:
targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second))
t.Lock() // Write t.lastScrape requires locking.
t.lastScrape = time.Now()
t.Unlock()
t.scrape(ingester)
}
}
@ -264,7 +279,8 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
// StopScraper implements Target.
func (t *target) StopScraper() {
close(t.stopScraper)
close(t.scraperStopping)
<-t.scraperStopped
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
@ -278,6 +294,7 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
instance: t.Address(),
outcome: success,
}
t.Lock() // Writing t.state and t.lastError requires the lock.
if err == nil {
t.state = ALIVE
t.recordScrapeHealth(ingester, timestamp, true)
@ -288,6 +305,7 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
}
targetOperationLatencies.With(labels).Observe(ms)
t.lastError = err
t.Unlock()
}(time.Now())
req, err := http.NewRequest("GET", t.Address(), nil)
@ -310,8 +328,6 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
return err
}
// TODO: This is a wart; we need to handle this more gracefully down the
// road, especially once we have service discovery support.
baseLabels := clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.Address())}
for baseLabel, baseValue := range t.baseLabels {
baseLabels[baseLabel] = baseValue
@ -331,16 +347,22 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) {
// LastError implements Target.
func (t *target) LastError() error {
t.Lock()
defer t.Unlock()
return t.lastError
}
// State implements Target.
func (t *target) State() TargetState {
t.Lock()
defer t.Unlock()
return t.state
}
// LastScrape implements Target.
func (t *target) LastScrape() time.Time {
t.Lock()
defer t.Unlock()
return t.lastScrape
}
@ -365,6 +387,8 @@ func (t *target) GlobalAddress() string {
// BaseLabels implements Target.
func (t *target) BaseLabels() clientmodel.LabelSet {
t.Lock()
defer t.Unlock()
return t.baseLabels
}
@ -375,5 +399,3 @@ func (t *target) SetBaseLabelsFrom(newTarget Target) {
}
t.newBaseLabels <- newTarget.BaseLabels()
}
type targets []Target

View file

@ -100,7 +100,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
// scrape once without timeout
signal <- true
if err := testTarget.scrape(ingester); err != nil {
if err := testTarget.(*target).scrape(ingester); err != nil {
t.Fatal(err)
}
@ -109,12 +109,12 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again
signal <- true
if err := testTarget.scrape(ingester); err != nil {
if err := testTarget.(*target).scrape(ingester); err != nil {
t.Fatal(err)
}
// now timeout
if err := testTarget.scrape(ingester); err == nil {
if err := testTarget.(*target).scrape(ingester); err == nil {
t.Fatal("expected scrape to timeout")
} else {
signal <- true // let handler continue
@ -122,7 +122,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again without timeout
signal <- true
if err := testTarget.scrape(ingester); err != nil {
if err := testTarget.(*target).scrape(ingester); err != nil {
t.Fatal(err)
}
}
@ -138,7 +138,7 @@ func TestTargetScrape404(t *testing.T) {
ingester := nopIngester{}
want := errors.New("server returned HTTP status 404 Not Found")
got := testTarget.scrape(ingester)
got := testTarget.(*target).scrape(ingester)
if got == nil || want.Error() != got.Error() {
t.Fatalf("want err %q, got %q", want, got)
}
@ -146,10 +146,11 @@ func TestTargetScrape404(t *testing.T) {
func TestTargetRunScraperScrapes(t *testing.T) {
testTarget := target{
state: UNKNOWN,
address: "bad schema",
httpClient: utility.NewDeadlineClient(0),
stopScraper: make(chan struct{}),
state: UNKNOWN,
address: "bad schema",
httpClient: utility.NewDeadlineClient(0),
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
}
go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond))

View file

@ -23,16 +23,18 @@ import (
"github.com/prometheus/prometheus/config"
)
// TargetManager manages all scrape targets. All methods are goroutine-safe.
type TargetManager interface {
AddTarget(job config.JobConfig, t Target)
ReplaceTargets(job config.JobConfig, newTargets []Target)
Remove(t Target)
AddTargetsFromConfig(config config.Config)
Stop()
Pools() map[string]*TargetPool
Pools() map[string]*TargetPool // Returns a copy of the name -> TargetPool mapping.
}
type targetManager struct {
sync.Mutex // Protects poolByJob.
poolsByJob map[string]*TargetPool
ingester extraction.Ingester
}
@ -44,7 +46,7 @@ func NewTargetManager(ingester extraction.Ingester) TargetManager {
}
}
func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool {
func (m *targetManager) targetPoolForJob(job config.JobConfig) *TargetPool {
targetPool, ok := m.poolsByJob[job.GetName()]
if !ok {
@ -58,7 +60,6 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool {
glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName())
m.poolsByJob[job.GetName()] = targetPool
// TODO: Investigate whether this auto-goroutine creation is desired.
go targetPool.Run()
}
@ -66,24 +67,32 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool {
}
func (m *targetManager) AddTarget(job config.JobConfig, t Target) {
targetPool := m.TargetPoolForJob(job)
m.Lock()
defer m.Unlock()
targetPool := m.targetPoolForJob(job)
targetPool.AddTarget(t)
m.poolsByJob[job.GetName()] = targetPool
}
func (m *targetManager) ReplaceTargets(job config.JobConfig, newTargets []Target) {
targetPool := m.TargetPoolForJob(job)
m.Lock()
defer m.Unlock()
targetPool := m.targetPoolForJob(job)
targetPool.ReplaceTargets(newTargets)
}
func (m targetManager) Remove(t Target) {
func (m *targetManager) Remove(t Target) {
panic("not implemented")
}
func (m *targetManager) AddTargetsFromConfig(config config.Config) {
for _, job := range config.Jobs() {
if job.SdName != nil {
m.TargetPoolForJob(job)
m.Lock()
m.targetPoolForJob(job)
m.Unlock()
continue
}
@ -106,6 +115,9 @@ func (m *targetManager) AddTargetsFromConfig(config config.Config) {
}
func (m *targetManager) Stop() {
m.Lock()
defer m.Unlock()
glog.Info("Stopping target manager...")
var wg sync.WaitGroup
for j, p := range m.poolsByJob {
@ -121,7 +133,13 @@ func (m *targetManager) Stop() {
glog.Info("Target manager stopped.")
}
// TODO: Not goroutine-safe. Only used in /status page for now.
func (m *targetManager) Pools() map[string]*TargetPool {
return m.poolsByJob
m.Lock()
defer m.Unlock()
result := make(map[string]*TargetPool, len(m.poolsByJob))
for k, v := range m.poolsByJob {
result[k] = v
}
return result
}

View file

@ -122,9 +122,9 @@ func (p *TargetPool) ReplaceTargets(newTargets []Target) {
defer wg.Done()
glog.V(1).Infof("Stopping scraper for target %s...", k)
oldTarget.StopScraper()
delete(p.targetsByAddress, k)
glog.V(1).Infof("Scraper for target %s stopped.", k)
}(k, oldTarget)
delete(p.targetsByAddress, k)
}
}
wg.Wait()

View file

@ -114,32 +114,36 @@ func TestTargetPool(t *testing.T) {
func TestTargetPoolReplaceTargets(t *testing.T) {
pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1))
oldTarget1 := &target{
address: "example1",
state: UNREACHABLE,
stopScraper: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
address: "example1",
state: UNREACHABLE,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
oldTarget2 := &target{
address: "example2",
state: UNREACHABLE,
stopScraper: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
address: "example2",
state: UNREACHABLE,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
newTarget1 := &target{
address: "example1",
state: ALIVE,
stopScraper: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
address: "example1",
state: ALIVE,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
newTarget2 := &target{
address: "example3",
state: ALIVE,
stopScraper: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
address: "example3",
state: ALIVE,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
pool.addTarget(oldTarget1)

View file

@ -255,8 +255,7 @@ func dropCommonLabelsImpl(timestamp clientmodel.Timestamp, args []Node) interfac
}
common := clientmodel.LabelSet{}
for k, v := range vector[0].Metric {
// TODO(julius): Revisit this when https://github.com/prometheus/prometheus/issues/380
// is implemented.
// TODO(julius): Should we also drop common metric names?
if k == clientmodel.MetricNameLabel {
continue
}

View file

@ -125,6 +125,13 @@ func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool {
return !t.Before(cd.firstTime()) && !t.After(cd.lastTime())
}
func (cd *chunkDesc) getChunk() chunk {
cd.Lock()
defer cd.Unlock()
return cd.chunk
}
func (cd *chunkDesc) setChunk(c chunk) {
cd.Lock()
defer cd.Unlock()

View file

@ -381,11 +381,11 @@ func (s *memorySeries) preloadChunksForRange(
func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
chunks := make([]chunk, 0, len(s.chunkDescs))
for i, cd := range s.chunkDescs {
if !cd.isEvicted() {
if chunk := cd.getChunk(); chunk != nil {
if i == len(s.chunkDescs)-1 && !s.headChunkPersisted {
s.headChunkUsedByIterator = true
}
chunks = append(chunks, cd.chunk)
chunks = append(chunks, chunk)
}
}