mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #15637 from prometheus/fix15538-2
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
config: Remove validation GetScrapeConfigs; require using config.Load.
This commit is contained in:
commit
fec7ca052a
|
@ -593,12 +593,14 @@ func main() {
|
||||||
logger.Error(fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "file", absPath, "err", err)
|
logger.Error(fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "file", absPath, "err", err)
|
||||||
os.Exit(2)
|
os.Exit(2)
|
||||||
}
|
}
|
||||||
|
// Get scrape configs to validate dynamically loaded scrape_config_files.
|
||||||
|
// They can change over time, but do the extra validation on startup for better experience.
|
||||||
if _, err := cfgFile.GetScrapeConfigs(); err != nil {
|
if _, err := cfgFile.GetScrapeConfigs(); err != nil {
|
||||||
absPath, pathErr := filepath.Abs(cfg.configFile)
|
absPath, pathErr := filepath.Abs(cfg.configFile)
|
||||||
if pathErr != nil {
|
if pathErr != nil {
|
||||||
absPath = cfg.configFile
|
absPath = cfg.configFile
|
||||||
}
|
}
|
||||||
logger.Error(fmt.Sprintf("Error loading scrape config files from config (--config.file=%q)", cfg.configFile), "file", absPath, "err", err)
|
logger.Error(fmt.Sprintf("Error loading dynamic scrape config files from config (--config.file=%q)", cfg.configFile), "file", absPath, "err", err)
|
||||||
os.Exit(2)
|
os.Exit(2)
|
||||||
}
|
}
|
||||||
if cfg.tsdb.EnableExemplarStorage {
|
if cfg.tsdb.EnableExemplarStorage {
|
||||||
|
|
|
@ -117,11 +117,12 @@ func Load(s string, logger *slog.Logger) (*Config, error) {
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unsupported OTLP translation strategy %q", cfg.OTLPConfig.TranslationStrategy)
|
return nil, fmt.Errorf("unsupported OTLP translation strategy %q", cfg.OTLPConfig.TranslationStrategy)
|
||||||
}
|
}
|
||||||
|
cfg.loaded = true
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadFile parses the given YAML file into a Config.
|
// LoadFile parses and validates the given YAML file into a read-only Config.
|
||||||
|
// Callers should never write to or shallow copy the returned Config.
|
||||||
func LoadFile(filename string, agentMode bool, logger *slog.Logger) (*Config, error) {
|
func LoadFile(filename string, agentMode bool, logger *slog.Logger) (*Config, error) {
|
||||||
content, err := os.ReadFile(filename)
|
content, err := os.ReadFile(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -270,9 +271,12 @@ type Config struct {
|
||||||
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
|
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
|
||||||
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
|
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
|
||||||
OTLPConfig OTLPConfig `yaml:"otlp,omitempty"`
|
OTLPConfig OTLPConfig `yaml:"otlp,omitempty"`
|
||||||
|
|
||||||
|
loaded bool // Certain methods require configuration to use Load validation.
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetDirectory joins any relative file paths with dir.
|
// SetDirectory joins any relative file paths with dir.
|
||||||
|
// This method writes to config, and it's not concurrency safe.
|
||||||
func (c *Config) SetDirectory(dir string) {
|
func (c *Config) SetDirectory(dir string) {
|
||||||
c.GlobalConfig.SetDirectory(dir)
|
c.GlobalConfig.SetDirectory(dir)
|
||||||
c.AlertingConfig.SetDirectory(dir)
|
c.AlertingConfig.SetDirectory(dir)
|
||||||
|
@ -302,24 +306,26 @@ func (c Config) String() string {
|
||||||
return string(b)
|
return string(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetScrapeConfigs returns the scrape configurations.
|
// GetScrapeConfigs returns the read-only, validated scrape configurations including
|
||||||
|
// the ones from the scrape_config_files.
|
||||||
|
// This method does not write to config, and it's concurrency safe (the pointer receiver is for efficiency).
|
||||||
|
// This method also assumes the Config was created by Load or LoadFile function, it returns error
|
||||||
|
// if it was not. We can't re-validate or apply globals here due to races,
|
||||||
|
// read more https://github.com/prometheus/prometheus/issues/15538.
|
||||||
func (c *Config) GetScrapeConfigs() ([]*ScrapeConfig, error) {
|
func (c *Config) GetScrapeConfigs() ([]*ScrapeConfig, error) {
|
||||||
scfgs := make([]*ScrapeConfig, len(c.ScrapeConfigs))
|
if !c.loaded {
|
||||||
|
// Programmatic error, we warn before more confusing errors would happen due to lack of the globalization.
|
||||||
|
return nil, errors.New("scrape config cannot be fetched, main config was not validated and loaded correctly; should not happen")
|
||||||
|
}
|
||||||
|
|
||||||
|
scfgs := make([]*ScrapeConfig, len(c.ScrapeConfigs))
|
||||||
jobNames := map[string]string{}
|
jobNames := map[string]string{}
|
||||||
for i, scfg := range c.ScrapeConfigs {
|
for i, scfg := range c.ScrapeConfigs {
|
||||||
// We do these checks for library users that would not call validate in
|
|
||||||
// Unmarshal.
|
|
||||||
if err := scfg.Validate(c.GlobalConfig); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, ok := jobNames[scfg.JobName]; ok {
|
|
||||||
return nil, fmt.Errorf("found multiple scrape configs with job name %q", scfg.JobName)
|
|
||||||
}
|
|
||||||
jobNames[scfg.JobName] = "main config file"
|
jobNames[scfg.JobName] = "main config file"
|
||||||
scfgs[i] = scfg
|
scfgs[i] = scfg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Re-read and validate the dynamic scrape config rules.
|
||||||
for _, pat := range c.ScrapeConfigFiles {
|
for _, pat := range c.ScrapeConfigFiles {
|
||||||
fs, err := filepath.Glob(pat)
|
fs, err := filepath.Glob(pat)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -355,6 +361,7 @@ func (c *Config) GetScrapeConfigs() ([]*ScrapeConfig, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||||
|
// NOTE: This method should not be used outside of this package. Use Load or LoadFile instead.
|
||||||
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||||
*c = DefaultConfig
|
*c = DefaultConfig
|
||||||
// We want to set c to the defaults and then overwrite it with the input.
|
// We want to set c to the defaults and then overwrite it with the input.
|
||||||
|
@ -391,18 +398,18 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do global overrides and validate unique names.
|
// Do global overrides and validation.
|
||||||
jobNames := map[string]struct{}{}
|
jobNames := map[string]struct{}{}
|
||||||
for _, scfg := range c.ScrapeConfigs {
|
for _, scfg := range c.ScrapeConfigs {
|
||||||
if err := scfg.Validate(c.GlobalConfig); err != nil {
|
if err := scfg.Validate(c.GlobalConfig); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := jobNames[scfg.JobName]; ok {
|
if _, ok := jobNames[scfg.JobName]; ok {
|
||||||
return fmt.Errorf("found multiple scrape configs with job name %q", scfg.JobName)
|
return fmt.Errorf("found multiple scrape configs with job name %q", scfg.JobName)
|
||||||
}
|
}
|
||||||
jobNames[scfg.JobName] = struct{}{}
|
jobNames[scfg.JobName] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
rwNames := map[string]struct{}{}
|
rwNames := map[string]struct{}{}
|
||||||
for _, rwcfg := range c.RemoteWriteConfigs {
|
for _, rwcfg := range c.RemoteWriteConfigs {
|
||||||
if rwcfg == nil {
|
if rwcfg == nil {
|
||||||
|
|
|
@ -18,6 +18,8 @@ package config
|
||||||
const ruleFilesConfigFile = "testdata/rules_abs_path.good.yml"
|
const ruleFilesConfigFile = "testdata/rules_abs_path.good.yml"
|
||||||
|
|
||||||
var ruleFilesExpectedConf = &Config{
|
var ruleFilesExpectedConf = &Config{
|
||||||
|
loaded: true,
|
||||||
|
|
||||||
GlobalConfig: DefaultGlobalConfig,
|
GlobalConfig: DefaultGlobalConfig,
|
||||||
Runtime: DefaultRuntimeConfig,
|
Runtime: DefaultRuntimeConfig,
|
||||||
RuleFiles: []string{
|
RuleFiles: []string{
|
||||||
|
|
|
@ -87,6 +87,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var expectedConf = &Config{
|
var expectedConf = &Config{
|
||||||
|
loaded: true,
|
||||||
GlobalConfig: GlobalConfig{
|
GlobalConfig: GlobalConfig{
|
||||||
ScrapeInterval: model.Duration(15 * time.Second),
|
ScrapeInterval: model.Duration(15 * time.Second),
|
||||||
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
|
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
|
||||||
|
@ -1512,10 +1513,10 @@ func TestYAMLRoundtrip(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
out, err := yaml.Marshal(want)
|
out, err := yaml.Marshal(want)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
got := &Config{}
|
|
||||||
require.NoError(t, yaml.UnmarshalStrict(out, got))
|
got, err := Load(string(out), promslog.NewNopLogger())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, want, got)
|
require.Equal(t, want, got)
|
||||||
}
|
}
|
||||||
|
@ -1525,10 +1526,10 @@ func TestRemoteWriteRetryOnRateLimit(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
out, err := yaml.Marshal(want)
|
out, err := yaml.Marshal(want)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
got := &Config{}
|
|
||||||
require.NoError(t, yaml.UnmarshalStrict(out, got))
|
got, err := Load(string(out), promslog.NewNopLogger())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.True(t, got.RemoteWriteConfigs[0].QueueConfig.RetryOnRateLimit)
|
require.True(t, got.RemoteWriteConfigs[0].QueueConfig.RetryOnRateLimit)
|
||||||
require.False(t, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit)
|
require.False(t, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit)
|
||||||
|
@ -2219,6 +2220,7 @@ func TestEmptyConfig(t *testing.T) {
|
||||||
c, err := Load("", promslog.NewNopLogger())
|
c, err := Load("", promslog.NewNopLogger())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
exp := DefaultConfig
|
exp := DefaultConfig
|
||||||
|
exp.loaded = true
|
||||||
require.Equal(t, exp, *c)
|
require.Equal(t, exp, *c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2268,6 +2270,7 @@ func TestEmptyGlobalBlock(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
exp := DefaultConfig
|
exp := DefaultConfig
|
||||||
exp.Runtime = DefaultRuntimeConfig
|
exp.Runtime = DefaultRuntimeConfig
|
||||||
|
exp.loaded = true
|
||||||
require.Equal(t, exp, *c)
|
require.Equal(t, exp, *c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2548,3 +2551,18 @@ func TestScrapeProtocolHeader(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Regression test against https://github.com/prometheus/prometheus/issues/15538
|
||||||
|
func TestGetScrapeConfigs_Loaded(t *testing.T) {
|
||||||
|
t.Run("without load", func(t *testing.T) {
|
||||||
|
c := &Config{}
|
||||||
|
_, err := c.GetScrapeConfigs()
|
||||||
|
require.EqualError(t, err, "scrape config cannot be fetched, main config was not validated and loaded correctly; should not happen")
|
||||||
|
})
|
||||||
|
t.Run("with load", func(t *testing.T) {
|
||||||
|
c, err := Load("", promslog.NewNopLogger())
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = c.GetScrapeConfigs()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -16,6 +16,8 @@ package config
|
||||||
const ruleFilesConfigFile = "testdata/rules_abs_path_windows.good.yml"
|
const ruleFilesConfigFile = "testdata/rules_abs_path_windows.good.yml"
|
||||||
|
|
||||||
var ruleFilesExpectedConf = &Config{
|
var ruleFilesExpectedConf = &Config{
|
||||||
|
loaded: true,
|
||||||
|
|
||||||
GlobalConfig: DefaultGlobalConfig,
|
GlobalConfig: DefaultGlobalConfig,
|
||||||
Runtime: DefaultRuntimeConfig,
|
Runtime: DefaultRuntimeConfig,
|
||||||
RuleFiles: []string{
|
RuleFiles: []string{
|
||||||
|
|
|
@ -38,6 +38,8 @@ import (
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/timestamp"
|
"github.com/prometheus/prometheus/model/timestamp"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
@ -468,10 +470,8 @@ func TestPopulateLabels(t *testing.T) {
|
||||||
func loadConfiguration(t testing.TB, c string) *config.Config {
|
func loadConfiguration(t testing.TB, c string) *config.Config {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
cfg := &config.Config{}
|
cfg, err := config.Load(c, promslog.NewNopLogger())
|
||||||
err := yaml.UnmarshalStrict([]byte(c), cfg)
|
require.NoError(t, err)
|
||||||
require.NoError(t, err, "Unable to load YAML config.")
|
|
||||||
|
|
||||||
return cfg
|
return cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -724,33 +724,6 @@ scrape_configs:
|
||||||
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
|
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupScrapeManager(t *testing.T, honorTimestamps, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) {
|
|
||||||
app := &collectResultAppender{}
|
|
||||||
scrapeManager, err := NewManager(
|
|
||||||
&Options{
|
|
||||||
EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion,
|
|
||||||
skipOffsetting: true,
|
|
||||||
},
|
|
||||||
promslog.New(&promslog.Config{}),
|
|
||||||
nil,
|
|
||||||
&collectResultAppendable{app},
|
|
||||||
prometheus.NewRegistry(),
|
|
||||||
)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
|
|
||||||
GlobalConfig: config.GlobalConfig{
|
|
||||||
// Disable regular scrapes.
|
|
||||||
ScrapeInterval: model.Duration(9999 * time.Minute),
|
|
||||||
ScrapeTimeout: model.Duration(5 * time.Second),
|
|
||||||
ScrapeProtocols: []config.ScrapeProtocol{config.OpenMetricsText1_0_0, config.PrometheusProto},
|
|
||||||
},
|
|
||||||
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test", HonorTimestamps: honorTimestamps}},
|
|
||||||
}))
|
|
||||||
|
|
||||||
return app, scrapeManager
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server {
|
func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server {
|
||||||
once := sync.Once{}
|
once := sync.Once{}
|
||||||
|
|
||||||
|
@ -789,6 +762,9 @@ func TestManagerCTZeroIngestion(t *testing.T) {
|
||||||
t.Run(fmt.Sprintf("withCT=%v", testWithCT), func(t *testing.T) {
|
t.Run(fmt.Sprintf("withCT=%v", testWithCT), func(t *testing.T) {
|
||||||
for _, testCTZeroIngest := range []bool{false, true} {
|
for _, testCTZeroIngest := range []bool{false, true} {
|
||||||
t.Run(fmt.Sprintf("ctZeroIngest=%v", testCTZeroIngest), func(t *testing.T) {
|
t.Run(fmt.Sprintf("ctZeroIngest=%v", testCTZeroIngest), func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
sampleTs := time.Now()
|
sampleTs := time.Now()
|
||||||
ctTs := time.Time{}
|
ctTs := time.Time{}
|
||||||
if testWithCT {
|
if testWithCT {
|
||||||
|
@ -797,10 +773,45 @@ func TestManagerCTZeroIngestion(t *testing.T) {
|
||||||
|
|
||||||
// TODO(bwplotka): Add more types than just counter?
|
// TODO(bwplotka): Add more types than just counter?
|
||||||
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, ctTs)
|
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, ctTs)
|
||||||
app, scrapeManager := setupScrapeManager(t, true, testCTZeroIngest)
|
|
||||||
|
|
||||||
// Perform the test.
|
app := &collectResultAppender{}
|
||||||
doOneScrape(t, scrapeManager, app, setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded))
|
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||||
|
EnableCreatedTimestampZeroIngestion: testCTZeroIngest,
|
||||||
|
skipOffsetting: true,
|
||||||
|
}, &collectResultAppendable{app})
|
||||||
|
defer scrapeManager.Stop()
|
||||||
|
|
||||||
|
server := setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded)
|
||||||
|
serverURL, err := url.Parse(server.URL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testConfig := fmt.Sprintf(`
|
||||||
|
global:
|
||||||
|
# Disable regular scrapes.
|
||||||
|
scrape_interval: 9999m
|
||||||
|
scrape_timeout: 5s
|
||||||
|
|
||||||
|
scrape_configs:
|
||||||
|
- job_name: test
|
||||||
|
honor_timestamps: true
|
||||||
|
static_configs:
|
||||||
|
- targets: ['%s']
|
||||||
|
`, serverURL.Host)
|
||||||
|
applyConfig(t, testConfig, scrapeManager, discoveryManager)
|
||||||
|
|
||||||
|
// Wait for one scrape.
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
||||||
|
app.mtx.Lock()
|
||||||
|
defer app.mtx.Unlock()
|
||||||
|
|
||||||
|
// Check if scrape happened and grab the relevant samples.
|
||||||
|
if len(app.resultFloats) > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errors.New("expected some float samples, got none")
|
||||||
|
}), "after 1 minute")
|
||||||
|
|
||||||
// Verify results.
|
// Verify results.
|
||||||
// Verify what we got vs expectations around CT injection.
|
// Verify what we got vs expectations around CT injection.
|
||||||
|
@ -871,39 +882,6 @@ func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func doOneScrape(t *testing.T, manager *Manager, appender *collectResultAppender, server *httptest.Server) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
serverURL, err := url.Parse(server.URL)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Add fake target directly into tsets + reload
|
|
||||||
manager.updateTsets(map[string][]*targetgroup.Group{
|
|
||||||
"test": {{
|
|
||||||
Targets: []model.LabelSet{{
|
|
||||||
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
|
|
||||||
model.AddressLabel: model.LabelValue(serverURL.Host),
|
|
||||||
}},
|
|
||||||
}},
|
|
||||||
})
|
|
||||||
manager.reload()
|
|
||||||
|
|
||||||
// Wait for one scrape.
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
|
||||||
defer cancel()
|
|
||||||
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
|
||||||
appender.mtx.Lock()
|
|
||||||
defer appender.mtx.Unlock()
|
|
||||||
|
|
||||||
// Check if scrape happened and grab the relevant samples.
|
|
||||||
if len(appender.resultFloats) > 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return errors.New("expected some float samples, got none")
|
|
||||||
}), "after 1 minute")
|
|
||||||
manager.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) {
|
func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) {
|
||||||
for _, f := range floats {
|
for _, f := range floats {
|
||||||
if f.metric.Get(model.MetricNameLabel) == metricName {
|
if f.metric.Get(model.MetricNameLabel) == metricName {
|
||||||
|
@ -978,37 +956,22 @@ func TestManagerCTZeroIngestionHistogram(t *testing.T) {
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
app := &collectResultAppender{}
|
app := &collectResultAppender{}
|
||||||
scrapeManager, err := NewManager(
|
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||||
&Options{
|
|
||||||
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
|
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
|
||||||
EnableNativeHistogramsIngestion: true,
|
EnableNativeHistogramsIngestion: true,
|
||||||
skipOffsetting: true,
|
skipOffsetting: true,
|
||||||
},
|
}, &collectResultAppendable{app})
|
||||||
promslog.New(&promslog.Config{}),
|
defer scrapeManager.Stop()
|
||||||
nil,
|
|
||||||
&collectResultAppendable{app},
|
|
||||||
prometheus.NewRegistry(),
|
|
||||||
)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
|
|
||||||
GlobalConfig: config.GlobalConfig{
|
|
||||||
// Disable regular scrapes.
|
|
||||||
ScrapeInterval: model.Duration(9999 * time.Minute),
|
|
||||||
ScrapeTimeout: model.Duration(5 * time.Second),
|
|
||||||
// Ensure the proto is chosen. We need proto as it's the only protocol
|
|
||||||
// with the CT parsing support.
|
|
||||||
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
|
|
||||||
},
|
|
||||||
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
|
|
||||||
}))
|
|
||||||
|
|
||||||
once := sync.Once{}
|
once := sync.Once{}
|
||||||
// Start fake HTTP target to that allow one scrape only.
|
// Start fake HTTP target to that allow one scrape only.
|
||||||
server := httptest.NewServer(
|
server := httptest.NewServer(
|
||||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
fail := true // TODO(bwplotka): Kill or use?
|
fail := true
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
fail = false
|
fail = false
|
||||||
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
|
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
|
||||||
|
@ -1031,22 +994,23 @@ func TestManagerCTZeroIngestionHistogram(t *testing.T) {
|
||||||
serverURL, err := url.Parse(server.URL)
|
serverURL, err := url.Parse(server.URL)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Add fake target directly into tsets + reload. Normally users would use
|
testConfig := fmt.Sprintf(`
|
||||||
// Manager.Run and wait for minimum 5s refresh interval.
|
global:
|
||||||
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
|
# Disable regular scrapes.
|
||||||
"test": {{
|
scrape_interval: 9999m
|
||||||
Targets: []model.LabelSet{{
|
scrape_timeout: 5s
|
||||||
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
|
|
||||||
model.AddressLabel: model.LabelValue(serverURL.Host),
|
scrape_configs:
|
||||||
}},
|
- job_name: test
|
||||||
}},
|
static_configs:
|
||||||
})
|
- targets: ['%s']
|
||||||
scrapeManager.reload()
|
`, serverURL.Host)
|
||||||
|
applyConfig(t, testConfig, scrapeManager, discoveryManager)
|
||||||
|
|
||||||
var got []histogramSample
|
var got []histogramSample
|
||||||
|
|
||||||
// Wait for one scrape.
|
// Wait for one scrape.
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
|
||||||
app.mtx.Lock()
|
app.mtx.Lock()
|
||||||
|
@ -1064,7 +1028,6 @@ func TestManagerCTZeroIngestionHistogram(t *testing.T) {
|
||||||
}
|
}
|
||||||
return errors.New("expected some histogram samples, got none")
|
return errors.New("expected some histogram samples, got none")
|
||||||
}), "after 1 minute")
|
}), "after 1 minute")
|
||||||
scrapeManager.Stop()
|
|
||||||
|
|
||||||
// Check for zero samples, assuming we only injected always one histogram sample.
|
// Check for zero samples, assuming we only injected always one histogram sample.
|
||||||
// Did it contain CT to inject? If yes, was CT zero enabled?
|
// Did it contain CT to inject? If yes, was CT zero enabled?
|
||||||
|
@ -1118,9 +1081,17 @@ func applyConfig(
|
||||||
require.NoError(t, discoveryManager.ApplyConfig(c))
|
require.NoError(t, discoveryManager.ApplyConfig(c))
|
||||||
}
|
}
|
||||||
|
|
||||||
func runManagers(t *testing.T, ctx context.Context) (*discovery.Manager, *Manager) {
|
func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.Appendable) (*discovery.Manager, *Manager) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
if opts == nil {
|
||||||
|
opts = &Options{}
|
||||||
|
}
|
||||||
|
opts.DiscoveryReloadInterval = model.Duration(100 * time.Millisecond)
|
||||||
|
if app == nil {
|
||||||
|
app = nopAppendable{}
|
||||||
|
}
|
||||||
|
|
||||||
reg := prometheus.NewRegistry()
|
reg := prometheus.NewRegistry()
|
||||||
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
|
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -1132,10 +1103,10 @@ func runManagers(t *testing.T, ctx context.Context) (*discovery.Manager, *Manage
|
||||||
discovery.Updatert(100*time.Millisecond),
|
discovery.Updatert(100*time.Millisecond),
|
||||||
)
|
)
|
||||||
scrapeManager, err := NewManager(
|
scrapeManager, err := NewManager(
|
||||||
&Options{DiscoveryReloadInterval: model.Duration(100 * time.Millisecond)},
|
opts,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
nopAppendable{},
|
app,
|
||||||
prometheus.NewRegistry(),
|
prometheus.NewRegistry(),
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -1213,7 +1184,7 @@ scrape_configs:
|
||||||
- files: ['%s']
|
- files: ['%s']
|
||||||
`
|
`
|
||||||
|
|
||||||
discoveryManager, scrapeManager := runManagers(t, ctx)
|
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
|
||||||
defer scrapeManager.Stop()
|
defer scrapeManager.Stop()
|
||||||
|
|
||||||
applyConfig(
|
applyConfig(
|
||||||
|
@ -1312,7 +1283,7 @@ scrape_configs:
|
||||||
file_sd_configs:
|
file_sd_configs:
|
||||||
- files: ['%s', '%s']
|
- files: ['%s', '%s']
|
||||||
`
|
`
|
||||||
discoveryManager, scrapeManager := runManagers(t, ctx)
|
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
|
||||||
defer scrapeManager.Stop()
|
defer scrapeManager.Stop()
|
||||||
|
|
||||||
applyConfig(
|
applyConfig(
|
||||||
|
@ -1372,7 +1343,7 @@ scrape_configs:
|
||||||
file_sd_configs:
|
file_sd_configs:
|
||||||
- files: ['%s']
|
- files: ['%s']
|
||||||
`
|
`
|
||||||
discoveryManager, scrapeManager := runManagers(t, ctx)
|
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
|
||||||
defer scrapeManager.Stop()
|
defer scrapeManager.Stop()
|
||||||
|
|
||||||
applyConfig(
|
applyConfig(
|
||||||
|
@ -1439,7 +1410,7 @@ scrape_configs:
|
||||||
- targets: ['%s']
|
- targets: ['%s']
|
||||||
`
|
`
|
||||||
|
|
||||||
discoveryManager, scrapeManager := runManagers(t, ctx)
|
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
|
||||||
defer scrapeManager.Stop()
|
defer scrapeManager.Stop()
|
||||||
|
|
||||||
// Apply the initial config with an existing file
|
// Apply the initial config with an existing file
|
||||||
|
|
Loading…
Reference in a new issue