mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
b50f9c1c84
* scrape: add label limits per scrape Add three new limits to the scrape configuration to provide some mechanism to defend against unbound number of labels and excessive label lengths. If any of these limits are broken by a sample from a scrape, the whole scrape will fail. For all of these configuration options, a zero value means no limit. The `label_limit` configuration will provide a mechanism to bound the number of labels per-scrape of a certain sample to a user defined limit. This limit will be tested against the sample labels plus the discovery labels, but it will exclude the __name__ from the count since it is a mandatory Prometheus label to which applying constraints isn't meaningful. The `label_name_length_limit` and `label_value_length_limit` will prevent having labels of excessive lengths. These limits also skip the __name__ label for the same reasons as the `label_limit` option and will also make the scrape fail if any sample has a label name/value length that exceed the predefined limits. Signed-off-by: Damien Grisonnet <dgrisonn@redhat.com> * scrape: add metrics and alert to label limits Add three gauge, one for each label limit to easily access the limit set by a certain scrape target. Also add a counter to count the number of targets that exceeded the label limits and thus were dropped. This is useful for the `PrometheusLabelLimitHit` alert that will notify the users that scraping some targets failed because they had samples exceeding the label limits defined in the scrape configuration. Signed-off-by: Damien Grisonnet <dgrisonn@redhat.com> * scrape: apply label limits to __name__ label Apply limits to the __name__ label that was previously skipped and truncate the label names and values in the error messages as they can be very very long. Signed-off-by: Damien Grisonnet <dgrisonn@redhat.com> * scrape: remove label limits gauges and refactor Remove `prometheus_target_scrape_pool_label_limit`, `prometheus_target_scrape_pool_label_name_length_limit`, and `prometheus_target_scrape_pool_label_value_length_limit` as they are not really useful since we don't have the information on the labels in it. Signed-off-by: Damien Grisonnet <dgrisonn@redhat.com>
2562 lines
65 KiB
Go
2562 lines
65 KiB
Go
// Copyright 2016 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package scrape
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/pkg/errors"
|
|
dto "github.com/prometheus/client_model/go"
|
|
config_util "github.com/prometheus/common/config"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/pkg/relabel"
|
|
"github.com/prometheus/prometheus/pkg/textparse"
|
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
|
"github.com/prometheus/prometheus/pkg/value"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/util/teststorage"
|
|
"github.com/prometheus/prometheus/util/testutil"
|
|
)
|
|
|
|
func TestMain(m *testing.M) {
|
|
testutil.TolerantVerifyLeak(m)
|
|
}
|
|
|
|
func TestNewScrapePool(t *testing.T) {
|
|
var (
|
|
app = &nopAppendable{}
|
|
cfg = &config.ScrapeConfig{}
|
|
sp, _ = newScrapePool(cfg, app, 0, nil)
|
|
)
|
|
|
|
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
|
|
t.Fatalf("Wrong sample appender")
|
|
}
|
|
if sp.config != cfg {
|
|
t.Fatalf("Wrong scrape config")
|
|
}
|
|
if sp.newLoop == nil {
|
|
t.Fatalf("newLoop function not initialized")
|
|
}
|
|
}
|
|
|
|
func TestDroppedTargetsList(t *testing.T) {
|
|
var (
|
|
app = &nopAppendable{}
|
|
cfg = &config.ScrapeConfig{
|
|
JobName: "dropMe",
|
|
ScrapeInterval: model.Duration(1),
|
|
RelabelConfigs: []*relabel.Config{
|
|
{
|
|
Action: relabel.Drop,
|
|
Regex: relabel.MustNewRegexp("dropMe"),
|
|
SourceLabels: model.LabelNames{"job"},
|
|
},
|
|
},
|
|
}
|
|
tgs = []*targetgroup.Group{
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{model.AddressLabel: "127.0.0.1:9090"},
|
|
},
|
|
},
|
|
}
|
|
sp, _ = newScrapePool(cfg, app, 0, nil)
|
|
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", job=\"dropMe\"}"
|
|
expectedLength = 1
|
|
)
|
|
sp.Sync(tgs)
|
|
sp.Sync(tgs)
|
|
if len(sp.droppedTargets) != expectedLength {
|
|
t.Fatalf("Length of dropped targets exceeded expected length, expected %v, got %v", expectedLength, len(sp.droppedTargets))
|
|
}
|
|
if sp.droppedTargets[0].DiscoveredLabels().String() != expectedLabelSetString {
|
|
t.Fatalf("Got %v, expected %v", sp.droppedTargets[0].DiscoveredLabels().String(), expectedLabelSetString)
|
|
}
|
|
}
|
|
|
|
// TestDiscoveredLabelsUpdate checks that DiscoveredLabels are updated
|
|
// even when new labels don't affect the target `hash`.
|
|
func TestDiscoveredLabelsUpdate(t *testing.T) {
|
|
|
|
sp := &scrapePool{}
|
|
// These are used when syncing so need this to avoid a panic.
|
|
sp.config = &config.ScrapeConfig{
|
|
ScrapeInterval: model.Duration(1),
|
|
ScrapeTimeout: model.Duration(1),
|
|
}
|
|
sp.activeTargets = make(map[uint64]*Target)
|
|
t1 := &Target{
|
|
discoveredLabels: labels.Labels{
|
|
labels.Label{
|
|
Name: "label",
|
|
Value: "name",
|
|
},
|
|
},
|
|
}
|
|
sp.activeTargets[t1.hash()] = t1
|
|
|
|
t2 := &Target{
|
|
discoveredLabels: labels.Labels{
|
|
labels.Label{
|
|
Name: "labelNew",
|
|
Value: "nameNew",
|
|
},
|
|
},
|
|
}
|
|
sp.sync([]*Target{t2})
|
|
|
|
require.Equal(t, t2.DiscoveredLabels(), sp.activeTargets[t1.hash()].DiscoveredLabels())
|
|
}
|
|
|
|
type testLoop struct {
|
|
startFunc func(interval, timeout time.Duration, errc chan<- error)
|
|
stopFunc func()
|
|
forcedErr error
|
|
forcedErrMtx sync.Mutex
|
|
runOnce bool
|
|
}
|
|
|
|
func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
|
if l.runOnce {
|
|
panic("loop must be started only once")
|
|
}
|
|
l.runOnce = true
|
|
l.startFunc(interval, timeout, errc)
|
|
}
|
|
|
|
func (l *testLoop) disableEndOfRunStalenessMarkers() {
|
|
}
|
|
|
|
func (l *testLoop) setForcedError(err error) {
|
|
l.forcedErrMtx.Lock()
|
|
defer l.forcedErrMtx.Unlock()
|
|
l.forcedErr = err
|
|
}
|
|
|
|
func (l *testLoop) getForcedError() error {
|
|
l.forcedErrMtx.Lock()
|
|
defer l.forcedErrMtx.Unlock()
|
|
return l.forcedErr
|
|
}
|
|
|
|
func (l *testLoop) stop() {
|
|
l.stopFunc()
|
|
}
|
|
|
|
func (l *testLoop) getCache() *scrapeCache {
|
|
return nil
|
|
}
|
|
|
|
func TestScrapePoolStop(t *testing.T) {
|
|
sp := &scrapePool{
|
|
activeTargets: map[uint64]*Target{},
|
|
loops: map[uint64]loop{},
|
|
cancel: func() {},
|
|
client: http.DefaultClient,
|
|
}
|
|
var mtx sync.Mutex
|
|
stopped := map[uint64]bool{}
|
|
numTargets := 20
|
|
|
|
// Stopping the scrape pool must call stop() on all scrape loops,
|
|
// clean them and the respective targets up. It must wait until each loop's
|
|
// stop function returned before returning itself.
|
|
|
|
for i := 0; i < numTargets; i++ {
|
|
t := &Target{
|
|
labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)),
|
|
}
|
|
l := &testLoop{}
|
|
l.stopFunc = func() {
|
|
time.Sleep(time.Duration(i*20) * time.Millisecond)
|
|
|
|
mtx.Lock()
|
|
stopped[t.hash()] = true
|
|
mtx.Unlock()
|
|
}
|
|
|
|
sp.activeTargets[t.hash()] = t
|
|
sp.loops[t.hash()] = l
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
stopTime := time.Now()
|
|
|
|
go func() {
|
|
sp.stop()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("scrapeLoop.stop() did not return as expected")
|
|
case <-done:
|
|
// This should have taken at least as long as the last target slept.
|
|
if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond {
|
|
t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
|
|
}
|
|
}
|
|
|
|
mtx.Lock()
|
|
require.Equal(t, numTargets, len(stopped), "Unexpected number of stopped loops")
|
|
mtx.Unlock()
|
|
|
|
require.Equal(t, 0, len(sp.activeTargets), "Targets were not cleared on stopping: %d left", len(sp.activeTargets))
|
|
require.Equal(t, 0, len(sp.loops), "Loops were not cleared on stopping: %d left", len(sp.loops))
|
|
}
|
|
|
|
func TestScrapePoolReload(t *testing.T) {
|
|
var mtx sync.Mutex
|
|
numTargets := 20
|
|
|
|
stopped := map[uint64]bool{}
|
|
|
|
reloadCfg := &config.ScrapeConfig{
|
|
ScrapeInterval: model.Duration(3 * time.Second),
|
|
ScrapeTimeout: model.Duration(2 * time.Second),
|
|
}
|
|
// On starting to run, new loops created on reload check whether their preceding
|
|
// equivalents have been stopped.
|
|
newLoop := func(opts scrapeLoopOptions) loop {
|
|
l := &testLoop{}
|
|
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
|
|
require.Equal(t, 3*time.Second, interval, "Unexpected scrape interval")
|
|
require.Equal(t, 2*time.Second, timeout, "Unexpected scrape timeout")
|
|
|
|
mtx.Lock()
|
|
targetScraper := opts.scraper.(*targetScraper)
|
|
require.True(t, stopped[targetScraper.hash()], "Scrape loop for %v not stopped yet", targetScraper)
|
|
mtx.Unlock()
|
|
}
|
|
return l
|
|
}
|
|
sp := &scrapePool{
|
|
appendable: &nopAppendable{},
|
|
activeTargets: map[uint64]*Target{},
|
|
loops: map[uint64]loop{},
|
|
newLoop: newLoop,
|
|
logger: nil,
|
|
client: http.DefaultClient,
|
|
}
|
|
|
|
// Reloading a scrape pool with a new scrape configuration must stop all scrape
|
|
// loops and start new ones. A new loop must not be started before the preceding
|
|
// one terminated.
|
|
|
|
for i := 0; i < numTargets; i++ {
|
|
t := &Target{
|
|
labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)),
|
|
}
|
|
l := &testLoop{}
|
|
l.stopFunc = func() {
|
|
time.Sleep(time.Duration(i*20) * time.Millisecond)
|
|
|
|
mtx.Lock()
|
|
stopped[t.hash()] = true
|
|
mtx.Unlock()
|
|
}
|
|
|
|
sp.activeTargets[t.hash()] = t
|
|
sp.loops[t.hash()] = l
|
|
}
|
|
done := make(chan struct{})
|
|
|
|
beforeTargets := map[uint64]*Target{}
|
|
for h, t := range sp.activeTargets {
|
|
beforeTargets[h] = t
|
|
}
|
|
|
|
reloadTime := time.Now()
|
|
|
|
go func() {
|
|
sp.reload(reloadCfg)
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("scrapeLoop.reload() did not return as expected")
|
|
case <-done:
|
|
// This should have taken at least as long as the last target slept.
|
|
if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond {
|
|
t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
|
|
}
|
|
}
|
|
|
|
mtx.Lock()
|
|
require.Equal(t, numTargets, len(stopped), "Unexpected number of stopped loops")
|
|
mtx.Unlock()
|
|
|
|
require.Equal(t, sp.activeTargets, beforeTargets, "Reloading affected target states unexpectedly")
|
|
require.Equal(t, numTargets, len(sp.loops), "Unexpected number of stopped loops after reload")
|
|
}
|
|
|
|
func TestScrapePoolTargetLimit(t *testing.T) {
|
|
var wg sync.WaitGroup
|
|
// On starting to run, new loops created on reload check whether their preceding
|
|
// equivalents have been stopped.
|
|
newLoop := func(opts scrapeLoopOptions) loop {
|
|
wg.Add(1)
|
|
l := &testLoop{
|
|
startFunc: func(interval, timeout time.Duration, errc chan<- error) {
|
|
wg.Done()
|
|
},
|
|
stopFunc: func() {},
|
|
}
|
|
return l
|
|
}
|
|
sp := &scrapePool{
|
|
appendable: &nopAppendable{},
|
|
activeTargets: map[uint64]*Target{},
|
|
loops: map[uint64]loop{},
|
|
newLoop: newLoop,
|
|
logger: nil,
|
|
client: http.DefaultClient,
|
|
}
|
|
|
|
var tgs = []*targetgroup.Group{}
|
|
for i := 0; i < 50; i++ {
|
|
tgs = append(tgs,
|
|
&targetgroup.Group{
|
|
Targets: []model.LabelSet{
|
|
{model.AddressLabel: model.LabelValue(fmt.Sprintf("127.0.0.1:%d", 9090+i))},
|
|
},
|
|
},
|
|
)
|
|
}
|
|
|
|
var limit uint
|
|
reloadWithLimit := func(l uint) {
|
|
limit = l
|
|
require.NoError(t, sp.reload(&config.ScrapeConfig{
|
|
ScrapeInterval: model.Duration(3 * time.Second),
|
|
ScrapeTimeout: model.Duration(2 * time.Second),
|
|
TargetLimit: l,
|
|
}))
|
|
}
|
|
|
|
var targets int
|
|
loadTargets := func(n int) {
|
|
targets = n
|
|
sp.Sync(tgs[:n])
|
|
}
|
|
|
|
validateIsRunning := func() {
|
|
wg.Wait()
|
|
for _, l := range sp.loops {
|
|
require.True(t, l.(*testLoop).runOnce, "loop should be running")
|
|
}
|
|
}
|
|
|
|
validateErrorMessage := func(shouldErr bool) {
|
|
for _, l := range sp.loops {
|
|
lerr := l.(*testLoop).getForcedError()
|
|
if shouldErr {
|
|
require.NotNil(t, lerr, "error was expected for %d targets with a limit of %d", targets, limit)
|
|
require.Equal(t, fmt.Sprintf("target_limit exceeded (number of targets: %d, limit: %d)", targets, limit), lerr.Error())
|
|
} else {
|
|
require.Equal(t, nil, lerr)
|
|
}
|
|
}
|
|
}
|
|
|
|
reloadWithLimit(0)
|
|
loadTargets(50)
|
|
validateIsRunning()
|
|
|
|
// Simulate an initial config with a limit.
|
|
sp.config.TargetLimit = 30
|
|
limit = 30
|
|
loadTargets(50)
|
|
validateIsRunning()
|
|
validateErrorMessage(true)
|
|
|
|
reloadWithLimit(50)
|
|
validateIsRunning()
|
|
validateErrorMessage(false)
|
|
|
|
reloadWithLimit(40)
|
|
validateIsRunning()
|
|
validateErrorMessage(true)
|
|
|
|
loadTargets(30)
|
|
validateIsRunning()
|
|
validateErrorMessage(false)
|
|
|
|
loadTargets(40)
|
|
validateIsRunning()
|
|
validateErrorMessage(false)
|
|
|
|
loadTargets(41)
|
|
validateIsRunning()
|
|
validateErrorMessage(true)
|
|
|
|
reloadWithLimit(51)
|
|
validateIsRunning()
|
|
validateErrorMessage(false)
|
|
|
|
tgs = append(tgs,
|
|
&targetgroup.Group{
|
|
Targets: []model.LabelSet{
|
|
{model.AddressLabel: model.LabelValue("127.0.0.1:1090")},
|
|
},
|
|
},
|
|
&targetgroup.Group{
|
|
Targets: []model.LabelSet{
|
|
{model.AddressLabel: model.LabelValue("127.0.0.1:1090")},
|
|
},
|
|
},
|
|
)
|
|
|
|
sp.Sync(tgs)
|
|
validateIsRunning()
|
|
validateErrorMessage(false)
|
|
}
|
|
|
|
func TestScrapePoolAppender(t *testing.T) {
|
|
cfg := &config.ScrapeConfig{}
|
|
app := &nopAppendable{}
|
|
sp, _ := newScrapePool(cfg, app, 0, nil)
|
|
|
|
loop := sp.newLoop(scrapeLoopOptions{
|
|
target: &Target{},
|
|
})
|
|
appl, ok := loop.(*scrapeLoop)
|
|
require.True(t, ok, "Expected scrapeLoop but got %T", loop)
|
|
|
|
wrapped := appl.appender(context.Background())
|
|
|
|
tl, ok := wrapped.(*timeLimitAppender)
|
|
require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped)
|
|
|
|
_, ok = tl.Appender.(nopAppender)
|
|
require.True(t, ok, "Expected base appender but got %T", tl.Appender)
|
|
|
|
loop = sp.newLoop(scrapeLoopOptions{
|
|
target: &Target{},
|
|
sampleLimit: 100,
|
|
})
|
|
appl, ok = loop.(*scrapeLoop)
|
|
require.True(t, ok, "Expected scrapeLoop but got %T", loop)
|
|
|
|
wrapped = appl.appender(context.Background())
|
|
|
|
sl, ok := wrapped.(*limitAppender)
|
|
require.True(t, ok, "Expected limitAppender but got %T", wrapped)
|
|
|
|
tl, ok = sl.Appender.(*timeLimitAppender)
|
|
require.True(t, ok, "Expected limitAppender but got %T", sl.Appender)
|
|
|
|
_, ok = tl.Appender.(nopAppender)
|
|
require.True(t, ok, "Expected base appender but got %T", tl.Appender)
|
|
}
|
|
|
|
func TestScrapePoolRaces(t *testing.T) {
|
|
interval, _ := model.ParseDuration("500ms")
|
|
timeout, _ := model.ParseDuration("1s")
|
|
newConfig := func() *config.ScrapeConfig {
|
|
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
|
|
}
|
|
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil)
|
|
tgts := []*targetgroup.Group{
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{model.AddressLabel: "127.0.0.1:9090"},
|
|
{model.AddressLabel: "127.0.0.2:9090"},
|
|
{model.AddressLabel: "127.0.0.3:9090"},
|
|
{model.AddressLabel: "127.0.0.4:9090"},
|
|
{model.AddressLabel: "127.0.0.5:9090"},
|
|
{model.AddressLabel: "127.0.0.6:9090"},
|
|
{model.AddressLabel: "127.0.0.7:9090"},
|
|
{model.AddressLabel: "127.0.0.8:9090"},
|
|
},
|
|
},
|
|
}
|
|
|
|
sp.Sync(tgts)
|
|
active := sp.ActiveTargets()
|
|
dropped := sp.DroppedTargets()
|
|
expectedActive, expectedDropped := len(tgts[0].Targets), 0
|
|
|
|
require.Equal(t, expectedActive, len(active), "Invalid number of active targets")
|
|
require.Equal(t, expectedDropped, len(dropped), "Invalid number of dropped targets")
|
|
|
|
for i := 0; i < 20; i++ {
|
|
time.Sleep(time.Duration(10 * time.Millisecond))
|
|
sp.reload(newConfig())
|
|
}
|
|
sp.stop()
|
|
}
|
|
|
|
func TestScrapePoolScrapeLoopsStarted(t *testing.T) {
|
|
var wg sync.WaitGroup
|
|
newLoop := func(opts scrapeLoopOptions) loop {
|
|
wg.Add(1)
|
|
l := &testLoop{
|
|
startFunc: func(interval, timeout time.Duration, errc chan<- error) {
|
|
wg.Done()
|
|
},
|
|
stopFunc: func() {},
|
|
}
|
|
return l
|
|
}
|
|
sp := &scrapePool{
|
|
appendable: &nopAppendable{},
|
|
activeTargets: map[uint64]*Target{},
|
|
loops: map[uint64]loop{},
|
|
newLoop: newLoop,
|
|
logger: nil,
|
|
client: http.DefaultClient,
|
|
}
|
|
|
|
tgs := []*targetgroup.Group{
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{model.AddressLabel: model.LabelValue("127.0.0.1:9090")},
|
|
},
|
|
},
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{model.AddressLabel: model.LabelValue("127.0.0.1:9090")},
|
|
},
|
|
},
|
|
}
|
|
|
|
require.NoError(t, sp.reload(&config.ScrapeConfig{
|
|
ScrapeInterval: model.Duration(3 * time.Second),
|
|
ScrapeTimeout: model.Duration(2 * time.Second),
|
|
}))
|
|
sp.Sync(tgs)
|
|
|
|
require.Equal(t, 1, len(sp.loops))
|
|
|
|
wg.Wait()
|
|
for _, l := range sp.loops {
|
|
require.True(t, l.(*testLoop).runOnce, "loop should be running")
|
|
}
|
|
}
|
|
|
|
func TestScrapeLoopStopBeforeRun(t *testing.T) {
|
|
scraper := &testScraper{}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
nil, nil, 0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
|
|
// loops are started asynchronously. Thus it's possible, that a loop is stopped
|
|
// again before having started properly.
|
|
// Stopping not-yet-started loops must block until the run method was called and exited.
|
|
// The run method must exit immediately.
|
|
|
|
stopDone := make(chan struct{})
|
|
go func() {
|
|
sl.stop()
|
|
close(stopDone)
|
|
}()
|
|
|
|
select {
|
|
case <-stopDone:
|
|
t.Fatalf("Stopping terminated before run exited successfully")
|
|
case <-time.After(500 * time.Millisecond):
|
|
}
|
|
|
|
// Running the scrape loop must exit before calling the scraper even once.
|
|
scraper.scrapeFunc = func(context.Context, io.Writer) error {
|
|
t.Fatalf("scraper was called for terminated scrape loop")
|
|
return nil
|
|
}
|
|
|
|
runDone := make(chan struct{})
|
|
go func() {
|
|
sl.run(1, 0, nil)
|
|
close(runDone)
|
|
}()
|
|
|
|
select {
|
|
case <-runDone:
|
|
case <-time.After(1 * time.Second):
|
|
t.Fatalf("Running terminated scrape loop did not exit")
|
|
}
|
|
|
|
select {
|
|
case <-stopDone:
|
|
case <-time.After(1 * time.Second):
|
|
t.Fatalf("Stopping did not terminate after running exited")
|
|
}
|
|
}
|
|
|
|
func nopMutator(l labels.Labels) labels.Labels { return l }
|
|
|
|
func TestScrapeLoopStop(t *testing.T) {
|
|
var (
|
|
signal = make(chan struct{}, 1)
|
|
appender = &collectResultAppender{}
|
|
scraper = &testScraper{}
|
|
app = func(ctx context.Context) storage.Appender { return appender }
|
|
)
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
app,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
// Terminate loop after 2 scrapes.
|
|
numScrapes := 0
|
|
|
|
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
|
numScrapes++
|
|
if numScrapes == 2 {
|
|
go sl.stop()
|
|
<-sl.ctx.Done()
|
|
}
|
|
w.Write([]byte("metric_a 42\n"))
|
|
return ctx.Err()
|
|
}
|
|
|
|
go func() {
|
|
sl.run(10*time.Millisecond, time.Hour, nil)
|
|
signal <- struct{}{}
|
|
}()
|
|
|
|
select {
|
|
case <-signal:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Scrape wasn't stopped.")
|
|
}
|
|
|
|
// We expected 1 actual sample for each scrape plus 5 for report samples.
|
|
// At least 2 scrapes were made, plus the final stale markers.
|
|
if len(appender.result) < 6*3 || len(appender.result)%6 != 0 {
|
|
t.Fatalf("Expected at least 3 scrapes with 6 samples each, got %d samples", len(appender.result))
|
|
}
|
|
// All samples in a scrape must have the same timestamp.
|
|
var ts int64
|
|
for i, s := range appender.result {
|
|
if i%6 == 0 {
|
|
ts = s.t
|
|
} else if s.t != ts {
|
|
t.Fatalf("Unexpected multiple timestamps within single scrape")
|
|
}
|
|
}
|
|
// All samples from the last scrape must be stale markers.
|
|
for _, s := range appender.result[len(appender.result)-5:] {
|
|
if !value.IsStaleNaN(s.v) {
|
|
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.v))
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestScrapeLoopRun(t *testing.T) {
|
|
var (
|
|
signal = make(chan struct{}, 1)
|
|
errc = make(chan error)
|
|
|
|
scraper = &testScraper{}
|
|
app = func(ctx context.Context) storage.Appender { return &nopAppender{} }
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
app,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
// The loop must terminate during the initial offset if the context
|
|
// is canceled.
|
|
scraper.offsetDur = time.Hour
|
|
|
|
go func() {
|
|
sl.run(time.Second, time.Hour, errc)
|
|
signal <- struct{}{}
|
|
}()
|
|
|
|
// Wait to make sure we are actually waiting on the offset.
|
|
time.Sleep(1 * time.Second)
|
|
|
|
cancel()
|
|
select {
|
|
case <-signal:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Cancellation during initial offset failed")
|
|
case err := <-errc:
|
|
t.Fatalf("Unexpected error: %s", err)
|
|
}
|
|
|
|
// The provided timeout must cause cancellation of the context passed down to the
|
|
// scraper. The scraper has to respect the context.
|
|
scraper.offsetDur = 0
|
|
|
|
block := make(chan struct{})
|
|
scraper.scrapeFunc = func(ctx context.Context, _ io.Writer) error {
|
|
select {
|
|
case <-block:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
sl = newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
app,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
go func() {
|
|
sl.run(time.Second, 100*time.Millisecond, errc)
|
|
signal <- struct{}{}
|
|
}()
|
|
|
|
select {
|
|
case err := <-errc:
|
|
if err != context.DeadlineExceeded {
|
|
t.Fatalf("Expected timeout error but got: %s", err)
|
|
}
|
|
case <-time.After(3 * time.Second):
|
|
t.Fatalf("Expected timeout error but got none")
|
|
}
|
|
|
|
// We already caught the timeout error and are certainly in the loop.
|
|
// Let the scrapes returns immediately to cause no further timeout errors
|
|
// and check whether canceling the parent context terminates the loop.
|
|
close(block)
|
|
cancel()
|
|
|
|
select {
|
|
case <-signal:
|
|
// Loop terminated as expected.
|
|
case err := <-errc:
|
|
t.Fatalf("Unexpected error: %s", err)
|
|
case <-time.After(3 * time.Second):
|
|
t.Fatalf("Loop did not terminate on context cancellation")
|
|
}
|
|
}
|
|
|
|
func TestScrapeLoopForcedErr(t *testing.T) {
|
|
var (
|
|
signal = make(chan struct{}, 1)
|
|
errc = make(chan error)
|
|
|
|
scraper = &testScraper{}
|
|
app = func(ctx context.Context) storage.Appender { return &nopAppender{} }
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
app,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
forcedErr := fmt.Errorf("forced err")
|
|
sl.setForcedError(forcedErr)
|
|
|
|
scraper.scrapeFunc = func(context.Context, io.Writer) error {
|
|
t.Fatalf("should not be scraped")
|
|
return nil
|
|
}
|
|
|
|
go func() {
|
|
sl.run(time.Second, time.Hour, errc)
|
|
signal <- struct{}{}
|
|
}()
|
|
|
|
select {
|
|
case err := <-errc:
|
|
if err != forcedErr {
|
|
t.Fatalf("Expected forced error but got: %s", err)
|
|
}
|
|
case <-time.After(3 * time.Second):
|
|
t.Fatalf("Expected forced error but got none")
|
|
}
|
|
cancel()
|
|
|
|
select {
|
|
case <-signal:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Scrape not stopped")
|
|
}
|
|
}
|
|
|
|
func TestScrapeLoopMetadata(t *testing.T) {
|
|
var (
|
|
signal = make(chan struct{})
|
|
scraper = &testScraper{}
|
|
cache = newScrapeCache()
|
|
)
|
|
defer close(signal)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender { return nopAppender{} },
|
|
cache,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
defer cancel()
|
|
|
|
slApp := sl.appender(ctx)
|
|
total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter
|
|
# HELP test_metric some help text
|
|
# UNIT test_metric metric
|
|
test_metric 1
|
|
# TYPE test_metric_no_help gauge
|
|
# HELP test_metric_no_type other help text
|
|
# EOF`), "application/openmetrics-text", time.Now())
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
require.Equal(t, 1, total)
|
|
|
|
md, ok := cache.GetMetadata("test_metric")
|
|
require.True(t, ok, "expected metadata to be present")
|
|
require.Equal(t, textparse.MetricTypeCounter, md.Type, "unexpected metric type")
|
|
require.Equal(t, "some help text", md.Help)
|
|
require.Equal(t, "metric", md.Unit)
|
|
|
|
md, ok = cache.GetMetadata("test_metric_no_help")
|
|
require.True(t, ok, "expected metadata to be present")
|
|
require.Equal(t, textparse.MetricTypeGauge, md.Type, "unexpected metric type")
|
|
require.Equal(t, "", md.Help)
|
|
require.Equal(t, "", md.Unit)
|
|
|
|
md, ok = cache.GetMetadata("test_metric_no_type")
|
|
require.True(t, ok, "expected metadata to be present")
|
|
require.Equal(t, textparse.MetricTypeUnknown, md.Type, "unexpected metric type")
|
|
require.Equal(t, "other help text", md.Help)
|
|
require.Equal(t, "", md.Unit)
|
|
}
|
|
|
|
func TestScrapeLoopSeriesAdded(t *testing.T) {
|
|
// Need a full storage for correct Add/AddFast semantics.
|
|
s := teststorage.New(t)
|
|
defer s.Close()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
&testScraper{},
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
s.Appender,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
defer cancel()
|
|
|
|
slApp := sl.appender(ctx)
|
|
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
require.Equal(t, 1, total)
|
|
require.Equal(t, 1, added)
|
|
require.Equal(t, 1, seriesAdded)
|
|
|
|
slApp = sl.appender(ctx)
|
|
total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
|
|
require.NoError(t, slApp.Commit())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, total)
|
|
require.Equal(t, 1, added)
|
|
require.Equal(t, 0, seriesAdded)
|
|
}
|
|
|
|
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
|
|
appender := &collectResultAppender{}
|
|
var (
|
|
signal = make(chan struct{}, 1)
|
|
scraper = &testScraper{}
|
|
app = func(ctx context.Context) storage.Appender { return appender }
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
app,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
// Succeed once, several failures, then stop.
|
|
numScrapes := 0
|
|
|
|
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
|
numScrapes++
|
|
|
|
if numScrapes == 1 {
|
|
w.Write([]byte("metric_a 42\n"))
|
|
return nil
|
|
} else if numScrapes == 5 {
|
|
cancel()
|
|
}
|
|
return errors.New("scrape failed")
|
|
}
|
|
|
|
go func() {
|
|
sl.run(10*time.Millisecond, time.Hour, nil)
|
|
signal <- struct{}{}
|
|
}()
|
|
|
|
select {
|
|
case <-signal:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Scrape wasn't stopped.")
|
|
}
|
|
|
|
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
|
|
// each scrape successful or not.
|
|
require.Equal(t, 27, len(appender.result), "Appended samples not as expected")
|
|
require.Equal(t, 42.0, appender.result[0].v, "Appended first sample not as expected")
|
|
require.True(t, value.IsStaleNaN(appender.result[6].v),
|
|
"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[6].v))
|
|
}
|
|
|
|
func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
|
|
appender := &collectResultAppender{}
|
|
var (
|
|
signal = make(chan struct{}, 1)
|
|
scraper = &testScraper{}
|
|
app = func(ctx context.Context) storage.Appender { return appender }
|
|
numScrapes = 0
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
app,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
// Succeed once, several failures, then stop.
|
|
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
|
numScrapes++
|
|
|
|
if numScrapes == 1 {
|
|
w.Write([]byte("metric_a 42\n"))
|
|
return nil
|
|
} else if numScrapes == 2 {
|
|
w.Write([]byte("7&-\n"))
|
|
return nil
|
|
} else if numScrapes == 3 {
|
|
cancel()
|
|
}
|
|
return errors.New("scrape failed")
|
|
}
|
|
|
|
go func() {
|
|
sl.run(10*time.Millisecond, time.Hour, nil)
|
|
signal <- struct{}{}
|
|
}()
|
|
|
|
select {
|
|
case <-signal:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Scrape wasn't stopped.")
|
|
}
|
|
|
|
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
|
|
// each scrape successful or not.
|
|
require.Equal(t, 17, len(appender.result), "Appended samples not as expected")
|
|
require.Equal(t, 42.0, appender.result[0].v, "Appended first sample not as expected")
|
|
require.True(t, value.IsStaleNaN(appender.result[6].v),
|
|
"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[6].v))
|
|
}
|
|
|
|
func TestScrapeLoopCache(t *testing.T) {
|
|
s := teststorage.New(t)
|
|
defer s.Close()
|
|
|
|
appender := &collectResultAppender{}
|
|
var (
|
|
signal = make(chan struct{}, 1)
|
|
scraper = &testScraper{}
|
|
app = func(ctx context.Context) storage.Appender { appender.next = s.Appender(ctx); return appender }
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
app,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
numScrapes := 0
|
|
|
|
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
|
if numScrapes == 1 || numScrapes == 2 {
|
|
if _, ok := sl.cache.series["metric_a"]; !ok {
|
|
t.Errorf("metric_a missing from cache after scrape %d", numScrapes)
|
|
}
|
|
if _, ok := sl.cache.series["metric_b"]; !ok {
|
|
t.Errorf("metric_b missing from cache after scrape %d", numScrapes)
|
|
}
|
|
} else if numScrapes == 3 {
|
|
if _, ok := sl.cache.series["metric_a"]; !ok {
|
|
t.Errorf("metric_a missing from cache after scrape %d", numScrapes)
|
|
}
|
|
if _, ok := sl.cache.series["metric_b"]; ok {
|
|
t.Errorf("metric_b present in cache after scrape %d", numScrapes)
|
|
}
|
|
}
|
|
|
|
numScrapes++
|
|
|
|
if numScrapes == 1 {
|
|
w.Write([]byte("metric_a 42\nmetric_b 43\n"))
|
|
return nil
|
|
} else if numScrapes == 3 {
|
|
w.Write([]byte("metric_a 44\n"))
|
|
return nil
|
|
} else if numScrapes == 4 {
|
|
cancel()
|
|
}
|
|
return fmt.Errorf("scrape failed")
|
|
}
|
|
|
|
go func() {
|
|
sl.run(10*time.Millisecond, time.Hour, nil)
|
|
signal <- struct{}{}
|
|
}()
|
|
|
|
select {
|
|
case <-signal:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Scrape wasn't stopped.")
|
|
}
|
|
|
|
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
|
|
// each scrape successful or not.
|
|
require.Equal(t, 26, len(appender.result), "Appended samples not as expected")
|
|
}
|
|
|
|
func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
|
|
s := teststorage.New(t)
|
|
defer s.Close()
|
|
|
|
sapp := s.Appender(context.Background())
|
|
|
|
appender := &collectResultAppender{next: sapp}
|
|
var (
|
|
signal = make(chan struct{}, 1)
|
|
scraper = &testScraper{}
|
|
app = func(ctx context.Context) storage.Appender { return appender }
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
app,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
numScrapes := 0
|
|
|
|
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
|
numScrapes++
|
|
if numScrapes < 5 {
|
|
s := ""
|
|
for i := 0; i < 500; i++ {
|
|
s = fmt.Sprintf("%smetric_%d_%d 42\n", s, i, numScrapes)
|
|
}
|
|
w.Write([]byte(fmt.Sprintf(s + "&")))
|
|
} else {
|
|
cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
go func() {
|
|
sl.run(10*time.Millisecond, time.Hour, nil)
|
|
signal <- struct{}{}
|
|
}()
|
|
|
|
select {
|
|
case <-signal:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Scrape wasn't stopped.")
|
|
}
|
|
|
|
if len(sl.cache.series) > 2000 {
|
|
t.Fatalf("More than 2000 series cached. Got: %d", len(sl.cache.series))
|
|
}
|
|
}
|
|
|
|
func TestScrapeLoopAppend(t *testing.T) {
|
|
tests := []struct {
|
|
title string
|
|
honorLabels bool
|
|
scrapeLabels string
|
|
discoveryLabels []string
|
|
expLset labels.Labels
|
|
expValue float64
|
|
}{
|
|
{
|
|
// When "honor_labels" is not set
|
|
// label name collision is handler by adding a prefix.
|
|
title: "Label name collision",
|
|
honorLabels: false,
|
|
scrapeLabels: `metric{n="1"} 0`,
|
|
discoveryLabels: []string{"n", "2"},
|
|
expLset: labels.FromStrings("__name__", "metric", "exported_n", "1", "n", "2"),
|
|
expValue: 0,
|
|
}, {
|
|
// When "honor_labels" is not set
|
|
// exported label from discovery don't get overwritten
|
|
title: "Label name collision",
|
|
honorLabels: false,
|
|
scrapeLabels: `metric 0`,
|
|
discoveryLabels: []string{"n", "2", "exported_n", "2"},
|
|
expLset: labels.FromStrings("__name__", "metric", "n", "2", "exported_n", "2"),
|
|
expValue: 0,
|
|
}, {
|
|
// Labels with no value need to be removed as these should not be ingested.
|
|
title: "Delete Empty labels",
|
|
honorLabels: false,
|
|
scrapeLabels: `metric{n=""} 0`,
|
|
discoveryLabels: nil,
|
|
expLset: labels.FromStrings("__name__", "metric"),
|
|
expValue: 0,
|
|
}, {
|
|
// Honor Labels should ignore labels with the same name.
|
|
title: "Honor Labels",
|
|
honorLabels: true,
|
|
scrapeLabels: `metric{n1="1" n2="2"} 0`,
|
|
discoveryLabels: []string{"n1", "0"},
|
|
expLset: labels.FromStrings("__name__", "metric", "n1", "1", "n2", "2"),
|
|
expValue: 0,
|
|
}, {
|
|
title: "Stale - NaN",
|
|
honorLabels: false,
|
|
scrapeLabels: `metric NaN`,
|
|
discoveryLabels: nil,
|
|
expLset: labels.FromStrings("__name__", "metric"),
|
|
expValue: float64(value.NormalNaN),
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
app := &collectResultAppender{}
|
|
|
|
discoveryLabels := &Target{
|
|
labels: labels.FromStrings(test.discoveryLabels...),
|
|
}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
func(l labels.Labels) labels.Labels {
|
|
return mutateSampleLabels(l, discoveryLabels, test.honorLabels, nil)
|
|
},
|
|
func(l labels.Labels) labels.Labels {
|
|
return mutateReportSampleLabels(l, discoveryLabels)
|
|
},
|
|
func(ctx context.Context) storage.Appender { return app },
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
now := time.Now()
|
|
|
|
slApp := sl.appender(context.Background())
|
|
_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now)
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
expected := []sample{
|
|
{
|
|
metric: test.expLset,
|
|
t: timestamp.FromTime(now),
|
|
v: test.expValue,
|
|
},
|
|
}
|
|
|
|
// When the expected value is NaN
|
|
// DeepEqual will report NaNs as being different,
|
|
// so replace it with the expected one.
|
|
if test.expValue == float64(value.NormalNaN) {
|
|
app.result[0].v = expected[0].v
|
|
}
|
|
|
|
t.Logf("Test:%s", test.title)
|
|
require.Equal(t, expected, app.result)
|
|
}
|
|
}
|
|
|
|
func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
|
|
// collectResultAppender's AddFast always returns ErrNotFound if we don't give it a next.
|
|
app := &collectResultAppender{}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender { return app },
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
fakeRef := uint64(1)
|
|
expValue := float64(1)
|
|
metric := `metric{n="1"} 1`
|
|
p := textparse.New([]byte(metric), "")
|
|
|
|
var lset labels.Labels
|
|
p.Next()
|
|
mets := p.Metric(&lset)
|
|
hash := lset.Hash()
|
|
|
|
// Create a fake entry in the cache
|
|
sl.cache.addRef(mets, fakeRef, lset, hash)
|
|
now := time.Now()
|
|
|
|
slApp := sl.appender(context.Background())
|
|
_, _, _, err := sl.append(slApp, []byte(metric), "", now)
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
expected := []sample{
|
|
{
|
|
metric: lset,
|
|
t: timestamp.FromTime(now),
|
|
v: expValue,
|
|
},
|
|
}
|
|
|
|
require.Equal(t, expected, app.result)
|
|
}
|
|
|
|
func TestScrapeLoopAppendSampleLimit(t *testing.T) {
|
|
resApp := &collectResultAppender{}
|
|
app := &limitAppender{Appender: resApp, limit: 1}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
func(l labels.Labels) labels.Labels {
|
|
if l.Has("deleteme") {
|
|
return nil
|
|
}
|
|
return l
|
|
},
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender { return app },
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
// Get the value of the Counter before performing the append.
|
|
beforeMetric := dto.Metric{}
|
|
err := targetScrapeSampleLimit.Write(&beforeMetric)
|
|
require.NoError(t, err)
|
|
|
|
beforeMetricValue := beforeMetric.GetCounter().GetValue()
|
|
|
|
now := time.Now()
|
|
slApp := sl.appender(context.Background())
|
|
total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
|
|
if err != errSampleLimit {
|
|
t.Fatalf("Did not see expected sample limit error: %s", err)
|
|
}
|
|
require.NoError(t, slApp.Rollback())
|
|
require.Equal(t, 3, total)
|
|
require.Equal(t, 3, added)
|
|
require.Equal(t, 1, seriesAdded)
|
|
|
|
// Check that the Counter has been incremented a single time for the scrape,
|
|
// not multiple times for each sample.
|
|
metric := dto.Metric{}
|
|
err = targetScrapeSampleLimit.Write(&metric)
|
|
require.NoError(t, err)
|
|
|
|
value := metric.GetCounter().GetValue()
|
|
change := value - beforeMetricValue
|
|
require.Equal(t, 1.0, change, "Unexpected change of sample limit metric: %f", change)
|
|
|
|
// And verify that we got the samples that fit under the limit.
|
|
want := []sample{
|
|
{
|
|
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
|
|
t: timestamp.FromTime(now),
|
|
v: 1,
|
|
},
|
|
}
|
|
require.Equal(t, want, resApp.rolledbackResult, "Appended samples not as expected")
|
|
|
|
now = time.Now()
|
|
slApp = sl.appender(context.Background())
|
|
total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now)
|
|
if err != errSampleLimit {
|
|
t.Fatalf("Did not see expected sample limit error: %s", err)
|
|
}
|
|
require.NoError(t, slApp.Rollback())
|
|
require.Equal(t, 9, total)
|
|
require.Equal(t, 6, added)
|
|
require.Equal(t, 0, seriesAdded)
|
|
}
|
|
|
|
func TestScrapeLoop_ChangingMetricString(t *testing.T) {
|
|
// This is a regression test for the scrape loop cache not properly maintaining
|
|
// IDs when the string representation of a metric changes across a scrape. Thus
|
|
// we use a real storage appender here.
|
|
s := teststorage.New(t)
|
|
defer s.Close()
|
|
|
|
capp := &collectResultAppender{}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender { capp.next = s.Appender(ctx); return capp },
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
now := time.Now()
|
|
slApp := sl.appender(context.Background())
|
|
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now)
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
slApp = sl.appender(context.Background())
|
|
_, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
// DeepEqual will report NaNs as being different, so replace with a different value.
|
|
want := []sample{
|
|
{
|
|
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
|
|
t: timestamp.FromTime(now),
|
|
v: 1,
|
|
},
|
|
{
|
|
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
|
|
t: timestamp.FromTime(now.Add(time.Minute)),
|
|
v: 2,
|
|
},
|
|
}
|
|
require.Equal(t, want, capp.result, "Appended samples not as expected")
|
|
}
|
|
|
|
func TestScrapeLoopAppendStaleness(t *testing.T) {
|
|
app := &collectResultAppender{}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender { return app },
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
now := time.Now()
|
|
slApp := sl.appender(context.Background())
|
|
_, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now)
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
slApp = sl.appender(context.Background())
|
|
_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
ingestedNaN := math.Float64bits(app.result[1].v)
|
|
require.Equal(t, value.StaleNaN, ingestedNaN, "Appended stale sample wasn't as expected")
|
|
|
|
// DeepEqual will report NaNs as being different, so replace with a different value.
|
|
app.result[1].v = 42
|
|
want := []sample{
|
|
{
|
|
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
|
|
t: timestamp.FromTime(now),
|
|
v: 1,
|
|
},
|
|
{
|
|
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
|
|
t: timestamp.FromTime(now.Add(time.Second)),
|
|
v: 42,
|
|
},
|
|
}
|
|
require.Equal(t, want, app.result, "Appended samples not as expected")
|
|
}
|
|
|
|
func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
|
|
app := &collectResultAppender{}
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender { return app },
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
now := time.Now()
|
|
slApp := sl.appender(context.Background())
|
|
_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now)
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
slApp = sl.appender(context.Background())
|
|
_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
want := []sample{
|
|
{
|
|
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
|
|
t: 1000,
|
|
v: 1,
|
|
},
|
|
}
|
|
require.Equal(t, want, app.result, "Appended samples not as expected")
|
|
}
|
|
|
|
func TestScrapeLoopAppendExemplar(t *testing.T) {
|
|
tests := []struct {
|
|
title string
|
|
scrapeText string
|
|
discoveryLabels []string
|
|
samples []sample
|
|
exemplars []exemplar.Exemplar
|
|
}{
|
|
{
|
|
title: "Metric without exemplars",
|
|
scrapeText: "metric_total{n=\"1\"} 0\n# EOF",
|
|
discoveryLabels: []string{"n", "2"},
|
|
samples: []sample{{
|
|
metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
|
|
v: 0,
|
|
}},
|
|
},
|
|
{
|
|
title: "Metric with exemplars",
|
|
scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0\n# EOF",
|
|
discoveryLabels: []string{"n", "2"},
|
|
samples: []sample{{
|
|
metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
|
|
v: 0,
|
|
}},
|
|
exemplars: []exemplar.Exemplar{
|
|
{Labels: labels.FromStrings("a", "abc"), Value: 1},
|
|
},
|
|
}, {
|
|
title: "Metric with exemplars and TS",
|
|
scrapeText: "metric_total{n=\"1\"} 0 # {a=\"abc\"} 1.0 10000\n# EOF",
|
|
discoveryLabels: []string{"n", "2"},
|
|
samples: []sample{{
|
|
metric: labels.FromStrings("__name__", "metric_total", "exported_n", "1", "n", "2"),
|
|
v: 0,
|
|
}},
|
|
exemplars: []exemplar.Exemplar{
|
|
{Labels: labels.FromStrings("a", "abc"), Value: 1, Ts: 10000000, HasTs: true},
|
|
},
|
|
}, {
|
|
title: "Two metrics and exemplars",
|
|
scrapeText: `metric_total{n="1"} 1 # {t="1"} 1.0 10000
|
|
metric_total{n="2"} 2 # {t="2"} 2.0 20000
|
|
# EOF`,
|
|
samples: []sample{{
|
|
metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
|
|
v: 1,
|
|
}, {
|
|
metric: labels.FromStrings("__name__", "metric_total", "n", "2"),
|
|
v: 2,
|
|
}},
|
|
exemplars: []exemplar.Exemplar{
|
|
{Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true},
|
|
{Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.title, func(t *testing.T) {
|
|
app := &collectResultAppender{}
|
|
|
|
discoveryLabels := &Target{
|
|
labels: labels.FromStrings(test.discoveryLabels...),
|
|
}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
func(l labels.Labels) labels.Labels {
|
|
return mutateSampleLabels(l, discoveryLabels, false, nil)
|
|
},
|
|
func(l labels.Labels) labels.Labels {
|
|
return mutateReportSampleLabels(l, discoveryLabels)
|
|
},
|
|
func(ctx context.Context) storage.Appender { return app },
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
now := time.Now()
|
|
|
|
for i := range test.samples {
|
|
test.samples[i].t = timestamp.FromTime(now)
|
|
}
|
|
|
|
// We need to set the timestamp for expected exemplars that does not have a timestamp.
|
|
for i := range test.exemplars {
|
|
if test.exemplars[i].Ts == 0 {
|
|
test.exemplars[i].Ts = timestamp.FromTime(now)
|
|
}
|
|
}
|
|
|
|
_, _, _, err := sl.append(app, []byte(test.scrapeText), "application/openmetrics-text", now)
|
|
require.NoError(t, err)
|
|
require.NoError(t, app.Commit())
|
|
require.Equal(t, test.samples, app.result)
|
|
require.Equal(t, test.exemplars, app.resultExemplars)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
|
|
scrapeText := []string{`metric_total{n="1"} 1 # {t="1"} 1.0 10000
|
|
# EOF`, `metric_total{n="1"} 2 # {t="2"} 2.0 20000
|
|
# EOF`}
|
|
samples := []sample{{
|
|
metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
|
|
v: 1,
|
|
}, {
|
|
metric: labels.FromStrings("__name__", "metric_total", "n", "1"),
|
|
v: 2,
|
|
}}
|
|
exemplars := []exemplar.Exemplar{
|
|
{Labels: labels.FromStrings("t", "1"), Value: 1, Ts: 10000000, HasTs: true},
|
|
{Labels: labels.FromStrings("t", "2"), Value: 2, Ts: 20000000, HasTs: true},
|
|
}
|
|
discoveryLabels := &Target{
|
|
labels: labels.FromStrings(),
|
|
}
|
|
|
|
app := &collectResultAppender{}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
func(l labels.Labels) labels.Labels {
|
|
return mutateSampleLabels(l, discoveryLabels, false, nil)
|
|
},
|
|
func(l labels.Labels) labels.Labels {
|
|
return mutateReportSampleLabels(l, discoveryLabels)
|
|
},
|
|
func(ctx context.Context) storage.Appender { return app },
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
now := time.Now()
|
|
|
|
for i := range samples {
|
|
ts := now.Add(time.Second * time.Duration(i))
|
|
samples[i].t = timestamp.FromTime(ts)
|
|
}
|
|
|
|
// We need to set the timestamp for expected exemplars that does not have a timestamp.
|
|
for i := range exemplars {
|
|
if exemplars[i].Ts == 0 {
|
|
ts := now.Add(time.Second * time.Duration(i))
|
|
exemplars[i].Ts = timestamp.FromTime(ts)
|
|
}
|
|
}
|
|
|
|
for i, st := range scrapeText {
|
|
_, _, _, err := sl.append(app, []byte(st), "application/openmetrics-text", timestamp.Time(samples[i].t))
|
|
require.NoError(t, err)
|
|
require.NoError(t, app.Commit())
|
|
}
|
|
|
|
require.Equal(t, samples, app.result)
|
|
require.Equal(t, exemplars, app.resultExemplars)
|
|
}
|
|
|
|
func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
|
|
var (
|
|
scraper = &testScraper{}
|
|
appender = &collectResultAppender{}
|
|
app = func(ctx context.Context) storage.Appender { return appender }
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
app,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
|
cancel()
|
|
return errors.New("scrape failed")
|
|
}
|
|
|
|
sl.run(10*time.Millisecond, time.Hour, nil)
|
|
require.Equal(t, 0.0, appender.result[0].v, "bad 'up' value")
|
|
}
|
|
|
|
func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
|
|
var (
|
|
scraper = &testScraper{}
|
|
appender = &collectResultAppender{}
|
|
app = func(ctx context.Context) storage.Appender { return appender }
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
app,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
|
cancel()
|
|
w.Write([]byte("a{l=\"\xff\"} 1\n"))
|
|
return nil
|
|
}
|
|
|
|
sl.run(10*time.Millisecond, time.Hour, nil)
|
|
require.Equal(t, 0.0, appender.result[0].v, "bad 'up' value")
|
|
}
|
|
|
|
type errorAppender struct {
|
|
collectResultAppender
|
|
}
|
|
|
|
func (app *errorAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
|
switch lset.Get(model.MetricNameLabel) {
|
|
case "out_of_order":
|
|
return 0, storage.ErrOutOfOrderSample
|
|
case "amend":
|
|
return 0, storage.ErrDuplicateSampleForTimestamp
|
|
case "out_of_bounds":
|
|
return 0, storage.ErrOutOfBounds
|
|
default:
|
|
return app.collectResultAppender.Append(ref, lset, t, v)
|
|
}
|
|
}
|
|
|
|
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
|
|
app := &errorAppender{}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender { return app },
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
now := time.Unix(1, 0)
|
|
slApp := sl.appender(context.Background())
|
|
total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
want := []sample{
|
|
{
|
|
metric: labels.FromStrings(model.MetricNameLabel, "normal"),
|
|
t: timestamp.FromTime(now),
|
|
v: 1,
|
|
},
|
|
}
|
|
require.Equal(t, want, app.result, "Appended samples not as expected")
|
|
require.Equal(t, 4, total)
|
|
require.Equal(t, 4, added)
|
|
require.Equal(t, 1, seriesAdded)
|
|
}
|
|
|
|
func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
|
|
app := &collectResultAppender{}
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender {
|
|
return &timeLimitAppender{
|
|
Appender: app,
|
|
maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)),
|
|
}
|
|
},
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
now := time.Now().Add(20 * time.Minute)
|
|
slApp := sl.appender(context.Background())
|
|
total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now)
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
require.Equal(t, 1, total)
|
|
require.Equal(t, 1, added)
|
|
require.Equal(t, 0, seriesAdded)
|
|
|
|
}
|
|
|
|
func TestTargetScraperScrapeOK(t *testing.T) {
|
|
const (
|
|
configTimeout = 1500 * time.Millisecond
|
|
expectedTimeout = "1.500000"
|
|
)
|
|
|
|
server := httptest.NewServer(
|
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
accept := r.Header.Get("Accept")
|
|
if !strings.HasPrefix(accept, "application/openmetrics-text;") {
|
|
t.Errorf("Expected Accept header to prefer application/openmetrics-text, got %q", accept)
|
|
}
|
|
|
|
timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
|
|
if timeout != expectedTimeout {
|
|
t.Errorf("Expected scrape timeout header %q, got %q", expectedTimeout, timeout)
|
|
}
|
|
|
|
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
|
|
w.Write([]byte("metric_a 1\nmetric_b 2\n"))
|
|
}),
|
|
)
|
|
defer server.Close()
|
|
|
|
serverURL, err := url.Parse(server.URL)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ts := &targetScraper{
|
|
Target: &Target{
|
|
labels: labels.FromStrings(
|
|
model.SchemeLabel, serverURL.Scheme,
|
|
model.AddressLabel, serverURL.Host,
|
|
),
|
|
},
|
|
client: http.DefaultClient,
|
|
timeout: configTimeout,
|
|
}
|
|
var buf bytes.Buffer
|
|
|
|
contentType, err := ts.scrape(context.Background(), &buf)
|
|
require.NoError(t, err)
|
|
require.Equal(t, "text/plain; version=0.0.4", contentType)
|
|
require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
|
|
}
|
|
|
|
func TestTargetScrapeScrapeCancel(t *testing.T) {
|
|
block := make(chan struct{})
|
|
|
|
server := httptest.NewServer(
|
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
<-block
|
|
}),
|
|
)
|
|
defer server.Close()
|
|
|
|
serverURL, err := url.Parse(server.URL)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ts := &targetScraper{
|
|
Target: &Target{
|
|
labels: labels.FromStrings(
|
|
model.SchemeLabel, serverURL.Scheme,
|
|
model.AddressLabel, serverURL.Host,
|
|
),
|
|
},
|
|
client: http.DefaultClient,
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
errc := make(chan error, 1)
|
|
|
|
go func() {
|
|
time.Sleep(1 * time.Second)
|
|
cancel()
|
|
}()
|
|
|
|
go func() {
|
|
_, err := ts.scrape(ctx, ioutil.Discard)
|
|
if err == nil {
|
|
errc <- errors.New("Expected error but got nil")
|
|
} else if ctx.Err() != context.Canceled {
|
|
errc <- errors.Errorf("Expected context cancellation error but got: %s", ctx.Err())
|
|
} else {
|
|
close(errc)
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Scrape function did not return unexpectedly")
|
|
case err := <-errc:
|
|
require.NoError(t, err)
|
|
}
|
|
// If this is closed in a defer above the function the test server
|
|
// doesn't terminate and the test doesn't complete.
|
|
close(block)
|
|
}
|
|
|
|
func TestTargetScrapeScrapeNotFound(t *testing.T) {
|
|
server := httptest.NewServer(
|
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
}),
|
|
)
|
|
defer server.Close()
|
|
|
|
serverURL, err := url.Parse(server.URL)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ts := &targetScraper{
|
|
Target: &Target{
|
|
labels: labels.FromStrings(
|
|
model.SchemeLabel, serverURL.Scheme,
|
|
model.AddressLabel, serverURL.Host,
|
|
),
|
|
},
|
|
client: http.DefaultClient,
|
|
}
|
|
|
|
_, err = ts.scrape(context.Background(), ioutil.Discard)
|
|
require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err)
|
|
}
|
|
|
|
// testScraper implements the scraper interface and allows setting values
|
|
// returned by its methods. It also allows setting a custom scrape function.
|
|
type testScraper struct {
|
|
offsetDur time.Duration
|
|
|
|
lastStart time.Time
|
|
lastDuration time.Duration
|
|
lastError error
|
|
|
|
scrapeErr error
|
|
scrapeFunc func(context.Context, io.Writer) error
|
|
}
|
|
|
|
func (ts *testScraper) offset(interval time.Duration, jitterSeed uint64) time.Duration {
|
|
return ts.offsetDur
|
|
}
|
|
|
|
func (ts *testScraper) Report(start time.Time, duration time.Duration, err error) {
|
|
ts.lastStart = start
|
|
ts.lastDuration = duration
|
|
ts.lastError = err
|
|
}
|
|
|
|
func (ts *testScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
|
|
if ts.scrapeFunc != nil {
|
|
return "", ts.scrapeFunc(ctx, w)
|
|
}
|
|
return "", ts.scrapeErr
|
|
}
|
|
|
|
func TestScrapeLoop_RespectTimestamps(t *testing.T) {
|
|
s := teststorage.New(t)
|
|
defer s.Close()
|
|
|
|
app := s.Appender(context.Background())
|
|
|
|
capp := &collectResultAppender{next: app}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender { return capp },
|
|
nil, 0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
now := time.Now()
|
|
slApp := sl.appender(context.Background())
|
|
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
want := []sample{
|
|
{
|
|
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
|
|
t: 0,
|
|
v: 1,
|
|
},
|
|
}
|
|
require.Equal(t, want, capp.result, "Appended samples not as expected")
|
|
}
|
|
|
|
func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
|
|
s := teststorage.New(t)
|
|
defer s.Close()
|
|
|
|
app := s.Appender(context.Background())
|
|
|
|
capp := &collectResultAppender{next: app}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender { return capp },
|
|
nil, 0,
|
|
false,
|
|
nil,
|
|
)
|
|
|
|
now := time.Now()
|
|
slApp := sl.appender(context.Background())
|
|
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
want := []sample{
|
|
{
|
|
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
|
|
t: timestamp.FromTime(now),
|
|
v: 1,
|
|
},
|
|
}
|
|
require.Equal(t, want, capp.result, "Appended samples not as expected")
|
|
}
|
|
|
|
func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
|
|
s := teststorage.New(t)
|
|
defer s.Close()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
&testScraper{},
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
s.Appender,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
defer cancel()
|
|
|
|
// We add a good and a bad metric to check that both are discarded.
|
|
slApp := sl.appender(ctx)
|
|
_, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
|
|
require.Error(t, err)
|
|
require.NoError(t, slApp.Rollback())
|
|
|
|
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
|
|
require.NoError(t, err)
|
|
series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
|
require.Equal(t, false, series.Next(), "series found in tsdb")
|
|
require.NoError(t, series.Err())
|
|
|
|
// We add a good metric to check that it is recorded.
|
|
slApp = sl.appender(ctx)
|
|
_, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
|
|
require.NoError(t, err)
|
|
series = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
|
|
require.Equal(t, true, series.Next(), "series not found in tsdb")
|
|
require.NoError(t, series.Err())
|
|
require.Equal(t, false, series.Next(), "more than one series found in tsdb")
|
|
}
|
|
|
|
func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
|
|
s := teststorage.New(t)
|
|
defer s.Close()
|
|
|
|
app := s.Appender(context.Background())
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(context.Background(),
|
|
&testScraper{},
|
|
nil, nil,
|
|
func(l labels.Labels) labels.Labels {
|
|
if l.Has("drop") {
|
|
return labels.Labels{}
|
|
}
|
|
return l
|
|
},
|
|
nopMutator,
|
|
func(ctx context.Context) storage.Appender { return app },
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
defer cancel()
|
|
|
|
slApp := sl.appender(context.Background())
|
|
_, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{})
|
|
require.Error(t, err)
|
|
require.NoError(t, slApp.Rollback())
|
|
require.Equal(t, errNameLabelMandatory, err)
|
|
|
|
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
|
|
require.NoError(t, err)
|
|
series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
|
require.Equal(t, false, series.Next(), "series found in tsdb")
|
|
require.NoError(t, series.Err())
|
|
}
|
|
|
|
func TestReusableConfig(t *testing.T) {
|
|
variants := []*config.ScrapeConfig{
|
|
{
|
|
JobName: "prometheus",
|
|
ScrapeTimeout: model.Duration(15 * time.Second),
|
|
},
|
|
{
|
|
JobName: "httpd",
|
|
ScrapeTimeout: model.Duration(15 * time.Second),
|
|
},
|
|
{
|
|
JobName: "prometheus",
|
|
ScrapeTimeout: model.Duration(5 * time.Second),
|
|
},
|
|
{
|
|
JobName: "prometheus",
|
|
MetricsPath: "/metrics",
|
|
},
|
|
{
|
|
JobName: "prometheus",
|
|
MetricsPath: "/metrics2",
|
|
},
|
|
{
|
|
JobName: "prometheus",
|
|
ScrapeTimeout: model.Duration(5 * time.Second),
|
|
MetricsPath: "/metrics2",
|
|
},
|
|
{
|
|
JobName: "prometheus",
|
|
ScrapeInterval: model.Duration(5 * time.Second),
|
|
MetricsPath: "/metrics2",
|
|
},
|
|
{
|
|
JobName: "prometheus",
|
|
ScrapeInterval: model.Duration(5 * time.Second),
|
|
SampleLimit: 1000,
|
|
MetricsPath: "/metrics2",
|
|
},
|
|
}
|
|
|
|
match := [][]int{
|
|
{0, 2},
|
|
{4, 5},
|
|
{4, 6},
|
|
{4, 7},
|
|
{5, 6},
|
|
{5, 7},
|
|
{6, 7},
|
|
}
|
|
noMatch := [][]int{
|
|
{1, 2},
|
|
{0, 4},
|
|
{3, 4},
|
|
}
|
|
|
|
for i, m := range match {
|
|
require.Equal(t, true, reusableCache(variants[m[0]], variants[m[1]]), "match test %d", i)
|
|
require.Equal(t, true, reusableCache(variants[m[1]], variants[m[0]]), "match test %d", i)
|
|
require.Equal(t, true, reusableCache(variants[m[1]], variants[m[1]]), "match test %d", i)
|
|
require.Equal(t, true, reusableCache(variants[m[0]], variants[m[0]]), "match test %d", i)
|
|
}
|
|
for i, m := range noMatch {
|
|
require.Equal(t, false, reusableCache(variants[m[0]], variants[m[1]]), "not match test %d", i)
|
|
require.Equal(t, false, reusableCache(variants[m[1]], variants[m[0]]), "not match test %d", i)
|
|
}
|
|
}
|
|
|
|
func TestReuseScrapeCache(t *testing.T) {
|
|
var (
|
|
app = &nopAppendable{}
|
|
cfg = &config.ScrapeConfig{
|
|
JobName: "Prometheus",
|
|
ScrapeTimeout: model.Duration(5 * time.Second),
|
|
ScrapeInterval: model.Duration(5 * time.Second),
|
|
MetricsPath: "/metrics",
|
|
}
|
|
sp, _ = newScrapePool(cfg, app, 0, nil)
|
|
t1 = &Target{
|
|
discoveredLabels: labels.Labels{
|
|
labels.Label{
|
|
Name: "labelNew",
|
|
Value: "nameNew",
|
|
},
|
|
},
|
|
}
|
|
proxyURL, _ = url.Parse("http://localhost:2128")
|
|
)
|
|
defer sp.stop()
|
|
sp.sync([]*Target{t1})
|
|
|
|
steps := []struct {
|
|
keep bool
|
|
newConfig *config.ScrapeConfig
|
|
}{
|
|
{
|
|
keep: true,
|
|
newConfig: &config.ScrapeConfig{
|
|
JobName: "Prometheus",
|
|
ScrapeInterval: model.Duration(5 * time.Second),
|
|
ScrapeTimeout: model.Duration(5 * time.Second),
|
|
MetricsPath: "/metrics",
|
|
},
|
|
},
|
|
{
|
|
keep: false,
|
|
newConfig: &config.ScrapeConfig{
|
|
JobName: "Prometheus",
|
|
ScrapeInterval: model.Duration(5 * time.Second),
|
|
ScrapeTimeout: model.Duration(15 * time.Second),
|
|
MetricsPath: "/metrics2",
|
|
},
|
|
},
|
|
{
|
|
keep: true,
|
|
newConfig: &config.ScrapeConfig{
|
|
JobName: "Prometheus",
|
|
SampleLimit: 400,
|
|
ScrapeInterval: model.Duration(5 * time.Second),
|
|
ScrapeTimeout: model.Duration(15 * time.Second),
|
|
MetricsPath: "/metrics2",
|
|
},
|
|
},
|
|
{
|
|
keep: false,
|
|
newConfig: &config.ScrapeConfig{
|
|
JobName: "Prometheus",
|
|
HonorTimestamps: true,
|
|
SampleLimit: 400,
|
|
ScrapeInterval: model.Duration(5 * time.Second),
|
|
ScrapeTimeout: model.Duration(15 * time.Second),
|
|
MetricsPath: "/metrics2",
|
|
},
|
|
},
|
|
{
|
|
keep: true,
|
|
newConfig: &config.ScrapeConfig{
|
|
JobName: "Prometheus",
|
|
HonorTimestamps: true,
|
|
SampleLimit: 400,
|
|
HTTPClientConfig: config_util.HTTPClientConfig{
|
|
ProxyURL: config_util.URL{URL: proxyURL},
|
|
},
|
|
ScrapeInterval: model.Duration(5 * time.Second),
|
|
ScrapeTimeout: model.Duration(15 * time.Second),
|
|
MetricsPath: "/metrics2",
|
|
},
|
|
},
|
|
{
|
|
keep: false,
|
|
newConfig: &config.ScrapeConfig{
|
|
JobName: "Prometheus",
|
|
HonorTimestamps: true,
|
|
HonorLabels: true,
|
|
SampleLimit: 400,
|
|
ScrapeInterval: model.Duration(5 * time.Second),
|
|
ScrapeTimeout: model.Duration(15 * time.Second),
|
|
MetricsPath: "/metrics2",
|
|
},
|
|
},
|
|
}
|
|
|
|
cacheAddr := func(sp *scrapePool) map[uint64]string {
|
|
r := make(map[uint64]string)
|
|
for fp, l := range sp.loops {
|
|
r[fp] = fmt.Sprintf("%p", l.getCache())
|
|
}
|
|
return r
|
|
}
|
|
|
|
for i, s := range steps {
|
|
initCacheAddr := cacheAddr(sp)
|
|
sp.reload(s.newConfig)
|
|
for fp, newCacheAddr := range cacheAddr(sp) {
|
|
if s.keep {
|
|
require.Equal(t, initCacheAddr[fp], newCacheAddr, "step %d: old cache and new cache are not the same", i)
|
|
} else {
|
|
require.NotEqual(t, initCacheAddr[fp], newCacheAddr, "step %d: old cache and new cache are the same", i)
|
|
}
|
|
}
|
|
initCacheAddr = cacheAddr(sp)
|
|
sp.reload(s.newConfig)
|
|
for fp, newCacheAddr := range cacheAddr(sp) {
|
|
require.Equal(t, initCacheAddr[fp], newCacheAddr, "step %d: reloading the exact config invalidates the cache", i)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestScrapeAddFast(t *testing.T) {
|
|
s := teststorage.New(t)
|
|
defer s.Close()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
&testScraper{},
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
s.Appender,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
defer cancel()
|
|
|
|
slApp := sl.appender(ctx)
|
|
_, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{})
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
|
|
// Poison the cache. There is just one entry, and one series in the
|
|
// storage. Changing the ref will create a 'not found' error.
|
|
for _, v := range sl.getCache().series {
|
|
v.ref++
|
|
}
|
|
|
|
slApp = sl.appender(ctx)
|
|
_, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second))
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
}
|
|
|
|
func TestReuseCacheRace(t *testing.T) {
|
|
var (
|
|
app = &nopAppendable{}
|
|
cfg = &config.ScrapeConfig{
|
|
JobName: "Prometheus",
|
|
ScrapeTimeout: model.Duration(5 * time.Second),
|
|
ScrapeInterval: model.Duration(5 * time.Second),
|
|
MetricsPath: "/metrics",
|
|
}
|
|
sp, _ = newScrapePool(cfg, app, 0, nil)
|
|
t1 = &Target{
|
|
discoveredLabels: labels.Labels{
|
|
labels.Label{
|
|
Name: "labelNew",
|
|
Value: "nameNew",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
defer sp.stop()
|
|
sp.sync([]*Target{t1})
|
|
|
|
start := time.Now()
|
|
for i := uint(1); i > 0; i++ {
|
|
if time.Since(start) > 5*time.Second {
|
|
break
|
|
}
|
|
sp.reload(&config.ScrapeConfig{
|
|
JobName: "Prometheus",
|
|
ScrapeTimeout: model.Duration(1 * time.Millisecond),
|
|
ScrapeInterval: model.Duration(1 * time.Millisecond),
|
|
MetricsPath: "/metrics",
|
|
SampleLimit: i,
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCheckAddError(t *testing.T) {
|
|
var appErrs appendErrors
|
|
sl := scrapeLoop{l: log.NewNopLogger()}
|
|
sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, &appErrs)
|
|
require.Equal(t, 1, appErrs.numOutOfOrder)
|
|
}
|
|
|
|
func TestScrapeReportSingleAppender(t *testing.T) {
|
|
s := teststorage.New(t)
|
|
defer s.Close()
|
|
|
|
var (
|
|
signal = make(chan struct{}, 1)
|
|
scraper = &testScraper{}
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sl := newScrapeLoop(ctx,
|
|
scraper,
|
|
nil, nil,
|
|
nopMutator,
|
|
nopMutator,
|
|
s.Appender,
|
|
nil,
|
|
0,
|
|
true,
|
|
nil,
|
|
)
|
|
|
|
numScrapes := 0
|
|
|
|
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
|
numScrapes++
|
|
if numScrapes%4 == 0 {
|
|
return fmt.Errorf("scrape failed")
|
|
}
|
|
w.Write([]byte("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n"))
|
|
return nil
|
|
}
|
|
|
|
go func() {
|
|
sl.run(10*time.Millisecond, time.Hour, nil)
|
|
signal <- struct{}{}
|
|
}()
|
|
|
|
start := time.Now()
|
|
for time.Since(start) < 3*time.Second {
|
|
q, err := s.Querier(ctx, time.Time{}.UnixNano(), time.Now().UnixNano())
|
|
require.NoError(t, err)
|
|
series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+"))
|
|
|
|
c := 0
|
|
for series.Next() {
|
|
i := series.At().Iterator()
|
|
for i.Next() {
|
|
c++
|
|
}
|
|
}
|
|
|
|
require.Equal(t, 0, c%9, "Appended samples not as expected: %d", c)
|
|
q.Close()
|
|
}
|
|
cancel()
|
|
|
|
select {
|
|
case <-signal:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("Scrape wasn't stopped.")
|
|
}
|
|
}
|
|
|
|
func TestScrapeLoopLabelLimit(t *testing.T) {
|
|
tests := []struct {
|
|
title string
|
|
scrapeLabels string
|
|
discoveryLabels []string
|
|
labelLimits labelLimits
|
|
expectErr bool
|
|
}{
|
|
{
|
|
title: "Valid number of labels",
|
|
scrapeLabels: `metric{l1="1", l2="2"} 0`,
|
|
discoveryLabels: nil,
|
|
labelLimits: labelLimits{labelLimit: 5},
|
|
expectErr: false,
|
|
}, {
|
|
title: "Too many labels",
|
|
scrapeLabels: `metric{l1="1", l2="2", l3="3", l4="4", l5="5", l6="6"} 0`,
|
|
discoveryLabels: nil,
|
|
labelLimits: labelLimits{labelLimit: 5},
|
|
expectErr: true,
|
|
}, {
|
|
title: "Too many labels including discovery labels",
|
|
scrapeLabels: `metric{l1="1", l2="2", l3="3", l4="4"} 0`,
|
|
discoveryLabels: []string{"l5", "5", "l6", "6"},
|
|
labelLimits: labelLimits{labelLimit: 5},
|
|
expectErr: true,
|
|
}, {
|
|
title: "Valid labels name length",
|
|
scrapeLabels: `metric{l1="1", l2="2"} 0`,
|
|
discoveryLabels: nil,
|
|
labelLimits: labelLimits{labelNameLengthLimit: 10},
|
|
expectErr: false,
|
|
}, {
|
|
title: "Label name too long",
|
|
scrapeLabels: `metric{label_name_too_long="0"} 0`,
|
|
discoveryLabels: nil,
|
|
labelLimits: labelLimits{labelNameLengthLimit: 10},
|
|
expectErr: true,
|
|
}, {
|
|
title: "Discovery label name too long",
|
|
scrapeLabels: `metric{l1="1", l2="2"} 0`,
|
|
discoveryLabels: []string{"label_name_too_long", "0"},
|
|
labelLimits: labelLimits{labelNameLengthLimit: 10},
|
|
expectErr: true,
|
|
}, {
|
|
title: "Valid labels value length",
|
|
scrapeLabels: `metric{l1="1", l2="2"} 0`,
|
|
discoveryLabels: nil,
|
|
labelLimits: labelLimits{labelValueLengthLimit: 10},
|
|
expectErr: false,
|
|
}, {
|
|
title: "Label value too long",
|
|
scrapeLabels: `metric{l1="label_value_too_long"} 0`,
|
|
discoveryLabels: nil,
|
|
labelLimits: labelLimits{labelValueLengthLimit: 10},
|
|
expectErr: true,
|
|
}, {
|
|
title: "Discovery label value too long",
|
|
scrapeLabels: `metric{l1="1", l2="2"} 0`,
|
|
discoveryLabels: []string{"l1", "label_value_too_long"},
|
|
labelLimits: labelLimits{labelValueLengthLimit: 10},
|
|
expectErr: true,
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
app := &collectResultAppender{}
|
|
|
|
discoveryLabels := &Target{
|
|
labels: labels.FromStrings(test.discoveryLabels...),
|
|
}
|
|
|
|
sl := newScrapeLoop(context.Background(),
|
|
nil, nil, nil,
|
|
func(l labels.Labels) labels.Labels {
|
|
return mutateSampleLabels(l, discoveryLabels, false, nil)
|
|
},
|
|
func(l labels.Labels) labels.Labels {
|
|
return mutateReportSampleLabels(l, discoveryLabels)
|
|
},
|
|
func(ctx context.Context) storage.Appender { return app },
|
|
nil,
|
|
0,
|
|
true,
|
|
&test.labelLimits,
|
|
)
|
|
|
|
slApp := sl.appender(context.Background())
|
|
_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", time.Now())
|
|
|
|
t.Logf("Test:%s", test.title)
|
|
if test.expectErr {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
require.NoError(t, slApp.Commit())
|
|
}
|
|
}
|
|
}
|