scrape: Add global jitter for HA server (#5181)

* scrape: Add global jitter for HA server

Covers issue in https://github.com/prometheus/prometheus/pull/4926#issuecomment-449039848
where the HA setup become a problem for targets unable to be scraped simultaneously.
The new jitter per server relies on the hostname and external labels which necessarily to be uniq.

As before, scrape offset will be calculated with regard the absolute time, so even
restart/reload doesn't change scrape time per scrape target + prometheus instance.

Use fqdn if possible, otherwise fall back to the hostname. It adds extra random seed
to calculate server hash to be distinguish on machines with the same hostname, but
different DC.

Signed-off-by: Aleksei Semiglazov <xjewer@gmail.com>
This commit is contained in:
xjewer 2019-03-12 10:46:15 +00:00 committed by Brian Brazil
parent 83c46fd549
commit 0d1a69353e
6 changed files with 143 additions and 16 deletions

View file

@ -14,7 +14,11 @@
package scrape
import (
"encoding"
"fmt"
"hash/fnv"
"net"
"os"
"reflect"
"sync"
"time"
@ -22,6 +26,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/storage"
@ -54,6 +59,7 @@ type Manager struct {
append Appendable
graceShut chan struct{}
jitterSeed uint64 // Global jitterSeed seed is used to spread scrape workload across HA setup.
mtxScrape sync.Mutex // Guards the fields below.
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
@ -111,7 +117,7 @@ func (m *Manager) reload() {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
continue
}
sp, err := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", setName))
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue
@ -131,6 +137,20 @@ func (m *Manager) reload() {
wg.Wait()
}
// setJitterSeed calculates a global jitterSeed per server relying on extra label set.
func (m *Manager) setJitterSeed(labels model.LabelSet) error {
h := fnv.New64a()
hostname, err := getFqdn()
if err != nil {
return err
}
if _, err := fmt.Fprintf(h, "%s%s", hostname, labels.String()); err != nil {
return err
}
m.jitterSeed = h.Sum64()
return nil
}
// Stop cancels all running scrape pools and blocks until all have exited.
func (m *Manager) Stop() {
m.mtxScrape.Lock()
@ -159,6 +179,10 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
}
m.scrapeConfigs = c
if err := m.setJitterSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
}
// Cleanup and reload pool if the configuration has changed.
var failed bool
for name, sp := range m.scrapePools {
@ -216,3 +240,45 @@ func (m *Manager) TargetsDropped() map[string][]*Target {
}
return targets
}
// getFqdn returns a fqdn if it's possible, otherwise falls back a hostname.
func getFqdn() (string, error) {
hostname, err := os.Hostname()
if err != nil {
return "", err
}
ips, err := net.LookupIP(hostname)
if err != nil {
return hostname, err
}
lookup := func(ipStr encoding.TextMarshaler) (string, error) {
ip, err := ipStr.MarshalText()
if err != nil {
return "", err
}
hosts, err := net.LookupAddr(string(ip))
if err != nil || len(hosts) == 0 {
return "", err
}
return hosts[0], nil
}
for _, addr := range ips {
if ip := addr.To4(); ip != nil {
if fqdn, err := lookup(ip); err == nil {
return fqdn, nil
}
}
if ip := addr.To16(); ip != nil {
if fqdn, err := lookup(ip); err == nil {
return fqdn, nil
}
}
}
return hostname, nil
}

View file

@ -19,14 +19,12 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/util/testutil"
yaml "gopkg.in/yaml.v2"
)
@ -368,3 +366,44 @@ func TestManagerTargetsUpdates(t *testing.T) {
t.Error("No scrape loops reload was triggered after targets update.")
}
}
func TestSetJitter(t *testing.T) {
getConfig := func(prometheus string) *config.Config {
cfgText := `
global:
external_labels:
prometheus: '` + prometheus + `'
`
cfg := &config.Config{}
if err := yaml.UnmarshalStrict([]byte(cfgText), cfg); err != nil {
t.Fatalf("Unable to load YAML config cfgYaml: %s", err)
}
return cfg
}
scrapeManager := NewManager(nil, nil)
// Load the first config.
cfg1 := getConfig("ha1")
if err := scrapeManager.setJitterSeed(cfg1.GlobalConfig.ExternalLabels); err != nil {
t.Error(err)
}
jitter1 := scrapeManager.jitterSeed
if jitter1 == 0 {
t.Error("Jitter has to be a hash of uint64")
}
// Load the first config.
cfg2 := getConfig("ha2")
if err := scrapeManager.setJitterSeed(cfg2.GlobalConfig.ExternalLabels); err != nil {
t.Error(err)
}
jitter2 := scrapeManager.jitterSeed
if jitter1 == jitter2 {
t.Error("Jitter should not be the same on different set of external labels")
}
}

View file

