Merge branch 'dev-2.0' of github.com:prometheus/prometheus into dev-2.0

This commit is contained in:
Fabian Reinartz 2017-09-12 12:01:09 +02:00
commit 63c246f924
3 changed files with 115 additions and 14 deletions

75
pkg/pool/pool.go Normal file
View file

@ -0,0 +1,75 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool
import "sync"
// BytesPool is a bucketed pool for variably sized byte slices.
type BytesPool struct {
buckets []sync.Pool
sizes []int
}
// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize
// increasing by the given factor.
func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool {
if minSize < 1 {
panic("invalid minimum pool size")
}
if maxSize < 1 {
panic("invalid maximum pool size")
}
if factor < 1 {
panic("invalid factor")
}
var sizes []int
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}
p := &BytesPool{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
}
return p
}
// Get returns a new byte slices that fits the given size.
func (p *BytesPool) Get(sz int) []byte {
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
b, ok := p.buckets[i].Get().([]byte)
if !ok {
b = make([]byte, 0, bktSize)
}
return b
}
return make([]byte, 0, sz)
}
// Put returns a byte slice to the right bucket in the pool.
func (p *BytesPool) Put(b []byte) {
for i, bktSize := range p.sizes {
if cap(b) > bktSize {
continue
}
p.buckets[i].Put(b[:0])
return
}
}

View file

@ -34,6 +34,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/pool"
"github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/pkg/value"
@ -145,13 +146,15 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
logger.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) logger.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
} }
buffers := pool.NewBytesPool(163, 100e6, 3)
newLoop := func( newLoop := func(
ctx context.Context, ctx context.Context,
s scraper, s scraper,
app, reportApp func() storage.Appender, app, reportApp func() storage.Appender,
l log.Logger, l log.Logger,
) loop { ) loop {
return newScrapeLoop(ctx, s, app, reportApp, l) return newScrapeLoop(ctx, s, app, reportApp, buffers, l)
} }
return &scrapePool{ return &scrapePool{
@ -470,9 +473,11 @@ type refEntry struct {
} }
type scrapeLoop struct { type scrapeLoop struct {
scraper scraper scraper scraper
l log.Logger l log.Logger
cache *scrapeCache cache *scrapeCache
lastScrapeSize int
buffers *pool.BytesPool
appender func() storage.Appender appender func() storage.Appender
reportAppender func() storage.Appender reportAppender func() storage.Appender
@ -573,16 +578,22 @@ func newScrapeLoop(
ctx context.Context, ctx context.Context,
sc scraper, sc scraper,
app, reportApp func() storage.Appender, app, reportApp func() storage.Appender,
buffers *pool.BytesPool,
l log.Logger, l log.Logger,
) *scrapeLoop { ) *scrapeLoop {
if l == nil { if l == nil {
l = log.Base() l = log.Base()
} }
if buffers == nil {
buffers = pool.NewBytesPool(10e3, 100e6, 3)
}
sl := &scrapeLoop{ sl := &scrapeLoop{
scraper: sc, scraper: sc,
appender: app, appender: app,
cache: newScrapeCache(), cache: newScrapeCache(),
reportAppender: reportApp, reportAppender: reportApp,
buffers: buffers,
lastScrapeSize: 16000,
stopped: make(chan struct{}), stopped: make(chan struct{}),
ctx: ctx, ctx: ctx,
l: l, l: l,
@ -631,12 +642,20 @@ mainLoop:
time.Since(last).Seconds(), time.Since(last).Seconds(),
) )
} }
b := sl.buffers.Get(sl.lastScrapeSize)
buf := bytes.NewBuffer(b)
scrapeErr := sl.scraper.scrape(scrapeCtx, buf) scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
cancel() cancel()
var b []byte
if scrapeErr == nil { if scrapeErr == nil {
b = buf.Bytes() b = buf.Bytes()
// NOTE: There were issues with misbehaving clients in the past
// that occasionally returned empty results. We don't want those
// to falsely reset our buffer size.
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
} else { } else {
sl.l.With("err", scrapeErr.Error()).Debug("scrape failed") sl.l.With("err", scrapeErr.Error()).Debug("scrape failed")
if errc != nil { if errc != nil {
@ -656,6 +675,8 @@ mainLoop:
} }
} }
sl.buffers.Put(b)
if scrapeErr == nil { if scrapeErr == nil {
scrapeErr = appErr scrapeErr = appErr
} }

View file

@ -313,7 +313,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
func TestScrapeLoopStopBeforeRun(t *testing.T) { func TestScrapeLoopStopBeforeRun(t *testing.T) {
scraper := &testScraper{} scraper := &testScraper{}
sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil) sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil, nil)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape // The scrape pool synchronizes on stopping scrape loops. However, new scrape
// loops are started asynchronously. Thus it's possible, that a loop is stopped // loops are started asynchronously. Thus it's possible, that a loop is stopped
@ -371,7 +371,7 @@ func TestScrapeLoopStop(t *testing.T) {
) )
defer close(signal) defer close(signal)
sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil) sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil, nil)
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -428,7 +428,7 @@ func TestScrapeLoopRun(t *testing.T) {
defer close(signal) defer close(signal)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil)
// The loop must terminate during the initial offset if the context // The loop must terminate during the initial offset if the context
// is canceled. // is canceled.
@ -466,7 +466,7 @@ func TestScrapeLoopRun(t *testing.T) {
} }
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
sl = newScrapeLoop(ctx, scraper, app, reportApp, nil) sl = newScrapeLoop(ctx, scraper, app, reportApp, nil, nil)
go func() { go func() {
sl.run(time.Second, 100*time.Millisecond, errc) sl.run(time.Second, 100*time.Millisecond, errc)
@ -511,7 +511,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
defer close(signal) defer close(signal)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil)
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -561,7 +561,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
defer close(signal) defer close(signal)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp, nil) sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil)
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -608,6 +608,7 @@ func TestScrapeLoopAppend(t *testing.T) {
func() storage.Appender { return app }, func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} }, func() storage.Appender { return nopAppender{} },
nil, nil,
nil,
) )
now := time.Now() now := time.Now()
_, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now) _, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now)
@ -645,6 +646,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
func() storage.Appender { return app }, func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} }, func() storage.Appender { return nopAppender{} },
nil, nil,
nil,
) )
now := time.Now() now := time.Now()
@ -688,6 +690,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
func() storage.Appender { return app }, func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} }, func() storage.Appender { return nopAppender{} },
nil, nil,
nil,
) )
now := time.Now() now := time.Now()
@ -780,7 +783,7 @@ func TestScrapeLoopRunAppliesScrapeLimit(t *testing.T) {
defer close(signal) defer close(signal)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil) sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil, nil)
// Setup a series to be stale, then 3 samples, then stop. // Setup a series to be stale, then 3 samples, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -831,7 +834,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil) sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil, nil)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
cancel() cancel()
@ -853,7 +856,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
) )
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil) sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil, nil)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
cancel() cancel()
@ -895,6 +898,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
func() storage.Appender { return app }, func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} }, func() storage.Appender { return nopAppender{} },
nil, nil,
nil,
) )
now := time.Unix(1, 0) now := time.Unix(1, 0)
@ -925,6 +929,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
}, },
func() storage.Appender { return nopAppender{} }, func() storage.Appender { return nopAppender{} },
nil, nil,
nil,
) )
now := time.Now().Add(20 * time.Minute) now := time.Now().Add(20 * time.Minute)