diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f8a4ecd8c9..cd0775bbdd 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -593,12 +593,14 @@ func main() { logger.Error(fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "file", absPath, "err", err) os.Exit(2) } + // Get scrape configs to validate dynamically loaded scrape_config_files. + // They can change over time, but do the extra validation on startup for better experience. if _, err := cfgFile.GetScrapeConfigs(); err != nil { absPath, pathErr := filepath.Abs(cfg.configFile) if pathErr != nil { absPath = cfg.configFile } - logger.Error(fmt.Sprintf("Error loading scrape config files from config (--config.file=%q)", cfg.configFile), "file", absPath, "err", err) + logger.Error(fmt.Sprintf("Error loading dynamic scrape config files from config (--config.file=%q)", cfg.configFile), "file", absPath, "err", err) os.Exit(2) } if cfg.tsdb.EnableExemplarStorage { diff --git a/config/config.go b/config/config.go index 86d8563536..551bbd5127 100644 --- a/config/config.go +++ b/config/config.go @@ -117,11 +117,12 @@ func Load(s string, logger *slog.Logger) (*Config, error) { default: return nil, fmt.Errorf("unsupported OTLP translation strategy %q", cfg.OTLPConfig.TranslationStrategy) } - + cfg.loaded = true return cfg, nil } -// LoadFile parses the given YAML file into a Config. +// LoadFile parses and validates the given YAML file into a read-only Config. +// Callers should never write to or shallow copy the returned Config. func LoadFile(filename string, agentMode bool, logger *slog.Logger) (*Config, error) { content, err := os.ReadFile(filename) if err != nil { @@ -270,9 +271,12 @@ type Config struct { RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"` RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"` OTLPConfig OTLPConfig `yaml:"otlp,omitempty"` + + loaded bool // Certain methods require configuration to use Load validation. } // SetDirectory joins any relative file paths with dir. +// This method writes to config, and it's not concurrency safe. func (c *Config) SetDirectory(dir string) { c.GlobalConfig.SetDirectory(dir) c.AlertingConfig.SetDirectory(dir) @@ -302,24 +306,24 @@ func (c Config) String() string { return string(b) } -// GetScrapeConfigs returns the scrape configurations. +// GetScrapeConfigs returns the read-only, validated scrape configurations including +// the ones from the scrape_config_files. +// This method does not write to config, and it's concurrency safe (the pointer receiver is for efficiency). +// This method also assumes the Config was created by Load, due to races, +// read mode https://github.com/prometheus/prometheus/issues/15538. func (c *Config) GetScrapeConfigs() ([]*ScrapeConfig, error) { - scfgs := make([]*ScrapeConfig, len(c.ScrapeConfigs)) + if !c.loaded { + return nil, errors.New("main config scrape configs was not validated and loaded; GetScrapeConfigs method can only be used on configuration from the config.Load or config.LoadFile") + } + scfgs := make([]*ScrapeConfig, len(c.ScrapeConfigs)) jobNames := map[string]string{} for i, scfg := range c.ScrapeConfigs { - // We do these checks for library users that would not call validate in - // Unmarshal. - if err := scfg.Validate(c.GlobalConfig); err != nil { - return nil, err - } - - if _, ok := jobNames[scfg.JobName]; ok { - return nil, fmt.Errorf("found multiple scrape configs with job name %q", scfg.JobName) - } jobNames[scfg.JobName] = "main config file" scfgs[i] = scfg } + + // Re-read and validate the dynamic scrape config rules. for _, pat := range c.ScrapeConfigFiles { fs, err := filepath.Glob(pat) if err != nil { @@ -355,6 +359,7 @@ func (c *Config) GetScrapeConfigs() ([]*ScrapeConfig, error) { } // UnmarshalYAML implements the yaml.Unmarshaler interface. +// NOTE: This method should not be used outside of this package. Use Load or LoadFile instead. func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = DefaultConfig // We want to set c to the defaults and then overwrite it with the input. @@ -391,18 +396,18 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { } } - // Do global overrides and validate unique names. + // Do global overrides and validation. jobNames := map[string]struct{}{} for _, scfg := range c.ScrapeConfigs { if err := scfg.Validate(c.GlobalConfig); err != nil { return err } - if _, ok := jobNames[scfg.JobName]; ok { return fmt.Errorf("found multiple scrape configs with job name %q", scfg.JobName) } jobNames[scfg.JobName] = struct{}{} } + rwNames := map[string]struct{}{} for _, rwcfg := range c.RemoteWriteConfigs { if rwcfg == nil { diff --git a/config/config_default_test.go b/config/config_default_test.go index 31133f1e04..2faaf98cf9 100644 --- a/config/config_default_test.go +++ b/config/config_default_test.go @@ -18,6 +18,8 @@ package config const ruleFilesConfigFile = "testdata/rules_abs_path.good.yml" var ruleFilesExpectedConf = &Config{ + loaded: true, + GlobalConfig: DefaultGlobalConfig, Runtime: DefaultRuntimeConfig, RuleFiles: []string{ diff --git a/config/config_test.go b/config/config_test.go index 4b5b11a9fd..0b1feea8b1 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -87,6 +87,7 @@ const ( ) var expectedConf = &Config{ + loaded: true, GlobalConfig: GlobalConfig{ ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, @@ -1512,10 +1513,10 @@ func TestYAMLRoundtrip(t *testing.T) { require.NoError(t, err) out, err := yaml.Marshal(want) - require.NoError(t, err) - got := &Config{} - require.NoError(t, yaml.UnmarshalStrict(out, got)) + + got, err := Load(string(out), promslog.NewNopLogger()) + require.NoError(t, err) require.Equal(t, want, got) } @@ -1525,10 +1526,10 @@ func TestRemoteWriteRetryOnRateLimit(t *testing.T) { require.NoError(t, err) out, err := yaml.Marshal(want) - require.NoError(t, err) - got := &Config{} - require.NoError(t, yaml.UnmarshalStrict(out, got)) + + got, err := Load(string(out), promslog.NewNopLogger()) + require.NoError(t, err) require.True(t, got.RemoteWriteConfigs[0].QueueConfig.RetryOnRateLimit) require.False(t, got.RemoteWriteConfigs[1].QueueConfig.RetryOnRateLimit) @@ -2219,6 +2220,7 @@ func TestEmptyConfig(t *testing.T) { c, err := Load("", promslog.NewNopLogger()) require.NoError(t, err) exp := DefaultConfig + exp.loaded = true require.Equal(t, exp, *c) } @@ -2268,6 +2270,7 @@ func TestEmptyGlobalBlock(t *testing.T) { require.NoError(t, err) exp := DefaultConfig exp.Runtime = DefaultRuntimeConfig + exp.loaded = true require.Equal(t, exp, *c) } @@ -2548,3 +2551,18 @@ func TestScrapeProtocolHeader(t *testing.T) { }) } } + +// Regression test against https://github.com/prometheus/prometheus/issues/15538 +func TestGetScrapeConfigs_Loaded(t *testing.T) { + t.Run("without load", func(t *testing.T) { + c := &Config{} + _, err := c.GetScrapeConfigs() + require.EqualError(t, err, "main config scrape configs was not validated and loaded; GetScrapeConfigs method can only be used on configuration from the config.Load or config.LoadFile") + }) + t.Run("with load", func(t *testing.T) { + c, err := Load("", promslog.NewNopLogger()) + require.NoError(t, err) + _, err = c.GetScrapeConfigs() + require.NoError(t, err) + }) +} diff --git a/config/config_windows_test.go b/config/config_windows_test.go index db4d46ef13..9d338b99e7 100644 --- a/config/config_windows_test.go +++ b/config/config_windows_test.go @@ -16,6 +16,8 @@ package config const ruleFilesConfigFile = "testdata/rules_abs_path_windows.good.yml" var ruleFilesExpectedConf = &Config{ + loaded: true, + GlobalConfig: DefaultGlobalConfig, Runtime: DefaultRuntimeConfig, RuleFiles: []string{ diff --git a/scrape/manager_test.go b/scrape/manager_test.go index f446c99789..6887ca1c43 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -38,6 +38,8 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "gopkg.in/yaml.v2" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/config" @@ -468,10 +470,8 @@ func TestPopulateLabels(t *testing.T) { func loadConfiguration(t testing.TB, c string) *config.Config { t.Helper() - cfg := &config.Config{} - err := yaml.UnmarshalStrict([]byte(c), cfg) - require.NoError(t, err, "Unable to load YAML config.") - + cfg, err := config.Load(c, promslog.NewNopLogger()) + require.NoError(t, err) return cfg } @@ -724,33 +724,6 @@ scrape_configs: require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) } -func setupScrapeManager(t *testing.T, honorTimestamps, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) { - app := &collectResultAppender{} - scrapeManager, err := NewManager( - &Options{ - EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion, - skipOffsetting: true, - }, - promslog.New(&promslog.Config{}), - nil, - &collectResultAppendable{app}, - prometheus.NewRegistry(), - ) - require.NoError(t, err) - - require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ - GlobalConfig: config.GlobalConfig{ - // Disable regular scrapes. - ScrapeInterval: model.Duration(9999 * time.Minute), - ScrapeTimeout: model.Duration(5 * time.Second), - ScrapeProtocols: []config.ScrapeProtocol{config.OpenMetricsText1_0_0, config.PrometheusProto}, - }, - ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test", HonorTimestamps: honorTimestamps}}, - })) - - return app, scrapeManager -} - func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server { once := sync.Once{} @@ -789,6 +762,9 @@ func TestManagerCTZeroIngestion(t *testing.T) { t.Run(fmt.Sprintf("withCT=%v", testWithCT), func(t *testing.T) { for _, testCTZeroIngest := range []bool{false, true} { t.Run(fmt.Sprintf("ctZeroIngest=%v", testCTZeroIngest), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sampleTs := time.Now() ctTs := time.Time{} if testWithCT { @@ -797,10 +773,45 @@ func TestManagerCTZeroIngestion(t *testing.T) { // TODO(bwplotka): Add more types than just counter? encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, ctTs) - app, scrapeManager := setupScrapeManager(t, true, testCTZeroIngest) - // Perform the test. - doOneScrape(t, scrapeManager, app, setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded)) + app := &collectResultAppender{} + discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ + EnableCreatedTimestampZeroIngestion: testCTZeroIngest, + skipOffsetting: true, + }, &collectResultAppendable{app}) + defer scrapeManager.Stop() + + server := setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded) + serverURL, err := url.Parse(server.URL) + require.NoError(t, err) + + testConfig := fmt.Sprintf(` +global: + # Disable regular scrapes. + scrape_interval: 9999m + scrape_timeout: 5s + +scrape_configs: +- job_name: test + honor_timestamps: true + static_configs: + - targets: ['%s'] +`, serverURL.Host) + applyConfig(t, testConfig, scrapeManager, discoveryManager) + + // Wait for one scrape. + ctx, cancel = context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { + app.mtx.Lock() + defer app.mtx.Unlock() + + // Check if scrape happened and grab the relevant samples. + if len(app.resultFloats) > 0 { + return nil + } + return errors.New("expected some float samples, got none") + }), "after 1 minute") // Verify results. // Verify what we got vs expectations around CT injection. @@ -871,39 +882,6 @@ func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName } } -func doOneScrape(t *testing.T, manager *Manager, appender *collectResultAppender, server *httptest.Server) { - t.Helper() - - serverURL, err := url.Parse(server.URL) - require.NoError(t, err) - - // Add fake target directly into tsets + reload - manager.updateTsets(map[string][]*targetgroup.Group{ - "test": {{ - Targets: []model.LabelSet{{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - }}, - }}, - }) - manager.reload() - - // Wait for one scrape. - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { - appender.mtx.Lock() - defer appender.mtx.Unlock() - - // Check if scrape happened and grab the relevant samples. - if len(appender.resultFloats) > 0 { - return nil - } - return errors.New("expected some float samples, got none") - }), "after 1 minute") - manager.Stop() -} - func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) { for _, f := range floats { if f.metric.Get(model.MetricNameLabel) == metricName { @@ -978,37 +956,22 @@ func TestManagerCTZeroIngestionHistogram(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - app := &collectResultAppender{} - scrapeManager, err := NewManager( - &Options{ - EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion, - EnableNativeHistogramsIngestion: true, - skipOffsetting: true, - }, - promslog.New(&promslog.Config{}), - nil, - &collectResultAppendable{app}, - prometheus.NewRegistry(), - ) - require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ - GlobalConfig: config.GlobalConfig{ - // Disable regular scrapes. - ScrapeInterval: model.Duration(9999 * time.Minute), - ScrapeTimeout: model.Duration(5 * time.Second), - // Ensure the proto is chosen. We need proto as it's the only protocol - // with the CT parsing support. - ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, - }, - ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, - })) + app := &collectResultAppender{} + discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ + EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion, + EnableNativeHistogramsIngestion: true, + skipOffsetting: true, + }, &collectResultAppendable{app}) + defer scrapeManager.Stop() once := sync.Once{} // Start fake HTTP target to that allow one scrape only. server := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fail := true // TODO(bwplotka): Kill or use? + fail := true once.Do(func() { fail = false w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) @@ -1031,22 +994,23 @@ func TestManagerCTZeroIngestionHistogram(t *testing.T) { serverURL, err := url.Parse(server.URL) require.NoError(t, err) - // Add fake target directly into tsets + reload. Normally users would use - // Manager.Run and wait for minimum 5s refresh interval. - scrapeManager.updateTsets(map[string][]*targetgroup.Group{ - "test": {{ - Targets: []model.LabelSet{{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - }}, - }}, - }) - scrapeManager.reload() + testConfig := fmt.Sprintf(` +global: + # Disable regular scrapes. + scrape_interval: 9999m + scrape_timeout: 5s + +scrape_configs: +- job_name: test + static_configs: + - targets: ['%s'] +`, serverURL.Host) + applyConfig(t, testConfig, scrapeManager, discoveryManager) var got []histogramSample // Wait for one scrape. - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancel = context.WithTimeout(ctx, 1*time.Minute) defer cancel() require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { app.mtx.Lock() @@ -1064,7 +1028,6 @@ func TestManagerCTZeroIngestionHistogram(t *testing.T) { } return errors.New("expected some histogram samples, got none") }), "after 1 minute") - scrapeManager.Stop() // Check for zero samples, assuming we only injected always one histogram sample. // Did it contain CT to inject? If yes, was CT zero enabled? @@ -1118,9 +1081,17 @@ func applyConfig( require.NoError(t, discoveryManager.ApplyConfig(c)) } -func runManagers(t *testing.T, ctx context.Context) (*discovery.Manager, *Manager) { +func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.Appendable) (*discovery.Manager, *Manager) { t.Helper() + if opts == nil { + opts = &Options{} + } + opts.DiscoveryReloadInterval = model.Duration(100 * time.Millisecond) + if app == nil { + app = nopAppendable{} + } + reg := prometheus.NewRegistry() sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) require.NoError(t, err) @@ -1132,10 +1103,10 @@ func runManagers(t *testing.T, ctx context.Context) (*discovery.Manager, *Manage discovery.Updatert(100*time.Millisecond), ) scrapeManager, err := NewManager( - &Options{DiscoveryReloadInterval: model.Duration(100 * time.Millisecond)}, + opts, nil, nil, - nopAppendable{}, + app, prometheus.NewRegistry(), ) require.NoError(t, err) @@ -1213,7 +1184,7 @@ scrape_configs: - files: ['%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) defer scrapeManager.Stop() applyConfig( @@ -1312,7 +1283,7 @@ scrape_configs: file_sd_configs: - files: ['%s', '%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) defer scrapeManager.Stop() applyConfig( @@ -1372,7 +1343,7 @@ scrape_configs: file_sd_configs: - files: ['%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) defer scrapeManager.Stop() applyConfig( @@ -1439,7 +1410,7 @@ scrape_configs: - targets: ['%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) defer scrapeManager.Stop() // Apply the initial config with an existing file