Merge branch 'main' into nhcb-scrape-impl

This commit is contained in:
György Krajcsovits 2024-10-21 11:00:41 +02:00
commit 8c1b5a6251
40 changed files with 895 additions and 297 deletions

View file

@ -2,6 +2,7 @@
## unreleased
* [CHANGE] Scraping: Remove implicit fallback to the Prometheus text format in case of invalid/missing Content-Type and fail the scrape instead. Add ability to specify a `fallback_scrape_protocol` in the scrape config. #15136
* [BUGFIX] PromQL: Fix stddev+stdvar aggregations to always ignore native histograms. #14941
* [BUGFIX] PromQL: Fix stddev+stdvar aggregations to treat Infinity consistently. #14941
@ -51,12 +52,12 @@ As is traditional with a beta release, we do **not** recommend users install 3.0
* [CHANGE] Remove deprecated `remote-write-receiver`,`promql-at-modifier`, and `promql-negative-offset` feature flags. #13456, #14526
* [CHANGE] Remove deprecated `storage.tsdb.allow-overlapping-blocks`, `alertmanager.timeout`, and `storage.tsdb.retention` flags. #14640, #14643
* [ENHANCEMENT] Move AM discovery page from "Monitoring status" to "Server status". #14875
* [FEATURE] Support config reload automatically - feature flag `auto-reload-config`. #14769
* [BUGFIX] Scrape: Do not override target parameter labels with config params. #11029
## 2.55.0-rc.0 / 2024-09-20
* [FEATURE] Support UTF-8 characters in label names - feature flag `utf8-names`. #14482, #14880, #14736, #14727
* [FEATURE] Support config reload automatically - feature flag `auto-reload-config`. #14769
* [FEATURE] Scraping: Add the ability to set custom `http_headers` in config. #14817
* [FEATURE] Scraping: Support feature flag `created-timestamp-zero-ingestion` in OpenMetrics. #14356, #14815
* [FEATURE] Scraping: `scrape_failure_log_file` option to log failures to a file. #14734
@ -69,7 +70,7 @@ As is traditional with a beta release, we do **not** recommend users install 3.0
* [ENHANCEMENT] Remote Read client: Enable streaming remote read if the server supports it. #11379
* [ENHANCEMENT] Remote-Write: Don't reshard if we haven't successfully sent a sample since last update. #14450
* [ENHANCEMENT] PromQL: Delay deletion of `__name__` label to the end of the query evaluation. This is **experimental** and enabled under the feature-flag `promql-delayed-name-removal`. #14477
* [ENHANCEMENT] PromQL: Experimental `sort_by_label` and `sort_by_label_desc` sort by all labels when label is equal. #14655
* [ENHANCEMENT] PromQL: Experimental `sort_by_label` and `sort_by_label_desc` sort by all labels when label is equal. #14655, #14985
* [ENHANCEMENT] PromQL: Clarify error message logged when Go runtime panic occurs during query evaluation. #14621
* [ENHANCEMENT] PromQL: Use Kahan summation for better accuracy in `avg` and `avg_over_time`. #14413
* [ENHANCEMENT] Tracing: Improve PromQL tracing, including showing the operation performed for aggregates, operators, and calls. #14816

View file

