mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-14 15:27:47 -08:00
f1c57a95ed
* chore: revert TypeRequiresCT to private Signed-off-by: Manik Rana <manikrana54@gmail.com> * feat: init NewOpenMetricsParser with skipCT true Signed-off-by: Manik Rana <manikrana54@gmail.com> * refac: allow opt-in to OM CT ingestion Signed-off-by: Manik Rana <manikrana54@gmail.com> * chore: lint Signed-off-by: Manik Rana <manikrana54@gmail.com> * chore: use textparse interface to set om options Signed-off-by: Manik Rana <manikrana54@gmail.com> * fix: set skipOMSeries in test Signed-off-by: Manik Rana <manikrana54@gmail.com> * chore: gofumpt Signed-off-by: Manik Rana <manikrana54@gmail.com> * wip: add tests for OM CR parse Signed-off-by: Manik Rana <manikrana54@gmail.com> * chore: merge ct tests Signed-off-by: Manik Rana <manikrana54@gmail.com> * tests: add cases for OM text Signed-off-by: Manik Rana <manikrana54@gmail.com> * fix: check correct test cases Signed-off-by: Manik Rana <manikrana54@gmail.com> * chore: use both scrape protocols in config Signed-off-by: Manik Rana <manikrana54@gmail.com> * fix: fix inputs and output tests for OM Signed-off-by: Manik Rana <manikrana54@gmail.com> * chore: cleanup Signed-off-by: Manik Rana <manikrana54@gmail.com> * refac: rename skipOMSeries to skipOMCTSeries Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com> Signed-off-by: Manik Rana <Manikrana54@gmail.com> * fix: finish refac Signed-off-by: Manik Rana <manikrana54@gmail.com> * refac: move setup code outside test Signed-off-by: Manik Rana <manikrana54@gmail.com> * tests: verify _created lines create new metric in certain cases Signed-off-by: Manik Rana <manikrana54@gmail.com> * fix: post merge fixes Signed-off-by: Manik Rana <manikrana54@gmail.com> * chore: lint Signed-off-by: Manik Rana <manikrana54@gmail.com> * manager: Fixed CT OMText conversion bug; Refactored tests. Signed-off-by: bwplotka <bwplotka@gmail.com> * chore: lint Signed-off-by: Manik Rana <manikrana54@gmail.com> * chore: gofumpt Signed-off-by: Manik Rana <manikrana54@gmail.com> * chore: imports Signed-off-by: Manik Rana <manikrana54@gmail.com> --------- Signed-off-by: Manik Rana <manikrana54@gmail.com> Signed-off-by: Manik Rana <Manikrana54@gmail.com> Signed-off-by: bwplotka <bwplotka@gmail.com> Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com> Co-authored-by: bwplotka <bwplotka@gmail.com>
1511 lines
42 KiB
Go
1511 lines
42 KiB
Go
// Copyright 2013 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package scrape
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"os"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
dto "github.com/prometheus/client_model/go"
|
|
"github.com/prometheus/common/expfmt"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/stretchr/testify/require"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
"gopkg.in/yaml.v2"
|
|
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/discovery"
|
|
_ "github.com/prometheus/prometheus/discovery/file"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/relabel"
|
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
|
"github.com/prometheus/prometheus/util/runutil"
|
|
"github.com/prometheus/prometheus/util/testutil"
|
|
)
|
|
|
|
func init() {
|
|
// This can be removed when the default validation scheme in common is updated.
|
|
model.NameValidationScheme = model.UTF8Validation
|
|
}
|
|
|
|
func TestPopulateLabels(t *testing.T) {
|
|
cases := []struct {
|
|
in labels.Labels
|
|
cfg *config.ScrapeConfig
|
|
res labels.Labels
|
|
resOrig labels.Labels
|
|
err string
|
|
}{
|
|
// Regular population of scrape config options.
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:1000",
|
|
"custom": "value",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:1000",
|
|
model.InstanceLabel: "1.2.3.4:1000",
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
"custom": "value",
|
|
}),
|
|
resOrig: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:1000",
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
"custom": "value",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
}),
|
|
},
|
|
// Pre-define/overwrite scrape config labels.
|
|
// Leave out port and expect it to be defaulted to scheme.
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4",
|
|
model.SchemeLabel: "http",
|
|
model.MetricsPathLabel: "/custom",
|
|
model.JobLabel: "custom-job",
|
|
model.ScrapeIntervalLabel: "2s",
|
|
model.ScrapeTimeoutLabel: "2s",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4",
|
|
model.InstanceLabel: "1.2.3.4",
|
|
model.SchemeLabel: "http",
|
|
model.MetricsPathLabel: "/custom",
|
|
model.JobLabel: "custom-job",
|
|
model.ScrapeIntervalLabel: "2s",
|
|
model.ScrapeTimeoutLabel: "2s",
|
|
}),
|
|
resOrig: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4",
|
|
model.SchemeLabel: "http",
|
|
model.MetricsPathLabel: "/custom",
|
|
model.JobLabel: "custom-job",
|
|
model.ScrapeIntervalLabel: "2s",
|
|
model.ScrapeTimeoutLabel: "2s",
|
|
}),
|
|
},
|
|
// Provide instance label. HTTPS port default for IPv6.
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "[::1]",
|
|
model.InstanceLabel: "custom-instance",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "[::1]",
|
|
model.InstanceLabel: "custom-instance",
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
}),
|
|
resOrig: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "[::1]",
|
|
model.InstanceLabel: "custom-instance",
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
}),
|
|
},
|
|
// Address label missing.
|
|
{
|
|
in: labels.FromStrings("custom", "value"),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.EmptyLabels(),
|
|
resOrig: labels.EmptyLabels(),
|
|
err: "no address",
|
|
},
|
|
// Address label missing, but added in relabelling.
|
|
{
|
|
in: labels.FromStrings("custom", "host:1234"),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
RelabelConfigs: []*relabel.Config{
|
|
{
|
|
Action: relabel.Replace,
|
|
Regex: relabel.MustNewRegexp("(.*)"),
|
|
SourceLabels: model.LabelNames{"custom"},
|
|
Replacement: "${1}",
|
|
TargetLabel: string(model.AddressLabel),
|
|
},
|
|
},
|
|
},
|
|
res: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "host:1234",
|
|
model.InstanceLabel: "host:1234",
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
"custom": "host:1234",
|
|
}),
|
|
resOrig: labels.FromMap(map[string]string{
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
"custom": "host:1234",
|
|
}),
|
|
},
|
|
// Address label missing, but added in relabelling.
|
|
{
|
|
in: labels.FromStrings("custom", "host:1234"),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
RelabelConfigs: []*relabel.Config{
|
|
{
|
|
Action: relabel.Replace,
|
|
Regex: relabel.MustNewRegexp("(.*)"),
|
|
SourceLabels: model.LabelNames{"custom"},
|
|
Replacement: "${1}",
|
|
TargetLabel: string(model.AddressLabel),
|
|
},
|
|
},
|
|
},
|
|
res: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "host:1234",
|
|
model.InstanceLabel: "host:1234",
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
"custom": "host:1234",
|
|
}),
|
|
resOrig: labels.FromMap(map[string]string{
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
"custom": "host:1234",
|
|
}),
|
|
},
|
|
// Invalid UTF-8 in label.
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:1000",
|
|
"custom": "\xbd",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.EmptyLabels(),
|
|
resOrig: labels.EmptyLabels(),
|
|
err: "invalid label value for \"custom\": \"\\xbd\"",
|
|
},
|
|
// Invalid duration in interval label.
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:1000",
|
|
model.ScrapeIntervalLabel: "2notseconds",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.EmptyLabels(),
|
|
resOrig: labels.EmptyLabels(),
|
|
err: "error parsing scrape interval: unknown unit \"notseconds\" in duration \"2notseconds\"",
|
|
},
|
|
// Invalid duration in timeout label.
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:1000",
|
|
model.ScrapeTimeoutLabel: "2notseconds",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.EmptyLabels(),
|
|
resOrig: labels.EmptyLabels(),
|
|
err: "error parsing scrape timeout: unknown unit \"notseconds\" in duration \"2notseconds\"",
|
|
},
|
|
// 0 interval in timeout label.
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:1000",
|
|
model.ScrapeIntervalLabel: "0s",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.EmptyLabels(),
|
|
resOrig: labels.EmptyLabels(),
|
|
err: "scrape interval cannot be 0",
|
|
},
|
|
// 0 duration in timeout label.
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:1000",
|
|
model.ScrapeTimeoutLabel: "0s",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.EmptyLabels(),
|
|
resOrig: labels.EmptyLabels(),
|
|
err: "scrape timeout cannot be 0",
|
|
},
|
|
// Timeout less than interval.
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:1000",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "2s",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.EmptyLabels(),
|
|
resOrig: labels.EmptyLabels(),
|
|
err: "scrape timeout cannot be greater than scrape interval (\"2s\" > \"1s\")",
|
|
},
|
|
// Don't attach default port.
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4",
|
|
model.InstanceLabel: "1.2.3.4",
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
}),
|
|
resOrig: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4",
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
}),
|
|
},
|
|
// verify that the default port is not removed (http).
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:80",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "http",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:80",
|
|
model.InstanceLabel: "1.2.3.4:80",
|
|
model.SchemeLabel: "http",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
}),
|
|
resOrig: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:80",
|
|
model.SchemeLabel: "http",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
}),
|
|
},
|
|
// verify that the default port is not removed (https).
|
|
{
|
|
in: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:443",
|
|
}),
|
|
cfg: &config.ScrapeConfig{
|
|
Scheme: "https",
|
|
MetricsPath: "/metrics",
|
|
JobName: "job",
|
|
ScrapeInterval: model.Duration(time.Second),
|
|
ScrapeTimeout: model.Duration(time.Second),
|
|
},
|
|
res: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:443",
|
|
model.InstanceLabel: "1.2.3.4:443",
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
}),
|
|
resOrig: labels.FromMap(map[string]string{
|
|
model.AddressLabel: "1.2.3.4:443",
|
|
model.SchemeLabel: "https",
|
|
model.MetricsPathLabel: "/metrics",
|
|
model.JobLabel: "job",
|
|
model.ScrapeIntervalLabel: "1s",
|
|
model.ScrapeTimeoutLabel: "1s",
|
|
}),
|
|
},
|
|
}
|
|
for _, c := range cases {
|
|
in := c.in.Copy()
|
|
|
|
res, orig, err := PopulateLabels(labels.NewBuilder(c.in), c.cfg)
|
|
if c.err != "" {
|
|
require.EqualError(t, err, c.err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
require.Equal(t, c.in, in)
|
|
testutil.RequireEqual(t, c.res, res)
|
|
testutil.RequireEqual(t, c.resOrig, orig)
|
|
}
|
|
}
|
|
|
|
func loadConfiguration(t testing.TB, c string) *config.Config {
|
|
t.Helper()
|
|
|
|
cfg := &config.Config{}
|
|
err := yaml.UnmarshalStrict([]byte(c), cfg)
|
|
require.NoError(t, err, "Unable to load YAML config.")
|
|
|
|
return cfg
|
|
}
|
|
|
|
func noopLoop() loop {
|
|
return &testLoop{
|
|
startFunc: func(interval, timeout time.Duration, errc chan<- error) {},
|
|
stopFunc: func() {},
|
|
}
|
|
}
|
|
|
|
func TestManagerApplyConfig(t *testing.T) {
|
|
// Valid initial configuration.
|
|
cfgText1 := `
|
|
scrape_configs:
|
|
- job_name: job1
|
|
static_configs:
|
|
- targets: ["foo:9090"]
|
|
`
|
|
// Invalid configuration.
|
|
cfgText2 := `
|
|
scrape_configs:
|
|
- job_name: job1
|
|
scheme: https
|
|
static_configs:
|
|
- targets: ["foo:9090"]
|
|
tls_config:
|
|
ca_file: /not/existing/ca/file
|
|
`
|
|
// Valid configuration.
|
|
cfgText3 := `
|
|
scrape_configs:
|
|
- job_name: job1
|
|
scheme: https
|
|
static_configs:
|
|
- targets: ["foo:9090"]
|
|
`
|
|
var (
|
|
cfg1 = loadConfiguration(t, cfgText1)
|
|
cfg2 = loadConfiguration(t, cfgText2)
|
|
cfg3 = loadConfiguration(t, cfgText3)
|
|
|
|
ch = make(chan struct{}, 1)
|
|
|
|
testRegistry = prometheus.NewRegistry()
|
|
)
|
|
|
|
opts := Options{}
|
|
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
|
require.NoError(t, err)
|
|
newLoop := func(scrapeLoopOptions) loop {
|
|
ch <- struct{}{}
|
|
return noopLoop()
|
|
}
|
|
sp := &scrapePool{
|
|
appendable: &nopAppendable{},
|
|
activeTargets: map[uint64]*Target{
|
|
1: {},
|
|
},
|
|
loops: map[uint64]loop{
|
|
1: noopLoop(),
|
|
},
|
|
newLoop: newLoop,
|
|
logger: nil,
|
|
config: cfg1.ScrapeConfigs[0],
|
|
client: http.DefaultClient,
|
|
metrics: scrapeManager.metrics,
|
|
symbolTable: labels.NewSymbolTable(),
|
|
}
|
|
scrapeManager.scrapePools = map[string]*scrapePool{
|
|
"job1": sp,
|
|
}
|
|
|
|
// Apply the initial configuration.
|
|
err = scrapeManager.ApplyConfig(cfg1)
|
|
require.NoError(t, err, "Unable to apply configuration.")
|
|
select {
|
|
case <-ch:
|
|
require.FailNow(t, "Reload happened.")
|
|
default:
|
|
}
|
|
|
|
// Apply a configuration for which the reload fails.
|
|
err = scrapeManager.ApplyConfig(cfg2)
|
|
require.Error(t, err, "Expecting error but got none.")
|
|
select {
|
|
case <-ch:
|
|
require.FailNow(t, "Reload happened.")
|
|
default:
|
|
}
|
|
|
|
// Apply a configuration for which the reload succeeds.
|
|
err = scrapeManager.ApplyConfig(cfg3)
|
|
require.NoError(t, err, "Unable to apply configuration.")
|
|
select {
|
|
case <-ch:
|
|
default:
|
|
require.FailNow(t, "Reload didn't happen.")
|
|
}
|
|
|
|
// Re-applying the same configuration shouldn't trigger a reload.
|
|
err = scrapeManager.ApplyConfig(cfg3)
|
|
require.NoError(t, err, "Unable to apply configuration.")
|
|
select {
|
|
case <-ch:
|
|
require.FailNow(t, "Reload happened.")
|
|
default:
|
|
}
|
|
}
|
|
|
|
func TestManagerTargetsUpdates(t *testing.T) {
|
|
opts := Options{}
|
|
testRegistry := prometheus.NewRegistry()
|
|
m, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
|
require.NoError(t, err)
|
|
|
|
ts := make(chan map[string][]*targetgroup.Group)
|
|
go m.Run(ts)
|
|
defer m.Stop()
|
|
|
|
tgSent := make(map[string][]*targetgroup.Group)
|
|
for x := 0; x < 10; x++ {
|
|
tgSent[strconv.Itoa(x)] = []*targetgroup.Group{
|
|
{
|
|
Source: strconv.Itoa(x),
|
|
},
|
|
}
|
|
|
|
select {
|
|
case ts <- tgSent:
|
|
case <-time.After(10 * time.Millisecond):
|
|
require.Fail(t, "Scrape manager's channel remained blocked after the set threshold.")
|
|
}
|
|
}
|
|
|
|
m.mtxScrape.Lock()
|
|
tsetActual := m.targetSets
|
|
m.mtxScrape.Unlock()
|
|
|
|
// Make sure all updates have been received.
|
|
require.Equal(t, tgSent, tsetActual)
|
|
|
|
select {
|
|
case <-m.triggerReload:
|
|
default:
|
|
require.Fail(t, "No scrape loops reload was triggered after targets update.")
|
|
}
|
|
}
|
|
|
|
func TestSetOffsetSeed(t *testing.T) {
|
|
getConfig := func(prometheus string) *config.Config {
|
|
cfgText := `
|
|
global:
|
|
external_labels:
|
|
prometheus: '` + prometheus + `'
|
|
`
|
|
|
|
cfg := &config.Config{}
|
|
err := yaml.UnmarshalStrict([]byte(cfgText), cfg)
|
|
require.NoError(t, err, "Unable to load YAML config cfgYaml.")
|
|
|
|
return cfg
|
|
}
|
|
|
|
opts := Options{}
|
|
testRegistry := prometheus.NewRegistry()
|
|
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
|
require.NoError(t, err)
|
|
|
|
// Load the first config.
|
|
cfg1 := getConfig("ha1")
|
|
err = scrapeManager.setOffsetSeed(cfg1.GlobalConfig.ExternalLabels)
|
|
require.NoError(t, err)
|
|
offsetSeed1 := scrapeManager.offsetSeed
|
|
|
|
require.NotZero(t, offsetSeed1, "Offset seed has to be a hash of uint64.")
|
|
|
|
// Load the first config.
|
|
cfg2 := getConfig("ha2")
|
|
require.NoError(t, scrapeManager.setOffsetSeed(cfg2.GlobalConfig.ExternalLabels))
|
|
offsetSeed2 := scrapeManager.offsetSeed
|
|
|
|
require.NotEqual(t, offsetSeed1, offsetSeed2, "Offset seed should not be the same on different set of external labels.")
|
|
}
|
|
|
|
func TestManagerScrapePools(t *testing.T) {
|
|
cfgText1 := `
|
|
scrape_configs:
|
|
- job_name: job1
|
|
static_configs:
|
|
- targets: ["foo:9090"]
|
|
- job_name: job2
|
|
static_configs:
|
|
- targets: ["foo:9091", "foo:9092"]
|
|
`
|
|
cfgText2 := `
|
|
scrape_configs:
|
|
- job_name: job1
|
|
static_configs:
|
|
- targets: ["foo:9090", "foo:9094"]
|
|
- job_name: job3
|
|
static_configs:
|
|
- targets: ["foo:9093"]
|
|
`
|
|
var (
|
|
cfg1 = loadConfiguration(t, cfgText1)
|
|
cfg2 = loadConfiguration(t, cfgText2)
|
|
testRegistry = prometheus.NewRegistry()
|
|
)
|
|
|
|
reload := func(scrapeManager *Manager, cfg *config.Config) {
|
|
newLoop := func(scrapeLoopOptions) loop {
|
|
return noopLoop()
|
|
}
|
|
scrapeManager.scrapePools = map[string]*scrapePool{}
|
|
for _, sc := range cfg.ScrapeConfigs {
|
|
_, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
sp := &scrapePool{
|
|
appendable: &nopAppendable{},
|
|
activeTargets: map[uint64]*Target{},
|
|
loops: map[uint64]loop{
|
|
1: noopLoop(),
|
|
},
|
|
newLoop: newLoop,
|
|
logger: nil,
|
|
config: sc,
|
|
client: http.DefaultClient,
|
|
cancel: cancel,
|
|
}
|
|
for _, c := range sc.ServiceDiscoveryConfigs {
|
|
staticConfig := c.(discovery.StaticConfig)
|
|
for _, group := range staticConfig {
|
|
for i := range group.Targets {
|
|
sp.activeTargets[uint64(i)] = &Target{}
|
|
}
|
|
}
|
|
}
|
|
scrapeManager.scrapePools[sc.JobName] = sp
|
|
}
|
|
}
|
|
|
|
opts := Options{}
|
|
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
|
require.NoError(t, err)
|
|
|
|
reload(scrapeManager, cfg1)
|
|
require.ElementsMatch(t, []string{"job1", "job2"}, scrapeManager.ScrapePools())
|
|
|
|
reload(scrapeManager, cfg2)
|
|
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
|
|
}
|
|
|
|
func setupScrapeManager(t *testing.T, honorTimestamps, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) {
|
|
app := &collectResultAppender{}
|
|
scrapeManager, err := NewManager(
|
|
&Options{
|
|
EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion,
|
|
skipOffsetting: true,
|
|
},
|
|
log.NewLogfmtLogger(os.Stderr),
|
|
nil,
|
|
&collectResultAppendable{app},
|
|
prometheus.NewRegistry(),
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
|
|
GlobalConfig: config.GlobalConfig{
|
|
// Disable regular scrapes.
|
|
ScrapeInterval: model.Duration(9999 * time.Minute),
|
|
ScrapeTimeout: model.Duration(5 * time.Second),
|
|
ScrapeProtocols: []config.ScrapeProtocol{config.OpenMetricsText1_0_0, config.PrometheusProto},
|
|
},
|
|
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test", HonorTimestamps: honorTimestamps}},
|
|
}))
|
|
|
|
return app, scrapeManager
|
|
}
|
|
|
|
func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server {
|
|
once := sync.Once{}
|
|
|
|
server := httptest.NewServer(
|
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
fail := true
|
|
once.Do(func() {
|
|
fail = false
|
|
w.Header().Set("Content-Type", typ)
|
|
w.Write(toWrite)
|
|
})
|
|
|
|
if fail {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
}
|
|
}),
|
|
)
|
|
|
|
t.Cleanup(func() { server.Close() })
|
|
|
|
return server
|
|
}
|
|
|
|
// TestManagerCTZeroIngestion tests scrape manager for various CT cases.
|
|
func TestManagerCTZeroIngestion(t *testing.T) {
|
|
const (
|
|
// _total suffix is required, otherwise expfmt with OMText will mark metric as "unknown"
|
|
expectedMetricName = "expected_metric_total"
|
|
expectedCreatedMetricName = "expected_metric_created"
|
|
expectedSampleValue = 17.0
|
|
)
|
|
|
|
for _, testFormat := range []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0} {
|
|
t.Run(fmt.Sprintf("format=%s", testFormat), func(t *testing.T) {
|
|
for _, testWithCT := range []bool{false, true} {
|
|
t.Run(fmt.Sprintf("withCT=%v", testWithCT), func(t *testing.T) {
|
|
for _, testCTZeroIngest := range []bool{false, true} {
|
|
t.Run(fmt.Sprintf("ctZeroIngest=%v", testCTZeroIngest), func(t *testing.T) {
|
|
sampleTs := time.Now()
|
|
ctTs := time.Time{}
|
|
if testWithCT {
|
|
ctTs = sampleTs.Add(-2 * time.Minute)
|
|
}
|
|
|
|
// TODO(bwplotka): Add more types than just counter?
|
|
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, ctTs)
|
|
app, scrapeManager := setupScrapeManager(t, true, testCTZeroIngest)
|
|
|
|
// Perform the test.
|
|
doOneScrape(t, scrapeManager, app, setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded))
|
|
|
|
// Verify results.
|
|
// Verify what we got vs expectations around CT injection.
|
|
samples := findSamplesForMetric(app.resultFloats, expectedMetricName)
|
|
if testWithCT && testCTZeroIngest {
|
|
require.Len(t, samples, 2)
|
|
require.Equal(t, 0.0, samples[0].f)
|
|
require.Equal(t, timestamp.FromTime(ctTs), samples[0].t)
|
|
require.Equal(t, expectedSampleValue, samples[1].f)
|
|
require.Equal(t, timestamp.FromTime(sampleTs), samples[1].t)
|
|
} else {
|
|
require.Len(t, samples, 1)
|
|
require.Equal(t, expectedSampleValue, samples[0].f)
|
|
require.Equal(t, timestamp.FromTime(sampleTs), samples[0].t)
|
|
}
|
|
|
|
// Verify what we got vs expectations around additional _created series for OM text.
|
|
// enableCTZeroInjection also kills that _created line.
|
|
createdSeriesSamples := findSamplesForMetric(app.resultFloats, expectedCreatedMetricName)
|
|
if testFormat == config.OpenMetricsText1_0_0 && testWithCT && !testCTZeroIngest {
|
|
// For OM Text, when counter has CT, and feature flag disabled we should see _created lines.
|
|
require.Len(t, createdSeriesSamples, 1)
|
|
// Conversion taken from common/expfmt.writeOpenMetricsFloat.
|
|
// We don't check the ct timestamp as explicit ts was not implemented in expfmt.Encoder,
|
|
// but exists in OM https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#:~:text=An%20example%20with%20a%20Metric%20with%20no%20labels%2C%20and%20a%20MetricPoint%20with%20a%20timestamp%20and%20a%20created
|
|
// We can implement this, but we want to potentially get rid of OM 1.0 CT lines
|
|
require.Equal(t, float64(timestamppb.New(ctTs).AsTime().UnixNano())/1e9, createdSeriesSamples[0].f)
|
|
} else {
|
|
require.Empty(t, createdSeriesSamples)
|
|
}
|
|
})
|
|
}
|
|
})
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName string, v float64, ts, ct time.Time) (encoded []byte) {
|
|
t.Helper()
|
|
|
|
counter := &dto.Counter{Value: proto.Float64(v)}
|
|
if !ct.IsZero() {
|
|
counter.CreatedTimestamp = timestamppb.New(ct)
|
|
}
|
|
ctrType := dto.MetricType_COUNTER
|
|
inputMetric := &dto.MetricFamily{
|
|
Name: proto.String(mName),
|
|
Type: &ctrType,
|
|
Metric: []*dto.Metric{{
|
|
TimestampMs: proto.Int64(timestamp.FromTime(ts)),
|
|
Counter: counter,
|
|
}},
|
|
}
|
|
switch format {
|
|
case config.PrometheusProto:
|
|
return protoMarshalDelimited(t, inputMetric)
|
|
case config.OpenMetricsText1_0_0:
|
|
buf := &bytes.Buffer{}
|
|
require.NoError(t, expfmt.NewEncoder(buf, expfmt.NewFormat(expfmt.TypeOpenMetrics), expfmt.WithCreatedLines(), expfmt.WithUnit()).Encode(inputMetric))
|
|
_, _ = buf.WriteString("# EOF")
|
|
|
|
t.Log("produced OM text to expose:", buf.String())
|
|
return buf.Bytes()
|
|
default:
|
|
t.Fatalf("not implemented format: %v", format)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func doOneScrape(t *testing.T, manager *Manager, appender *collectResultAppender, server *httptest.Server) {
|
|
t.Helper()
|
|
|
|
serverURL, err := url.Parse(server.URL)
|
|
require.NoError(t, err)
|
|
|
|
// Add fake target directly into tsets + reload
|
|
manager.updateTsets(map[string][]*targetgroup.Group{
|
|
"test": {{
|
|
Targets: []model.LabelSet{{
|
|
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
|
|
model.AddressLabel: model.LabelValue(serverURL.Host),
|
|
}},
|
|
}},
|
|
})
|
|
manager.reload()
|
|
|
|
// Wait for one scrape.
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
|
defer cancel()
|
|
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
|
appender.mtx.Lock()
|
|
defer appender.mtx.Unlock()
|
|
|
|
// Check if scrape happened and grab the relevant samples.
|
|
if len(appender.resultFloats) > 0 {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("expected some float samples, got none")
|
|
}), "after 1 minute")
|
|
manager.Stop()
|
|
}
|
|
|
|
func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) {
|
|
for _, f := range floats {
|
|
if f.metric.Get(model.MetricNameLabel) == metricName {
|
|
ret = append(ret, f)
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// generateTestHistogram generates the same thing as tsdbutil.GenerateTestHistogram,
|
|
// but in the form of dto.Histogram.
|
|
func generateTestHistogram(i int) *dto.Histogram {
|
|
helper := tsdbutil.GenerateTestHistogram(i)
|
|
h := &dto.Histogram{}
|
|
h.SampleCount = proto.Uint64(helper.Count)
|
|
h.SampleSum = proto.Float64(helper.Sum)
|
|
h.Schema = proto.Int32(helper.Schema)
|
|
h.ZeroThreshold = proto.Float64(helper.ZeroThreshold)
|
|
h.ZeroCount = proto.Uint64(helper.ZeroCount)
|
|
h.PositiveSpan = make([]*dto.BucketSpan, len(helper.PositiveSpans))
|
|
for i, span := range helper.PositiveSpans {
|
|
h.PositiveSpan[i] = &dto.BucketSpan{
|
|
Offset: proto.Int32(span.Offset),
|
|
Length: proto.Uint32(span.Length),
|
|
}
|
|
}
|
|
h.PositiveDelta = helper.PositiveBuckets
|
|
h.NegativeSpan = make([]*dto.BucketSpan, len(helper.NegativeSpans))
|
|
for i, span := range helper.NegativeSpans {
|
|
h.NegativeSpan[i] = &dto.BucketSpan{
|
|
Offset: proto.Int32(span.Offset),
|
|
Length: proto.Uint32(span.Length),
|
|
}
|
|
}
|
|
h.NegativeDelta = helper.NegativeBuckets
|
|
return h
|
|
}
|
|
|
|
func TestManagerCTZeroIngestionHistogram(t *testing.T) {
|
|
const mName = "expected_histogram"
|
|
|
|
for _, tc := range []struct {
|
|
name string
|
|
inputHistSample *dto.Histogram
|
|
enableCTZeroIngestion bool
|
|
}{
|
|
{
|
|
name: "disabled with CT on histogram",
|
|
inputHistSample: func() *dto.Histogram {
|
|
h := generateTestHistogram(0)
|
|
h.CreatedTimestamp = timestamppb.Now()
|
|
return h
|
|
}(),
|
|
enableCTZeroIngestion: false,
|
|
},
|
|
{
|
|
name: "enabled with CT on histogram",
|
|
inputHistSample: func() *dto.Histogram {
|
|
h := generateTestHistogram(0)
|
|
h.CreatedTimestamp = timestamppb.Now()
|
|
return h
|
|
}(),
|
|
enableCTZeroIngestion: true,
|
|
},
|
|
{
|
|
name: "enabled without CT on histogram",
|
|
inputHistSample: func() *dto.Histogram {
|
|
h := generateTestHistogram(0)
|
|
return h
|
|
}(),
|
|
enableCTZeroIngestion: true,
|
|
},
|
|
} {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
app := &collectResultAppender{}
|
|
scrapeManager, err := NewManager(
|
|
&Options{
|
|
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
|
|
EnableNativeHistogramsIngestion: true,
|
|
skipOffsetting: true,
|
|
},
|
|
log.NewLogfmtLogger(os.Stderr),
|
|
nil,
|
|
&collectResultAppendable{app},
|
|
prometheus.NewRegistry(),
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
|
|
GlobalConfig: config.GlobalConfig{
|
|
// Disable regular scrapes.
|
|
ScrapeInterval: model.Duration(9999 * time.Minute),
|
|
ScrapeTimeout: model.Duration(5 * time.Second),
|
|
// Ensure the proto is chosen. We need proto as it's the only protocol
|
|
// with the CT parsing support.
|
|
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
|
|
},
|
|
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
|
|
}))
|
|
|
|
once := sync.Once{}
|
|
// Start fake HTTP target to that allow one scrape only.
|
|
server := httptest.NewServer(
|
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
fail := true // TODO(bwplotka): Kill or use?
|
|
once.Do(func() {
|
|
fail = false
|
|
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
|
|
|
|
ctrType := dto.MetricType_HISTOGRAM
|
|
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
|
|
Name: proto.String(mName),
|
|
Type: &ctrType,
|
|
Metric: []*dto.Metric{{Histogram: tc.inputHistSample}},
|
|
}))
|
|
})
|
|
|
|
if fail {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
}
|
|
}),
|
|
)
|
|
defer server.Close()
|
|
|
|
serverURL, err := url.Parse(server.URL)
|
|
require.NoError(t, err)
|
|
|
|
// Add fake target directly into tsets + reload. Normally users would use
|
|
// Manager.Run and wait for minimum 5s refresh interval.
|
|
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
|
|
"test": {{
|
|
Targets: []model.LabelSet{{
|
|
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
|
|
model.AddressLabel: model.LabelValue(serverURL.Host),
|
|
}},
|
|
}},
|
|
})
|
|
scrapeManager.reload()
|
|
|
|
var got []histogramSample
|
|
|
|
// Wait for one scrape.
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
|
defer cancel()
|
|
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
|
app.mtx.Lock()
|
|
defer app.mtx.Unlock()
|
|
|
|
// Check if scrape happened and grab the relevant histograms, they have to be there - or it's a bug
|
|
// and it's not worth waiting.
|
|
for _, h := range app.resultHistograms {
|
|
if h.metric.Get(model.MetricNameLabel) == mName {
|
|
got = append(got, h)
|
|
}
|
|
}
|
|
if len(app.resultHistograms) > 0 {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("expected some histogram samples, got none")
|
|
}), "after 1 minute")
|
|
scrapeManager.Stop()
|
|
|
|
// Check for zero samples, assuming we only injected always one histogram sample.
|
|
// Did it contain CT to inject? If yes, was CT zero enabled?
|
|
if tc.inputHistSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
|
|
require.Len(t, got, 2)
|
|
// Zero sample.
|
|
require.Equal(t, histogram.Histogram{}, *got[0].h)
|
|
// Quick soft check to make sure it's the same sample or at least not zero.
|
|
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].h.Sum)
|
|
return
|
|
}
|
|
|
|
// Expect only one, valid sample.
|
|
require.Len(t, got, 1)
|
|
// Quick soft check to make sure it's the same sample or at least not zero.
|
|
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].h.Sum)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestUnregisterMetrics(t *testing.T) {
|
|
reg := prometheus.NewRegistry()
|
|
// Check that all metrics can be unregistered, allowing a second manager to be created.
|
|
for i := 0; i < 2; i++ {
|
|
opts := Options{}
|
|
manager, err := NewManager(&opts, nil, nil, nil, reg)
|
|
require.NotNil(t, manager)
|
|
require.NoError(t, err)
|
|
// Unregister all metrics.
|
|
manager.UnregisterMetrics()
|
|
}
|
|
}
|
|
|
|
func applyConfig(
|
|
t *testing.T,
|
|
config string,
|
|
scrapeManager *Manager,
|
|
discoveryManager *discovery.Manager,
|
|
) {
|
|
t.Helper()
|
|
|
|
cfg := loadConfiguration(t, config)
|
|
require.NoError(t, scrapeManager.ApplyConfig(cfg))
|
|
|
|
c := make(map[string]discovery.Configs)
|
|
scfgs, err := cfg.GetScrapeConfigs()
|
|
require.NoError(t, err)
|
|
for _, v := range scfgs {
|
|
c[v.JobName] = v.ServiceDiscoveryConfigs
|
|
}
|
|
require.NoError(t, discoveryManager.ApplyConfig(c))
|
|
}
|
|
|
|
func runManagers(t *testing.T, ctx context.Context) (*discovery.Manager, *Manager) {
|
|
t.Helper()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
|
|
require.NoError(t, err)
|
|
discoveryManager := discovery.NewManager(
|
|
ctx,
|
|
log.NewNopLogger(),
|
|
reg,
|
|
sdMetrics,
|
|
discovery.Updatert(100*time.Millisecond),
|
|
)
|
|
scrapeManager, err := NewManager(
|
|
&Options{DiscoveryReloadInterval: model.Duration(100 * time.Millisecond)},
|
|
nil,
|
|
nil,
|
|
nopAppendable{},
|
|
prometheus.NewRegistry(),
|
|
)
|
|
require.NoError(t, err)
|
|
go discoveryManager.Run()
|
|
go scrapeManager.Run(discoveryManager.SyncCh())
|
|
return discoveryManager, scrapeManager
|
|
}
|
|
|
|
func writeIntoFile(t *testing.T, content, filePattern string) *os.File {
|
|
t.Helper()
|
|
|
|
file, err := os.CreateTemp("", filePattern)
|
|
require.NoError(t, err)
|
|
_, err = file.WriteString(content)
|
|
require.NoError(t, err)
|
|
return file
|
|
}
|
|
|
|
func requireTargets(
|
|
t *testing.T,
|
|
scrapeManager *Manager,
|
|
jobName string,
|
|
waitToAppear bool,
|
|
expectedTargets []string,
|
|
) {
|
|
t.Helper()
|
|
|
|
require.Eventually(t, func() bool {
|
|
targets, ok := scrapeManager.TargetsActive()[jobName]
|
|
if !ok {
|
|
if waitToAppear {
|
|
return false
|
|
}
|
|
t.Fatalf("job %s shouldn't be dropped", jobName)
|
|
}
|
|
if expectedTargets == nil {
|
|
return targets == nil
|
|
}
|
|
if len(targets) != len(expectedTargets) {
|
|
return false
|
|
}
|
|
sTargets := []string{}
|
|
for _, t := range targets {
|
|
sTargets = append(sTargets, t.String())
|
|
}
|
|
sort.Strings(expectedTargets)
|
|
sort.Strings(sTargets)
|
|
for i, t := range sTargets {
|
|
if t != expectedTargets[i] {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}, 1*time.Second, 100*time.Millisecond)
|
|
}
|
|
|
|
// TestTargetDisappearsAfterProviderRemoved makes sure that when a provider is dropped, (only) its targets are dropped.
|
|
func TestTargetDisappearsAfterProviderRemoved(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
myJob := "my-job"
|
|
myJobSDTargetURL := "my:9876"
|
|
myJobStaticTargetURL := "my:5432"
|
|
|
|
sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, myJobSDTargetURL)
|
|
sDFile := writeIntoFile(t, sdFileContent, "*targets.json")
|
|
|
|
baseConfig := `
|
|
scrape_configs:
|
|
- job_name: %s
|
|
static_configs:
|
|
- targets: ['%s']
|
|
file_sd_configs:
|
|
- files: ['%s']
|
|
`
|
|
|
|
discoveryManager, scrapeManager := runManagers(t, ctx)
|
|
defer scrapeManager.Stop()
|
|
|
|
applyConfig(
|
|
t,
|
|
fmt.Sprintf(
|
|
baseConfig,
|
|
myJob,
|
|
myJobStaticTargetURL,
|
|
sDFile.Name(),
|
|
),
|
|
scrapeManager,
|
|
discoveryManager,
|
|
)
|
|
// Make sure the jobs targets are taken into account
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
myJob,
|
|
true,
|
|
[]string{
|
|
fmt.Sprintf("http://%s/metrics", myJobSDTargetURL),
|
|
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL),
|
|
},
|
|
)
|
|
|
|
// Apply a new config where a provider is removed
|
|
baseConfig = `
|
|
scrape_configs:
|
|
- job_name: %s
|
|
static_configs:
|
|
- targets: ['%s']
|
|
`
|
|
applyConfig(
|
|
t,
|
|
fmt.Sprintf(
|
|
baseConfig,
|
|
myJob,
|
|
myJobStaticTargetURL,
|
|
),
|
|
scrapeManager,
|
|
discoveryManager,
|
|
)
|
|
// Make sure the corresponding target was dropped
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
myJob,
|
|
false,
|
|
[]string{
|
|
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL),
|
|
},
|
|
)
|
|
|
|
// Apply a new config with no providers
|
|
baseConfig = `
|
|
scrape_configs:
|
|
- job_name: %s
|
|
`
|
|
applyConfig(
|
|
t,
|
|
fmt.Sprintf(
|
|
baseConfig,
|
|
myJob,
|
|
),
|
|
scrapeManager,
|
|
discoveryManager,
|
|
)
|
|
// Make sure the corresponding target was dropped
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
myJob,
|
|
false,
|
|
nil,
|
|
)
|
|
}
|
|
|
|
// TestOnlyProviderStaleTargetsAreDropped makes sure that when a job has only one provider with multiple targets
|
|
// and when the provider can no longer discover some of those targets, only those stale targets are dropped.
|
|
func TestOnlyProviderStaleTargetsAreDropped(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
jobName := "my-job"
|
|
jobTarget1URL := "foo:9876"
|
|
jobTarget2URL := "foo:5432"
|
|
|
|
sdFile1Content := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTarget1URL)
|
|
sdFile2Content := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTarget2URL)
|
|
sDFile1 := writeIntoFile(t, sdFile1Content, "*targets.json")
|
|
sDFile2 := writeIntoFile(t, sdFile2Content, "*targets.json")
|
|
|
|
baseConfig := `
|
|
scrape_configs:
|
|
- job_name: %s
|
|
file_sd_configs:
|
|
- files: ['%s', '%s']
|
|
`
|
|
discoveryManager, scrapeManager := runManagers(t, ctx)
|
|
defer scrapeManager.Stop()
|
|
|
|
applyConfig(
|
|
t,
|
|
fmt.Sprintf(baseConfig, jobName, sDFile1.Name(), sDFile2.Name()),
|
|
scrapeManager,
|
|
discoveryManager,
|
|
)
|
|
|
|
// Make sure the job's targets are taken into account
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
jobName,
|
|
true,
|
|
[]string{
|
|
fmt.Sprintf("http://%s/metrics", jobTarget1URL),
|
|
fmt.Sprintf("http://%s/metrics", jobTarget2URL),
|
|
},
|
|
)
|
|
|
|
// Apply the same config for the same job but with a non existing file to make the provider
|
|
// unable to discover some targets
|
|
applyConfig(
|
|
t,
|
|
fmt.Sprintf(baseConfig, jobName, sDFile1.Name(), "/idontexistdoi.json"),
|
|
scrapeManager,
|
|
discoveryManager,
|
|
)
|
|
|
|
// The old target should get dropped
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
jobName,
|
|
false,
|
|
[]string{fmt.Sprintf("http://%s/metrics", jobTarget1URL)},
|
|
)
|
|
}
|
|
|
|
// TestProviderStaleTargetsAreDropped makes sure that when a job has only one provider and when that provider
|
|
// should no longer discover targets, the targets of that provider are dropped.
|
|
// See: https://github.com/prometheus/prometheus/issues/12858
|
|
func TestProviderStaleTargetsAreDropped(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
jobName := "my-job"
|
|
jobTargetURL := "foo:9876"
|
|
|
|
sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, jobTargetURL)
|
|
sDFile := writeIntoFile(t, sdFileContent, "*targets.json")
|
|
|
|
baseConfig := `
|
|
scrape_configs:
|
|
- job_name: %s
|
|
file_sd_configs:
|
|
- files: ['%s']
|
|
`
|
|
discoveryManager, scrapeManager := runManagers(t, ctx)
|
|
defer scrapeManager.Stop()
|
|
|
|
applyConfig(
|
|
t,
|
|
fmt.Sprintf(baseConfig, jobName, sDFile.Name()),
|
|
scrapeManager,
|
|
discoveryManager,
|
|
)
|
|
|
|
// Make sure the job's targets are taken into account
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
jobName,
|
|
true,
|
|
[]string{
|
|
fmt.Sprintf("http://%s/metrics", jobTargetURL),
|
|
},
|
|
)
|
|
|
|
// Apply the same config for the same job but with a non existing file to make the provider
|
|
// unable to discover some targets
|
|
applyConfig(
|
|
t,
|
|
fmt.Sprintf(baseConfig, jobName, "/idontexistdoi.json"),
|
|
scrapeManager,
|
|
discoveryManager,
|
|
)
|
|
|
|
// The old target should get dropped
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
jobName,
|
|
false,
|
|
nil,
|
|
)
|
|
}
|
|
|
|
// TestOnlyStaleTargetsAreDropped makes sure that when a job has multiple providers, when one of them should no
|
|
// longer discover targets, only the stale targets of that provider are dropped.
|
|
func TestOnlyStaleTargetsAreDropped(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
myJob := "my-job"
|
|
myJobSDTargetURL := "my:9876"
|
|
myJobStaticTargetURL := "my:5432"
|
|
otherJob := "other-job"
|
|
otherJobTargetURL := "other:1234"
|
|
|
|
sdFileContent := fmt.Sprintf(`[{"targets": ["%s"]}]`, myJobSDTargetURL)
|
|
sDFile := writeIntoFile(t, sdFileContent, "*targets.json")
|
|
|
|
baseConfig := `
|
|
scrape_configs:
|
|
- job_name: %s
|
|
static_configs:
|
|
- targets: ['%s']
|
|
file_sd_configs:
|
|
- files: ['%s']
|
|
- job_name: %s
|
|
static_configs:
|
|
- targets: ['%s']
|
|
`
|
|
|
|
discoveryManager, scrapeManager := runManagers(t, ctx)
|
|
defer scrapeManager.Stop()
|
|
|
|
// Apply the initial config with an existing file
|
|
applyConfig(
|
|
t,
|
|
fmt.Sprintf(
|
|
baseConfig,
|
|
myJob,
|
|
myJobStaticTargetURL,
|
|
sDFile.Name(),
|
|
otherJob,
|
|
otherJobTargetURL,
|
|
),
|
|
scrapeManager,
|
|
discoveryManager,
|
|
)
|
|
// Make sure the jobs targets are taken into account
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
myJob,
|
|
true,
|
|
[]string{
|
|
fmt.Sprintf("http://%s/metrics", myJobSDTargetURL),
|
|
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL),
|
|
},
|
|
)
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
otherJob,
|
|
true,
|
|
[]string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)},
|
|
)
|
|
|
|
// Apply the same config with a non existing file for myJob
|
|
applyConfig(
|
|
t,
|
|
fmt.Sprintf(
|
|
baseConfig,
|
|
myJob,
|
|
myJobStaticTargetURL,
|
|
"/idontexistdoi.json",
|
|
otherJob,
|
|
otherJobTargetURL,
|
|
),
|
|
scrapeManager,
|
|
discoveryManager,
|
|
)
|
|
|
|
// Only the SD target should get dropped for myJob
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
myJob,
|
|
false,
|
|
[]string{
|
|
fmt.Sprintf("http://%s/metrics", myJobStaticTargetURL),
|
|
},
|
|
)
|
|
// The otherJob should keep its target
|
|
requireTargets(
|
|
t,
|
|
scrapeManager,
|
|
otherJob,
|
|
false,
|
|
[]string{fmt.Sprintf("http://%s/metrics", otherJobTargetURL)},
|
|
)
|
|
}
|