@ -173,7 +173,7 @@ const maxAheadTime = 10 * time.Minute
type labelsMutator func(labels.Labels) labels.Labels
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) (*scrapePool, error) {
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {
targetScrapePools.Inc()
if logger == nil {
logger = log.NewNopLogger()
@ -219,6 +219,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
return appender(app, opts.limit)
},
cache,
jitterSeed,
)
}
@ -489,7 +490,7 @@ func appender(app storage.Appender, limit int) storage.Appender {
type scraper interface {
scrape(ctx context.Context, w io.Writer) (string, error)
report(start time.Time, dur time.Duration, err error)
offset(interval time.Duration) time.Duration
offset(interval time.Duration, jitterSeed uint64) time.Duration
}
// targetScraper implements the scraper interface for a target.
@ -580,6 +581,7 @@ type scrapeLoop struct {
cache *scrapeCache
lastScrapeSize int
buffers *pool.Pool
jitterSeed uint64
appender func() storage.Appender
sampleMutator labelsMutator
@ -798,6 +800,7 @@ func newScrapeLoop(ctx context.Context,
reportSampleMutator labelsMutator,
appender func() storage.Appender,
cache *scrapeCache,
jitterSeed uint64,
) *scrapeLoop {
if l == nil {
l = log.NewNopLogger()
@ -816,6 +819,7 @@ func newScrapeLoop(ctx context.Context,
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
jitterSeed: jitterSeed,
l: l,
ctx: ctx,
}
@ -826,7 +830,7 @@ func newScrapeLoop(ctx context.Context,
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
select {
case <-time.After(sl.scraper.offset(interval)):
case <-time.After(sl.scraper.offset(interval, sl.jitterSeed)):
// Continue after a scraping offset.
case <-sl.scrapeCtx.Done():
close(sl.stopped)

View file

@ -48,7 +48,7 @@ func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp, _ = newScrapePool(cfg, app, nil)
sp, _ = newScrapePool(cfg, app, 0, nil)
)
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
@ -83,7 +83,7 @@ func TestDroppedTargetsList(t *testing.T) {
},
},
}
sp, _ = newScrapePool(cfg, app, nil)
sp, _ = newScrapePool(cfg, app, 0, nil)
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __metrics_path__=\"\", __scheme__=\"\", job=\"dropMe\"}"
expectedLength = 1
)
@ -305,7 +305,7 @@ func TestScrapePoolReload(t *testing.T) {
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
sp, _ := newScrapePool(cfg, app, nil)
sp, _ := newScrapePool(cfg, app, 0, nil)
loop := sp.newLoop(scrapeLoopOptions{
target: &Target{},
@ -353,7 +353,7 @@ func TestScrapePoolRaces(t *testing.T) {
newConfig := func() *config.ScrapeConfig {
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
}
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, nil)
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil)
tgts := []*targetgroup.Group{
{
Targets: []model.LabelSet{
@ -395,7 +395,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
nil, nil,
nopMutator,
nopMutator,
nil, nil,
nil, nil, 0,
)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
@ -459,6 +459,7 @@ func TestScrapeLoopStop(t *testing.T) {
nopMutator,
app,
nil,
0,
)
// Terminate loop after 2 scrapes.
@ -524,6 +525,7 @@ func TestScrapeLoopRun(t *testing.T) {
nopMutator,
app,
nil,
0,
)
// The loop must terminate during the initial offset if the context
@ -569,6 +571,7 @@ func TestScrapeLoopRun(t *testing.T) {
nopMutator,
app,
nil,
0,
)
go func() {
@ -617,6 +620,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
nopMutator,
func() storage.Appender { return nopAppender{} },
cache,
0,
)
defer cancel()
@ -666,6 +670,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
nopMutator,
app,
nil,
0,
)
// Succeed once, several failures, then stop.
numScrapes := 0
@ -724,6 +729,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
nopMutator,
app,
nil,
0,
)
// Succeed once, several failures, then stop.
@ -828,6 +834,7 @@ func TestScrapeLoopAppend(t *testing.T) {
},
func() storage.Appender { return app },
nil,
0,
)
now := time.Now()
@ -867,6 +874,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
nopMutator,
func() storage.Appender { return app },
nil,
0,
)
// Get the value of the Counter before performing the append.
@ -927,6 +935,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
nopMutator,
func() storage.Appender { return capp },
nil,
0,
)
now := time.Now()
@ -966,6 +975,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
nopMutator,
func() storage.Appender { return app },
nil,
0,
)
now := time.Now()
@ -1011,6 +1021,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
nopMutator,
func() storage.Appender { return app },
nil,
0,
)
now := time.Now()
@ -1050,6 +1061,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
nopMutator,
app,
nil,
0,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -1079,6 +1091,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
nopMutator,
app,
nil,
0,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -1125,6 +1138,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
nopMutator,
func() storage.Appender { return app },
nil,
0,
)
now := time.Unix(1, 0)
@ -1158,6 +1172,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
}
},
nil,
0,
)
now := time.Now().Add(20 * time.Minute)
@ -1322,7 +1337,7 @@ type testScraper struct {
scrapeFunc func(context.Context, io.Writer) error
}
func (ts *testScraper) offset(interval time.Duration) time.Duration {
func (ts *testScraper) offset(interval time.Duration, jitterSeed uint64) time.Duration {
return ts.offsetDur
}

View file

@ -125,12 +125,14 @@ func (t *Target) hash() uint64 {
}
// offset returns the time until the next scrape cycle for the target.
func (t *Target) offset(interval time.Duration) time.Duration {
// It includes the global server jitterSeed for scrapes from multiple Prometheus to try to be at different times.
func (t *Target) offset(interval time.Duration, jitterSeed uint64) time.Duration {
now := time.Now().UnixNano()
// Base is a pinned to absolute time, no matter how often offset is called.
var (
base = int64(interval) - now%int64(interval)
offset = t.hash() % uint64(interval)
offset = (t.hash() ^ jitterSeed) % uint64(interval)
next = base + int64(offset)
)

View file

@ -47,6 +47,7 @@ func TestTargetLabels(t *testing.T) {
func TestTargetOffset(t *testing.T) {
interval := 10 * time.Second
jitter := uint64(0)
offsets := make([]time.Duration, 10000)
@ -55,7 +56,7 @@ func TestTargetOffset(t *testing.T) {
target := newTestTarget("example.com:80", 0, labels.FromStrings(
"label", fmt.Sprintf("%d", i),
))
offsets[i] = target.offset(interval)
offsets[i] = target.offset(interval, jitter)
}
// Put the offsets into buckets and validate that they are all