@ -125,6 +125,7 @@ func TestFailedStartupExitCode(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
t.Parallel()
fakeInputFile := "fake-input-file"
expectedExitStatus := 2
@ -211,83 +212,125 @@ func TestWALSegmentSizeBounds(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
t.Parallel()
for size, expectedExitStatus := range map[string]int{"9MB": 1, "257MB": 1, "10": 2, "1GB": 1, "12MB": 0} {
prom := exec.Command(promPath, "-test.main", "--storage.tsdb.wal-segment-size="+size, "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig, "--storage.tsdb.path="+filepath.Join(t.TempDir(), "data"))
for _, tc := range []struct {
size string
exitCode int
}{
{
size: "9MB",
exitCode: 1,
},
{
size: "257MB",
exitCode: 1,
},
{
size: "10",
exitCode: 2,
},
{
size: "1GB",
exitCode: 1,
},
{
size: "12MB",
exitCode: 0,
},
} {
t.Run(tc.size, func(t *testing.T) {
t.Parallel()
prom := exec.Command(promPath, "-test.main", "--storage.tsdb.wal-segment-size="+tc.size, "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig, "--storage.tsdb.path="+filepath.Join(t.TempDir(), "data"))
// Log stderr in case of failure.
stderr, err := prom.StderrPipe()
require.NoError(t, err)
go func() {
slurp, _ := io.ReadAll(stderr)
t.Log(string(slurp))
}()
// Log stderr in case of failure.
stderr, err := prom.StderrPipe()
require.NoError(t, err)
go func() {
slurp, _ := io.ReadAll(stderr)
t.Log(string(slurp))
}()
err = prom.Start()
require.NoError(t, err)
err = prom.Start()
require.NoError(t, err)
if expectedExitStatus == 0 {
done := make(chan error, 1)
go func() { done <- prom.Wait() }()
select {
case err := <-done:
require.Fail(t, "prometheus should be still running: %v", err)
case <-time.After(startupTime):
prom.Process.Kill()
<-done
if tc.exitCode == 0 {
done := make(chan error, 1)
go func() { done <- prom.Wait() }()
select {
case err := <-done:
require.Fail(t, "prometheus should be still running: %v", err)
case <-time.After(startupTime):
prom.Process.Kill()
<-done
}
return
}
continue
}
err = prom.Wait()
require.Error(t, err)
var exitError *exec.ExitError
require.ErrorAs(t, err, &exitError)
status := exitError.Sys().(syscall.WaitStatus)
require.Equal(t, expectedExitStatus, status.ExitStatus())
err = prom.Wait()
require.Error(t, err)
var exitError *exec.ExitError
require.ErrorAs(t, err, &exitError)
status := exitError.Sys().(syscall.WaitStatus)
require.Equal(t, tc.exitCode, status.ExitStatus())
})
}
}
func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping test in short mode.")
}
t.Parallel()
for size, expectedExitStatus := range map[string]int{"512KB": 1, "1MB": 0} {
prom := exec.Command(promPath, "-test.main", "--storage.tsdb.max-block-chunk-segment-size="+size, "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig, "--storage.tsdb.path="+filepath.Join(t.TempDir(), "data"))
for _, tc := range []struct {
size string
exitCode int
}{
{
size: "512KB",
exitCode: 1,
},
{
size: "1MB",
exitCode: 0,
},
} {
t.Run(tc.size, func(t *testing.T) {
t.Parallel()
prom := exec.Command(promPath, "-test.main", "--storage.tsdb.max-block-chunk-segment-size="+tc.size, "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig, "--storage.tsdb.path="+filepath.Join(t.TempDir(), "data"))
// Log stderr in case of failure.
stderr, err := prom.StderrPipe()
require.NoError(t, err)
go func() {
slurp, _ := io.ReadAll(stderr)
t.Log(string(slurp))
}()
// Log stderr in case of failure.
stderr, err := prom.StderrPipe()
require.NoError(t, err)
go func() {
slurp, _ := io.ReadAll(stderr)
t.Log(string(slurp))
}()
err = prom.Start()
require.NoError(t, err)
err = prom.Start()
require.NoError(t, err)
if expectedExitStatus == 0 {
done := make(chan error, 1)
go func() { done <- prom.Wait() }()
select {
case err := <-done:
require.Fail(t, "prometheus should be still running: %v", err)
case <-time.After(startupTime):
prom.Process.Kill()
<-done
if tc.exitCode == 0 {
done := make(chan error, 1)
go func() { done <- prom.Wait() }()
select {
case err := <-done:
require.Fail(t, "prometheus should be still running: %v", err)
case <-time.After(startupTime):
prom.Process.Kill()
<-done
}
return
}
continue
}
err = prom.Wait()
require.Error(t, err)
var exitError *exec.ExitError
require.ErrorAs(t, err, &exitError)
status := exitError.Sys().(syscall.WaitStatus)
require.Equal(t, expectedExitStatus, status.ExitStatus())
err = prom.Wait()
require.Error(t, err)
var exitError *exec.ExitError
require.ErrorAs(t, err, &exitError)
status := exitError.Sys().(syscall.WaitStatus)
require.Equal(t, tc.exitCode, status.ExitStatus())
})
}
}
@ -353,6 +396,8 @@ func getCurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames
}
func TestAgentSuccessfulStartup(t *testing.T) {
t.Parallel()
prom := exec.Command(promPath, "-test.main", "--agent", "--web.listen-address=0.0.0.0:0", "--config.file="+agentConfig)
require.NoError(t, prom.Start())
@ -371,6 +416,8 @@ func TestAgentSuccessfulStartup(t *testing.T) {
}
func TestAgentFailedStartupWithServerFlag(t *testing.T) {
t.Parallel()
prom := exec.Command(promPath, "-test.main", "--agent", "--storage.tsdb.path=.", "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig)
output := bytes.Buffer{}
@ -398,6 +445,8 @@ func TestAgentFailedStartupWithServerFlag(t *testing.T) {
}
func TestAgentFailedStartupWithInvalidConfig(t *testing.T) {
t.Parallel()
prom := exec.Command(promPath, "-test.main", "--agent", "--web.listen-address=0.0.0.0:0", "--config.file="+promConfig)
require.NoError(t, prom.Start())
@ -419,6 +468,7 @@ func TestModeSpecificFlags(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
t.Parallel()
testcases := []struct {
mode string
@ -433,6 +483,7 @@ func TestModeSpecificFlags(t *testing.T) {
for _, tc := range testcases {
t.Run(fmt.Sprintf("%s mode with option %s", tc.mode, tc.arg), func(t *testing.T) {
t.Parallel()
args := []string{"-test.main", tc.arg, t.TempDir(), "--web.listen-address=0.0.0.0:0"}
if tc.mode == "agent" {
@ -484,6 +535,8 @@ func TestDocumentation(t *testing.T) {
if runtime.GOOS == "windows" {
t.SkipNow()
}
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@ -508,6 +561,8 @@ func TestDocumentation(t *testing.T) {
}
func TestRwProtoMsgFlagParser(t *testing.T) {
t.Parallel()
defaultOpts := config.RemoteWriteProtoMsgs{
config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2,
}

View file

@ -34,6 +34,7 @@ func TestStartupInterrupt(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
t.Parallel()
port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t))

View file

@ -456,6 +456,7 @@ func TestQueryLog(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
t.Parallel()
cwd, err := os.Getwd()
require.NoError(t, err)
@ -474,6 +475,7 @@ func TestQueryLog(t *testing.T) {
}
t.Run(p.String(), func(t *testing.T) {
t.Parallel()
p.run(t)
})
}

View file

@ -6,7 +6,7 @@ scrape_configs:
alerting:
alertmanagers:
- scheme: http
api_version: v1
api_version: v2
file_sd_configs:
- files:
- nonexistent_file.yml

View file

@ -17,6 +17,7 @@ import (
"errors"
"fmt"
"log/slog"
"mime"
"net/url"
"os"
"path/filepath"
@ -163,13 +164,13 @@ var (
// DefaultScrapeConfig is the default scrape configuration.
DefaultScrapeConfig = ScrapeConfig{
// ScrapeTimeout, ScrapeInterval and ScrapeProtocols default to the configured globals.
ScrapeClassicHistograms: false,
MetricsPath: "/metrics",
Scheme: "http",
HonorLabels: false,
HonorTimestamps: true,
HTTPClientConfig: config.DefaultHTTPClientConfig,
EnableCompression: true,
AlwaysScrapeClassicHistograms: false,
MetricsPath: "/metrics",
Scheme: "http",
HonorLabels: false,
HonorTimestamps: true,
HTTPClientConfig: config.DefaultHTTPClientConfig,
EnableCompression: true,
}
// DefaultAlertmanagerConfig is the default alertmanager configuration.
@ -473,9 +474,22 @@ func (s ScrapeProtocol) Validate() error {
return nil
}
// HeaderMediaType returns the MIME mediaType for a particular ScrapeProtocol.
func (s ScrapeProtocol) HeaderMediaType() string {
if _, ok := ScrapeProtocolsHeaders[s]; !ok {
return ""
}
mediaType, _, err := mime.ParseMediaType(ScrapeProtocolsHeaders[s])
if err != nil {
return ""
}
return mediaType
}
var (
PrometheusProto ScrapeProtocol = "PrometheusProto"
PrometheusText0_0_4 ScrapeProtocol = "PrometheusText0.0.4"
PrometheusText1_0_0 ScrapeProtocol = "PrometheusText1.0.0"
OpenMetricsText0_0_1 ScrapeProtocol = "OpenMetricsText0.0.1"
OpenMetricsText1_0_0 ScrapeProtocol = "OpenMetricsText1.0.0"
UTF8NamesHeader string = model.EscapingKey + "=" + model.AllowUTF8
@ -483,6 +497,7 @@ var (
ScrapeProtocolsHeaders = map[ScrapeProtocol]string{
PrometheusProto: "application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited",
PrometheusText0_0_4: "text/plain;version=0.0.4",
PrometheusText1_0_0: "text/plain;version=1.0.0;escaping=allow-utf-8",
OpenMetricsText0_0_1: "application/openmetrics-text;version=0.0.1",
OpenMetricsText1_0_0: "application/openmetrics-text;version=1.0.0",
}
@ -492,6 +507,7 @@ var (
DefaultScrapeProtocols = []ScrapeProtocol{
OpenMetricsText1_0_0,
OpenMetricsText0_0_1,
PrometheusText1_0_0,
PrometheusText0_0_4,
}
@ -503,6 +519,7 @@ var (
PrometheusProto,
OpenMetricsText1_0_0,
OpenMetricsText0_0_1,
PrometheusText1_0_0,
PrometheusText0_0_4,
}
)
@ -629,10 +646,15 @@ type ScrapeConfig struct {
// The protocols to negotiate during a scrape. It tells clients what
// protocol are accepted by Prometheus and with what preference (most wanted is first).
// Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1,
// OpenMetricsText1.0.0, PrometheusText0.0.4.
// OpenMetricsText1.0.0, PrometheusText1.0.0, PrometheusText0.0.4.
ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"`
// Whether to scrape a classic histogram that is also exposed as a native histogram.
ScrapeClassicHistograms bool `yaml:"scrape_classic_histograms,omitempty"`
// The fallback protocol to use if the Content-Type provided by the target
// is not provided, blank, or not one of the expected values.
// Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1,
// OpenMetricsText1.0.0, PrometheusText1.0.0, PrometheusText0.0.4.
ScrapeFallbackProtocol ScrapeProtocol `yaml:"fallback_scrape_protocol,omitempty"`
// Whether to scrape a classic histogram, even if it is also exposed as a native histogram.
AlwaysScrapeClassicHistograms bool `yaml:"always_scrape_classic_histograms,omitempty"`
// Whether to convert all scraped classic histograms into a native histogram with custom buckets.
ConvertClassicHistograms bool `yaml:"convert_classic_histograms,omitempty"`
// File to which scrape failures are logged.
@ -782,6 +804,12 @@ func (c *ScrapeConfig) Validate(globalConfig GlobalConfig) error {
return fmt.Errorf("%w for scrape config with job name %q", err, c.JobName)
}
if c.ScrapeFallbackProtocol != "" {
if err := c.ScrapeFallbackProtocol.Validate(); err != nil {
return fmt.Errorf("invalid fallback_scrape_protocol for scrape config with job name %q: %w", c.JobName, err)
}
}
switch globalConfig.MetricNameValidationScheme {
case LegacyValidationConfig:
case "", UTF8ValidationConfig:
@ -957,6 +985,7 @@ func (a AlertmanagerConfigs) ToMap() map[string]*AlertmanagerConfig {
// AlertmanagerAPIVersion represents a version of the
// github.com/prometheus/alertmanager/api, e.g. 'v1' or 'v2'.
// 'v1' is no longer supported.
type AlertmanagerAPIVersion string
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -986,7 +1015,7 @@ const (
)
var SupportedAlertmanagerAPIVersions = []AlertmanagerAPIVersion{
AlertmanagerAPIVersionV1, AlertmanagerAPIVersionV2,
AlertmanagerAPIVersionV2,
}
// AlertmanagerConfig configures how Alertmanagers can be discovered and communicated with.

View file

@ -206,19 +206,20 @@ var expectedConf = &Config{
{
JobName: "prometheus",
HonorLabels: true,
HonorTimestamps: true,
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
EnableCompression: true,
BodySizeLimit: globBodySizeLimit,
SampleLimit: globSampleLimit,
TargetLimit: globTargetLimit,
LabelLimit: globLabelLimit,
LabelNameLengthLimit: globLabelNameLengthLimit,
LabelValueLengthLimit: globLabelValueLengthLimit,
ScrapeProtocols: DefaultGlobalConfig.ScrapeProtocols,
ScrapeFailureLogFile: "testdata/fail_prom.log",
HonorLabels: true,
HonorTimestamps: true,
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
EnableCompression: true,
BodySizeLimit: globBodySizeLimit,
SampleLimit: globSampleLimit,
TargetLimit: globTargetLimit,
LabelLimit: globLabelLimit,
LabelNameLengthLimit: globLabelNameLengthLimit,
LabelValueLengthLimit: globLabelValueLengthLimit,
ScrapeProtocols: DefaultGlobalConfig.ScrapeProtocols,
ScrapeFallbackProtocol: PrometheusText0_0_4,
ScrapeFailureLogFile: "testdata/fail_prom.log",
MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme,
@ -1500,6 +1501,11 @@ var expectedConf = &Config{
},
}
func TestYAMLNotLongerSupportedAMApi(t *testing.T) {
_, err := LoadFile("testdata/config_with_no_longer_supported_am_api_config.yml", false, promslog.NewNopLogger())
require.Error(t, err)
}
func TestYAMLRoundtrip(t *testing.T) {
want, err := LoadFile("testdata/roundtrip.good.yml", false, promslog.NewNopLogger())
require.NoError(t, err)
@ -2081,12 +2087,20 @@ var expectedErrors = []struct {
},
{
filename: "scrape_config_files_scrape_protocols.bad.yml",
errMsg: `parsing YAML file testdata/scrape_config_files_scrape_protocols.bad.yml: scrape_protocols: unknown scrape protocol prometheusproto, supported: [OpenMetricsText0.0.1 OpenMetricsText1.0.0 PrometheusProto PrometheusText0.0.4] for scrape config with job name "node"`,
errMsg: `parsing YAML file testdata/scrape_config_files_scrape_protocols.bad.yml: scrape_protocols: unknown scrape protocol prometheusproto, supported: [OpenMetricsText0.0.1 OpenMetricsText1.0.0 PrometheusProto PrometheusText0.0.4 PrometheusText1.0.0] for scrape config with job name "node"`,
},
{
filename: "scrape_config_files_scrape_protocols2.bad.yml",
errMsg: `parsing YAML file testdata/scrape_config_files_scrape_protocols2.bad.yml: duplicated protocol in scrape_protocols, got [OpenMetricsText1.0.0 PrometheusProto OpenMetricsText1.0.0] for scrape config with job name "node"`,
},
{
filename: "scrape_config_files_fallback_scrape_protocol1.bad.yml",
errMsg: `parsing YAML file testdata/scrape_config_files_fallback_scrape_protocol1.bad.yml: invalid fallback_scrape_protocol for scrape config with job name "node": unknown scrape protocol prometheusproto, supported: [OpenMetricsText0.0.1 OpenMetricsText1.0.0 PrometheusProto PrometheusText0.0.4 PrometheusText1.0.0]`,
},
{
filename: "scrape_config_files_fallback_scrape_protocol2.bad.yml",
errMsg: `unmarshal errors`,
},
}
func TestBadConfigs(t *testing.T) {
@ -2407,3 +2421,54 @@ func TestScrapeConfigNameValidationSettings(t *testing.T) {
})
}
}
func TestScrapeProtocolHeader(t *testing.T) {
tests := []struct {
name string
proto ScrapeProtocol
expectedValue string
}{
{
name: "blank",
proto: ScrapeProtocol(""),
expectedValue: "",
},
{
name: "invalid",
proto: ScrapeProtocol("invalid"),
expectedValue: "",
},
{
name: "prometheus protobuf",
proto: PrometheusProto,
expectedValue: "application/vnd.google.protobuf",
},
{
name: "prometheus text 0.0.4",
proto: PrometheusText0_0_4,
expectedValue: "text/plain",
},
{
name: "prometheus text 1.0.0",
proto: PrometheusText1_0_0,
expectedValue: "text/plain",
},
{
name: "openmetrics 0.0.1",
proto: OpenMetricsText0_0_1,
expectedValue: "application/openmetrics-text",
},
{
name: "openmetrics 1.0.0",
proto: OpenMetricsText1_0_0,
expectedValue: "application/openmetrics-text",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mediaType := tc.proto.HeaderMediaType()
require.Equal(t, tc.expectedValue, mediaType)
})
}
}

View file

@ -74,6 +74,8 @@ scrape_configs:
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
fallback_scrape_protocol: PrometheusText0.0.4
scrape_failure_log_file: fail_prom.log
file_sd_configs:
- files:

View file

@ -0,0 +1,7 @@
alerting:
alertmanagers:
- scheme: http
api_version: v1
file_sd_configs:
- files:
- nonexistent_file.yml

View file

@ -0,0 +1,5 @@
scrape_configs:
- job_name: node
fallback_scrape_protocol: "prometheusproto"
static_configs:
- targets: ['localhost:8080']

View file

@ -0,0 +1,5 @@
scrape_configs:
- job_name: node
fallback_scrape_protocol: ["OpenMetricsText1.0.0", "PrometheusText0.0.4"]
static_configs:
- targets: ['localhost:8080']

View file

@ -167,8 +167,11 @@ func NewEndpoints(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, node
e.enqueueNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
nodeName, err := nodeName(o)
if err != nil {
l.Error("Error getting Node name", "err", err)
}
e.enqueueNode(nodeName)
},
})
if err != nil {

View file

@ -145,8 +145,11 @@ func NewEndpointSlice(l *slog.Logger, eps cache.SharedIndexInformer, svc, pod, n
e.enqueueNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
e.enqueueNode(node.Name)
nodeName, err := nodeName(o)
if err != nil {
l.Error("Error getting Node name", "err", err)
}
e.enqueueNode(nodeName)
},
})
if err != nil {

View file

@ -804,3 +804,13 @@ func addObjectMetaLabels(labelSet model.LabelSet, objectMeta metav1.ObjectMeta,
func namespacedName(namespace, name string) string {
return namespace + "/" + name
}
// nodeName knows how to handle the cache.DeletedFinalStateUnknown tombstone.
// It assumes the MetaNamespaceKeyFunc keyFunc is used, which uses the node name as the tombstone key.
func nodeName(o interface{}) (string, error) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(o)
if err != nil {
return "", err
}
return key, nil
}

View file

@ -23,7 +23,9 @@ import (
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
apiv1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apimachinery/pkg/watch"
@ -320,3 +322,18 @@ func TestFailuresCountMetric(t *testing.T) {
})
}
}
func TestNodeName(t *testing.T) {
node := &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
}
name, err := nodeName(node)
require.NoError(t, err)
require.Equal(t, "foo", name)
name, err = nodeName(cache.DeletedFinalStateUnknown{Key: "bar"})
require.NoError(t, err)
require.Equal(t, "bar", name)
}

View file

@ -82,7 +82,7 @@ func NewNode(l *slog.Logger, inf cache.SharedInformer, eventCount *prometheus.Co
}
func (n *Node) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
key, err := nodeName(obj)
if err != nil {
return
}

View file

@ -95,8 +95,11 @@ func NewPod(l *slog.Logger, pods cache.SharedIndexInformer, nodes cache.SharedIn
p.enqueuePodsForNode(node.Name)
},
DeleteFunc: func(o interface{}) {
node := o.(*apiv1.Node)
p.enqueuePodsForNode(node.Name)
nodeName, err := nodeName(o)
if err != nil {
l.Error("Error getting Node name", "err", err)
}
p.enqueuePodsForNode(nodeName)
},
})
if err != nil {

View file

@ -212,12 +212,18 @@ job_name: <job_name>
# The protocols to negotiate during a scrape with the client.
# Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1,
# OpenMetricsText1.0.0, PrometheusText0.0.4.
# OpenMetricsText1.0.0, PrometheusText0.0.4, PrometheusText1.0.0.
[ scrape_protocols: [<string>, ...] | default = <global_config.scrape_protocols> ]
# Whether to scrape a classic histogram that is also exposed as a native
# Fallback protocol to use if a scrape returns blank, unparseable, or otherwise
# invalid Content-Type.
# Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1,
# OpenMetricsText1.0.0, PrometheusText0.0.4, PrometheusText1.0.0.
[ fallback_scrape_protocol: <string> ]
# Whether to scrape a classic histogram, even if it is also exposed as a native
# histogram (has no effect without --enable-feature=native-histograms).
[ scrape_classic_histograms: <boolean> | default = false ]
[ always_scrape_classic_histograms: <boolean> | default = false ]
# The HTTP resource path on which to fetch metrics from targets.
[ metrics_path: <path> | default = /metrics ]

View file

@ -84,59 +84,7 @@ those classic histograms that do not come with a corresponding native
histogram. However, if a native histogram is present, Prometheus will ignore
the corresponding classic histogram, with the notable exception of exemplars,
which are always ingested. To keep the classic histograms as well, enable
`scrape_classic_histograms` in the scrape job.
_Note about the format of `le` and `quantile` label values:_
In certain situations, the protobuf parsing changes the number formatting of
the `le` labels of classic histograms and the `quantile` labels of
summaries. Typically, this happens if the scraped target is instrumented with
[client_golang](https://github.com/prometheus/client_golang) provided that
[promhttp.HandlerOpts.EnableOpenMetrics](https://pkg.go.dev/github.com/prometheus/client_golang/prometheus/promhttp#HandlerOpts)
is set to `false`. In such a case, integer label values are represented in the
text format as such, e.g. `quantile="1"` or `le="2"`. However, the protobuf parsing
changes the representation to float-like (following the OpenMetrics
specification), so the examples above become `quantile="1.0"` and `le="2.0"` after
ingestion into Prometheus, which changes the identity of the metric compared to
what was ingested before via the text format.
The effect of this change is that alerts, recording rules and dashboards that
directly reference label values as whole numbers such as `le="1"` will stop
working.
Aggregation by the `le` and `quantile` labels for vectors that contain the old and
new formatting will lead to unexpected results, and range vectors that span the
transition between the different formatting will contain additional series.
The most common use case for both is the quantile calculation via
`histogram_quantile`, e.g.
`histogram_quantile(0.95, sum by (le) (rate(histogram_bucket[10m])))`.
The `histogram_quantile` function already tries to mitigate the effects to some
extent, but there will be inaccuracies, in particular for shorter ranges that
cover only a few samples.
Ways to deal with this change either globally or on a per metric basis:
- Fix references to integer `le`, `quantile` label values, but otherwise do
nothing and accept that some queries that span the transition time will produce
inaccurate or unexpected results.
_This is the recommended solution, to get consistently normalized label values._
Also Prometheus 3.0 is expected to enforce normalization of these label values.
- Use `metric_relabel_config` to retain the old labels when scraping targets.
This should **only** be applied to metrics that currently produce such labels.
<!-- The following config snippet is unit tested in scrape/scrape_test.go. -->
```yaml
metric_relabel_configs:
- source_labels:
- quantile
target_label: quantile
regex: (\d+)\.0+
- source_labels:
- le
- __name__
target_label: le
regex: (\d+)\.0+;.*_bucket
```
`always_scrape_classic_histograms` in the scrape job.
## Experimental PromQL functions

View file

@ -14,6 +14,8 @@
package textparse
import (
"errors"
"fmt"
"mime"
"github.com/prometheus/common/model"
@ -23,8 +25,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
)
// Parser parses samples from a byte slice of samples in the official
// Prometheus and OpenMetrics text exposition formats.
// Parser parses samples from a byte slice of samples in different exposition formats.
type Parser interface {
// Series returns the bytes of a series with a simple float64 as a
// value, the timestamp if set, and the value of the current sample.
@ -58,6 +59,8 @@ type Parser interface {
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
// The values of the "le" labels of classic histograms and "quantile" labels
// of summaries should follow the OpenMetrics formatting rules.
Metric(l *labels.Labels) string
// Exemplar writes the exemplar of the current sample into the passed
@ -78,28 +81,65 @@ type Parser interface {
Next() (Entry, error)
}
// New returns a new parser of the byte slice.
//
// This function always returns a valid parser, but might additionally
// return an error if the content type cannot be parsed.
func New(b []byte, contentType string, parseClassicHistograms, skipOMCTSeries bool, st *labels.SymbolTable) (Parser, error) {
// extractMediaType returns the mediaType of a required parser. It tries first to
// extract a valid and supported mediaType from contentType. If that fails,
// the provided fallbackType (possibly an empty string) is returned, together with
// an error. fallbackType is used as-is without further validation.
func extractMediaType(contentType, fallbackType string) (string, error) {
if contentType == "" {
return NewPromParser(b, st), nil
if fallbackType == "" {
return "", errors.New("non-compliant scrape target sending blank Content-Type and no fallback_scrape_protocol specified for target")
}
return fallbackType, fmt.Errorf("non-compliant scrape target sending blank Content-Type, using fallback_scrape_protocol %q", fallbackType)
}
// We have a contentType, parse it.
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
return NewPromParser(b, st), err
if fallbackType == "" {
retErr := fmt.Errorf("cannot parse Content-Type %q and no fallback_scrape_protocol for target", contentType)
return "", errors.Join(retErr, err)
}
retErr := fmt.Errorf("could not parse received Content-Type %q, using fallback_scrape_protocol %q", contentType, fallbackType)
return fallbackType, errors.Join(retErr, err)
}
// We have a valid media type, either we recognise it and can use it
// or we have to error.
switch mediaType {
case "application/openmetrics-text", "application/vnd.google.protobuf", "text/plain":
return mediaType, nil
}
// We're here because we have no recognised mediaType.
if fallbackType == "" {
return "", fmt.Errorf("received unsupported Content-Type %q and no fallback_scrape_protocol specified for target", contentType)
}
return fallbackType, fmt.Errorf("received unsupported Content-Type %q, using fallback_scrape_protocol %q", contentType, fallbackType)
}
// New returns a new parser of the byte slice.
//
// This function no longer guarantees to return a valid parser.
//
// It only returns a valid parser if the supplied contentType and fallbackType allow.
// An error may also be returned if fallbackType had to be used or there was some
// other error parsing the supplied Content-Type.
// If the returned parser is nil then the scrape must fail.
func New(b []byte, contentType, fallbackType string, parseClassicHistograms, skipOMCTSeries bool, st *labels.SymbolTable) (Parser, error) {
mediaType, err := extractMediaType(contentType, fallbackType)
// err may be nil or something we want to warn about.
switch mediaType {
case "application/openmetrics-text":
return NewOpenMetricsParser(b, st, func(o *openMetricsParserOptions) {
o.SkipCTSeries = skipOMCTSeries
}), nil
}), err
case "application/vnd.google.protobuf":
return NewProtobufParser(b, parseClassicHistograms, st), nil
return NewProtobufParser(b, parseClassicHistograms, st), err
case "text/plain":
return NewPromParser(b, st), err
default:
return NewPromParser(b, st), nil
return nil, err
}
}

View file

@ -22,6 +22,7 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
@ -31,6 +32,10 @@ import (
func TestNewParser(t *testing.T) {
t.Parallel()
requireNilParser := func(t *testing.T, p Parser) {
require.Nil(t, p)
}
requirePromParser := func(t *testing.T, p Parser) {
require.NotNil(t, p)
_, ok := p.(*PromParser)
@ -43,34 +48,83 @@ func TestNewParser(t *testing.T) {
require.True(t, ok)
}
requireProtobufParser := func(t *testing.T, p Parser) {
require.NotNil(t, p)
_, ok := p.(*ProtobufParser)
require.True(t, ok)
}
for name, tt := range map[string]*struct {
contentType string
validateParser func(*testing.T, Parser)
err string
contentType string
fallbackScrapeProtocol config.ScrapeProtocol
validateParser func(*testing.T, Parser)
err string
}{
"empty-string": {
validateParser: requirePromParser,
validateParser: requireNilParser,
err: "non-compliant scrape target sending blank Content-Type and no fallback_scrape_protocol specified for target",
},
"empty-string-fallback-text-plain": {
validateParser: requirePromParser,
fallbackScrapeProtocol: config.PrometheusText0_0_4,
err: "non-compliant scrape target sending blank Content-Type, using fallback_scrape_protocol \"text/plain\"",
},
"invalid-content-type-1": {
contentType: "invalid/",
validateParser: requirePromParser,
validateParser: requireNilParser,
err: "expected token after slash",
},
"invalid-content-type-1-fallback-text-plain": {
contentType: "invalid/",
validateParser: requirePromParser,
fallbackScrapeProtocol: config.PrometheusText0_0_4,
err: "expected token after slash",
},
"invalid-content-type-1-fallback-openmetrics": {
contentType: "invalid/",
validateParser: requireOpenMetricsParser,
fallbackScrapeProtocol: config.OpenMetricsText0_0_1,
err: "expected token after slash",
},
"invalid-content-type-1-fallback-protobuf": {
contentType: "invalid/",
validateParser: requireProtobufParser,
fallbackScrapeProtocol: config.PrometheusProto,
err: "expected token after slash",
},
"invalid-content-type-2": {
contentType: "invalid/invalid/invalid",
validateParser: requirePromParser,
validateParser: requireNilParser,
err: "unexpected content after media subtype",
},
"invalid-content-type-2-fallback-text-plain": {
contentType: "invalid/invalid/invalid",
validateParser: requirePromParser,
fallbackScrapeProtocol: config.PrometheusText1_0_0,
err: "unexpected content after media subtype",
},
"invalid-content-type-3": {
contentType: "/",
validateParser: requirePromParser,
validateParser: requireNilParser,
err: "no media type",
},
"invalid-content-type-3-fallback-text-plain": {
contentType: "/",
validateParser: requirePromParser,
fallbackScrapeProtocol: config.PrometheusText1_0_0,
err: "no media type",
},
"invalid-content-type-4": {
contentType: "application/openmetrics-text; charset=UTF-8; charset=utf-8",
validateParser: requirePromParser,
validateParser: requireNilParser,
err: "duplicate parameter name",
},
"invalid-content-type-4-fallback-open-metrics": {
contentType: "application/openmetrics-text; charset=UTF-8; charset=utf-8",
validateParser: requireOpenMetricsParser,
fallbackScrapeProtocol: config.OpenMetricsText1_0_0,
err: "duplicate parameter name",
},
"openmetrics": {
contentType: "application/openmetrics-text",
validateParser: requireOpenMetricsParser,
@ -87,20 +141,33 @@ func TestNewParser(t *testing.T) {
contentType: "text/plain",
validateParser: requirePromParser,
},
"protobuf": {
contentType: "application/vnd.google.protobuf",
validateParser: requireProtobufParser,
},
"plain-text-with-version": {
contentType: "text/plain; version=0.0.4",
validateParser: requirePromParser,
},
"some-other-valid-content-type": {
contentType: "text/html",
validateParser: requirePromParser,
validateParser: requireNilParser,
err: "received unsupported Content-Type \"text/html\" and no fallback_scrape_protocol specified for target",
},
"some-other-valid-content-type-fallback-text-plain": {
contentType: "text/html",
validateParser: requirePromParser,
fallbackScrapeProtocol: config.PrometheusText0_0_4,
err: "received unsupported Content-Type \"text/html\", using fallback_scrape_protocol \"text/plain\"",
},
} {
t.Run(name, func(t *testing.T) {
tt := tt // Copy to local variable before going parallel.
t.Parallel()
p, err := New([]byte{}, tt.contentType, false, false, labels.NewSymbolTable())
fallbackProtoMediaType := tt.fallbackScrapeProtocol.HeaderMediaType()
p, err := New([]byte{}, tt.contentType, fallbackProtoMediaType, false, false, labels.NewSymbolTable())
tt.validateParser(t, p)
if tt.err == "" {
require.NoError(t, err)

View file

@ -36,9 +36,9 @@ import (
// single native histogram.
//
// Note:
// - Only series that have the histogram metadata type are considered for
// conversion.
// - The classic series are also returned if keepClassicHistograms is true.
// - Only series that have the histogram metadata type are considered for
// conversion.
// - The classic series are also returned if keepClassicHistograms is true.
type NHCBParser struct {
// The parser we're wrapping.
parser Parser

View file

@ -22,6 +22,7 @@ import (
"fmt"
"io"
"math"
"strconv"
"strings"
"unicode/utf8"
@ -210,7 +211,7 @@ func (p *OpenMetricsParser) Metric(l *labels.Labels) string {
label := unreplace(s[a:b])
c := p.offsets[i+2] - p.start
d := p.offsets[i+3] - p.start
value := unreplace(s[c:d])
value := normalizeFloatsInLabelValues(p.mtype, label, unreplace(s[c:d]))
p.builder.Add(label, value)
}
@ -723,3 +724,15 @@ func (p *OpenMetricsParser) getFloatValue(t token, after string) (float64, error
}
return val, nil
}
// normalizeFloatsInLabelValues ensures that values of the "le" labels of classic histograms and "quantile" labels
// of summaries follow OpenMetrics formatting rules.
func normalizeFloatsInLabelValues(t model.MetricType, l, v string) string {
if (t == model.MetricTypeSummary && l == model.QuantileLabel) || (t == model.MetricTypeHistogram && l == model.BucketLabel) {
f, err := strconv.ParseFloat(v, 64)
if err == nil {
return formatOpenMetricsFloat(f)
}
}
return v
}

View file

@ -74,6 +74,7 @@ foo_total{a="b"} 17.0 1520879607.789 # {id="counter-test"} 5
foo_created{a="b"} 1520872607.123
foo_total{le="c"} 21.0
foo_created{le="c"} 1520872621.123
foo_total{le="1"} 10.0
# HELP bar Summary with CT at the end, making sure we find CT even if it's multiple lines a far
# TYPE bar summary
bar_count 17.0
@ -97,6 +98,7 @@ something_count 18
something_sum 324789.4
something_created 1520430001
something_bucket{le="0.0"} 1
something_bucket{le="1"} 2
something_bucket{le="+Inf"} 18
# HELP yum Summary with _created between sum and quantiles
# TYPE yum summary
@ -130,7 +132,7 @@ foobar{quantile="0.99"} 150.1`
}, {
m: `go_gc_duration_seconds{quantile="0"}`,
v: 4.9351e-05,
lset: labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0"),
lset: labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.0"),
}, {
m: `go_gc_duration_seconds{quantile="0.25"}`,
v: 7.424100000000001e-05,
@ -302,6 +304,10 @@ foobar{quantile="0.99"} 150.1`
v: 21.0,
lset: labels.FromStrings("__name__", "foo_total", "le", "c"),
ct: int64p(1520872621123),
}, {
m: `foo_total{le="1"}`,
v: 10.0,
lset: labels.FromStrings("__name__", "foo_total", "le", "1"),
}, {
m: "bar",
help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far",
@ -385,6 +391,11 @@ foobar{quantile="0.99"} 150.1`
v: 1,
lset: labels.FromStrings("__name__", "something_bucket", "le", "0.0"),
ct: int64p(1520430001000),
}, {
m: `something_bucket{le="1"}`,
v: 2,
lset: labels.FromStrings("__name__", "something_bucket", "le", "1.0"),
ct: int64p(1520430001000),
}, {
m: `something_bucket{le="+Inf"}`,
v: 18,
@ -492,7 +503,7 @@ func TestUTF8OpenMetricsParse(t *testing.T) {
}, {
m: `{"go.gc_duration_seconds",quantile="0"}`,
v: 4.9351e-05,
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0"),
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0.0"),
ct: int64p(1520872607123),
}, {
m: `{"go.gc_duration_seconds",quantile="0.25"}`,

View file

@ -239,7 +239,8 @@ func (p *PromParser) Metric(l *labels.Labels) string {
label := unreplace(s[a:b])
c := p.offsets[i+2] - p.start
d := p.offsets[i+3] - p.start
value := unreplace(s[c:d])
value := normalizeFloatsInLabelValues(p.mtype, label, unreplace(s[c:d]))
p.builder.Add(label, value)
}

View file

@ -31,6 +31,13 @@ go_gc_duration_seconds{quantile="0.25",} 7.424100000000001e-05
go_gc_duration_seconds{quantile="0.5",a="b"} 8.3835e-05
go_gc_duration_seconds{quantile="0.8", a="b"} 8.3835e-05
go_gc_duration_seconds{ quantile="0.9", a="b"} 8.3835e-05
# HELP prometheus_http_request_duration_seconds Histogram of latencies for HTTP requests.
# TYPE prometheus_http_request_duration_seconds histogram
prometheus_http_request_duration_seconds_bucket{handler="/",le="1"} 423
prometheus_http_request_duration_seconds_bucket{handler="/",le="2"} 1423
prometheus_http_request_duration_seconds_bucket{handler="/",le="+Inf"} 1423
prometheus_http_request_duration_seconds_sum{handler="/"} 2000
prometheus_http_request_duration_seconds_count{handler="/"} 1423
# Hrandom comment starting with prefix of HELP
#
wind_speed{A="2",c="3"} 12345
@ -50,7 +57,8 @@ some:aggregate:rate5m{a_b="c"} 1
go_goroutines 33 123123
_metric_starting_with_underscore 1
testmetric{_label_starting_with_underscore="foo"} 1
testmetric{label="\"bar\""} 1`
testmetric{label="\"bar\""} 1
testmetric{le="10"} 1`
input += "\n# HELP metric foo\x00bar"
input += "\nnull_byte_metric{a=\"abc\x00\"} 1"
@ -64,7 +72,7 @@ testmetric{label="\"bar\""} 1`
}, {
m: `go_gc_duration_seconds{quantile="0"}`,
v: 4.9351e-05,
lset: labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0"),
lset: labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.0"),
}, {
m: `go_gc_duration_seconds{quantile="0.25",}`,
v: 7.424100000000001e-05,
@ -81,6 +89,32 @@ testmetric{label="\"bar\""} 1`
m: `go_gc_duration_seconds{ quantile="0.9", a="b"}`,
v: 8.3835e-05,
lset: labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.9", "a", "b"),
}, {
m: "prometheus_http_request_duration_seconds",
help: "Histogram of latencies for HTTP requests.",
}, {
m: "prometheus_http_request_duration_seconds",
typ: model.MetricTypeHistogram,
}, {
m: `prometheus_http_request_duration_seconds_bucket{handler="/",le="1"}`,
v: 423,
lset: labels.FromStrings("__name__", "prometheus_http_request_duration_seconds_bucket", "handler", "/", "le", "1.0"),
}, {
m: `prometheus_http_request_duration_seconds_bucket{handler="/",le="2"}`,
v: 1423,
lset: labels.FromStrings("__name__", "prometheus_http_request_duration_seconds_bucket", "handler", "/", "le", "2.0"),
}, {
m: `prometheus_http_request_duration_seconds_bucket{handler="/",le="+Inf"}`,
v: 1423,
lset: labels.FromStrings("__name__", "prometheus_http_request_duration_seconds_bucket", "handler", "/", "le", "+Inf"),
}, {
m: `prometheus_http_request_duration_seconds_sum{handler="/"}`,
v: 2000,
lset: labels.FromStrings("__name__", "prometheus_http_request_duration_seconds_sum", "handler", "/"),
}, {
m: `prometheus_http_request_duration_seconds_count{handler="/"}`,
v: 1423,
lset: labels.FromStrings("__name__", "prometheus_http_request_duration_seconds_count", "handler", "/"),
}, {
comment: "# Hrandom comment starting with prefix of HELP",
}, {
@ -151,6 +185,10 @@ testmetric{label="\"bar\""} 1`
m: "testmetric{label=\"\\\"bar\\\"\"}",
v: 1,
lset: labels.FromStrings("__name__", "testmetric", "label", `"bar"`),
}, {
m: `testmetric{le="10"}`,
v: 1,
lset: labels.FromStrings("__name__", "testmetric", "le", "10"),
}, {
m: "metric",
help: "foo\x00bar",
@ -197,7 +235,7 @@ func TestUTF8PromParse(t *testing.T) {
}, {
m: `{"go.gc_duration_seconds",quantile="0"}`,
v: 4.9351e-05,
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0"),
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0.0"),
}, {
m: `{"go.gc_duration_seconds",quantile="0.25",}`,
v: 7.424100000000001e-05,

View file

@ -20,7 +20,9 @@ import (
"fmt"
"io"
"math"
"strconv"
"strings"
"sync"
"unicode/utf8"
"github.com/gogo/protobuf/proto"
@ -34,6 +36,15 @@ import (
dto "github.com/prometheus/prometheus/prompb/io/prometheus/client"
)
// floatFormatBufPool is exclusively used in formatOpenMetricsFloat.
var floatFormatBufPool = sync.Pool{
New: func() interface{} {
// To contain at most 17 digits and additional syntax for a float64.
b := make([]byte, 0, 24)
return &b
},
}
// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus
// protobuf format and then present it as it if were parsed by a
// Prometheus-2-style text parser. This is only done so that we can easily plug
@ -629,11 +640,15 @@ func formatOpenMetricsFloat(f float64) string {
case math.IsInf(f, -1):
return "-Inf"
}
s := fmt.Sprint(f)
if strings.ContainsAny(s, "e.") {
return s
bp := floatFormatBufPool.Get().(*[]byte)
defer floatFormatBufPool.Put(bp)
*bp = strconv.AppendFloat((*bp)[:0], f, 'g', -1, 64)
if bytes.ContainsAny(*bp, "e.") {
return string(*bp)
}
return s + ".0"
*bp = append(*bp, '.', '0')
return string(*bp)
}
// isNativeHistogram returns false iff the provided histograms has no spans at

View file

@ -409,6 +409,49 @@ metric: <
>
>
`,
`name: "test_histogram3"
help: "Similar histogram as before but now with integer buckets."
type: HISTOGRAM
metric: <
histogram: <
sample_count: 6
sample_sum: 50
bucket: <
cumulative_count: 2
upper_bound: -20
>
bucket: <
cumulative_count: 4
upper_bound: 20
exemplar: <
label: <
name: "dummyID"
value: "59727"
>
value: 15
timestamp: <
seconds: 1625851153
nanos: 146848499
>
>
>
bucket: <
cumulative_count: 6
upper_bound: 30
exemplar: <
label: <
name: "dummyID"
value: "5617"
>
value: 25
>
>
schema: 0
zero_threshold: 0
>
>
`,
`name: "test_histogram_family"
help: "Test histogram metric family with two very simple histograms."
@ -1050,6 +1093,66 @@ func TestProtobufParse(t *testing.T) {
"le", "+Inf",
),
},
{
m: "test_histogram3",
help: "Similar histogram as before but now with integer buckets.",
},
{
m: "test_histogram3",
typ: model.MetricTypeHistogram,
},
{
m: "test_histogram3_count",
v: 6,
lset: labels.FromStrings(
"__name__", "test_histogram3_count",
),
},
{
m: "test_histogram3_sum",
v: 50,
lset: labels.FromStrings(
"__name__", "test_histogram3_sum",
),
},
{
m: "test_histogram3_bucket\xffle\xff-20.0",
v: 2,
lset: labels.FromStrings(
"__name__", "test_histogram3_bucket",
"le", "-20.0",
),
},
{
m: "test_histogram3_bucket\xffle\xff20.0",
v: 4,
lset: labels.FromStrings(
"__name__", "test_histogram3_bucket",
"le", "20.0",
),
es: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "59727"), Value: 15, HasTs: true, Ts: 1625851153146},
},
},
{
m: "test_histogram3_bucket\xffle\xff30.0",
v: 6,
lset: labels.FromStrings(
"__name__", "test_histogram3_bucket",
"le", "30.0",
),
es: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "5617"), Value: 25, HasTs: false},
},
},
{
m: "test_histogram3_bucket\xffle\xff+Inf",
v: 6,
lset: labels.FromStrings(
"__name__", "test_histogram3_bucket",
"le", "+Inf",
),
},
{
m: "test_histogram_family",
help: "Test histogram metric family with two very simple histograms.",
@ -1857,6 +1960,66 @@ func TestProtobufParse(t *testing.T) {
"le", "+Inf",
),
},
{
m: "test_histogram3",
help: "Similar histogram as before but now with integer buckets.",
},
{
m: "test_histogram3",
typ: model.MetricTypeHistogram,
},
{
m: "test_histogram3_count",
v: 6,
lset: labels.FromStrings(
"__name__", "test_histogram3_count",
),
},
{
m: "test_histogram3_sum",
v: 50,
lset: labels.FromStrings(
"__name__", "test_histogram3_sum",
),
},
{
m: "test_histogram3_bucket\xffle\xff-20.0",
v: 2,
lset: labels.FromStrings(
"__name__", "test_histogram3_bucket",
"le", "-20.0",
),
},
{
m: "test_histogram3_bucket\xffle\xff20.0",
v: 4,
lset: labels.FromStrings(
"__name__", "test_histogram3_bucket",
"le", "20.0",
),
es: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "59727"), Value: 15, HasTs: true, Ts: 1625851153146},
},
},
{
m: "test_histogram3_bucket\xffle\xff30.0",
v: 6,
lset: labels.FromStrings(
"__name__", "test_histogram3_bucket",
"le", "30.0",
),
es: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "5617"), Value: 25, HasTs: false},
},
},
{
m: "test_histogram3_bucket\xffle\xff+Inf",
v: 6,
lset: labels.FromStrings(
"__name__", "test_histogram3_bucket",
"le", "+Inf",
),
},
{
m: "test_histogram_family",
help: "Test histogram metric family with two very simple histograms.",

View file

@ -542,10 +542,10 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
begin := time.Now()
// v1Payload and v2Payload represent 'alerts' marshaled for Alertmanager API
// v1 or v2. Marshaling happens below. Reference here is for caching between
// cachedPayload represent 'alerts' marshaled for Alertmanager API v2.
// Marshaling happens below. Reference here is for caching between
// for loop iterations.
var v1Payload, v2Payload []byte
var cachedPayload []byte
n.mtx.RLock()
amSets := n.alertmanagers
@ -576,29 +576,16 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
continue
}
// We can't use the cached values from previous iteration.
v1Payload, v2Payload = nil, nil
cachedPayload = nil
}
switch ams.cfg.APIVersion {
case config.AlertmanagerAPIVersionV1:
{
if v1Payload == nil {
v1Payload, err = json.Marshal(amAlerts)
if err != nil {
n.logger.Error("Encoding alerts for Alertmanager API v1 failed", "err", err)
ams.mtx.RUnlock()
return false
}
}
payload = v1Payload
}
case config.AlertmanagerAPIVersionV2:
{
if v2Payload == nil {
if cachedPayload == nil {
openAPIAlerts := alertsToOpenAPIAlerts(amAlerts)
v2Payload, err = json.Marshal(openAPIAlerts)
cachedPayload, err = json.Marshal(openAPIAlerts)
if err != nil {
n.logger.Error("Encoding alerts for Alertmanager API v2 failed", "err", err)
ams.mtx.RUnlock()
@ -606,7 +593,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
}
}
payload = v2Payload
payload = cachedPayload
}
default:
{
@ -621,7 +608,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
if len(ams.cfg.AlertRelabelConfigs) > 0 {
// We can't use the cached values on the next iteration.
v1Payload, v2Payload = nil, nil
cachedPayload = nil
}
for _, am := range ams.ams {

View file

@ -50,27 +50,27 @@ func TestPostPath(t *testing.T) {
}{
{
in: "",
out: "/api/v1/alerts",
out: "/api/v2/alerts",
},
{
in: "/",
out: "/api/v1/alerts",
out: "/api/v2/alerts",
},
{
in: "/prefix",
out: "/prefix/api/v1/alerts",
out: "/prefix/api/v2/alerts",
},
{
in: "/prefix//",
out: "/prefix/api/v1/alerts",
out: "/prefix/api/v2/alerts",
},
{
in: "prefix//",
out: "/prefix/api/v1/alerts",
out: "/prefix/api/v2/alerts",
},
}
for _, c := range cases {
require.Equal(t, c.out, postPath(c.in, config.AlertmanagerAPIVersionV1))
require.Equal(t, c.out, postPath(c.in, config.AlertmanagerAPIVersionV2))
}
}

View file

@ -1230,38 +1230,17 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label
ev.currentSamples = tempNumSamples
// Gather input vectors for this timestamp.
for i := range exprs {
vectors[i] = vectors[i][:0]
var bh []EvalSeriesHelper
var sh []EvalSeriesHelper
if prepSeries != nil {
bufHelpers[i] = bufHelpers[i][:0]
}
for si, series := range matrixes[i] {
switch {
case len(series.Floats) > 0 && series.Floats[0].T == ts:
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts, DropName: series.DropName})
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes[i][si].Floats = series.Floats[1:]
case len(series.Histograms) > 0 && series.Histograms[0].T == ts:
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts, DropName: series.DropName})
matrixes[i][si].Histograms = series.Histograms[1:]
default:
continue
}
if prepSeries != nil {
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
}
// Don't add histogram size here because we only
// copy the pointer above, not the whole
// histogram.
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
bh = bufHelpers[i][:0]
sh = seriesHelpers[i]
}
vectors[i], bh = ev.gatherVector(ts, matrixes[i], vectors[i], bh, sh)
args[i] = vectors[i]
ev.samplesStats.UpdatePeak(ev.currentSamples)
if prepSeries != nil {
bufHelpers[i] = bh
}
}
// Make the function call.
@ -3724,3 +3703,41 @@ func newHistogramStatsSeries(series storage.Series) *histogramStatsSeries {
func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
return NewHistogramStatsIterator(s.Series.Iterator(it))
}
// gatherVector gathers a Vector for ts from the series in input.
// output is used as a buffer.
// If bufHelpers and seriesHelpers are provided, seriesHelpers[i] is appended to bufHelpers for every input index i.
// The gathered Vector and bufHelper are returned.
func (ev *evaluator) gatherVector(ts int64, input Matrix, output Vector, bufHelpers, seriesHelpers []EvalSeriesHelper) (Vector, []EvalSeriesHelper) {
output = output[:0]
for i, series := range input {
switch {
case len(series.Floats) > 0 && series.Floats[0].T == ts:
s := series.Floats[0]
output = append(output, Sample{Metric: series.Metric, F: s.F, T: ts, DropName: series.DropName})
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
input[i].Floats = series.Floats[1:]
case len(series.Histograms) > 0 && series.Histograms[0].T == ts:
s := series.Histograms[0]
output = append(output, Sample{Metric: series.Metric, H: s.H, T: ts, DropName: series.DropName})
input[i].Histograms = series.Histograms[1:]
default:
continue
}
if len(seriesHelpers) > 0 {
bufHelpers = append(bufHelpers, seriesHelpers[i])
}
// Don't add histogram size here because we only
// copy the pointer above, not the whole
// histogram.
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
return output, bufHelpers
}

View file

@ -415,22 +415,12 @@ func funcSortDesc(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel
// === sort_by_label(vector parser.ValueTypeVector, label parser.ValueTypeString...) (Vector, Annotations) ===
func funcSortByLabel(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
// First, sort by the full label set. This ensures a consistent ordering in case sorting by the
// labels provided as arguments is not conclusive.
lbls := stringSliceFromArgs(args[1:])
slices.SortFunc(vals[0].(Vector), func(a, b Sample) int {
return labels.Compare(a.Metric, b.Metric)
})
labels := stringSliceFromArgs(args[1:])
// Next, sort by the labels provided as arguments.
slices.SortFunc(vals[0].(Vector), func(a, b Sample) int {
// Iterate over each given label.
for _, label := range labels {
for _, label := range lbls {
lv1 := a.Metric.Get(label)
lv2 := b.Metric.Get(label)
// If we encounter multiple samples with the same label values, the sorting which was
// performed in the first step will act as a "tie breaker".
if lv1 == lv2 {
continue
}
@ -442,7 +432,8 @@ func funcSortByLabel(vals []parser.Value, args parser.Expressions, enh *EvalNode
return +1
}
return 0
// If all labels provided as arguments were equal, sort by the full label set. This ensures a consistent ordering.
return labels.Compare(a.Metric, b.Metric)
})
return vals[0].(Vector), nil
@ -450,22 +441,12 @@ func funcSortByLabel(vals []parser.Value, args parser.Expressions, enh *EvalNode
// === sort_by_label_desc(vector parser.ValueTypeVector, label parser.ValueTypeString...) (Vector, Annotations) ===
func funcSortByLabelDesc(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
// First, sort by the full label set. This ensures a consistent ordering in case sorting by the
// labels provided as arguments is not conclusive.
lbls := stringSliceFromArgs(args[1:])
slices.SortFunc(vals[0].(Vector), func(a, b Sample) int {
return labels.Compare(b.Metric, a.Metric)
})
labels := stringSliceFromArgs(args[1:])
// Next, sort by the labels provided as arguments.
slices.SortFunc(vals[0].(Vector), func(a, b Sample) int {
// Iterate over each given label.
for _, label := range labels {
for _, label := range lbls {
lv1 := a.Metric.Get(label)
lv2 := b.Metric.Get(label)
// If we encounter multiple samples with the same label values, the sorting which was
// performed in the first step will act as a "tie breaker".
if lv1 == lv2 {
continue
}
@ -477,7 +458,8 @@ func funcSortByLabelDesc(vals []parser.Value, args parser.Expressions, enh *Eval
return -1
}
return 0
// If all labels provided as arguments were equal, sort by the full label set. This ensures a consistent ordering.
return -labels.Compare(a.Metric, b.Metric)
})
return vals[0].(Vector), nil

View file

@ -61,8 +61,8 @@ const (
var symbolTable = labels.NewSymbolTable()
func fuzzParseMetricWithContentType(in []byte, contentType string) int {
p, warning := textparse.New(in, contentType, false, false, symbolTable)
if warning != nil {
p, warning := textparse.New(in, contentType, "", false, false, symbolTable)
if p == nil || warning != nil {
// An invalid content type is being passed, which should not happen
// in this context.
panic(warning)
@ -91,7 +91,7 @@ func fuzzParseMetricWithContentType(in []byte, contentType string) int {
// Note that this is not the parser for the text-based exposition-format; that
// lives in github.com/prometheus/client_golang/text.
func FuzzParseMetric(in []byte) int {
return fuzzParseMetricWithContentType(in, "")
return fuzzParseMetricWithContentType(in, "text/plain")
}
func FuzzParseOpenMetric(in []byte) int {

View file

@ -112,9 +112,10 @@ type scrapeLoopOptions struct {
trackTimestampsStaleness bool
interval time.Duration
timeout time.Duration
scrapeClassicHistograms bool
alwaysScrapeClassicHist bool
convertClassicHistograms bool
validationScheme model.ValidationScheme
fallbackScrapeProtocol string
mrc []*relabel.Config
cache *scrapeCache
@ -180,7 +181,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
opts.labelLimits,
opts.interval,
opts.timeout,
opts.scrapeClassicHistograms,
opts.alwaysScrapeClassicHist,
opts.convertClassicHistograms,
options.EnableNativeHistogramsIngestion,
options.EnableCreatedTimestampZeroIngestion,
@ -191,6 +192,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
metrics,
options.skipOffsetting,
opts.validationScheme,
opts.fallbackScrapeProtocol,
)
}
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
@ -327,6 +329,7 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
enableCompression = sp.config.EnableCompression
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
mrc = sp.config.MetricRelabelConfigs
fallbackScrapeProtocol = sp.config.ScrapeFallbackProtocol.HeaderMediaType()
)
validationScheme := model.UTF8Validation
@ -373,6 +376,7 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
interval: interval,
timeout: timeout,
validationScheme: validationScheme,
fallbackScrapeProtocol: fallbackScrapeProtocol,
})
)
if err != nil {
@ -482,7 +486,8 @@ func (sp *scrapePool) sync(targets []*Target) {
enableCompression = sp.config.EnableCompression
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
mrc = sp.config.MetricRelabelConfigs
scrapeClassicHistograms = sp.config.ScrapeClassicHistograms
fallbackScrapeProtocol = sp.config.ScrapeFallbackProtocol.HeaderMediaType()
alwaysScrapeClassicHist = sp.config.AlwaysScrapeClassicHistograms
convertClassicHistograms = sp.config.ConvertClassicHistograms
)
@ -524,9 +529,10 @@ func (sp *scrapePool) sync(targets []*Target) {
mrc: mrc,
interval: interval,
timeout: timeout,
scrapeClassicHistograms: scrapeClassicHistograms,
alwaysScrapeClassicHist: alwaysScrapeClassicHist,
convertClassicHistograms: convertClassicHistograms,
validationScheme: validationScheme,
fallbackScrapeProtocol: fallbackScrapeProtocol,
})
if err != nil {
l.setForcedError(err)
@ -887,9 +893,10 @@ type scrapeLoop struct {
labelLimits *labelLimits
interval time.Duration
timeout time.Duration
scrapeClassicHistograms bool
alwaysScrapeClassicHist bool
convertClassicHistograms bool
validationScheme model.ValidationScheme
fallbackScrapeProtocol string
// Feature flagged options.
enableNativeHistogramIngestion bool
@ -1188,7 +1195,7 @@ func newScrapeLoop(ctx context.Context,
labelLimits *labelLimits,
interval time.Duration,
timeout time.Duration,
scrapeClassicHistograms bool,
alwaysScrapeClassicHist bool,
convertClassicHistograms bool,
enableNativeHistogramIngestion bool,
enableCTZeroIngestion bool,
@ -1199,6 +1206,7 @@ func newScrapeLoop(ctx context.Context,
metrics *scrapeMetrics,
skipOffsetting bool,
validationScheme model.ValidationScheme,
fallbackScrapeProtocol string,
) *scrapeLoop {
if l == nil {
l = promslog.NewNopLogger()
@ -1243,7 +1251,7 @@ func newScrapeLoop(ctx context.Context,
labelLimits: labelLimits,
interval: interval,
timeout: timeout,
scrapeClassicHistograms: scrapeClassicHistograms,
alwaysScrapeClassicHist: alwaysScrapeClassicHist,
convertClassicHistograms: convertClassicHistograms,
enableNativeHistogramIngestion: enableNativeHistogramIngestion,
enableCTZeroIngestion: enableCTZeroIngestion,
@ -1252,6 +1260,7 @@ func newScrapeLoop(ctx context.Context,
metrics: metrics,
skipOffsetting: skipOffsetting,
validationScheme: validationScheme,
fallbackScrapeProtocol: fallbackScrapeProtocol,
}
sl.ctx, sl.cancel = context.WithCancel(ctx)
@ -1544,14 +1553,24 @@ type appendErrors struct {
}
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.enableCTZeroIngestion, sl.symbolTable)
p, err := textparse.New(b, contentType, sl.fallbackScrapeProtocol, sl.alwaysScrapeClassicHist, sl.enableCTZeroIngestion, sl.symbolTable)
if p == nil {
sl.l.Error(
"Failed to determine correct type of scrape target.",
"content_type", contentType,
"fallback_media_type", sl.fallbackScrapeProtocol,
"err", err,
)
return
}
if sl.convertClassicHistograms {
p = textparse.NewNHCBParser(p, sl.symbolTable, sl.scrapeClassicHistograms)
p = textparse.NewNHCBParser(p, sl.symbolTable, sl.alwaysScrapeClassicHist)
}
if err != nil {
sl.l.Debug(
"Invalid content type on scrape, using prometheus parser as fallback.",
"Invalid content type on scrape, using fallback setting.",
"content_type", contentType,
"fallback_media_type", sl.fallbackScrapeProtocol,
"err", err,
)
}

View file

@ -692,6 +692,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
newTestScrapeMetrics(t),
false,
model.LegacyValidation,
"text/plain",
)
}
@ -836,6 +837,7 @@ func TestScrapeLoopRun(t *testing.T) {
scrapeMetrics,
false,
model.LegacyValidation,
"text/plain",
)
// The loop must terminate during the initial offset if the context
@ -982,6 +984,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
scrapeMetrics,
false,
model.LegacyValidation,
"text/plain",
)
defer cancel()
@ -1530,7 +1533,8 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
fakeRef := storage.SeriesRef(1)
expValue := float64(1)
metric := []byte(`metric{n="1"} 1`)
p, warning := textparse.New(metric, "", false, false, labels.NewSymbolTable())
p, warning := textparse.New(metric, "text/plain", "", false, false, labels.NewSymbolTable())
require.NotNil(t, p)
require.NoError(t, warning)
var lset labels.Labels
@ -1850,7 +1854,7 @@ func TestScrapeLoopAppendStalenessIfTrackTimestampStaleness(t *testing.T) {
func TestScrapeLoopAppendExemplar(t *testing.T) {
tests := []struct {
title string
scrapeClassicHistograms bool
alwaysScrapeClassicHist bool
enableNativeHistogramsIngestion bool
scrapeText string
contentType string
@ -2119,7 +2123,7 @@ metric: <
>
`,
scrapeClassicHistograms: true,
alwaysScrapeClassicHist: true,
contentType: "application/vnd.google.protobuf",
floats: []floatSample{
{metric: labels.FromStrings("__name__", "test_histogram_count"), t: 1234568, f: 175},
@ -2181,7 +2185,7 @@ metric: <
sl.reportSampleMutator = func(l labels.Labels) labels.Labels {
return mutateReportSampleLabels(l, discoveryLabels)
}
sl.scrapeClassicHistograms = test.scrapeClassicHistograms
sl.alwaysScrapeClassicHist = test.alwaysScrapeClassicHist
now := time.Now()

View file

@ -4757,7 +4757,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
require.Len(t, seriesSet, 1)
gotSamples := seriesSet[series1.String()]
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets)
// Verify chunks querier.
chunkQuerier, err := db.ChunkQuerier(minT, maxT)
@ -4775,7 +4775,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
gotChunkSamples = append(gotChunkSamples, smpls...)
require.NoError(t, it.Err())
}
requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, true)
requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, requireEqualSamplesIgnoreCounterResets)
}
var expSamples []chunks.Sample
@ -5704,16 +5704,33 @@ func testQuerierOOOQuery(t *testing.T,
gotSamples := seriesSet[series1.String()]
require.NotNil(t, gotSamples)
require.Len(t, seriesSet, 1)
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets)
requireEqualOOOSamples(t, oooSamples, db)
})
}
}
func TestChunkQuerierOOOQuery(t *testing.T) {
nBucketHistogram := func(n int64) *histogram.Histogram {
h := &histogram.Histogram{
Count: uint64(n),
Sum: float64(n),
}
if n == 0 {
h.PositiveSpans = []histogram.Span{}
h.PositiveBuckets = []int64{}
return h
}
h.PositiveSpans = []histogram.Span{{Offset: 0, Length: uint32(n)}}
h.PositiveBuckets = make([]int64, n)
h.PositiveBuckets[0] = 1
return h
}
scenarios := map[string]struct {
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error)
sampleFunc func(ts int64) chunks.Sample
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error)
sampleFunc func(ts int64) chunks.Sample
checkInUseBucket bool
}{
"float": {
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
@ -5758,10 +5775,24 @@ func TestChunkQuerierOOOQuery(t *testing.T) {
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))}
},
},
"integer histogram with recode": {
// Histograms have increasing number of buckets so their chunks are recoded.
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
n := ts / time.Minute.Milliseconds()
return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nBucketHistogram(n), nil)
},
sampleFunc: func(ts int64) chunks.Sample {
n := ts / time.Minute.Milliseconds()
return sample{t: ts, h: nBucketHistogram(n)}
},
// Only check in-use buckets for this scenario.
// Recoding adds empty buckets.
checkInUseBucket: true,
},
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc)
testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc, scenario.checkInUseBucket)
})
}
}
@ -5769,6 +5800,7 @@ func TestChunkQuerierOOOQuery(t *testing.T) {
func testChunkQuerierOOOQuery(t *testing.T,
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error),
sampleFunc func(ts int64) chunks.Sample,
checkInUseBuckets bool,
) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 30
@ -6008,10 +6040,28 @@ func testChunkQuerierOOOQuery(t *testing.T,
it := chunk.Chunk.Iterator(nil)
smpls, err := storage.ExpandSamples(it, newSample)
require.NoError(t, err)
// Verify that no sample is outside the chunk's time range.
for i, s := range smpls {
switch i {
case 0:
require.Equal(t, chunk.MinTime, s.T(), "first sample %v not at chunk min time %v", s, chunk.MinTime)
case len(smpls) - 1:
require.Equal(t, chunk.MaxTime, s.T(), "last sample %v not at chunk max time %v", s, chunk.MaxTime)
default:
require.GreaterOrEqual(t, s.T(), chunk.MinTime, "sample %v before chunk min time %v", s, chunk.MinTime)
require.LessOrEqual(t, s.T(), chunk.MaxTime, "sample %v after chunk max time %v", s, chunk.MaxTime)
}
}
gotSamples = append(gotSamples, smpls...)
require.NoError(t, it.Err())
}
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
if checkInUseBuckets {
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets, requireEqualSamplesInUseBucketCompare)
} else {
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets)
}
})
}
}

View file

@ -5178,7 +5178,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
// Passing in true for the 'ignoreCounterResets' parameter prevents differences in counter reset headers
// from being factored in to the sample comparison
// TODO(fionaliao): understand counter reset behaviour, might want to modify this later
requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, true)
requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, requireEqualSamplesIgnoreCounterResets)
require.NoError(t, h.Close())
}

View file

@ -878,7 +878,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
}
resultSamples, err := storage.ExpandSamples(it, nil)
require.NoError(t, err)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets)
}
})
}
@ -1054,7 +1054,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
it := iterable.Iterator(nil)
resultSamples, err := storage.ExpandSamples(it, nil)
require.NoError(t, err)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets)
}
})
}

View file

@ -1022,9 +1022,9 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
if newChunk != nil {
if !recoded {
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
cmint = t
}
currentChunk = newChunk
cmint = t
}
cmaxt = t

View file

@ -111,7 +111,7 @@ func requireEqualSeries(t *testing.T, expected, actual map[string][]chunks.Sampl
for name, expectedItem := range expected {
actualItem, ok := actual[name]
require.True(t, ok, "Expected series %s not found", name)
requireEqualSamples(t, name, expectedItem, actualItem, ignoreCounterResets)
requireEqualSamples(t, name, expectedItem, actualItem, requireEqualSamplesIgnoreCounterResets)
}
for name := range actual {
_, ok := expected[name]
@ -126,7 +126,28 @@ func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) {
"number of ooo appended samples mismatch")
}
func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, ignoreCounterResets bool) {
type requireEqualSamplesOption int
const (
requireEqualSamplesNoOption requireEqualSamplesOption = iota
requireEqualSamplesIgnoreCounterResets
requireEqualSamplesInUseBucketCompare
)
func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, options ...requireEqualSamplesOption) {
var (
ignoreCounterResets bool
inUseBucketCompare bool
)
for _, option := range options {
switch option {
case requireEqualSamplesIgnoreCounterResets:
ignoreCounterResets = true
case requireEqualSamplesInUseBucketCompare:
inUseBucketCompare = true
}
}
require.Equal(t, len(expected), len(actual), "Length not equal to expected for %s", name)
for i, s := range expected {
expectedSample := s
@ -144,6 +165,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa
} else {
require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint))
}
if inUseBucketCompare {
expectedSample.H().Compact(0)
actualSample.H().Compact(0)
}
require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T())
}
case s.FH() != nil:
@ -156,6 +181,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa
} else {
require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint))
}
if inUseBucketCompare {
expectedSample.FH().Compact(0)
actualSample.FH().Compact(0)
}
require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T())
}
default: