diff --git a/CHANGELOG.md b/CHANGELOG.md index cc3a68d8b0..f1321829e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## unreleased +* [CHANGE] Scraping: Remove implicit fallback to the Prometheus text format in case of invalid/missing Content-Type and fail the scrape instead. Add ability to specify a `fallback_scrape_protocol` in the scrape config. #15136 * [BUGFIX] PromQL: Fix stddev+stdvar aggregations to always ignore native histograms. #14941 * [BUGFIX] PromQL: Fix stddev+stdvar aggregations to treat Infinity consistently. #14941 @@ -51,12 +52,12 @@ As is traditional with a beta release, we do **not** recommend users install 3.0 * [CHANGE] Remove deprecated `remote-write-receiver`,`promql-at-modifier`, and `promql-negative-offset` feature flags. #13456, #14526 * [CHANGE] Remove deprecated `storage.tsdb.allow-overlapping-blocks`, `alertmanager.timeout`, and `storage.tsdb.retention` flags. #14640, #14643 * [ENHANCEMENT] Move AM discovery page from "Monitoring status" to "Server status". #14875 +* [FEATURE] Support config reload automatically - feature flag `auto-reload-config`. #14769 * [BUGFIX] Scrape: Do not override target parameter labels with config params. #11029 ## 2.55.0-rc.0 / 2024-09-20 * [FEATURE] Support UTF-8 characters in label names - feature flag `utf8-names`. #14482, #14880, #14736, #14727 -* [FEATURE] Support config reload automatically - feature flag `auto-reload-config`. #14769 * [FEATURE] Scraping: Add the ability to set custom `http_headers` in config. #14817 * [FEATURE] Scraping: Support feature flag `created-timestamp-zero-ingestion` in OpenMetrics. #14356, #14815 * [FEATURE] Scraping: `scrape_failure_log_file` option to log failures to a file. #14734 @@ -69,7 +70,7 @@ As is traditional with a beta release, we do **not** recommend users install 3.0 * [ENHANCEMENT] Remote Read client: Enable streaming remote read if the server supports it. #11379 * [ENHANCEMENT] Remote-Write: Don't reshard if we haven't successfully sent a sample since last update. #14450 * [ENHANCEMENT] PromQL: Delay deletion of `__name__` label to the end of the query evaluation. This is **experimental** and enabled under the feature-flag `promql-delayed-name-removal`. #14477 -* [ENHANCEMENT] PromQL: Experimental `sort_by_label` and `sort_by_label_desc` sort by all labels when label is equal. #14655 +* [ENHANCEMENT] PromQL: Experimental `sort_by_label` and `sort_by_label_desc` sort by all labels when label is equal. #14655, #14985 * [ENHANCEMENT] PromQL: Clarify error message logged when Go runtime panic occurs during query evaluation. #14621 * [ENHANCEMENT] PromQL: Use Kahan summation for better accuracy in `avg` and `avg_over_time`. #14413 * [ENHANCEMENT] Tracing: Improve PromQL tracing, including showing the operation performed for aggregates, operators, and calls. #14816 diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index d0c2846bec..4bd1c71b2d 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -125,6 +125,7 @@ func TestFailedStartupExitCode(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") } + t.Parallel() fakeInputFile := "fake-input-file" expectedExitStatus := 2 @@ -211,83 +212,125 @@ func TestWALSegmentSizeBounds(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") } + t.Parallel() - for size, expectedExitStatus := range map[string]int{"9MB": 1, "257MB": 1, "10": 2, "1GB": 1, "12MB": 0} { - prom := exec.Command(promPath, "-test.main", "--storage.tsdb.wal-segment-size="+size, "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig, "--storage.tsdb.path="+filepath.Join(t.TempDir(), "data")) + for _, tc := range []struct { + size string + exitCode int + }{ + { + size: "9MB", + exitCode: 1, + }, + { + size: "257MB", + exitCode: 1, + }, + { + size: "10", + exitCode: 2, + }, + { + size: "1GB", + exitCode: 1, + }, + { + size: "12MB", + exitCode: 0, + }, + } { + t.Run(tc.size, func(t *testing.T) { + t.Parallel() + prom := exec.Command(promPath, "-test.main", "--storage.tsdb.wal-segment-size="+tc.size, "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig, "--storage.tsdb.path="+filepath.Join(t.TempDir(), "data")) - // Log stderr in case of failure. - stderr, err := prom.StderrPipe() - require.NoError(t, err) - go func() { - slurp, _ := io.ReadAll(stderr) - t.Log(string(slurp)) - }() + // Log stderr in case of failure. + stderr, err := prom.StderrPipe() + require.NoError(t, err) + go func() { + slurp, _ := io.ReadAll(stderr) + t.Log(string(slurp)) + }() - err = prom.Start() - require.NoError(t, err) + err = prom.Start() + require.NoError(t, err) - if expectedExitStatus == 0 { - done := make(chan error, 1) - go func() { done <- prom.Wait() }() - select { - case err := <-done: - require.Fail(t, "prometheus should be still running: %v", err) - case <-time.After(startupTime): - prom.Process.Kill() - <-done + if tc.exitCode == 0 { + done := make(chan error, 1) + go func() { done <- prom.Wait() }() + select { + case err := <-done: + require.Fail(t, "prometheus should be still running: %v", err) + case <-time.After(startupTime): + prom.Process.Kill() + <-done + } + return } - continue - } - err = prom.Wait() - require.Error(t, err) - var exitError *exec.ExitError - require.ErrorAs(t, err, &exitError) - status := exitError.Sys().(syscall.WaitStatus) - require.Equal(t, expectedExitStatus, status.ExitStatus()) + err = prom.Wait() + require.Error(t, err) + var exitError *exec.ExitError + require.ErrorAs(t, err, &exitError) + status := exitError.Sys().(syscall.WaitStatus) + require.Equal(t, tc.exitCode, status.ExitStatus()) + }) } } func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) { - t.Parallel() - if testing.Short() { t.Skip("skipping test in short mode.") } + t.Parallel() - for size, expectedExitStatus := range map[string]int{"512KB": 1, "1MB": 0} { - prom := exec.Command(promPath, "-test.main", "--storage.tsdb.max-block-chunk-segment-size="+size, "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig, "--storage.tsdb.path="+filepath.Join(t.TempDir(), "data")) + for _, tc := range []struct { + size string + exitCode int + }{ + { + size: "512KB", + exitCode: 1, + }, + { + size: "1MB", + exitCode: 0, + }, + } { + t.Run(tc.size, func(t *testing.T) { + t.Parallel() + prom := exec.Command(promPath, "-test.main", "--storage.tsdb.max-block-chunk-segment-size="+tc.size, "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig, "--storage.tsdb.path="+filepath.Join(t.TempDir(), "data")) - // Log stderr in case of failure. - stderr, err := prom.StderrPipe() - require.NoError(t, err) - go func() { - slurp, _ := io.ReadAll(stderr) - t.Log(string(slurp)) - }() + // Log stderr in case of failure. + stderr, err := prom.StderrPipe() + require.NoError(t, err) + go func() { + slurp, _ := io.ReadAll(stderr) + t.Log(string(slurp)) + }() - err = prom.Start() - require.NoError(t, err) + err = prom.Start() + require.NoError(t, err) - if expectedExitStatus == 0 { - done := make(chan error, 1) - go func() { done <- prom.Wait() }() - select { - case err := <-done: - require.Fail(t, "prometheus should be still running: %v", err) - case <-time.After(startupTime): - prom.Process.Kill() - <-done + if tc.exitCode == 0 { + done := make(chan error, 1) + go func() { done <- prom.Wait() }() + select { + case err := <-done: + require.Fail(t, "prometheus should be still running: %v", err) + case <-time.After(startupTime): + prom.Process.Kill() + <-done + } + return } - continue - } - err = prom.Wait() - require.Error(t, err) - var exitError *exec.ExitError - require.ErrorAs(t, err, &exitError) - status := exitError.Sys().(syscall.WaitStatus) - require.Equal(t, expectedExitStatus, status.ExitStatus()) + err = prom.Wait() + require.Error(t, err) + var exitError *exec.ExitError + require.ErrorAs(t, err, &exitError) + status := exitError.Sys().(syscall.WaitStatus) + require.Equal(t, tc.exitCode, status.ExitStatus()) + }) } } @@ -353,6 +396,8 @@ func getCurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames } func TestAgentSuccessfulStartup(t *testing.T) { + t.Parallel() + prom := exec.Command(promPath, "-test.main", "--agent", "--web.listen-address=0.0.0.0:0", "--config.file="+agentConfig) require.NoError(t, prom.Start()) @@ -371,6 +416,8 @@ func TestAgentSuccessfulStartup(t *testing.T) { } func TestAgentFailedStartupWithServerFlag(t *testing.T) { + t.Parallel() + prom := exec.Command(promPath, "-test.main", "--agent", "--storage.tsdb.path=.", "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig) output := bytes.Buffer{} @@ -398,6 +445,8 @@ func TestAgentFailedStartupWithServerFlag(t *testing.T) { } func TestAgentFailedStartupWithInvalidConfig(t *testing.T) { + t.Parallel() + prom := exec.Command(promPath, "-test.main", "--agent", "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig) require.NoError(t, prom.Start()) @@ -419,6 +468,7 @@ func TestModeSpecificFlags(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") } + t.Parallel() testcases := []struct { mode string @@ -433,6 +483,7 @@ func TestModeSpecificFlags(t *testing.T) { for _, tc := range testcases { t.Run(fmt.Sprintf("%s mode with option %s", tc.mode, tc.arg), func(t *testing.T) { + t.Parallel() args := []string{"-test.main", tc.arg, t.TempDir(), "--web.listen-address=0.0.0.0:0"} if tc.mode == "agent" { @@ -484,6 +535,8 @@ func TestDocumentation(t *testing.T) { if runtime.GOOS == "windows" { t.SkipNow() } + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -508,6 +561,8 @@ func TestDocumentation(t *testing.T) { } func TestRwProtoMsgFlagParser(t *testing.T) { + t.Parallel() + defaultOpts := config.RemoteWriteProtoMsgs{ config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2, } diff --git a/cmd/prometheus/main_unix_test.go b/cmd/prometheus/main_unix_test.go index 2011fb123f..94eec27e79 100644 --- a/cmd/prometheus/main_unix_test.go +++ b/cmd/prometheus/main_unix_test.go @@ -34,6 +34,7 @@ func TestStartupInterrupt(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") } + t.Parallel() port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t)) diff --git a/cmd/prometheus/query_log_test.go b/cmd/prometheus/query_log_test.go index f05ad9df2a..25abf5e965 100644 --- a/cmd/prometheus/query_log_test.go +++ b/cmd/prometheus/query_log_test.go @@ -456,6 +456,7 @@ func TestQueryLog(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") } + t.Parallel() cwd, err := os.Getwd() require.NoError(t, err) @@ -474,6 +475,7 @@ func TestQueryLog(t *testing.T) { } t.Run(p.String(), func(t *testing.T) { + t.Parallel() p.run(t) }) } diff --git a/cmd/promtool/testdata/config_with_service_discovery_files.yml b/cmd/promtool/testdata/config_with_service_discovery_files.yml index 13b6d7faff..6a550a8403 100644 --- a/cmd/promtool/testdata/config_with_service_discovery_files.yml +++ b/cmd/promtool/testdata/config_with_service_discovery_files.yml @@ -6,7 +6,7 @@ scrape_configs: alerting: alertmanagers: - scheme: http - api_version: v1 + api_version: v2 file_sd_configs: - files: - nonexistent_file.yml diff --git a/config/config.go b/config/config.go index 2702579913..962a0f4a73 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "log/slog" + "mime" "net/url" "os" "path/filepath" @@ -163,13 +164,13 @@ var ( // DefaultScrapeConfig is the default scrape configuration. DefaultScrapeConfig = ScrapeConfig{ // ScrapeTimeout, ScrapeInterval and ScrapeProtocols default to the configured globals. - ScrapeClassicHistograms: false, - MetricsPath: "/metrics", - Scheme: "http", - HonorLabels: false, - HonorTimestamps: true, - HTTPClientConfig: config.DefaultHTTPClientConfig, - EnableCompression: true, + AlwaysScrapeClassicHistograms: false, + MetricsPath: "/metrics", + Scheme: "http", + HonorLabels: false, + HonorTimestamps: true, + HTTPClientConfig: config.DefaultHTTPClientConfig, + EnableCompression: true, } // DefaultAlertmanagerConfig is the default alertmanager configuration. @@ -473,9 +474,22 @@ func (s ScrapeProtocol) Validate() error { return nil } +// HeaderMediaType returns the MIME mediaType for a particular ScrapeProtocol. +func (s ScrapeProtocol) HeaderMediaType() string { + if _, ok := ScrapeProtocolsHeaders[s]; !ok { + return "" + } + mediaType, _, err := mime.ParseMediaType(ScrapeProtocolsHeaders[s]) + if err != nil { + return "" + } + return mediaType +} + var ( PrometheusProto ScrapeProtocol = "PrometheusProto" PrometheusText0_0_4 ScrapeProtocol = "PrometheusText0.0.4" + PrometheusText1_0_0 ScrapeProtocol = "PrometheusText1.0.0" OpenMetricsText0_0_1 ScrapeProtocol = "OpenMetricsText0.0.1" OpenMetricsText1_0_0 ScrapeProtocol = "OpenMetricsText1.0.0" UTF8NamesHeader string = model.EscapingKey + "=" + model.AllowUTF8 @@ -483,6 +497,7 @@ var ( ScrapeProtocolsHeaders = map[ScrapeProtocol]string{ PrometheusProto: "application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited", PrometheusText0_0_4: "text/plain;version=0.0.4", + PrometheusText1_0_0: "text/plain;version=1.0.0;escaping=allow-utf-8", OpenMetricsText0_0_1: "application/openmetrics-text;version=0.0.1", OpenMetricsText1_0_0: "application/openmetrics-text;version=1.0.0", } @@ -492,6 +507,7 @@ var ( DefaultScrapeProtocols = []ScrapeProtocol{ OpenMetricsText1_0_0, OpenMetricsText0_0_1, + PrometheusText1_0_0, PrometheusText0_0_4, } @@ -503,6 +519,7 @@ var ( PrometheusProto, OpenMetricsText1_0_0, OpenMetricsText0_0_1, + PrometheusText1_0_0, PrometheusText0_0_4, } ) @@ -629,10 +646,15 @@ type ScrapeConfig struct { // The protocols to negotiate during a scrape. It tells clients what // protocol are accepted by Prometheus and with what preference (most wanted is first). // Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1, - // OpenMetricsText1.0.0, PrometheusText0.0.4. + // OpenMetricsText1.0.0, PrometheusText1.0.0, PrometheusText0.0.4. ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"` - // Whether to scrape a classic histogram that is also exposed as a native histogram. - ScrapeClassicHistograms bool `yaml:"scrape_classic_histograms,omitempty"` + // The fallback protocol to use if the Content-Type provided by the target + // is not provided, blank, or not one of the expected values. + // Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1, + // OpenMetricsText1.0.0, PrometheusText1.0.0, PrometheusText0.0.4. + ScrapeFallbackProtocol ScrapeProtocol `yaml:"fallback_scrape_protocol,omitempty"` + // Whether to scrape a classic histogram, even if it is also exposed as a native histogram. + AlwaysScrapeClassicHistograms bool `yaml:"always_scrape_classic_histograms,omitempty"` // Whether to convert all scraped classic histograms into a native histogram with custom buckets. ConvertClassicHistograms bool `yaml:"convert_classic_histograms,omitempty"` // File to which scrape failures are logged. @@ -782,6 +804,12 @@ func (c *ScrapeConfig) Validate(globalConfig GlobalConfig) error { return fmt.Errorf("%w for scrape config with job name %q", err, c.JobName) } + if c.ScrapeFallbackProtocol != "" { + if err := c.ScrapeFallbackProtocol.Validate(); err != nil { + return fmt.Errorf("invalid fallback_scrape_protocol for scrape config with job name %q: %w", c.JobName, err) + } + } + switch globalConfig.MetricNameValidationScheme { case LegacyValidationConfig: case "", UTF8ValidationConfig: @@ -957,6 +985,7 @@ func (a AlertmanagerConfigs) ToMap() map[string]*AlertmanagerConfig { // AlertmanagerAPIVersion represents a version of the // github.com/prometheus/alertmanager/api, e.g. 'v1' or 'v2'. +// 'v1' is no longer supported. type AlertmanagerAPIVersion string // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -986,7 +1015,7 @@ const ( ) var SupportedAlertmanagerAPIVersions = []AlertmanagerAPIVersion{ - AlertmanagerAPIVersionV1, AlertmanagerAPIVersionV2, + AlertmanagerAPIVersionV2, } // AlertmanagerConfig configures how Alertmanagers can be discovered and communicated with. diff --git a/config/config_test.go b/config/config_test.go index 547070dacc..8bf664c1f0 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -206,19 +206,20 @@ var expectedConf = &Config{ { JobName: "prometheus", - HonorLabels: true, - HonorTimestamps: true, - ScrapeInterval: model.Duration(15 * time.Second), - ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, - EnableCompression: true, - BodySizeLimit: globBodySizeLimit, - SampleLimit: globSampleLimit, - TargetLimit: globTargetLimit, - LabelLimit: globLabelLimit, - LabelNameLengthLimit: globLabelNameLengthLimit, - LabelValueLengthLimit: globLabelValueLengthLimit, - ScrapeProtocols: DefaultGlobalConfig.ScrapeProtocols, - ScrapeFailureLogFile: "testdata/fail_prom.log", + HonorLabels: true, + HonorTimestamps: true, + ScrapeInterval: model.Duration(15 * time.Second), + ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, + EnableCompression: true, + BodySizeLimit: globBodySizeLimit, + SampleLimit: globSampleLimit, + TargetLimit: globTargetLimit, + LabelLimit: globLabelLimit, + LabelNameLengthLimit: globLabelNameLengthLimit, + LabelValueLengthLimit: globLabelValueLengthLimit, + ScrapeProtocols: DefaultGlobalConfig.ScrapeProtocols, + ScrapeFallbackProtocol: PrometheusText0_0_4, + ScrapeFailureLogFile: "testdata/fail_prom.log", MetricsPath: DefaultScrapeConfig.MetricsPath, Scheme: DefaultScrapeConfig.Scheme, @@ -1500,6 +1501,11 @@ var expectedConf = &Config{ }, } +func TestYAMLNotLongerSupportedAMApi(t *testing.T) { + _, err := LoadFile("testdata/config_with_no_longer_supported_am_api_config.yml", false, promslog.NewNopLogger()) + require.Error(t, err) +} + func TestYAMLRoundtrip(t *testing.T) { want, err := LoadFile("testdata/roundtrip.good.yml", false, promslog.NewNopLogger()) require.NoError(t, err) @@ -2081,12 +2087,20 @@ var expectedErrors = []struct { }, { filename: "scrape_config_files_scrape_protocols.bad.yml", - errMsg: `parsing YAML file testdata/scrape_config_files_scrape_protocols.bad.yml: scrape_protocols: unknown scrape protocol prometheusproto, supported: [OpenMetricsText0.0.1 OpenMetricsText1.0.0 PrometheusProto PrometheusText0.0.4] for scrape config with job name "node"`, + errMsg: `parsing YAML file testdata/scrape_config_files_scrape_protocols.bad.yml: scrape_protocols: unknown scrape protocol prometheusproto, supported: [OpenMetricsText0.0.1 OpenMetricsText1.0.0 PrometheusProto PrometheusText0.0.4 PrometheusText1.0.0] for scrape config with job name "node"`, }, { filename: "scrape_config_files_scrape_protocols2.bad.yml", errMsg: `parsing YAML file testdata/scrape_config_files_scrape_protocols2.bad.yml: duplicated protocol in scrape_protocols, got [OpenMetricsText1.0.0 PrometheusProto OpenMetricsText1.0.0] for scrape config with job name "node"`, }, + { + filename: "scrape_config_files_fallback_scrape_protocol1.bad.yml", + errMsg: `parsing YAML file testdata/scrape_config_files_fallback_scrape_protocol1.bad.yml: invalid fallback_scrape_protocol for scrape config with job name "node": unknown scrape protocol prometheusproto, supported: [OpenMetricsText0.0.1 OpenMetricsText1.0.0 PrometheusProto PrometheusText0.0.4 PrometheusText1.0.0]`, + }, + { + filename: "scrape_config_files_fallback_scrape_protocol2.bad.yml", + errMsg: `unmarshal errors`, + }, } func TestBadConfigs(t *testing.T) { @@ -2407,3 +2421,54 @@ func TestScrapeConfigNameValidationSettings(t *testing.T) { }) } } + +func TestScrapeProtocolHeader(t *testing.T) { + tests := []struct { + name string + proto ScrapeProtocol + expectedValue string + }{ + { + name: "blank", + proto: ScrapeProtocol(""), + expectedValue: "", + }, + { + name: "invalid", + proto: ScrapeProtocol("invalid"), + expectedValue: "", + }, + { + name: "prometheus protobuf", + proto: PrometheusProto, + expectedValue: "application/vnd.google.protobuf", + }, + { + name: "prometheus text 0.0.4", + proto: PrometheusText0_0_4, + expectedValue: "text/plain", + }, + { + name: "prometheus text 1.0.0", + proto: PrometheusText1_0_0, + expectedValue: "text/plain", + }, + { + name: "openmetrics 0.0.1", + proto: OpenMetricsText0_0_1, + expectedValue: "application/openmetrics-text", + }, + { + name: "openmetrics 1.0.0", + proto: OpenMetricsText1_0_0, + expectedValue: "application/openmetrics-text", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mediaType := tc.proto.HeaderMediaType() + + require.Equal(t, tc.expectedValue, mediaType) + }) + } +} diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 9eb7995432..2501652d5b 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -74,6 +74,8 @@ scrape_configs: # metrics_path defaults to '/metrics' # scheme defaults to 'http'. + fallback_scrape_protocol: PrometheusText0.0.4 + scrape_failure_log_file: fail_prom.log file_sd_configs: - files: diff --git a/config/testdata/config_with_deprecated_am_api_config.yml b/config/testdata/config_with_deprecated_am_api_config.yml new file mode 100644 index 0000000000..ac89537ff1 --- /dev/null +++ b/config/testdata/config_with_deprecated_am_api_config.yml @@ -0,0 +1,7 @@ +alerting: + alertmanagers: + - scheme: http + api_version: v1 + file_sd_configs: + - files: + - nonexistent_file.yml diff --git a/config/testdata/scrape_config_files_fallback_scrape_protocol1.bad.yml b/config/testdata/scrape_config_files_fallback_scrape_protocol1.bad.yml new file mode 100644 index 0000000000..07cfe47594 --- /dev/null +++ b/config/testdata/scrape_config_files_fallback_scrape_protocol1.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: node + fallback_scrape_protocol: "prometheusproto" + static_configs: + - targets: ['localhost:8080'] diff --git a/config/testdata/scrape_config_files_fallback_scrape_protocol2.bad.yml b/config/testdata/scrape_config_files_fallback_scrape_protocol2.bad.yml new file mode 100644 index 0000000000..c5d133f9c4 --- /dev/null +++ b/config/testdata/scrape_config_files_fallback_scrape_protocol2.bad.yml @@ -0,0 +1,5 @@ +scrape_configs: + - job_name: node + fallback_scrape_protocol: ["OpenMetricsText1.0.0", "PrometheusText0.0.4"] + static_configs: + - targets: ['localhost:8080'] diff --git a/discovery/kubernetes/endpoints.go b/discovery/kubernetes/endpoints.go index 75da67f1c6..5ba9df6276 100644 --- a/discovery/kubernetes/endpoints.go +++ b/discovery/kubernetes/endpoints.go @@ -167,8 +167,11 @@ func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node e.enqueueNode(node.Name) }, DeleteFunc: func(o interface{}) { - node := o.(*apiv1.Node) - e.enqueueNode(node.Name) + nodeName, err := nodeName(o) + if err != nil { + l.Error("Error getting Node name", "err", err) + } + e.enqueueNode(nodeName) }, }) if err != nil { diff --git a/discovery/kubernetes/endpointslice.go b/discovery/kubernetes/endpointslice.go index efd1c72167..8f58ba3535 100644 --- a/discovery/kubernetes/endpointslice.go +++ b/discovery/kubernetes/endpointslice.go @@ -145,8 +145,11 @@ func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, n e.enqueueNode(node.Name) }, DeleteFunc: func(o interface{}) { - node := o.(*apiv1.Node) - e.enqueueNode(node.Name) + nodeName, err := nodeName(o) + if err != nil { + l.Error("Error getting Node name", "err", err) + } + e.enqueueNode(nodeName) }, }) if err != nil { diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index be1c77c205..64e8886cfd 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -804,3 +804,13 @@ func addObjectMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta, func namespacedName(namespace, name string) string { return namespace + "/" + name } + +// nodeName knows how to handle the cache.DeletedFinalStateUnknown tombstone. +// It assumes the MetaNamespaceKeyFunc keyFunc is used, which uses the node name as the tombstone key. +func nodeName(o interface{}) (string, error) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(o) + if err != nil { + return "", err + } + return key, nil +} diff --git a/discovery/kubernetes/kubernetes_test.go b/discovery/kubernetes/kubernetes_test.go index fbbd77c3c3..a14f2b3d1b 100644 --- a/discovery/kubernetes/kubernetes_test.go +++ b/discovery/kubernetes/kubernetes_test.go @@ -23,7 +23,9 @@ import ( prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" + apiv1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/version" "k8s.io/apimachinery/pkg/watch" @@ -320,3 +322,18 @@ func TestFailuresCountMetric(t *testing.T) { }) } } + +func TestNodeName(t *testing.T) { + node := &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + name, err := nodeName(node) + require.NoError(t, err) + require.Equal(t, "foo", name) + + name, err = nodeName(cache.DeletedFinalStateUnknown{Key: "bar"}) + require.NoError(t, err) + require.Equal(t, "bar", name) +} diff --git a/discovery/kubernetes/node.go b/discovery/kubernetes/node.go index eecb52ab50..0e0c5745f2 100644 --- a/discovery/kubernetes/node.go +++ b/discovery/kubernetes/node.go @@ -82,7 +82,7 @@ func NewNode(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus.Co } func (n *Node) enqueue(obj interface{}) { - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + key, err := nodeName(obj) if err != nil { return } diff --git a/discovery/kubernetes/pod.go b/discovery/kubernetes/pod.go index 73568e51c8..8704a66239 100644 --- a/discovery/kubernetes/pod.go +++ b/discovery/kubernetes/pod.go @@ -95,8 +95,11 @@ func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedIn p.enqueuePodsForNode(node.Name) }, DeleteFunc: func(o interface{}) { - node := o.(*apiv1.Node) - p.enqueuePodsForNode(node.Name) + nodeName, err := nodeName(o) + if err != nil { + l.Error("Error getting Node name", "err", err) + } + p.enqueuePodsForNode(nodeName) }, }) if err != nil { diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 6e0cf431cb..31ceac734f 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -212,12 +212,18 @@ job_name: # The protocols to negotiate during a scrape with the client. # Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1, -# OpenMetricsText1.0.0, PrometheusText0.0.4. +# OpenMetricsText1.0.0, PrometheusText0.0.4, PrometheusText1.0.0. [ scrape_protocols: [, ...] | default = ] -# Whether to scrape a classic histogram that is also exposed as a native +# Fallback protocol to use if a scrape returns blank, unparseable, or otherwise +# invalid Content-Type. +# Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1, +# OpenMetricsText1.0.0, PrometheusText0.0.4, PrometheusText1.0.0. +[ fallback_scrape_protocol: ] + +# Whether to scrape a classic histogram, even if it is also exposed as a native # histogram (has no effect without --enable-feature=native-histograms). -[ scrape_classic_histograms: | default = false ] +[ always_scrape_classic_histograms: | default = false ] # The HTTP resource path on which to fetch metrics from targets. [ metrics_path: | default = /metrics ] diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 65eb60eaf1..c2de68dec2 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -84,59 +84,7 @@ those classic histograms that do not come with a corresponding native histogram. However, if a native histogram is present, Prometheus will ignore the corresponding classic histogram, with the notable exception of exemplars, which are always ingested. To keep the classic histograms as well, enable -`scrape_classic_histograms` in the scrape job. - -_Note about the format of `le` and `quantile` label values:_ - -In certain situations, the protobuf parsing changes the number formatting of -the `le` labels of classic histograms and the `quantile` labels of -summaries. Typically, this happens if the scraped target is instrumented with -[client_golang](https://github.com/prometheus/client_golang) provided that -[promhttp.HandlerOpts.EnableOpenMetrics](https://pkg.go.dev/github.com/prometheus/client_golang/prometheus/promhttp#HandlerOpts) -is set to `false`. In such a case, integer label values are represented in the -text format as such, e.g. `quantile="1"` or `le="2"`. However, the protobuf parsing -changes the representation to float-like (following the OpenMetrics -specification), so the examples above become `quantile="1.0"` and `le="2.0"` after -ingestion into Prometheus, which changes the identity of the metric compared to -what was ingested before via the text format. - -The effect of this change is that alerts, recording rules and dashboards that -directly reference label values as whole numbers such as `le="1"` will stop -working. - -Aggregation by the `le` and `quantile` labels for vectors that contain the old and -new formatting will lead to unexpected results, and range vectors that span the -transition between the different formatting will contain additional series. -The most common use case for both is the quantile calculation via -`histogram_quantile`, e.g. -`histogram_quantile(0.95, sum by (le) (rate(histogram_bucket[10m])))`. -The `histogram_quantile` function already tries to mitigate the effects to some -extent, but there will be inaccuracies, in particular for shorter ranges that -cover only a few samples. - -Ways to deal with this change either globally or on a per metric basis: - -- Fix references to integer `le`, `quantile` label values, but otherwise do -nothing and accept that some queries that span the transition time will produce -inaccurate or unexpected results. -_This is the recommended solution, to get consistently normalized label values._ -Also Prometheus 3.0 is expected to enforce normalization of these label values. -- Use `metric_relabel_config` to retain the old labels when scraping targets. -This should **only** be applied to metrics that currently produce such labels. - - -```yaml - metric_relabel_configs: - - source_labels: - - quantile - target_label: quantile - regex: (\d+)\.0+ - - source_labels: - - le - - __name__ - target_label: le - regex: (\d+)\.0+;.*_bucket -``` +`always_scrape_classic_histograms` in the scrape job. ## Experimental PromQL functions diff --git a/model/textparse/interface.go b/model/textparse/interface.go index 3b0e9a96e1..2682855281 100644 --- a/model/textparse/interface.go +++ b/model/textparse/interface.go @@ -14,6 +14,8 @@ package textparse import ( + "errors" + "fmt" "mime" "github.com/prometheus/common/model" @@ -23,8 +25,7 @@ import ( "github.com/prometheus/prometheus/model/labels" ) -// Parser parses samples from a byte slice of samples in the official -// Prometheus and OpenMetrics text exposition formats. +// Parser parses samples from a byte slice of samples in different exposition formats. type Parser interface { // Series returns the bytes of a series with a simple float64 as a // value, the timestamp if set, and the value of the current sample. @@ -58,6 +59,8 @@ type Parser interface { // Metric writes the labels of the current sample into the passed labels. // It returns the string from which the metric was parsed. + // The values of the "le" labels of classic histograms and "quantile" labels + // of summaries should follow the OpenMetrics formatting rules. Metric(l *labels.Labels) string // Exemplar writes the exemplar of the current sample into the passed @@ -78,28 +81,65 @@ type Parser interface { Next() (Entry, error) } -// New returns a new parser of the byte slice. -// -// This function always returns a valid parser, but might additionally -// return an error if the content type cannot be parsed. -func New(b []byte, contentType string, parseClassicHistograms, skipOMCTSeries bool, st *labels.SymbolTable) (Parser, error) { +// extractMediaType returns the mediaType of a required parser. It tries first to +// extract a valid and supported mediaType from contentType. If that fails, +// the provided fallbackType (possibly an empty string) is returned, together with +// an error. fallbackType is used as-is without further validation. +func extractMediaType(contentType, fallbackType string) (string, error) { if contentType == "" { - return NewPromParser(b, st), nil + if fallbackType == "" { + return "", errors.New("non-compliant scrape target sending blank Content-Type and no fallback_scrape_protocol specified for target") + } + return fallbackType, fmt.Errorf("non-compliant scrape target sending blank Content-Type, using fallback_scrape_protocol %q", fallbackType) } + // We have a contentType, parse it. mediaType, _, err := mime.ParseMediaType(contentType) if err != nil { - return NewPromParser(b, st), err + if fallbackType == "" { + retErr := fmt.Errorf("cannot parse Content-Type %q and no fallback_scrape_protocol for target", contentType) + return "", errors.Join(retErr, err) + } + retErr := fmt.Errorf("could not parse received Content-Type %q, using fallback_scrape_protocol %q", contentType, fallbackType) + return fallbackType, errors.Join(retErr, err) } + + // We have a valid media type, either we recognise it and can use it + // or we have to error. + switch mediaType { + case "application/openmetrics-text", "application/vnd.google.protobuf", "text/plain": + return mediaType, nil + } + // We're here because we have no recognised mediaType. + if fallbackType == "" { + return "", fmt.Errorf("received unsupported Content-Type %q and no fallback_scrape_protocol specified for target", contentType) + } + return fallbackType, fmt.Errorf("received unsupported Content-Type %q, using fallback_scrape_protocol %q", contentType, fallbackType) +} + +// New returns a new parser of the byte slice. +// +// This function no longer guarantees to return a valid parser. +// +// It only returns a valid parser if the supplied contentType and fallbackType allow. +// An error may also be returned if fallbackType had to be used or there was some +// other error parsing the supplied Content-Type. +// If the returned parser is nil then the scrape must fail. +func New(b []byte, contentType, fallbackType string, parseClassicHistograms, skipOMCTSeries bool, st *labels.SymbolTable) (Parser, error) { + mediaType, err := extractMediaType(contentType, fallbackType) + // err may be nil or something we want to warn about. + switch mediaType { case "application/openmetrics-text": return NewOpenMetricsParser(b, st, func(o *openMetricsParserOptions) { o.SkipCTSeries = skipOMCTSeries - }), nil + }), err case "application/vnd.google.protobuf": - return NewProtobufParser(b, parseClassicHistograms, st), nil + return NewProtobufParser(b, parseClassicHistograms, st), err + case "text/plain": + return NewPromParser(b, st), err default: - return NewPromParser(b, st), nil + return nil, err } } diff --git a/model/textparse/interface_test.go b/model/textparse/interface_test.go index 3f2f758d7e..6136fbc915 100644 --- a/model/textparse/interface_test.go +++ b/model/textparse/interface_test.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -31,6 +32,10 @@ import ( func TestNewParser(t *testing.T) { t.Parallel() + requireNilParser := func(t *testing.T, p Parser) { + require.Nil(t, p) + } + requirePromParser := func(t *testing.T, p Parser) { require.NotNil(t, p) _, ok := p.(*PromParser) @@ -43,34 +48,83 @@ func TestNewParser(t *testing.T) { require.True(t, ok) } + requireProtobufParser := func(t *testing.T, p Parser) { + require.NotNil(t, p) + _, ok := p.(*ProtobufParser) + require.True(t, ok) + } + for name, tt := range map[string]*struct { - contentType string - validateParser func(*testing.T, Parser) - err string + contentType string + fallbackScrapeProtocol config.ScrapeProtocol + validateParser func(*testing.T, Parser) + err string }{ "empty-string": { - validateParser: requirePromParser, + validateParser: requireNilParser, + err: "non-compliant scrape target sending blank Content-Type and no fallback_scrape_protocol specified for target", + }, + "empty-string-fallback-text-plain": { + validateParser: requirePromParser, + fallbackScrapeProtocol: config.PrometheusText0_0_4, + err: "non-compliant scrape target sending blank Content-Type, using fallback_scrape_protocol \"text/plain\"", }, "invalid-content-type-1": { contentType: "invalid/", - validateParser: requirePromParser, + validateParser: requireNilParser, err: "expected token after slash", }, + "invalid-content-type-1-fallback-text-plain": { + contentType: "invalid/", + validateParser: requirePromParser, + fallbackScrapeProtocol: config.PrometheusText0_0_4, + err: "expected token after slash", + }, + "invalid-content-type-1-fallback-openmetrics": { + contentType: "invalid/", + validateParser: requireOpenMetricsParser, + fallbackScrapeProtocol: config.OpenMetricsText0_0_1, + err: "expected token after slash", + }, + "invalid-content-type-1-fallback-protobuf": { + contentType: "invalid/", + validateParser: requireProtobufParser, + fallbackScrapeProtocol: config.PrometheusProto, + err: "expected token after slash", + }, "invalid-content-type-2": { contentType: "invalid/invalid/invalid", - validateParser: requirePromParser, + validateParser: requireNilParser, err: "unexpected content after media subtype", }, + "invalid-content-type-2-fallback-text-plain": { + contentType: "invalid/invalid/invalid", + validateParser: requirePromParser, + fallbackScrapeProtocol: config.PrometheusText1_0_0, + err: "unexpected content after media subtype", + }, "invalid-content-type-3": { contentType: "/", - validateParser: requirePromParser, + validateParser: requireNilParser, err: "no media type", }, + "invalid-content-type-3-fallback-text-plain": { + contentType: "/", + validateParser: requirePromParser, + fallbackScrapeProtocol: config.PrometheusText1_0_0, + err: "no media type", + }, "invalid-content-type-4": { contentType: "application/openmetrics-text; charset=UTF-8; charset=utf-8", - validateParser: requirePromParser, + validateParser: requireNilParser, err: "duplicate parameter name", }, + "invalid-content-type-4-fallback-open-metrics": { + contentType: "application/openmetrics-text; charset=UTF-8; charset=utf-8", + validateParser: requireOpenMetricsParser, + fallbackScrapeProtocol: config.OpenMetricsText1_0_0, + err: "duplicate parameter name", + }, "openmetrics": { contentType: "application/openmetrics-text", validateParser: requireOpenMetricsParser, @@ -87,20 +141,33 @@ func TestNewParser(t *testing.T) { contentType: "text/plain", validateParser: requirePromParser, }, + "protobuf": { + contentType: "application/vnd.google.protobuf", + validateParser: requireProtobufParser, + }, "plain-text-with-version": { contentType: "text/plain; version=0.0.4", validateParser: requirePromParser, }, "some-other-valid-content-type": { contentType: "text/html", - validateParser: requirePromParser, + validateParser: requireNilParser, + err: "received unsupported Content-Type \"text/html\" and no fallback_scrape_protocol specified for target", + }, + "some-other-valid-content-type-fallback-text-plain": { + contentType: "text/html", + validateParser: requirePromParser, + fallbackScrapeProtocol: config.PrometheusText0_0_4, + err: "received unsupported Content-Type \"text/html\", using fallback_scrape_protocol \"text/plain\"", }, } { t.Run(name, func(t *testing.T) { tt := tt // Copy to local variable before going parallel. t.Parallel() - p, err := New([]byte{}, tt.contentType, false, false, labels.NewSymbolTable()) + fallbackProtoMediaType := tt.fallbackScrapeProtocol.HeaderMediaType() + + p, err := New([]byte{}, tt.contentType, fallbackProtoMediaType, false, false, labels.NewSymbolTable()) tt.validateParser(t, p) if tt.err == "" { require.NoError(t, err) diff --git a/model/textparse/nhcbparse.go b/model/textparse/nhcbparse.go index a63d0d93e2..7c2db69906 100644 --- a/model/textparse/nhcbparse.go +++ b/model/textparse/nhcbparse.go @@ -36,9 +36,9 @@ import ( // single native histogram. // // Note: -// - Only series that have the histogram metadata type are considered for -// conversion. -// - The classic series are also returned if keepClassicHistograms is true. +// - Only series that have the histogram metadata type are considered for +// conversion. +// - The classic series are also returned if keepClassicHistograms is true. type NHCBParser struct { // The parser we're wrapping. parser Parser diff --git a/model/textparse/openmetricsparse.go b/model/textparse/openmetricsparse.go index 638e9619c9..70c24d9ec6 100644 --- a/model/textparse/openmetricsparse.go +++ b/model/textparse/openmetricsparse.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "math" + "strconv" "strings" "unicode/utf8" @@ -210,7 +211,7 @@ func (p *OpenMetricsParser) Metric(l *labels.Labels) string { label := unreplace(s[a:b]) c := p.offsets[i+2] - p.start d := p.offsets[i+3] - p.start - value := unreplace(s[c:d]) + value := normalizeFloatsInLabelValues(p.mtype, label, unreplace(s[c:d])) p.builder.Add(label, value) } @@ -723,3 +724,15 @@ func (p *OpenMetricsParser) getFloatValue(t token, after string) (float64, error } return val, nil } + +// normalizeFloatsInLabelValues ensures that values of the "le" labels of classic histograms and "quantile" labels +// of summaries follow OpenMetrics formatting rules. +func normalizeFloatsInLabelValues(t model.MetricType, l, v string) string { + if (t == model.MetricTypeSummary && l == model.QuantileLabel) || (t == model.MetricTypeHistogram && l == model.BucketLabel) { + f, err := strconv.ParseFloat(v, 64) + if err == nil { + return formatOpenMetricsFloat(f) + } + } + return v +} diff --git a/model/textparse/openmetricsparse_test.go b/model/textparse/openmetricsparse_test.go index 467a237718..9c3c679ab5 100644 --- a/model/textparse/openmetricsparse_test.go +++ b/model/textparse/openmetricsparse_test.go @@ -74,6 +74,7 @@ foo_total{a="b"} 17.0 1520879607.789 # {id="counter-test"} 5 foo_created{a="b"} 1520872607.123 foo_total{le="c"} 21.0 foo_created{le="c"} 1520872621.123 +foo_total{le="1"} 10.0 # HELP bar Summary with CT at the end, making sure we find CT even if it's multiple lines a far # TYPE bar summary bar_count 17.0 @@ -97,6 +98,7 @@ something_count 18 something_sum 324789.4 something_created 1520430001 something_bucket{le="0.0"} 1 +something_bucket{le="1"} 2 something_bucket{le="+Inf"} 18 # HELP yum Summary with _created between sum and quantiles # TYPE yum summary @@ -130,7 +132,7 @@ foobar{quantile="0.99"} 150.1` }, { m: `go_gc_duration_seconds{quantile="0"}`, v: 4.9351e-05, - lset: labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0"), + lset: labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.0"), }, { m: `go_gc_duration_seconds{quantile="0.25"}`, v: 7.424100000000001e-05, @@ -302,6 +304,10 @@ foobar{quantile="0.99"} 150.1` v: 21.0, lset: labels.FromStrings("__name__", "foo_total", "le", "c"), ct: int64p(1520872621123), + }, { + m: `foo_total{le="1"}`, + v: 10.0, + lset: labels.FromStrings("__name__", "foo_total", "le", "1"), }, { m: "bar", help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far", @@ -385,6 +391,11 @@ foobar{quantile="0.99"} 150.1` v: 1, lset: labels.FromStrings("__name__", "something_bucket", "le", "0.0"), ct: int64p(1520430001000), + }, { + m: `something_bucket{le="1"}`, + v: 2, + lset: labels.FromStrings("__name__", "something_bucket", "le", "1.0"), + ct: int64p(1520430001000), }, { m: `something_bucket{le="+Inf"}`, v: 18, @@ -492,7 +503,7 @@ func TestUTF8OpenMetricsParse(t *testing.T) { }, { m: `{"go.gc_duration_seconds",quantile="0"}`, v: 4.9351e-05, - lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0"), + lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0.0"), ct: int64p(1520872607123), }, { m: `{"go.gc_duration_seconds",quantile="0.25"}`, diff --git a/model/textparse/promparse.go b/model/textparse/promparse.go index 5759769279..0ab932c665 100644 --- a/model/textparse/promparse.go +++ b/model/textparse/promparse.go @@ -239,7 +239,8 @@ func (p *PromParser) Metric(l *labels.Labels) string { label := unreplace(s[a:b]) c := p.offsets[i+2] - p.start d := p.offsets[i+3] - p.start - value := unreplace(s[c:d]) + value := normalizeFloatsInLabelValues(p.mtype, label, unreplace(s[c:d])) + p.builder.Add(label, value) } diff --git a/model/textparse/promparse_test.go b/model/textparse/promparse_test.go index b726d8847a..e8cf66f539 100644 --- a/model/textparse/promparse_test.go +++ b/model/textparse/promparse_test.go @@ -31,6 +31,13 @@ go_gc_duration_seconds{quantile="0.25",} 7.424100000000001e-05 go_gc_duration_seconds{quantile="0.5",a="b"} 8.3835e-05 go_gc_duration_seconds{quantile="0.8", a="b"} 8.3835e-05 go_gc_duration_seconds{ quantile="0.9", a="b"} 8.3835e-05 +# HELP prometheus_http_request_duration_seconds Histogram of latencies for HTTP requests. +# TYPE prometheus_http_request_duration_seconds histogram +prometheus_http_request_duration_seconds_bucket{handler="/",le="1"} 423 +prometheus_http_request_duration_seconds_bucket{handler="/",le="2"} 1423 +prometheus_http_request_duration_seconds_bucket{handler="/",le="+Inf"} 1423 +prometheus_http_request_duration_seconds_sum{handler="/"} 2000 +prometheus_http_request_duration_seconds_count{handler="/"} 1423 # Hrandom comment starting with prefix of HELP # wind_speed{A="2",c="3"} 12345 @@ -50,7 +57,8 @@ some:aggregate:rate5m{a_b="c"} 1 go_goroutines 33 123123 _metric_starting_with_underscore 1 testmetric{_label_starting_with_underscore="foo"} 1 -testmetric{label="\"bar\""} 1` +testmetric{label="\"bar\""} 1 +testmetric{le="10"} 1` input += "\n# HELP metric foo\x00bar" input += "\nnull_byte_metric{a=\"abc\x00\"} 1" @@ -64,7 +72,7 @@ testmetric{label="\"bar\""} 1` }, { m: `go_gc_duration_seconds{quantile="0"}`, v: 4.9351e-05, - lset: labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0"), + lset: labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.0"), }, { m: `go_gc_duration_seconds{quantile="0.25",}`, v: 7.424100000000001e-05, @@ -81,6 +89,32 @@ testmetric{label="\"bar\""} 1` m: `go_gc_duration_seconds{ quantile="0.9", a="b"}`, v: 8.3835e-05, lset: labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.9", "a", "b"), + }, { + m: "prometheus_http_request_duration_seconds", + help: "Histogram of latencies for HTTP requests.", + }, { + m: "prometheus_http_request_duration_seconds", + typ: model.MetricTypeHistogram, + }, { + m: `prometheus_http_request_duration_seconds_bucket{handler="/",le="1"}`, + v: 423, + lset: labels.FromStrings("__name__", "prometheus_http_request_duration_seconds_bucket", "handler", "/", "le", "1.0"), + }, { + m: `prometheus_http_request_duration_seconds_bucket{handler="/",le="2"}`, + v: 1423, + lset: labels.FromStrings("__name__", "prometheus_http_request_duration_seconds_bucket", "handler", "/", "le", "2.0"), + }, { + m: `prometheus_http_request_duration_seconds_bucket{handler="/",le="+Inf"}`, + v: 1423, + lset: labels.FromStrings("__name__", "prometheus_http_request_duration_seconds_bucket", "handler", "/", "le", "+Inf"), + }, { + m: `prometheus_http_request_duration_seconds_sum{handler="/"}`, + v: 2000, + lset: labels.FromStrings("__name__", "prometheus_http_request_duration_seconds_sum", "handler", "/"), + }, { + m: `prometheus_http_request_duration_seconds_count{handler="/"}`, + v: 1423, + lset: labels.FromStrings("__name__", "prometheus_http_request_duration_seconds_count", "handler", "/"), }, { comment: "# Hrandom comment starting with prefix of HELP", }, { @@ -151,6 +185,10 @@ testmetric{label="\"bar\""} 1` m: "testmetric{label=\"\\\"bar\\\"\"}", v: 1, lset: labels.FromStrings("__name__", "testmetric", "label", `"bar"`), + }, { + m: `testmetric{le="10"}`, + v: 1, + lset: labels.FromStrings("__name__", "testmetric", "le", "10"), }, { m: "metric", help: "foo\x00bar", @@ -197,7 +235,7 @@ func TestUTF8PromParse(t *testing.T) { }, { m: `{"go.gc_duration_seconds",quantile="0"}`, v: 4.9351e-05, - lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0"), + lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0.0"), }, { m: `{"go.gc_duration_seconds",quantile="0.25",}`, v: 7.424100000000001e-05, diff --git a/model/textparse/protobufparse.go b/model/textparse/protobufparse.go index b3dfdfca1c..a77e1d728f 100644 --- a/model/textparse/protobufparse.go +++ b/model/textparse/protobufparse.go @@ -20,7 +20,9 @@ import ( "fmt" "io" "math" + "strconv" "strings" + "sync" "unicode/utf8" "github.com/gogo/protobuf/proto" @@ -34,6 +36,15 @@ import ( dto "github.com/prometheus/prometheus/prompb/io/prometheus/client" ) +// floatFormatBufPool is exclusively used in formatOpenMetricsFloat. +var floatFormatBufPool = sync.Pool{ + New: func() interface{} { + // To contain at most 17 digits and additional syntax for a float64. + b := make([]byte, 0, 24) + return &b + }, +} + // ProtobufParser is a very inefficient way of unmarshaling the old Prometheus // protobuf format and then present it as it if were parsed by a // Prometheus-2-style text parser. This is only done so that we can easily plug @@ -629,11 +640,15 @@ func formatOpenMetricsFloat(f float64) string { case math.IsInf(f, -1): return "-Inf" } - s := fmt.Sprint(f) - if strings.ContainsAny(s, "e.") { - return s + bp := floatFormatBufPool.Get().(*[]byte) + defer floatFormatBufPool.Put(bp) + + *bp = strconv.AppendFloat((*bp)[:0], f, 'g', -1, 64) + if bytes.ContainsAny(*bp, "e.") { + return string(*bp) } - return s + ".0" + *bp = append(*bp, '.', '0') + return string(*bp) } // isNativeHistogram returns false iff the provided histograms has no spans at diff --git a/model/textparse/protobufparse_test.go b/model/textparse/protobufparse_test.go index 0c09279fed..065459a69a 100644 --- a/model/textparse/protobufparse_test.go +++ b/model/textparse/protobufparse_test.go @@ -409,6 +409,49 @@ metric: < > > +`, + `name: "test_histogram3" +help: "Similar histogram as before but now with integer buckets." +type: HISTOGRAM +metric: < + histogram: < + sample_count: 6 + sample_sum: 50 + bucket: < + cumulative_count: 2 + upper_bound: -20 + > + bucket: < + cumulative_count: 4 + upper_bound: 20 + exemplar: < + label: < + name: "dummyID" + value: "59727" + > + value: 15 + timestamp: < + seconds: 1625851153 + nanos: 146848499 + > + > + > + bucket: < + cumulative_count: 6 + upper_bound: 30 + exemplar: < + label: < + name: "dummyID" + value: "5617" + > + value: 25 + > + > + schema: 0 + zero_threshold: 0 + > +> + `, `name: "test_histogram_family" help: "Test histogram metric family with two very simple histograms." @@ -1050,6 +1093,66 @@ func TestProtobufParse(t *testing.T) { "le", "+Inf", ), }, + { + m: "test_histogram3", + help: "Similar histogram as before but now with integer buckets.", + }, + { + m: "test_histogram3", + typ: model.MetricTypeHistogram, + }, + { + m: "test_histogram3_count", + v: 6, + lset: labels.FromStrings( + "__name__", "test_histogram3_count", + ), + }, + { + m: "test_histogram3_sum", + v: 50, + lset: labels.FromStrings( + "__name__", "test_histogram3_sum", + ), + }, + { + m: "test_histogram3_bucket\xffle\xff-20.0", + v: 2, + lset: labels.FromStrings( + "__name__", "test_histogram3_bucket", + "le", "-20.0", + ), + }, + { + m: "test_histogram3_bucket\xffle\xff20.0", + v: 4, + lset: labels.FromStrings( + "__name__", "test_histogram3_bucket", + "le", "20.0", + ), + es: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: 15, HasTs: true, Ts: 1625851153146}, + }, + }, + { + m: "test_histogram3_bucket\xffle\xff30.0", + v: 6, + lset: labels.FromStrings( + "__name__", "test_histogram3_bucket", + "le", "30.0", + ), + es: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "5617"), Value: 25, HasTs: false}, + }, + }, + { + m: "test_histogram3_bucket\xffle\xff+Inf", + v: 6, + lset: labels.FromStrings( + "__name__", "test_histogram3_bucket", + "le", "+Inf", + ), + }, { m: "test_histogram_family", help: "Test histogram metric family with two very simple histograms.", @@ -1857,6 +1960,66 @@ func TestProtobufParse(t *testing.T) { "le", "+Inf", ), }, + { + m: "test_histogram3", + help: "Similar histogram as before but now with integer buckets.", + }, + { + m: "test_histogram3", + typ: model.MetricTypeHistogram, + }, + { + m: "test_histogram3_count", + v: 6, + lset: labels.FromStrings( + "__name__", "test_histogram3_count", + ), + }, + { + m: "test_histogram3_sum", + v: 50, + lset: labels.FromStrings( + "__name__", "test_histogram3_sum", + ), + }, + { + m: "test_histogram3_bucket\xffle\xff-20.0", + v: 2, + lset: labels.FromStrings( + "__name__", "test_histogram3_bucket", + "le", "-20.0", + ), + }, + { + m: "test_histogram3_bucket\xffle\xff20.0", + v: 4, + lset: labels.FromStrings( + "__name__", "test_histogram3_bucket", + "le", "20.0", + ), + es: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: 15, HasTs: true, Ts: 1625851153146}, + }, + }, + { + m: "test_histogram3_bucket\xffle\xff30.0", + v: 6, + lset: labels.FromStrings( + "__name__", "test_histogram3_bucket", + "le", "30.0", + ), + es: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "5617"), Value: 25, HasTs: false}, + }, + }, + { + m: "test_histogram3_bucket\xffle\xff+Inf", + v: 6, + lset: labels.FromStrings( + "__name__", "test_histogram3_bucket", + "le", "+Inf", + ), + }, { m: "test_histogram_family", help: "Test histogram metric family with two very simple histograms.", diff --git a/notifier/notifier.go b/notifier/notifier.go index 482d2fdaab..e970b67e6d 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -542,10 +542,10 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { begin := time.Now() - // v1Payload and v2Payload represent 'alerts' marshaled for Alertmanager API - // v1 or v2. Marshaling happens below. Reference here is for caching between + // cachedPayload represent 'alerts' marshaled for Alertmanager API v2. + // Marshaling happens below. Reference here is for caching between // for loop iterations. - var v1Payload, v2Payload []byte + var cachedPayload []byte n.mtx.RLock() amSets := n.alertmanagers @@ -576,29 +576,16 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { continue } // We can't use the cached values from previous iteration. - v1Payload, v2Payload = nil, nil + cachedPayload = nil } switch ams.cfg.APIVersion { - case config.AlertmanagerAPIVersionV1: - { - if v1Payload == nil { - v1Payload, err = json.Marshal(amAlerts) - if err != nil { - n.logger.Error("Encoding alerts for Alertmanager API v1 failed", "err", err) - ams.mtx.RUnlock() - return false - } - } - - payload = v1Payload - } case config.AlertmanagerAPIVersionV2: { - if v2Payload == nil { + if cachedPayload == nil { openAPIAlerts := alertsToOpenAPIAlerts(amAlerts) - v2Payload, err = json.Marshal(openAPIAlerts) + cachedPayload, err = json.Marshal(openAPIAlerts) if err != nil { n.logger.Error("Encoding alerts for Alertmanager API v2 failed", "err", err) ams.mtx.RUnlock() @@ -606,7 +593,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { } } - payload = v2Payload + payload = cachedPayload } default: { @@ -621,7 +608,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { if len(ams.cfg.AlertRelabelConfigs) > 0 { // We can't use the cached values on the next iteration. - v1Payload, v2Payload = nil, nil + cachedPayload = nil } for _, am := range ams.ams { diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 83eaf8168b..97b0274f29 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -50,27 +50,27 @@ func TestPostPath(t *testing.T) { }{ { in: "", - out: "/api/v1/alerts", + out: "/api/v2/alerts", }, { in: "/", - out: "/api/v1/alerts", + out: "/api/v2/alerts", }, { in: "/prefix", - out: "/prefix/api/v1/alerts", + out: "/prefix/api/v2/alerts", }, { in: "/prefix//", - out: "/prefix/api/v1/alerts", + out: "/prefix/api/v2/alerts", }, { in: "prefix//", - out: "/prefix/api/v1/alerts", + out: "/prefix/api/v2/alerts", }, } for _, c := range cases { - require.Equal(t, c.out, postPath(c.in, config.AlertmanagerAPIVersionV1)) + require.Equal(t, c.out, postPath(c.in, config.AlertmanagerAPIVersionV2)) } } diff --git a/promql/engine.go b/promql/engine.go index 5435db0fc4..0e5ab62355 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1230,38 +1230,17 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label ev.currentSamples = tempNumSamples // Gather input vectors for this timestamp. for i := range exprs { - vectors[i] = vectors[i][:0] - + var bh []EvalSeriesHelper + var sh []EvalSeriesHelper if prepSeries != nil { - bufHelpers[i] = bufHelpers[i][:0] - } - - for si, series := range matrixes[i] { - switch { - case len(series.Floats) > 0 && series.Floats[0].T == ts: - vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts, DropName: series.DropName}) - // Move input vectors forward so we don't have to re-scan the same - // past points at the next step. - matrixes[i][si].Floats = series.Floats[1:] - case len(series.Histograms) > 0 && series.Histograms[0].T == ts: - vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts, DropName: series.DropName}) - matrixes[i][si].Histograms = series.Histograms[1:] - default: - continue - } - if prepSeries != nil { - bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si]) - } - // Don't add histogram size here because we only - // copy the pointer above, not the whole - // histogram. - ev.currentSamples++ - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } + bh = bufHelpers[i][:0] + sh = seriesHelpers[i] } + vectors[i], bh = ev.gatherVector(ts, matrixes[i], vectors[i], bh, sh) args[i] = vectors[i] - ev.samplesStats.UpdatePeak(ev.currentSamples) + if prepSeries != nil { + bufHelpers[i] = bh + } } // Make the function call. @@ -3724,3 +3703,41 @@ func newHistogramStatsSeries(series storage.Series) *histogramStatsSeries { func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return NewHistogramStatsIterator(s.Series.Iterator(it)) } + +// gatherVector gathers a Vector for ts from the series in input. +// output is used as a buffer. +// If bufHelpers and seriesHelpers are provided, seriesHelpers[i] is appended to bufHelpers for every input index i. +// The gathered Vector and bufHelper are returned. +func (ev *evaluator) gatherVector(ts int64, input Matrix, output Vector, bufHelpers, seriesHelpers []EvalSeriesHelper) (Vector, []EvalSeriesHelper) { + output = output[:0] + for i, series := range input { + switch { + case len(series.Floats) > 0 && series.Floats[0].T == ts: + s := series.Floats[0] + output = append(output, Sample{Metric: series.Metric, F: s.F, T: ts, DropName: series.DropName}) + // Move input vectors forward so we don't have to re-scan the same + // past points at the next step. + input[i].Floats = series.Floats[1:] + case len(series.Histograms) > 0 && series.Histograms[0].T == ts: + s := series.Histograms[0] + output = append(output, Sample{Metric: series.Metric, H: s.H, T: ts, DropName: series.DropName}) + input[i].Histograms = series.Histograms[1:] + default: + continue + } + if len(seriesHelpers) > 0 { + bufHelpers = append(bufHelpers, seriesHelpers[i]) + } + + // Don't add histogram size here because we only + // copy the pointer above, not the whole + // histogram. + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + } + ev.samplesStats.UpdatePeak(ev.currentSamples) + + return output, bufHelpers +} diff --git a/promql/functions.go b/promql/functions.go index 4333cb5ce0..0543655122 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -415,22 +415,12 @@ func funcSortDesc(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel // === sort_by_label(vector parser.ValueTypeVector, label parser.ValueTypeString...) (Vector, Annotations) === func funcSortByLabel(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - // First, sort by the full label set. This ensures a consistent ordering in case sorting by the - // labels provided as arguments is not conclusive. + lbls := stringSliceFromArgs(args[1:]) slices.SortFunc(vals[0].(Vector), func(a, b Sample) int { - return labels.Compare(a.Metric, b.Metric) - }) - - labels := stringSliceFromArgs(args[1:]) - // Next, sort by the labels provided as arguments. - slices.SortFunc(vals[0].(Vector), func(a, b Sample) int { - // Iterate over each given label. - for _, label := range labels { + for _, label := range lbls { lv1 := a.Metric.Get(label) lv2 := b.Metric.Get(label) - // If we encounter multiple samples with the same label values, the sorting which was - // performed in the first step will act as a "tie breaker". if lv1 == lv2 { continue } @@ -442,7 +432,8 @@ func funcSortByLabel(vals []parser.Value, args parser.Expressions, enh *EvalNode return +1 } - return 0 + // If all labels provided as arguments were equal, sort by the full label set. This ensures a consistent ordering. + return labels.Compare(a.Metric, b.Metric) }) return vals[0].(Vector), nil @@ -450,22 +441,12 @@ func funcSortByLabel(vals []parser.Value, args parser.Expressions, enh *EvalNode // === sort_by_label_desc(vector parser.ValueTypeVector, label parser.ValueTypeString...) (Vector, Annotations) === func funcSortByLabelDesc(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - // First, sort by the full label set. This ensures a consistent ordering in case sorting by the - // labels provided as arguments is not conclusive. + lbls := stringSliceFromArgs(args[1:]) slices.SortFunc(vals[0].(Vector), func(a, b Sample) int { - return labels.Compare(b.Metric, a.Metric) - }) - - labels := stringSliceFromArgs(args[1:]) - // Next, sort by the labels provided as arguments. - slices.SortFunc(vals[0].(Vector), func(a, b Sample) int { - // Iterate over each given label. - for _, label := range labels { + for _, label := range lbls { lv1 := a.Metric.Get(label) lv2 := b.Metric.Get(label) - // If we encounter multiple samples with the same label values, the sorting which was - // performed in the first step will act as a "tie breaker". if lv1 == lv2 { continue } @@ -477,7 +458,8 @@ func funcSortByLabelDesc(vals []parser.Value, args parser.Expressions, enh *Eval return -1 } - return 0 + // If all labels provided as arguments were equal, sort by the full label set. This ensures a consistent ordering. + return -labels.Compare(a.Metric, b.Metric) }) return vals[0].(Vector), nil diff --git a/promql/fuzz.go b/promql/fuzz.go index 57fd1166ac..759055fb0d 100644 --- a/promql/fuzz.go +++ b/promql/fuzz.go @@ -61,8 +61,8 @@ const ( var symbolTable = labels.NewSymbolTable() func fuzzParseMetricWithContentType(in []byte, contentType string) int { - p, warning := textparse.New(in, contentType, false, false, symbolTable) - if warning != nil { + p, warning := textparse.New(in, contentType, "", false, false, symbolTable) + if p == nil || warning != nil { // An invalid content type is being passed, which should not happen // in this context. panic(warning) @@ -91,7 +91,7 @@ func fuzzParseMetricWithContentType(in []byte, contentType string) int { // Note that this is not the parser for the text-based exposition-format; that // lives in github.com/prometheus/client_golang/text. func FuzzParseMetric(in []byte) int { - return fuzzParseMetricWithContentType(in, "") + return fuzzParseMetricWithContentType(in, "text/plain") } func FuzzParseOpenMetric(in []byte) int { diff --git a/scrape/scrape.go b/scrape/scrape.go index 9a0a7b0ec3..290855b3a3 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -112,9 +112,10 @@ type scrapeLoopOptions struct { trackTimestampsStaleness bool interval time.Duration timeout time.Duration - scrapeClassicHistograms bool + alwaysScrapeClassicHist bool convertClassicHistograms bool validationScheme model.ValidationScheme + fallbackScrapeProtocol string mrc []*relabel.Config cache *scrapeCache @@ -180,7 +181,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed opts.labelLimits, opts.interval, opts.timeout, - opts.scrapeClassicHistograms, + opts.alwaysScrapeClassicHist, opts.convertClassicHistograms, options.EnableNativeHistogramsIngestion, options.EnableCreatedTimestampZeroIngestion, @@ -191,6 +192,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed metrics, options.skipOffsetting, opts.validationScheme, + opts.fallbackScrapeProtocol, ) } sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit)) @@ -327,6 +329,7 @@ func (sp *scrapePool) restartLoops(reuseCache bool) { enableCompression = sp.config.EnableCompression trackTimestampsStaleness = sp.config.TrackTimestampsStaleness mrc = sp.config.MetricRelabelConfigs + fallbackScrapeProtocol = sp.config.ScrapeFallbackProtocol.HeaderMediaType() ) validationScheme := model.UTF8Validation @@ -373,6 +376,7 @@ func (sp *scrapePool) restartLoops(reuseCache bool) { interval: interval, timeout: timeout, validationScheme: validationScheme, + fallbackScrapeProtocol: fallbackScrapeProtocol, }) ) if err != nil { @@ -482,7 +486,8 @@ func (sp *scrapePool) sync(targets []*Target) { enableCompression = sp.config.EnableCompression trackTimestampsStaleness = sp.config.TrackTimestampsStaleness mrc = sp.config.MetricRelabelConfigs - scrapeClassicHistograms = sp.config.ScrapeClassicHistograms + fallbackScrapeProtocol = sp.config.ScrapeFallbackProtocol.HeaderMediaType() + alwaysScrapeClassicHist = sp.config.AlwaysScrapeClassicHistograms convertClassicHistograms = sp.config.ConvertClassicHistograms ) @@ -524,9 +529,10 @@ func (sp *scrapePool) sync(targets []*Target) { mrc: mrc, interval: interval, timeout: timeout, - scrapeClassicHistograms: scrapeClassicHistograms, + alwaysScrapeClassicHist: alwaysScrapeClassicHist, convertClassicHistograms: convertClassicHistograms, validationScheme: validationScheme, + fallbackScrapeProtocol: fallbackScrapeProtocol, }) if err != nil { l.setForcedError(err) @@ -887,9 +893,10 @@ type scrapeLoop struct { labelLimits *labelLimits interval time.Duration timeout time.Duration - scrapeClassicHistograms bool + alwaysScrapeClassicHist bool convertClassicHistograms bool validationScheme model.ValidationScheme + fallbackScrapeProtocol string // Feature flagged options. enableNativeHistogramIngestion bool @@ -1188,7 +1195,7 @@ func newScrapeLoop(ctx context.Context, labelLimits *labelLimits, interval time.Duration, timeout time.Duration, - scrapeClassicHistograms bool, + alwaysScrapeClassicHist bool, convertClassicHistograms bool, enableNativeHistogramIngestion bool, enableCTZeroIngestion bool, @@ -1199,6 +1206,7 @@ func newScrapeLoop(ctx context.Context, metrics *scrapeMetrics, skipOffsetting bool, validationScheme model.ValidationScheme, + fallbackScrapeProtocol string, ) *scrapeLoop { if l == nil { l = promslog.NewNopLogger() @@ -1243,7 +1251,7 @@ func newScrapeLoop(ctx context.Context, labelLimits: labelLimits, interval: interval, timeout: timeout, - scrapeClassicHistograms: scrapeClassicHistograms, + alwaysScrapeClassicHist: alwaysScrapeClassicHist, convertClassicHistograms: convertClassicHistograms, enableNativeHistogramIngestion: enableNativeHistogramIngestion, enableCTZeroIngestion: enableCTZeroIngestion, @@ -1252,6 +1260,7 @@ func newScrapeLoop(ctx context.Context, metrics: metrics, skipOffsetting: skipOffsetting, validationScheme: validationScheme, + fallbackScrapeProtocol: fallbackScrapeProtocol, } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -1544,14 +1553,24 @@ type appendErrors struct { } func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { - p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.enableCTZeroIngestion, sl.symbolTable) + p, err := textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable) + if p == nil { + sl.l.Error( + "Failed to determine correct type of scrape target.", + "content_type", contentType, + "fallback_media_type", sl.fallbackScrapeProtocol, + "err", err, + ) + return + } if sl.convertClassicHistograms { - p = textparse.NewNHCBParser(p, sl.symbolTable, sl.scrapeClassicHistograms) + p = textparse.NewNHCBParser(p, sl.symbolTable, sl.alwaysScrapeClassicHist) } if err != nil { sl.l.Debug( - "Invalid content type on scrape, using prometheus parser as fallback.", + "Invalid content type on scrape, using fallback setting.", "content_type", contentType, + "fallback_media_type", sl.fallbackScrapeProtocol, "err", err, ) } diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index be53d72a26..fef4d0b7fa 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -692,6 +692,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app newTestScrapeMetrics(t), false, model.LegacyValidation, + "text/plain", ) } @@ -836,6 +837,7 @@ func TestScrapeLoopRun(t *testing.T) { scrapeMetrics, false, model.LegacyValidation, + "text/plain", ) // The loop must terminate during the initial offset if the context @@ -982,6 +984,7 @@ func TestScrapeLoopMetadata(t *testing.T) { scrapeMetrics, false, model.LegacyValidation, + "text/plain", ) defer cancel() @@ -1530,7 +1533,8 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { fakeRef := storage.SeriesRef(1) expValue := float64(1) metric := []byte(`metric{n="1"} 1`) - p, warning := textparse.New(metric, "", false, false, labels.NewSymbolTable()) + p, warning := textparse.New(metric, "text/plain", "", false, false, labels.NewSymbolTable()) + require.NotNil(t, p) require.NoError(t, warning) var lset labels.Labels @@ -1850,7 +1854,7 @@ func TestScrapeLoopAppendStalenessIfTrackTimestampStaleness(t *testing.T) { func TestScrapeLoopAppendExemplar(t *testing.T) { tests := []struct { title string - scrapeClassicHistograms bool + alwaysScrapeClassicHist bool enableNativeHistogramsIngestion bool scrapeText string contentType string @@ -2119,7 +2123,7 @@ metric: < > `, - scrapeClassicHistograms: true, + alwaysScrapeClassicHist: true, contentType: "application/vnd.google.protobuf", floats: []floatSample{ {metric: labels.FromStrings("__name__", "test_histogram_count"), t: 1234568, f: 175}, @@ -2181,7 +2185,7 @@ metric: < sl.reportSampleMutator = func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, discoveryLabels) } - sl.scrapeClassicHistograms = test.scrapeClassicHistograms + sl.alwaysScrapeClassicHist = test.alwaysScrapeClassicHist now := time.Now() diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 08417e889e..3f0fc0c841 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4757,7 +4757,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) require.Len(t, seriesSet, 1) gotSamples := seriesSet[series1.String()] - requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets) // Verify chunks querier. chunkQuerier, err := db.ChunkQuerier(minT, maxT) @@ -4775,7 +4775,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { gotChunkSamples = append(gotChunkSamples, smpls...) require.NoError(t, it.Err()) } - requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, true) + requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, requireEqualSamplesIgnoreCounterResets) } var expSamples []chunks.Sample @@ -5704,16 +5704,33 @@ func testQuerierOOOQuery(t *testing.T, gotSamples := seriesSet[series1.String()] require.NotNil(t, gotSamples) require.Len(t, seriesSet, 1) - requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets) requireEqualOOOSamples(t, oooSamples, db) }) } } func TestChunkQuerierOOOQuery(t *testing.T) { + nBucketHistogram := func(n int64) *histogram.Histogram { + h := &histogram.Histogram{ + Count: uint64(n), + Sum: float64(n), + } + if n == 0 { + h.PositiveSpans = []histogram.Span{} + h.PositiveBuckets = []int64{} + return h + } + h.PositiveSpans = []histogram.Span{{Offset: 0, Length: uint32(n)}} + h.PositiveBuckets = make([]int64, n) + h.PositiveBuckets[0] = 1 + return h + } + scenarios := map[string]struct { - appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) - sampleFunc func(ts int64) chunks.Sample + appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) + sampleFunc func(ts int64) chunks.Sample + checkInUseBucket bool }{ "float": { appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { @@ -5758,10 +5775,24 @@ func TestChunkQuerierOOOQuery(t *testing.T) { return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))} }, }, + "integer histogram with recode": { + // Histograms have increasing number of buckets so their chunks are recoded. + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + n := ts / time.Minute.Milliseconds() + return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nBucketHistogram(n), nil) + }, + sampleFunc: func(ts int64) chunks.Sample { + n := ts / time.Minute.Milliseconds() + return sample{t: ts, h: nBucketHistogram(n)} + }, + // Only check in-use buckets for this scenario. + // Recoding adds empty buckets. + checkInUseBucket: true, + }, } for name, scenario := range scenarios { t.Run(name, func(t *testing.T) { - testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc) + testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc, scenario.checkInUseBucket) }) } } @@ -5769,6 +5800,7 @@ func TestChunkQuerierOOOQuery(t *testing.T) { func testChunkQuerierOOOQuery(t *testing.T, appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error), sampleFunc func(ts int64) chunks.Sample, + checkInUseBuckets bool, ) { opts := DefaultOptions() opts.OutOfOrderCapMax = 30 @@ -6008,10 +6040,28 @@ func testChunkQuerierOOOQuery(t *testing.T, it := chunk.Chunk.Iterator(nil) smpls, err := storage.ExpandSamples(it, newSample) require.NoError(t, err) + + // Verify that no sample is outside the chunk's time range. + for i, s := range smpls { + switch i { + case 0: + require.Equal(t, chunk.MinTime, s.T(), "first sample %v not at chunk min time %v", s, chunk.MinTime) + case len(smpls) - 1: + require.Equal(t, chunk.MaxTime, s.T(), "last sample %v not at chunk max time %v", s, chunk.MaxTime) + default: + require.GreaterOrEqual(t, s.T(), chunk.MinTime, "sample %v before chunk min time %v", s, chunk.MinTime) + require.LessOrEqual(t, s.T(), chunk.MaxTime, "sample %v after chunk max time %v", s, chunk.MaxTime) + } + } + gotSamples = append(gotSamples, smpls...) require.NoError(t, it.Err()) } - requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + if checkInUseBuckets { + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets, requireEqualSamplesInUseBucketCompare) + } else { + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets) + } }) } } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 671e85cd78..cc9daa97fe 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5178,7 +5178,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { // Passing in true for the 'ignoreCounterResets' parameter prevents differences in counter reset headers // from being factored in to the sample comparison // TODO(fionaliao): understand counter reset behaviour, might want to modify this later - requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, true) + requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, requireEqualSamplesIgnoreCounterResets) require.NoError(t, h.Close()) } diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 8d1527e05c..17f551dd7d 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -878,7 +878,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { } resultSamples, err := storage.ExpandSamples(it, nil) require.NoError(t, err) - requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true) + requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets) } }) } @@ -1054,7 +1054,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( it := iterable.Iterator(nil) resultSamples, err := storage.ExpandSamples(it, nil) require.NoError(t, err) - requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true) + requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets) } }) } diff --git a/tsdb/querier.go b/tsdb/querier.go index 1083cbba0e..b80faf881e 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -1022,9 +1022,9 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { if newChunk != nil { if !recoded { p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) + cmint = t } currentChunk = newChunk - cmint = t } cmaxt = t diff --git a/tsdb/testutil.go b/tsdb/testutil.go index ab6aab79f4..03587f4e2c 100644 --- a/tsdb/testutil.go +++ b/tsdb/testutil.go @@ -111,7 +111,7 @@ func requireEqualSeries(t *testing.T, expected, actual map[string][]chunks.Sampl for name, expectedItem := range expected { actualItem, ok := actual[name] require.True(t, ok, "Expected series %s not found", name) - requireEqualSamples(t, name, expectedItem, actualItem, ignoreCounterResets) + requireEqualSamples(t, name, expectedItem, actualItem, requireEqualSamplesIgnoreCounterResets) } for name := range actual { _, ok := expected[name] @@ -126,7 +126,28 @@ func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) { "number of ooo appended samples mismatch") } -func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, ignoreCounterResets bool) { +type requireEqualSamplesOption int + +const ( + requireEqualSamplesNoOption requireEqualSamplesOption = iota + requireEqualSamplesIgnoreCounterResets + requireEqualSamplesInUseBucketCompare +) + +func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, options ...requireEqualSamplesOption) { + var ( + ignoreCounterResets bool + inUseBucketCompare bool + ) + for _, option := range options { + switch option { + case requireEqualSamplesIgnoreCounterResets: + ignoreCounterResets = true + case requireEqualSamplesInUseBucketCompare: + inUseBucketCompare = true + } + } + require.Equal(t, len(expected), len(actual), "Length not equal to expected for %s", name) for i, s := range expected { expectedSample := s @@ -144,6 +165,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa } else { require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint)) } + if inUseBucketCompare { + expectedSample.H().Compact(0) + actualSample.H().Compact(0) + } require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T()) } case s.FH() != nil: @@ -156,6 +181,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa } else { require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint)) } + if inUseBucketCompare { + expectedSample.FH().Compact(0) + actualSample.FH().Compact(0) + } require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T()) } default: