Enable protobuf negotiation only when histograms are enabled

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2022-10-12 13:18:25 +05:30
parent bde500e690
commit 3cbf87b83d
No known key found for this signature in database
GPG key ID: F056451B52F1DC34
4 changed files with 66 additions and 34 deletions

View file

@ -198,6 +198,7 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.") level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.")
case "native-histograms": case "native-histograms":
c.tsdb.EnableNativeHistograms = true c.tsdb.EnableNativeHistograms = true
c.scrape.EnableProtobufNegotiation = true
level.Info(logger).Log("msg", "Experimental native histogram support enabled.") level.Info(logger).Log("msg", "Experimental native histogram support enabled.")
case "": case "":
continue continue

View file

@ -132,6 +132,9 @@ type Options struct {
// Option to enable the experimental in-memory metadata storage and append // Option to enable the experimental in-memory metadata storage and append
// metadata to the WAL. // metadata to the WAL.
EnableMetadataStorage bool EnableMetadataStorage bool
// Option to enable protobuf negotiation with the client. Note that the client can already
// send protobuf without needing to enable this.
EnableProtobufNegotiation bool
// Option to increase the interval used by scrape manager to throttle target groups updates. // Option to increase the interval used by scrape manager to throttle target groups updates.
DiscoveryReloadInterval model.Duration DiscoveryReloadInterval model.Duration

View file

@ -243,6 +243,8 @@ type scrapePool struct {
newLoop func(scrapeLoopOptions) loop newLoop func(scrapeLoopOptions) loop
noDefaultPort bool noDefaultPort bool
enableProtobufNegotiation bool
} }
type labelLimits struct { type labelLimits struct {
@ -293,6 +295,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
logger: logger, logger: logger,
httpOpts: options.HTTPClientOptions, httpOpts: options.HTTPClientOptions,
noDefaultPort: options.NoDefaultPort, noDefaultPort: options.NoDefaultPort,
enableProtobufNegotiation: options.EnableProtobufNegotiation,
} }
sp.newLoop = func(opts scrapeLoopOptions) loop { sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache. // Update the targets retrieval function for metadata to a new scrape cache.
@ -433,8 +436,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
t := sp.activeTargets[fp] t := sp.activeTargets[fp]
interval, timeout, err := t.intervalAndTimeout(interval, timeout) interval, timeout, err := t.intervalAndTimeout(interval, timeout)
acceptHeader := scrapeAcceptHeader
if sp.enableProtobufNegotiation {
acceptHeader = scrapeAcceptHeaderWithProtobuf
}
var ( var (
s = &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit} s = &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit, acceptHeader: acceptHeader}
newLoop = sp.newLoop(scrapeLoopOptions{ newLoop = sp.newLoop(scrapeLoopOptions{
target: t, target: t,
scraper: s, scraper: s,
@ -537,8 +544,11 @@ func (sp *scrapePool) sync(targets []*Target) {
// for every target. // for every target.
var err error var err error
interval, timeout, err = t.intervalAndTimeout(interval, timeout) interval, timeout, err = t.intervalAndTimeout(interval, timeout)
acceptHeader := scrapeAcceptHeader
s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit} if sp.enableProtobufNegotiation {
acceptHeader = scrapeAcceptHeaderWithProtobuf
}
s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit, acceptHeader: acceptHeader}
l := sp.newLoop(scrapeLoopOptions{ l := sp.newLoop(scrapeLoopOptions{
target: t, target: t,
scraper: s, scraper: s,
@ -757,11 +767,15 @@ type targetScraper struct {
buf *bufio.Reader buf *bufio.Reader
bodySizeLimit int64 bodySizeLimit int64
acceptHeader string
} }
var errBodySizeLimit = errors.New("body size limit exceeded") var errBodySizeLimit = errors.New("body size limit exceeded")
const acceptHeader = `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1` const (
scrapeAcceptHeader = `encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`
scrapeAcceptHeaderWithProtobuf = `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited,application/openmetrics-text;version=1.0.0;q=0.75,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`
)
var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)
@ -771,7 +785,7 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
if err != nil { if err != nil {
return "", err return "", err
} }
req.Header.Add("Accept", acceptHeader) req.Header.Add("Accept", s.acceptHeader)
req.Header.Add("Accept-Encoding", "gzip") req.Header.Add("Accept-Encoding", "gzip")
req.Header.Set("User-Agent", UserAgent) req.Header.Set("User-Agent", UserAgent)
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(s.timeout.Seconds(), 'f', -1, 64)) req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", strconv.FormatFloat(s.timeout.Seconds(), 'f', -1, 64))

View file

@ -2147,12 +2147,16 @@ func TestTargetScraperScrapeOK(t *testing.T) {
expectedTimeout = "1.5" expectedTimeout = "1.5"
) )
var protobufParsing bool
server := httptest.NewServer( server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if protobufParsing {
accept := r.Header.Get("Accept") accept := r.Header.Get("Accept")
if !strings.HasPrefix(accept, "application/vnd.google.protobuf;") { if !strings.HasPrefix(accept, "application/vnd.google.protobuf;") {
t.Errorf("Expected Accept header to prefer application/vnd.google.protobuf, got %q", accept) t.Errorf("Expected Accept header to prefer application/vnd.google.protobuf, got %q", accept)
} }
}
timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds") timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
if timeout != expectedTimeout { if timeout != expectedTimeout {
@ -2170,6 +2174,7 @@ func TestTargetScraperScrapeOK(t *testing.T) {
panic(err) panic(err)
} }
runTest := func(acceptHeader string) {
ts := &targetScraper{ ts := &targetScraper{
Target: &Target{ Target: &Target{
labels: labels.FromStrings( labels: labels.FromStrings(
@ -2179,6 +2184,7 @@ func TestTargetScraperScrapeOK(t *testing.T) {
}, },
client: http.DefaultClient, client: http.DefaultClient,
timeout: configTimeout, timeout: configTimeout,
acceptHeader: acceptHeader,
} }
var buf bytes.Buffer var buf bytes.Buffer
@ -2186,6 +2192,11 @@ func TestTargetScraperScrapeOK(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "text/plain; version=0.0.4", contentType) require.Equal(t, "text/plain; version=0.0.4", contentType)
require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String()) require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
}
runTest(scrapeAcceptHeader)
protobufParsing = true
runTest(scrapeAcceptHeaderWithProtobuf)
} }
func TestTargetScrapeScrapeCancel(t *testing.T) { func TestTargetScrapeScrapeCancel(t *testing.T) {
@ -2211,6 +2222,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) {
), ),
}, },
client: http.DefaultClient, client: http.DefaultClient,
acceptHeader: scrapeAcceptHeader,
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -2264,6 +2276,7 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) {
), ),
}, },
client: http.DefaultClient, client: http.DefaultClient,
acceptHeader: scrapeAcceptHeader,
} }
_, err = ts.scrape(context.Background(), io.Discard) _, err = ts.scrape(context.Background(), io.Discard)
@ -2305,6 +2318,7 @@ func TestTargetScraperBodySizeLimit(t *testing.T) {
}, },
client: http.DefaultClient, client: http.DefaultClient,
bodySizeLimit: bodySizeLimit, bodySizeLimit: bodySizeLimit,
acceptHeader: scrapeAcceptHeader,
} }
var buf bytes.Buffer var buf bytes.Buffer