Merge branch 'main' into krajo/update-pr-14546-from-main
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

This commit is contained in:
György Krajcsovits 2024-08-21 16:09:17 +02:00
commit 336603dfe5
43 changed files with 459 additions and 242 deletions

View file

@ -186,7 +186,7 @@ jobs:
with:
args: --verbose
# Make sure to sync this with Makefile.common and scripts/golangci-lint.yml.
version: v1.59.1
version: v1.60.1
fuzzing:
uses: ./.github/workflows/fuzzing.yml
if: github.event_name == 'pull_request'

View file

@ -3,6 +3,7 @@
## unreleased
* [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200
* [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042
## 2.54.0-rc.1 / 2024-08-05

View file

@ -61,7 +61,7 @@ PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_
SKIP_GOLANGCI_LINT :=
GOLANGCI_LINT :=
GOLANGCI_LINT_OPTS ?=
GOLANGCI_LINT_VERSION ?= v1.59.1
GOLANGCI_LINT_VERSION ?= v1.60.1
# golangci-lint only supports linux, darwin and windows platforms on i386/amd64/arm64.
# windows isn't included here because of the path separator being different.
ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin))

View file

@ -152,6 +152,7 @@ type flagConfig struct {
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline model.Duration
nameEscapingScheme string
featureList []string
memlimitRatio float64
@ -240,6 +241,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "delayed-compaction":
c.tsdb.EnableDelayedCompaction = true
level.Info(logger).Log("msg", "Experimental delayed compaction is enabled.")
case "utf8-names":
model.NameValidationScheme = model.UTF8Validation
level.Info(logger).Log("msg", "Experimental UTF-8 support enabled")
case "":
continue
case "promql-at-modifier", "promql-negative-offset":
@ -484,7 +488,9 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("scrape.name-escaping-scheme", `Method for escaping legacy invalid names when sending to Prometheus that does not support UTF-8. Can be one of "values", "underscores", or "dots".`).Default(scrape.DefaultNameEscapingScheme.String()).StringVar(&cfg.nameEscapingScheme)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
promlogflag.AddFlags(a, &cfg.promlogConfig)
@ -512,6 +518,15 @@ func main() {
os.Exit(1)
}
if cfg.nameEscapingScheme != "" {
scheme, err := model.ToEscapingScheme(cfg.nameEscapingScheme)
if err != nil {
fmt.Fprintf(os.Stderr, `Invalid name escaping scheme: %q; Needs to be one of "values", "underscores", or "dots"`, cfg.nameEscapingScheme)
os.Exit(1)
}
model.NameEscapingScheme = scheme
}
if agentMode && len(serverOnlyFlags) > 0 {
fmt.Fprintf(os.Stderr, "The following flag(s) can not be used in agent mode: %q", serverOnlyFlags)
os.Exit(3)

View file

@ -866,16 +866,16 @@ func displayHistogram(dataType string, datas []int, total int) {
fmt.Println()
}
func generateBucket(min, max int) (start, end, step int) {
s := (max - min) / 10
func generateBucket(minVal, maxVal int) (start, end, step int) {
s := (maxVal - minVal) / 10
step = 10
for step < s && step <= 10000 {
step *= 10
}
start = min - min%step
end = max - max%step + step
start = minVal - minVal%step
end = maxVal - maxVal%step + step
return
}

View file

@ -67,6 +67,11 @@ var (
}
)
const (
LegacyValidationConfig = "legacy"
UTF8ValidationConfig = "utf8"
)
// Load parses the YAML input s into a Config.
func Load(s string, expandExternalLabels bool, logger log.Logger) (*Config, error) {
cfg := &Config{}
@ -446,6 +451,8 @@ type GlobalConfig struct {
// Keep no more than this many dropped targets per job.
// 0 means no limit.
KeepDroppedTargets uint `yaml:"keep_dropped_targets,omitempty"`
// Allow UTF8 Metric and Label Names.
MetricNameValidationScheme string `yaml:"metric_name_validation_scheme,omitempty"`
}
// ScrapeProtocol represents supported protocol for scraping metrics.
@ -471,6 +478,7 @@ var (
PrometheusText0_0_4 ScrapeProtocol = "PrometheusText0.0.4"
OpenMetricsText0_0_1 ScrapeProtocol = "OpenMetricsText0.0.1"
OpenMetricsText1_0_0 ScrapeProtocol = "OpenMetricsText1.0.0"
UTF8NamesHeader string = model.EscapingKey + "=" + model.AllowUTF8
ScrapeProtocolsHeaders = map[ScrapeProtocol]string{
PrometheusProto: "application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited",
@ -656,6 +664,8 @@ type ScrapeConfig struct {
// Keep no more than this many dropped targets per job.
// 0 means no limit.
KeepDroppedTargets uint `yaml:"keep_dropped_targets,omitempty"`
// Allow UTF8 Metric and Label Names.
MetricNameValidationScheme string `yaml:"metric_name_validation_scheme,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
@ -762,6 +772,17 @@ func (c *ScrapeConfig) Validate(globalConfig GlobalConfig) error {
return fmt.Errorf("%w for scrape config with job name %q", err, c.JobName)
}
switch globalConfig.MetricNameValidationScheme {
case "", LegacyValidationConfig:
case UTF8ValidationConfig:
if model.NameValidationScheme != model.UTF8Validation {
return fmt.Errorf("utf8 name validation requested but feature not enabled via --enable-feature=utf8-names")
}
default:
return fmt.Errorf("unknown name validation method specified, must be either 'legacy' or 'utf8', got %s", globalConfig.MetricNameValidationScheme)
}
c.MetricNameValidationScheme = globalConfig.MetricNameValidationScheme
return nil
}

View file

@ -97,7 +97,6 @@ func fetchApps(ctx context.Context, server string, client *http.Client) (*Applic
resp.Body.Close()
}()
//nolint:usestdlibvars
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("non 2xx status '%d' response during eureka service discovery", resp.StatusCode)
}

View file

@ -87,7 +87,6 @@ func (d *robotDiscovery) refresh(context.Context) ([]*targetgroup.Group, error)
resp.Body.Close()
}()
//nolint:usestdlibvars
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("non 2xx status '%d' response during hetzner service discovery with role robot", resp.StatusCode)
}

View file

@ -154,7 +154,7 @@ func (d k8sDiscoveryTest) Run(t *testing.T) {
// readResultWithTimeout reads all targetgroups from channel with timeout.
// It merges targetgroups by source and sends the result to result channel.
func readResultWithTimeout(t *testing.T, ctx context.Context, ch <-chan []*targetgroup.Group, max int, stopAfter time.Duration, resChan chan<- map[string]*targetgroup.Group) {
func readResultWithTimeout(t *testing.T, ctx context.Context, ch <-chan []*targetgroup.Group, maxGroups int, stopAfter time.Duration, resChan chan<- map[string]*targetgroup.Group) {
res := make(map[string]*targetgroup.Group)
timeout := time.After(stopAfter)
Loop:
@ -167,7 +167,7 @@ Loop:
}
res[tg.Source] = tg
}
if len(res) == max {
if len(res) == maxGroups {
// Reached max target groups we may get, break fast.
break Loop
}
@ -175,10 +175,10 @@ Loop:
// Because we use queue, an object that is created then
// deleted or updated may be processed only once.
// So possibly we may skip events, timed out here.
t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(res), max)
t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(res), maxGroups)
break Loop
case <-ctx.Done():
t.Logf("stopped, got %d (max: %d) items", len(res), max)
t.Logf("stopped, got %d (max: %d) items", len(res), maxGroups)
break Loop
}
}

View file

@ -56,7 +56,8 @@ The Prometheus monitoring server
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--scrape.name-escaping-scheme</code> | Method for escaping legacy invalid names when sending to Prometheus that does not support UTF-8. Can be one of "values", "underscores", or "dots". | `values` |
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |

View file

@ -121,6 +121,11 @@ global:
# that will be kept in memory. 0 means no limit.
[ keep_dropped_targets: <int> | default = 0 ]
# Specifies the validation scheme for metric and label names. Either blank or
# "legacy" for letters, numbers, colons, and underscores; or "utf8" for full
# UTF-8 support.
[ metric_name_validation_scheme <string> | default "legacy" ]
runtime:
# Configure the Go garbage collector GOGC parameter
# See: https://tip.golang.org/doc/gc-guide#GOGC
@ -461,6 +466,11 @@ metric_relabel_configs:
# that will be kept in memory. 0 means no limit.
[ keep_dropped_targets: <int> | default = 0 ]
# Specifies the validation scheme for metric and label names. Either blank or
# "legacy" for letters, numbers, colons, and underscores; or "utf8" for full
# UTF-8 support.
[ metric_name_validation_scheme <string> | default "legacy" ]
# Limit on total number of positive and negative buckets allowed in a single
# native histogram. The resolution of a histogram with more buckets will be
# reduced until the number of buckets is within the limit. If the limit cannot

View file

@ -249,3 +249,11 @@ In the event of multiple consecutive Head compactions being possible, only the f
Note that during this delay, the Head continues its usual operations, which include serving and appending series.
Despite the delay in compaction, the blocks produced are time-aligned in the same manner as they would be if the delay was not in place.
## UTF-8 Name Support
`--enable-feature=utf8-names`
When enabled, changes the metric and label name validation scheme inside Prometheus to allow the full UTF-8 character set.
By itself, this flag does not enable the request of UTF-8 names via content negotiation.
Users will also have to set `metric_name_validation_scheme` in scrape configs to enable the feature either on the global config or on a per-scrape config basis.

View file

@ -619,7 +619,7 @@ Like `sort`, `sort_desc` only affects the results of instant queries, as range q
**This function has to be enabled via the [feature flag](../feature_flags.md#experimental-promql-functions) `--enable-feature=promql-experimental-functions`.**
`sort_by_label(v instant-vector, label string, ...)` returns vector elements sorted by their label values and sample value in case of label values being equal, in ascending order.
`sort_by_label(v instant-vector, label string, ...)` returns vector elements sorted by the values of the given labels in ascending order. In case these label values are equal, elements are sorted by their full label sets.
Please note that the sort by label functions only affect the results of instant queries, as range query results always have a fixed output ordering.

View file

@ -20,7 +20,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect
github.com/aws/aws-sdk-go v1.53.16 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
@ -62,7 +62,7 @@ require (
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

View file

@ -26,8 +26,8 @@ github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9/go.mod h1:OMCwj8V
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.53.16 h1:8oZjKQO/ml1WLUZw5hvF7pvYjPf8o9f57Wldoy/q9Qc=
github.com/aws/aws-sdk-go v1.53.16/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@ -37,8 +37,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 h1:DBmgJDC9dTfkVyGgipamEh2BpGYxScCH1TOF1LL1cXc=
github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50/go.mod h1:5e1+Vvlzido69INQaVO6d87Qn543Xr6nooe9Kz7oBFM=
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw=
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
@ -402,8 +402,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View file

@ -674,7 +674,6 @@ func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []b
}()
// Any HTTP status 2xx is OK.
//nolint:usestdlibvars
if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad response status %s", resp.Status)
}

View file

@ -711,7 +711,7 @@ func TestHangingNotifier(t *testing.T) {
)
var (
sendTimeout = 10 * time.Millisecond
sendTimeout = 100 * time.Millisecond
sdUpdatert = sendTimeout / 2
done = make(chan struct{})

View file

@ -406,17 +406,22 @@ func funcSortDesc(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel
// === sort_by_label(vector parser.ValueTypeVector, label parser.ValueTypeString...) (Vector, Annotations) ===
func funcSortByLabel(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
// In case the labels are the same, NaN should sort to the bottom, so take
// ascending sort with NaN first and reverse it.
var anno annotations.Annotations
vals[0], anno = funcSort(vals, args, enh)
labels := stringSliceFromArgs(args[1:])
// First, sort by the full label set. This ensures a consistent ordering in case sorting by the
// labels provided as arguments is not conclusive.
slices.SortFunc(vals[0].(Vector), func(a, b Sample) int {
// Iterate over each given label
return labels.Compare(a.Metric, b.Metric)
})
labels := stringSliceFromArgs(args[1:])
// Next, sort by the labels provided as arguments.
slices.SortFunc(vals[0].(Vector), func(a, b Sample) int {
// Iterate over each given label.
for _, label := range labels {
lv1 := a.Metric.Get(label)
lv2 := b.Metric.Get(label)
// If we encounter multiple samples with the same label values, the sorting which was
// performed in the first step will act as a "tie breaker".
if lv1 == lv2 {
continue
}
@ -431,22 +436,27 @@ func funcSortByLabel(vals []parser.Value, args parser.Expressions, enh *EvalNode
return 0
})
return vals[0].(Vector), anno
return vals[0].(Vector), nil
}
// === sort_by_label_desc(vector parser.ValueTypeVector, label parser.ValueTypeString...) (Vector, Annotations) ===
func funcSortByLabelDesc(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
// In case the labels are the same, NaN should sort to the bottom, so take
// ascending sort with NaN first and reverse it.
var anno annotations.Annotations
vals[0], anno = funcSortDesc(vals, args, enh)
labels := stringSliceFromArgs(args[1:])
// First, sort by the full label set. This ensures a consistent ordering in case sorting by the
// labels provided as arguments is not conclusive.
slices.SortFunc(vals[0].(Vector), func(a, b Sample) int {
// Iterate over each given label
return labels.Compare(b.Metric, a.Metric)
})
labels := stringSliceFromArgs(args[1:])
// Next, sort by the labels provided as arguments.
slices.SortFunc(vals[0].(Vector), func(a, b Sample) int {
// Iterate over each given label.
for _, label := range labels {
lv1 := a.Metric.Get(label)
lv2 := b.Metric.Get(label)
// If we encounter multiple samples with the same label values, the sorting which was
// performed in the first step will act as a "tie breaker".
if lv1 == lv2 {
continue
}
@ -461,21 +471,21 @@ func funcSortByLabelDesc(vals []parser.Value, args parser.Expressions, enh *Eval
return 0
})
return vals[0].(Vector), anno
return vals[0].(Vector), nil
}
// === clamp(Vector parser.ValueTypeVector, min, max Scalar) (Vector, Annotations) ===
func funcClamp(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec := vals[0].(Vector)
min := vals[1].(Vector)[0].F
max := vals[2].(Vector)[0].F
if max < min {
minVal := vals[1].(Vector)[0].F
maxVal := vals[2].(Vector)[0].F
if maxVal < minVal {
return enh.Out, nil
}
for _, el := range vec {
enh.Out = append(enh.Out, Sample{
Metric: el.Metric.DropMetricName(),
F: math.Max(min, math.Min(max, el.F)),
F: math.Max(minVal, math.Min(maxVal, el.F)),
})
}
return enh.Out, nil
@ -484,11 +494,11 @@ func funcClamp(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper
// === clamp_max(Vector parser.ValueTypeVector, max Scalar) (Vector, Annotations) ===
func funcClampMax(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec := vals[0].(Vector)
max := vals[1].(Vector)[0].F
maxVal := vals[1].(Vector)[0].F
for _, el := range vec {
enh.Out = append(enh.Out, Sample{
Metric: el.Metric.DropMetricName(),
F: math.Min(max, el.F),
F: math.Min(maxVal, el.F),
})
}
return enh.Out, nil
@ -497,11 +507,11 @@ func funcClampMax(vals []parser.Value, args parser.Expressions, enh *EvalNodeHel
// === clamp_min(Vector parser.ValueTypeVector, min Scalar) (Vector, Annotations) ===
func funcClampMin(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec := vals[0].(Vector)
min := vals[1].(Vector)[0].F
minVal := vals[1].(Vector)[0].F
for _, el := range vec {
enh.Out = append(enh.Out, Sample{
Metric: el.Metric.DropMetricName(),
F: math.Max(min, el.F),
F: math.Max(minVal, el.F),
})
}
return enh.Out, nil
@ -700,13 +710,13 @@ func funcMaxOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode
return enh.Out, nil
}
return aggrOverTime(vals, enh, func(s Series) float64 {
max := s.Floats[0].F
maxVal := s.Floats[0].F
for _, f := range s.Floats {
if f.F > max || math.IsNaN(max) {
max = f.F
if f.F > maxVal || math.IsNaN(maxVal) {
maxVal = f.F
}
}
return max
return maxVal
}), nil
}
@ -720,13 +730,13 @@ func funcMinOverTime(vals []parser.Value, args parser.Expressions, enh *EvalNode
return enh.Out, nil
}
return aggrOverTime(vals, enh, func(s Series) float64 {
min := s.Floats[0].F
minVal := s.Floats[0].F
for _, f := range s.Floats {
if f.F < min || math.IsNaN(min) {
min = f.F
if f.F < minVal || math.IsNaN(minVal) {
minVal = f.F
}
}
return min
return minVal
}), nil
}

View file

@ -727,23 +727,23 @@ func lexValueSequence(l *Lexer) stateFn {
// was only modified to integrate with our lexer.
func lexEscape(l *Lexer) stateFn {
var n int
var base, max uint32
var base, maxVal uint32
ch := l.next()
switch ch {
case 'a', 'b', 'f', 'n', 'r', 't', 'v', '\\', l.stringOpen:
return lexString
case '0', '1', '2', '3', '4', '5', '6', '7':
n, base, max = 3, 8, 255
n, base, maxVal = 3, 8, 255
case 'x':
ch = l.next()
n, base, max = 2, 16, 255
n, base, maxVal = 2, 16, 255
case 'u':
ch = l.next()
n, base, max = 4, 16, unicode.MaxRune
n, base, maxVal = 4, 16, unicode.MaxRune
case 'U':
ch = l.next()
n, base, max = 8, 16, unicode.MaxRune
n, base, maxVal = 8, 16, unicode.MaxRune
case eof:
l.errorf("escape sequence not terminated")
return lexString
@ -772,7 +772,7 @@ func lexEscape(l *Lexer) stateFn {
}
}
if x > max || 0xD800 <= x && x < 0xE000 {
if x > maxVal || 0xD800 <= x && x < 0xE000 {
l.errorf("escape sequence is an invalid Unicode code point")
}
return lexString

View file

@ -523,16 +523,16 @@ load 5m
node_uname_info{job="node_exporter", instance="4m1000", release="1.111.3"} 0+10x10
eval_ordered instant at 50m sort_by_label(http_requests, "instance")
http_requests{group="production", instance="0", job="api-server"} 100
http_requests{group="canary", instance="0", job="api-server"} 300
http_requests{group="production", instance="0", job="app-server"} 500
http_requests{group="canary", instance="0", job="app-server"} 700
http_requests{group="production", instance="1", job="api-server"} 200
http_requests{group="production", instance="0", job="api-server"} 100
http_requests{group="production", instance="0", job="app-server"} 500
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="production", instance="1", job="app-server"} 600
http_requests{group="canary", instance="1", job="app-server"} 800
http_requests{group="production", instance="2", job="api-server"} 100
http_requests{group="production", instance="1", job="api-server"} 200
http_requests{group="production", instance="1", job="app-server"} 600
http_requests{group="canary", instance="2", job="api-server"} NaN
http_requests{group="production", instance="2", job="api-server"} 100
eval_ordered instant at 50m sort_by_label(http_requests, "instance", "group")
http_requests{group="canary", instance="0", job="api-server"} 300
@ -585,14 +585,14 @@ eval_ordered instant at 50m sort_by_label(http_requests, "job", "instance", "gro
eval_ordered instant at 50m sort_by_label_desc(http_requests, "instance")
http_requests{group="production", instance="2", job="api-server"} 100
http_requests{group="canary", instance="2", job="api-server"} NaN
http_requests{group="canary", instance="1", job="app-server"} 800
http_requests{group="production", instance="1", job="app-server"} 600
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="production", instance="1", job="api-server"} 200
http_requests{group="canary", instance="0", job="app-server"} 700
http_requests{group="canary", instance="1", job="app-server"} 800
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="production", instance="0", job="app-server"} 500
http_requests{group="canary", instance="0", job="api-server"} 300
http_requests{group="production", instance="0", job="api-server"} 100
http_requests{group="canary", instance="0", job="app-server"} 700
http_requests{group="canary", instance="0", job="api-server"} 300
eval_ordered instant at 50m sort_by_label_desc(http_requests, "instance", "group")
http_requests{group="production", instance="2", job="api-server"} 100

View file

@ -93,6 +93,8 @@ type Options struct {
skipOffsetting bool
}
const DefaultNameEscapingScheme = model.ValueEncodingEscaping
// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups from the discovery manager.
type Manager struct {

View file

@ -303,6 +303,11 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
mrc = sp.config.MetricRelabelConfigs
)
validationScheme := model.LegacyValidation
if sp.config.MetricNameValidationScheme == config.UTF8ValidationConfig {
validationScheme = model.UTF8Validation
}
sp.targetMtx.Lock()
forcedErr := sp.refreshTargetLimitErr()
@ -323,7 +328,7 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
client: sp.client,
timeout: timeout,
bodySizeLimit: bodySizeLimit,
acceptHeader: acceptHeader(sp.config.ScrapeProtocols),
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, validationScheme),
acceptEncodingHeader: acceptEncodingHeader(enableCompression),
}
newLoop = sp.newLoop(scrapeLoopOptions{
@ -452,6 +457,11 @@ func (sp *scrapePool) sync(targets []*Target) {
scrapeClassicHistograms = sp.config.ScrapeClassicHistograms
)
validationScheme := model.LegacyValidation
if sp.config.MetricNameValidationScheme == config.UTF8ValidationConfig {
validationScheme = model.UTF8Validation
}
sp.targetMtx.Lock()
for _, t := range targets {
hash := t.hash()
@ -467,7 +477,7 @@ func (sp *scrapePool) sync(targets []*Target) {
client: sp.client,
timeout: timeout,
bodySizeLimit: bodySizeLimit,
acceptHeader: acceptHeader(sp.config.ScrapeProtocols),
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, validationScheme),
acceptEncodingHeader: acceptEncodingHeader(enableCompression),
metrics: sp.metrics,
}
@ -714,11 +724,16 @@ var errBodySizeLimit = errors.New("body size limit exceeded")
// acceptHeader transforms preference from the options into specific header values as
// https://www.rfc-editor.org/rfc/rfc9110.html#name-accept defines.
// No validation is here, we expect scrape protocols to be validated already.
func acceptHeader(sps []config.ScrapeProtocol) string {
func acceptHeader(sps []config.ScrapeProtocol, scheme model.ValidationScheme) string {
var vals []string
weight := len(config.ScrapeProtocolsHeaders) + 1
for _, sp := range sps {
vals = append(vals, fmt.Sprintf("%s;q=0.%d", config.ScrapeProtocolsHeaders[sp], weight))
val := config.ScrapeProtocolsHeaders[sp]
if scheme == model.UTF8Validation {
val += ";" + config.UTF8NamesHeader
}
val += fmt.Sprintf(";q=0.%d", weight)
vals = append(vals, val)
weight--
}
// Default match anything.

View file

@ -2339,11 +2339,15 @@ func TestTargetScraperScrapeOK(t *testing.T) {
)
var protobufParsing bool
var allowUTF8 bool
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
accept := r.Header.Get("Accept")
if allowUTF8 {
require.Truef(t, strings.Contains(accept, "escaping=allow-utf-8"), "Expected Accept header to allow utf8, got %q", accept)
}
if protobufParsing {
accept := r.Header.Get("Accept")
require.True(t, strings.HasPrefix(accept, "application/vnd.google.protobuf;"),
"Expected Accept header to prefer application/vnd.google.protobuf.")
}
@ -2351,7 +2355,11 @@ func TestTargetScraperScrapeOK(t *testing.T) {
timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")
require.Equal(t, expectedTimeout, timeout, "Expected scrape timeout header.")
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
if allowUTF8 {
w.Header().Set("Content-Type", `text/plain; version=1.0.0; escaping=allow-utf-8`)
} else {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
}
w.Write([]byte("metric_a 1\nmetric_b 2\n"))
}),
)
@ -2380,13 +2388,22 @@ func TestTargetScraperScrapeOK(t *testing.T) {
require.NoError(t, err)
contentType, err := ts.readResponse(context.Background(), resp, &buf)
require.NoError(t, err)
require.Equal(t, "text/plain; version=0.0.4", contentType)
if allowUTF8 {
require.Equal(t, "text/plain; version=1.0.0; escaping=allow-utf-8", contentType)
} else {
require.Equal(t, "text/plain; version=0.0.4", contentType)
}
require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
}
runTest(acceptHeader(config.DefaultScrapeProtocols))
runTest(acceptHeader(config.DefaultScrapeProtocols, model.LegacyValidation))
protobufParsing = true
runTest(acceptHeader(config.DefaultProtoFirstScrapeProtocols))
runTest(acceptHeader(config.DefaultProtoFirstScrapeProtocols, model.LegacyValidation))
protobufParsing = false
allowUTF8 = true
runTest(acceptHeader(config.DefaultScrapeProtocols, model.UTF8Validation))
protobufParsing = true
runTest(acceptHeader(config.DefaultProtoFirstScrapeProtocols, model.UTF8Validation))
}
func TestTargetScrapeScrapeCancel(t *testing.T) {
@ -2412,7 +2429,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) {
),
},
client: http.DefaultClient,
acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols, model.LegacyValidation),
}
ctx, cancel := context.WithCancel(context.Background())
@ -2467,7 +2484,7 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) {
),
},
client: http.DefaultClient,
acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols, model.LegacyValidation),
}
resp, err := ts.scrape(context.Background())
@ -2511,7 +2528,7 @@ func TestTargetScraperBodySizeLimit(t *testing.T) {
},
client: http.DefaultClient,
bodySizeLimit: bodySizeLimit,
acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols, model.LegacyValidation),
metrics: newTestScrapeMetrics(t),
}
var buf bytes.Buffer

View file

@ -1,8 +1,66 @@
Certificate:
Data:
Version: 3 (0x2)
Serial Number:
93:6c:9e:29:8d:37:7b:66
Signature Algorithm: sha256WithRSAEncryption
Issuer: C = XX, L = Default City, O = Default Company Ltd, CN = Prometheus Test CA
Validity
Not Before: Aug 20 11:51:23 2024 GMT
Not After : Dec 5 11:51:23 2044 GMT
Subject: C = XX, L = Default City, O = Default Company Ltd, CN = Prometheus Test CA
Subject Public Key Info:
Public Key Algorithm: rsaEncryption
Public-Key: (2048 bit)
Modulus:
00:e9:52:05:4d:f2:5a:95:04:2d:b8:73:8b:3c:e7:
47:48:67:00:be:dd:6c:41:f3:7c:4b:44:73:a3:77:
3e:84:af:30:d7:2a:ad:45:6a:b7:89:23:05:15:b3:
aa:46:79:b8:95:64:cc:13:c4:44:a1:01:a0:e2:3d:
a5:67:2b:aa:d3:13:06:43:33:1c:96:36:12:9e:c6:
1d:36:9b:d7:47:bd:28:2d:88:15:04:fa:14:a3:ff:
8c:26:22:c5:a2:15:c7:76:b3:11:f6:a3:44:9a:28:
0f:ca:fb:f4:51:a8:6a:05:94:7c:77:47:c8:21:56:
25:bf:e2:2e:df:33:f3:e4:bd:d6:47:a5:49:13:12:
c8:1f:4c:d7:2a:56:a2:6c:c1:cf:55:05:5d:9a:75:
a2:23:4e:e6:8e:ff:76:05:d6:e0:c8:0b:51:f0:b6:
a1:b2:7d:8f:9c:6a:a5:ce:86:92:15:8c:5b:86:45:
c0:4a:ff:54:b8:ee:cf:11:bd:07:cb:4b:7d:0b:a1:
9d:72:86:9f:55:bc:f9:6c:d9:55:60:96:30:3f:ec:
2d:f6:5f:9a:32:9a:5a:5c:1c:5f:32:f9:d6:0f:04:
f8:81:08:04:9a:95:c3:9d:5a:30:8e:a5:0e:47:2f:
00:ce:e0:2e:ad:5a:b8:b6:4c:55:7c:8a:59:22:b0:
ed:73
Exponent: 65537 (0x10001)
X509v3 extensions:
X509v3 Subject Key Identifier:
CC:F5:05:99:E5:AB:12:69:D8:78:89:4A:31:CA:F0:8B:0B:AD:66:1B
X509v3 Authority Key Identifier:
CC:F5:05:99:E5:AB:12:69:D8:78:89:4A:31:CA:F0:8B:0B:AD:66:1B
X509v3 Basic Constraints:
CA:TRUE
Signature Algorithm: sha256WithRSAEncryption
Signature Value:
4a:a1:b0:bc:c8:87:4f:7c:96:62:e5:09:29:ae:3a:2e:68:d0:
d2:c5:68:ed:ea:83:36:b1:86:f3:b9:e9:19:2b:b6:73:10:6f:
df:7f:bb:f1:76:81:03:c1:a1:5a:ee:6c:44:b8:7c:10:d1:5a:
d7:c1:92:64:59:35:a6:e0:aa:08:41:37:6e:e7:c8:b6:bd:0c:
4b:47:78:ec:c4:b4:15:a3:62:76:4a:39:8e:6e:19:ff:f0:c0:
8a:7e:1c:cd:87:e5:00:6c:f1:ce:27:26:ff:b8:e9:eb:f7:2f:
bd:c2:4b:9c:d6:57:de:74:74:b3:4f:03:98:9a:b5:08:2d:16:
ca:7f:b6:c8:76:62:86:1b:7c:f2:3e:6c:78:cc:2c:95:9a:bb:
77:25:e8:80:ff:9b:e8:f8:9a:85:3b:85:b7:17:4e:77:a1:cf:
4d:b9:d0:25:e8:5d:8c:e6:7c:f1:d9:52:30:3d:ec:2b:37:91:
bc:e2:e8:39:31:6f:3d:e9:98:70:80:7c:41:dd:19:13:05:21:
94:7b:16:cf:d8:ee:4e:38:34:5e:6a:ff:cd:85:ac:8f:94:9a:
dd:4e:77:05:13:a6:b4:80:52:b2:97:64:76:88:f4:dd:42:0a:
50:1c:80:fd:4b:6e:a9:62:10:aa:ef:2e:c1:2f:be:0e:c2:2e:
b5:28:5f:83
-----BEGIN CERTIFICATE-----
MIIDkTCCAnmgAwIBAgIJAJNsnimNN3tmMA0GCSqGSIb3DQEBCwUAMF8xCzAJBgNV
BAYTAlhYMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQg
Q29tcGFueSBMdGQxGzAZBgNVBAMMElByb21ldGhldXMgVGVzdCBDQTAeFw0xNTA4
MDQxNDA5MjFaFw0yNTA4MDExNDA5MjFaMF8xCzAJBgNVBAYTAlhYMRUwEwYDVQQH
Q29tcGFueSBMdGQxGzAZBgNVBAMMElByb21ldGhldXMgVGVzdCBDQTAeFw0yNDA4
MjAxMTUxMjNaFw00NDEyMDUxMTUxMjNaMF8xCzAJBgNVBAYTAlhYMRUwEwYDVQQH
DAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQgQ29tcGFueSBMdGQxGzAZ
BgNVBAMMElByb21ldGhldXMgVGVzdCBDQTCCASIwDQYJKoZIhvcNAQEBBQADggEP
ADCCAQoCggEBAOlSBU3yWpUELbhzizznR0hnAL7dbEHzfEtEc6N3PoSvMNcqrUVq
@ -12,11 +70,11 @@ yB9M1ypWomzBz1UFXZp1oiNO5o7/dgXW4MgLUfC2obJ9j5xqpc6GkhWMW4ZFwEr/
VLjuzxG9B8tLfQuhnXKGn1W8+WzZVWCWMD/sLfZfmjKaWlwcXzL51g8E+IEIBJqV
w51aMI6lDkcvAM7gLq1auLZMVXyKWSKw7XMCAwEAAaNQME4wHQYDVR0OBBYEFMz1
BZnlqxJp2HiJSjHK8IsLrWYbMB8GA1UdIwQYMBaAFMz1BZnlqxJp2HiJSjHK8IsL
rWYbMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAI2iA3w3TK5J15Pu
e4fPFB4jxQqsbUwuyXbCCv/jKLeFNCD4BjM181WZEYjPMumeTBVzU3aF45LWQIG1
0DJcrCL4mjMz9qgAoGqA7aDDXiJGbukMgYYsn7vrnVmrZH8T3E8ySlltr7+W578k
pJ5FxnbCroQwn0zLyVB3sFbS8E3vpBr3L8oy8PwPHhIScexcNVc3V6/m4vTZsXTH
U+vUm1XhDgpDcFMTg2QQiJbfpOYUkwIgnRDAT7t282t2KQWtnlqc3zwPQ1F/6Cpx
j19JeNsaF1DArkD7YlyKj/GhZLtHwFHG5cxznH0mLDJTW7bQvqqh2iQTeXmBk1lU
mM5lH/s=
rWYbMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAEqhsLzIh098lmLl
CSmuOi5o0NLFaO3qgzaxhvO56RkrtnMQb99/u/F2gQPBoVrubES4fBDRWtfBkmRZ
NabgqghBN27nyLa9DEtHeOzEtBWjYnZKOY5uGf/wwIp+HM2H5QBs8c4nJv+46ev3
L73CS5zWV950dLNPA5iatQgtFsp/tsh2YoYbfPI+bHjMLJWau3cl6ID/m+j4moU7
hbcXTnehz0250CXoXYzmfPHZUjA97Cs3kbzi6Dkxbz3pmHCAfEHdGRMFIZR7Fs/Y
7k44NF5q/82FrI+Umt1OdwUTprSAUrKXZHaI9N1CClAcgP1LbqliEKrvLsEvvg7C
LrUoX4M=
-----END CERTIFICATE-----

View file

@ -36,4 +36,4 @@ jobs:
uses: golangci/golangci-lint-action@aaa42aa0628b4ae2578232a66b541047968fac86 # v6.1.0
with:
args: --verbose
version: v1.59.1
version: v1.60.1

View file

@ -96,10 +96,10 @@ func TestSampleRingMixed(t *testing.T) {
// With ValNone as the preferred type, nothing should be initialized.
r := newSampleRing(10, 2, chunkenc.ValNone)
require.Zero(t, len(r.fBuf))
require.Zero(t, len(r.hBuf))
require.Zero(t, len(r.fhBuf))
require.Zero(t, len(r.iBuf))
require.Empty(t, r.fBuf)
require.Empty(t, r.hBuf)
require.Empty(t, r.fhBuf)
require.Empty(t, r.iBuf)
// But then mixed adds should work as expected.
r.addF(fSample{t: 1, f: 3.14})
@ -146,10 +146,10 @@ func TestSampleRingAtFloatHistogram(t *testing.T) {
// With ValNone as the preferred type, nothing should be initialized.
r := newSampleRing(10, 2, chunkenc.ValNone)
require.Zero(t, len(r.fBuf))
require.Zero(t, len(r.hBuf))
require.Zero(t, len(r.fhBuf))
require.Zero(t, len(r.iBuf))
require.Empty(t, r.fBuf)
require.Empty(t, r.hBuf)
require.Empty(t, r.fhBuf)
require.Empty(t, r.iBuf)
var (
h *histogram.Histogram

View file

@ -287,7 +287,6 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) (WriteRespo
// we can continue handling.
rs, _ := ParseWriteResponseStats(httpResp)
//nolint:usestdlibvars
if httpResp.StatusCode/100 == 2 {
return rs, nil
}
@ -297,7 +296,6 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) (WriteRespo
body, _ := io.ReadAll(io.LimitReader(httpResp.Body, maxErrMsgLen))
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, body)
//nolint:usestdlibvars
if httpResp.StatusCode/100 == 5 ||
(c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) {
return rs, RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
@ -382,7 +380,6 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe
return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err)
}
//nolint:usestdlibvars
if httpResp.StatusCode/100 != 2 {
return nil, fmt.Errorf("remote server %s returned HTTP status %s: %s", c.urlString, httpResp.Status, strings.TrimSpace(string(compressed)))
}

View file

@ -1522,7 +1522,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline anyways.
var (
max = s.qm.cfg.MaxSamplesPerSend
maxCount = s.qm.cfg.MaxSamplesPerSend
pBuf = proto.NewBuffer(nil)
pBufRaw []byte
@ -1530,19 +1530,19 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
)
// TODO(@tpaschalis) Should we also raise the max if we have WAL metadata?
if s.qm.sendExemplars {
max += int(float64(max) * 0.1)
maxCount += int(float64(maxCount) * 0.1)
}
// TODO: Dry all of this, we should make an interface/generic for the timeseries type.
batchQueue := queue.Chan()
pendingData := make([]prompb.TimeSeries, max)
pendingData := make([]prompb.TimeSeries, maxCount)
for i := range pendingData {
pendingData[i].Samples = []prompb.Sample{{}}
if s.qm.sendExemplars {
pendingData[i].Exemplars = []prompb.Exemplar{{}}
}
}
pendingDataV2 := make([]writev2.TimeSeries, max)
pendingDataV2 := make([]writev2.TimeSeries, maxCount)
for i := range pendingDataV2 {
pendingDataV2[i].Samples = []writev2.Sample{{}}
}

View file

@ -453,10 +453,10 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenHistogramsHeader))
expectHeaderValue(t, 0, resp.Header.Get(rw20WrittenExemplarsHeader))
require.Empty(t, len(appendable.samples))
require.Empty(t, len(appendable.histograms))
require.Empty(t, len(appendable.exemplars))
require.Empty(t, len(appendable.metadata))
require.Empty(t, appendable.samples)
require.Empty(t, appendable.histograms)
require.Empty(t, appendable.exemplars)
require.Empty(t, appendable.metadata)
return
}

View file

@ -166,7 +166,7 @@ func NewTemplateExpander(
return html_template.HTML(text)
},
"match": regexp.MatchString,
"title": strings.Title, //nolint:staticcheck
"title": strings.Title,
"toUpper": strings.ToUpper,
"toLower": strings.ToLower,
"graphLink": strutil.GraphLinkForExpression,

View file

@ -1,8 +1,66 @@
Certificate:
Data:
Version: 3 (0x2)
Serial Number:
93:6c:9e:29:8d:37:7b:66
Signature Algorithm: sha256WithRSAEncryption
Issuer: C = XX, L = Default City, O = Default Company Ltd, CN = Prometheus Test CA
Validity
Not Before: Aug 20 11:51:23 2024 GMT
Not After : Dec 5 11:51:23 2044 GMT
Subject: C = XX, L = Default City, O = Default Company Ltd, CN = Prometheus Test CA
Subject Public Key Info:
Public Key Algorithm: rsaEncryption
Public-Key: (2048 bit)
Modulus:
00:e9:52:05:4d:f2:5a:95:04:2d:b8:73:8b:3c:e7:
47:48:67:00:be:dd:6c:41:f3:7c:4b:44:73:a3:77:
3e:84:af:30:d7:2a:ad:45:6a:b7:89:23:05:15:b3:
aa:46:79:b8:95:64:cc:13:c4:44:a1:01:a0:e2:3d:
a5:67:2b:aa:d3:13:06:43:33:1c:96:36:12:9e:c6:
1d:36:9b:d7:47:bd:28:2d:88:15:04:fa:14:a3:ff:
8c:26:22:c5:a2:15:c7:76:b3:11:f6:a3:44:9a:28:
0f:ca:fb:f4:51:a8:6a:05:94:7c:77:47:c8:21:56:
25:bf:e2:2e:df:33:f3:e4:bd:d6:47:a5:49:13:12:
c8:1f:4c:d7:2a:56:a2:6c:c1:cf:55:05:5d:9a:75:
a2:23:4e:e6:8e:ff:76:05:d6:e0:c8:0b:51:f0:b6:
a1:b2:7d:8f:9c:6a:a5:ce:86:92:15:8c:5b:86:45:
c0:4a:ff:54:b8:ee:cf:11:bd:07:cb:4b:7d:0b:a1:
9d:72:86:9f:55:bc:f9:6c:d9:55:60:96:30:3f:ec:
2d:f6:5f:9a:32:9a:5a:5c:1c:5f:32:f9:d6:0f:04:
f8:81:08:04:9a:95:c3:9d:5a:30:8e:a5:0e:47:2f:
00:ce:e0:2e:ad:5a:b8:b6:4c:55:7c:8a:59:22:b0:
ed:73
Exponent: 65537 (0x10001)
X509v3 extensions:
X509v3 Subject Key Identifier:
CC:F5:05:99:E5:AB:12:69:D8:78:89:4A:31:CA:F0:8B:0B:AD:66:1B
X509v3 Authority Key Identifier:
CC:F5:05:99:E5:AB:12:69:D8:78:89:4A:31:CA:F0:8B:0B:AD:66:1B
X509v3 Basic Constraints:
CA:TRUE
Signature Algorithm: sha256WithRSAEncryption
Signature Value:
4a:a1:b0:bc:c8:87:4f:7c:96:62:e5:09:29:ae:3a:2e:68:d0:
d2:c5:68:ed:ea:83:36:b1:86:f3:b9:e9:19:2b:b6:73:10:6f:
df:7f:bb:f1:76:81:03:c1:a1:5a:ee:6c:44:b8:7c:10:d1:5a:
d7:c1:92:64:59:35:a6:e0:aa:08:41:37:6e:e7:c8:b6:bd:0c:
4b:47:78:ec:c4:b4:15:a3:62:76:4a:39:8e:6e:19:ff:f0:c0:
8a:7e:1c:cd:87:e5:00:6c:f1:ce:27:26:ff:b8:e9:eb:f7:2f:
bd:c2:4b:9c:d6:57:de:74:74:b3:4f:03:98:9a:b5:08:2d:16:
ca:7f:b6:c8:76:62:86:1b:7c:f2:3e:6c:78:cc:2c:95:9a:bb:
77:25:e8:80:ff:9b:e8:f8:9a:85:3b:85:b7:17:4e:77:a1:cf:
4d:b9:d0:25:e8:5d:8c:e6:7c:f1:d9:52:30:3d:ec:2b:37:91:
bc:e2:e8:39:31:6f:3d:e9:98:70:80:7c:41:dd:19:13:05:21:
94:7b:16:cf:d8:ee:4e:38:34:5e:6a:ff:cd:85:ac:8f:94:9a:
dd:4e:77:05:13:a6:b4:80:52:b2:97:64:76:88:f4:dd:42:0a:
50:1c:80:fd:4b:6e:a9:62:10:aa:ef:2e:c1:2f:be:0e:c2:2e:
b5:28:5f:83
-----BEGIN CERTIFICATE-----
MIIDkTCCAnmgAwIBAgIJAJNsnimNN3tmMA0GCSqGSIb3DQEBCwUAMF8xCzAJBgNV
BAYTAlhYMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQg
Q29tcGFueSBMdGQxGzAZBgNVBAMMElByb21ldGhldXMgVGVzdCBDQTAeFw0xNTA4
MDQxNDA5MjFaFw0yNTA4MDExNDA5MjFaMF8xCzAJBgNVBAYTAlhYMRUwEwYDVQQH
Q29tcGFueSBMdGQxGzAZBgNVBAMMElByb21ldGhldXMgVGVzdCBDQTAeFw0yNDA4
MjAxMTUxMjNaFw00NDEyMDUxMTUxMjNaMF8xCzAJBgNVBAYTAlhYMRUwEwYDVQQH
DAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQgQ29tcGFueSBMdGQxGzAZ
BgNVBAMMElByb21ldGhldXMgVGVzdCBDQTCCASIwDQYJKoZIhvcNAQEBBQADggEP
ADCCAQoCggEBAOlSBU3yWpUELbhzizznR0hnAL7dbEHzfEtEc6N3PoSvMNcqrUVq
@ -12,11 +70,11 @@ yB9M1ypWomzBz1UFXZp1oiNO5o7/dgXW4MgLUfC2obJ9j5xqpc6GkhWMW4ZFwEr/
VLjuzxG9B8tLfQuhnXKGn1W8+WzZVWCWMD/sLfZfmjKaWlwcXzL51g8E+IEIBJqV
w51aMI6lDkcvAM7gLq1auLZMVXyKWSKw7XMCAwEAAaNQME4wHQYDVR0OBBYEFMz1
BZnlqxJp2HiJSjHK8IsLrWYbMB8GA1UdIwQYMBaAFMz1BZnlqxJp2HiJSjHK8IsL
rWYbMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAI2iA3w3TK5J15Pu
e4fPFB4jxQqsbUwuyXbCCv/jKLeFNCD4BjM181WZEYjPMumeTBVzU3aF45LWQIG1
0DJcrCL4mjMz9qgAoGqA7aDDXiJGbukMgYYsn7vrnVmrZH8T3E8ySlltr7+W578k
pJ5FxnbCroQwn0zLyVB3sFbS8E3vpBr3L8oy8PwPHhIScexcNVc3V6/m4vTZsXTH
U+vUm1XhDgpDcFMTg2QQiJbfpOYUkwIgnRDAT7t282t2KQWtnlqc3zwPQ1F/6Cpx
j19JeNsaF1DArkD7YlyKj/GhZLtHwFHG5cxznH0mLDJTW7bQvqqh2iQTeXmBk1lU
mM5lH/s=
rWYbMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAEqhsLzIh098lmLl
CSmuOi5o0NLFaO3qgzaxhvO56RkrtnMQb99/u/F2gQPBoVrubES4fBDRWtfBkmRZ
NabgqghBN27nyLa9DEtHeOzEtBWjYnZKOY5uGf/wwIp+HM2H5QBs8c4nJv+46ev3
L73CS5zWV950dLNPA5iatQgtFsp/tsh2YoYbfPI+bHjMLJWau3cl6ID/m+j4moU7
hbcXTnehz0250CXoXYzmfPHZUjA97Cs3kbzi6Dkxbz3pmHCAfEHdGRMFIZR7Fs/Y
7k44NF5q/82FrI+Umt1OdwUTprSAUrKXZHaI9N1CClAcgP1LbqliEKrvLsEvvg7C
LrUoX4M=
-----END CERTIFICATE-----

View file

@ -69,16 +69,16 @@ func TestQueuePushPopSingleGoroutine(t *testing.T) {
const maxSize = 500
const maxIters = 50
for max := 1; max < maxSize; max++ {
queue := newWriteJobQueue(max, 1+(r.Int()%max))
for maxCount := 1; maxCount < maxSize; maxCount++ {
queue := newWriteJobQueue(maxCount, 1+(r.Int()%maxCount))
elements := 0 // total elements in the queue
lastWriteID := 0
lastReadID := 0
for iter := 0; iter < maxIters; iter++ {
if elements < max {
toWrite := r.Int() % (max - elements)
if elements < maxCount {
toWrite := r.Int() % (maxCount - elements)
if toWrite == 0 {
toWrite = 1
}

View file

@ -699,7 +699,7 @@ func (db *DBReadOnly) LastBlockID() (string, error) {
return "", err
}
max := uint64(0)
maxT := uint64(0)
lastBlockID := ""
@ -711,8 +711,8 @@ func (db *DBReadOnly) LastBlockID() (string, error) {
continue // Not a block dir.
}
timestamp := ulidObj.Time()
if timestamp > max {
max = timestamp
if timestamp > maxT {
maxT = timestamp
lastBlockID = dirName
}
}
@ -2333,13 +2333,13 @@ func blockDirs(dir string) ([]string, error) {
return dirs, nil
}
func exponential(d, min, max time.Duration) time.Duration {
func exponential(d, minD, maxD time.Duration) time.Duration {
d *= 2
if d < min {
d = min
if d < minD {
d = minD
}
if d > max {
d = max
if d > maxD {
d = maxD
}
return d
}

View file

@ -2676,8 +2676,9 @@ func TestDBReadOnly_Querier_NoAlteration(t *testing.T) {
require.NoError(t, db.Close())
// Simulate a corrupted chunk: without a header.
_, err := os.Create(path.Join(mmappedChunksDir(db.dir), "000001"))
chunk, err := os.Create(path.Join(mmappedChunksDir(db.dir), "000001"))
require.NoError(t, err)
require.NoError(t, chunk.Close())
spinUpQuerierAndCheck(db.dir, t.TempDir(), 1)
@ -4887,8 +4888,8 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample
addSample := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, _, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
@ -4924,8 +4925,8 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample
var series1Samples, series2Samples []chunks.Sample
for _, r := range [][2]int64{{90, 119}, {120, 239}, {240, highest}} {
fromMins, toMins := r[0], r[1]
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
}
@ -5003,8 +5004,8 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample
verifySamples := func(block *Block, fromMins, toMins int64) {
series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
}
@ -5090,8 +5091,8 @@ func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScen
addSamples := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, _, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
@ -5145,8 +5146,8 @@ func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScen
verifySamples := func(block *Block, fromMins, toMins int64) {
series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
}
@ -5203,8 +5204,8 @@ func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScen
addSamples := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, _, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
@ -5258,8 +5259,8 @@ func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScen
verifySamples := func(block *Block, fromMins, toMins int64) {
series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
}
@ -5314,8 +5315,8 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa
addSamples := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, _, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
@ -5362,8 +5363,8 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa
verifySamples := func(fromMins, toMins int64) {
series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
series2Samples = append(series2Samples, scenario.sampleFunc(ts, ts*2))
}
@ -5474,13 +5475,13 @@ func testQuerierOOOQuery(t *testing.T,
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc, counterReset bool) ([]chunks.Sample, int) {
app := db.Appender(context.Background())
totalAppended := 0
for min := fromMins; min <= toMins; min += time.Minute.Milliseconds() {
if !filter(min) {
for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() {
if !filter(m / time.Minute.Milliseconds()) {
continue
}
_, err := appendFunc(app, min, counterReset)
if min >= queryMinT && min <= queryMaxT {
expSamples = append(expSamples, sampleFunc(min))
_, err := appendFunc(app, m, counterReset)
if m >= queryMinT && m <= queryMaxT {
expSamples = append(expSamples, sampleFunc(m))
}
require.NoError(t, err)
totalAppended++
@ -5564,6 +5565,25 @@ func testQuerierOOOQuery(t *testing.T,
},
},
},
{
name: "query overlapping inorder and ooo samples returns all ingested samples",
queryMinT: minutes(0),
queryMaxT: minutes(200),
batches: []sampleBatch{
{
minT: minutes(100),
maxT: minutes(200),
filter: func(t int64) bool { return t%2 == 0 },
isOOO: false,
},
{
minT: minutes(180 - opts.OutOfOrderCapMax/2), // Make sure to fit into the OOO head.
maxT: minutes(180),
filter: func(t int64) bool { return t%2 == 1 },
isOOO: true,
},
},
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
@ -5580,6 +5600,7 @@ func testQuerierOOOQuery(t *testing.T,
for _, batch := range tc.batches {
expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter, batch.counterReset)
require.Positive(t, appendedCount) // Sanity check that filter is not too zealous.
if batch.isOOO {
oooSamples += appendedCount
}
@ -5676,13 +5697,13 @@ func testChunkQuerierOOOQuery(t *testing.T,
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc, counterReset bool) ([]chunks.Sample, int) {
app := db.Appender(context.Background())
totalAppended := 0
for min := fromMins; min <= toMins; min += time.Minute.Milliseconds() {
if !filter(min) {
for m := fromMins; m <= toMins; m += time.Minute.Milliseconds() {
if !filter(m) {
continue
}
_, err := appendFunc(app, min, counterReset)
if min >= queryMinT && min <= queryMaxT {
expSamples = append(expSamples, sampleFunc(min))
_, err := appendFunc(app, m, counterReset)
if m >= queryMinT && m <= queryMaxT {
expSamples = append(expSamples, sampleFunc(m))
}
require.NoError(t, err)
totalAppended++
@ -6017,9 +6038,9 @@ func testOOOAppendAndQuery(t *testing.T, scenario sampleTypeScenario) {
app := db.Appender(context.Background())
key := lbls.String()
from, to := minutes(fromMins), minutes(toMins)
for min := from; min <= to; min += time.Minute.Milliseconds() {
for m := from; m <= to; m += time.Minute.Milliseconds() {
val := rand.Intn(1000)
_, s, err := scenario.appendFunc(app, lbls, min, int64(val))
_, s, err := scenario.appendFunc(app, lbls, m, int64(val))
if faceError {
require.Error(t, err)
} else {
@ -6150,14 +6171,14 @@ func testOOODisabled(t *testing.T, scenario sampleTypeScenario) {
app := db.Appender(context.Background())
key := lbls.String()
from, to := minutes(fromMins), minutes(toMins)
for min := from; min <= to; min += time.Minute.Milliseconds() {
_, _, err := scenario.appendFunc(app, lbls, min, min)
for m := from; m <= to; m += time.Minute.Milliseconds() {
_, _, err := scenario.appendFunc(app, lbls, m, m)
if faceError {
require.Error(t, err)
failedSamples++
} else {
require.NoError(t, err)
expSamples[key] = append(expSamples[key], scenario.sampleFunc(min, min))
expSamples[key] = append(expSamples[key], scenario.sampleFunc(m, m))
totalSamples++
}
}
@ -6226,9 +6247,9 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) {
app := db.Appender(context.Background())
key := lbls.String()
from, to := minutes(fromMins), minutes(toMins)
for min := from; min <= to; min += time.Minute.Milliseconds() {
for m := from; m <= to; m += time.Minute.Milliseconds() {
val := rand.Intn(1000)
_, s, err := scenario.appendFunc(app, lbls, min, int64(val))
_, s, err := scenario.appendFunc(app, lbls, m, int64(val))
require.NoError(t, err)
expSamples[key] = append(expSamples[key], s)
totalSamples++
@ -6791,8 +6812,8 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) {
addSample := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, _, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
}
@ -6879,8 +6900,8 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) {
verifySamples := func(block *Block, fromMins, toMins int64) {
series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
}
expRes := map[string][]chunks.Sample{
@ -6928,8 +6949,8 @@ func TestWBLCorruption(t *testing.T) {
var allSamples, expAfterRestart []chunks.Sample
addSamples := func(fromMins, toMins int64, afterRestart bool) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
require.NoError(t, err)
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
@ -7084,8 +7105,8 @@ func testOOOMmapCorruption(t *testing.T, scenario sampleTypeScenario) {
var allSamples, expInMmapChunks []chunks.Sample
addSamples := func(fromMins, toMins int64, inMmapAfterCorruption bool) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, s, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
allSamples = append(allSamples, s)
@ -7231,8 +7252,8 @@ func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) {
series1 := labels.FromStrings("foo", "bar1")
addSamples := func(t *testing.T, db *DB, fromMins, toMins int64, success bool, allSamples []chunks.Sample) []chunks.Sample {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, s, err := scenario.appendFunc(app, series1, ts, ts)
if success {
require.NoError(t, err)
@ -7265,7 +7286,7 @@ func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) {
// WBL is not empty.
size, err := db.head.wbl.Size()
require.NoError(t, err)
require.Greater(t, size, int64(0))
require.Positive(t, size)
require.Empty(t, db.Blocks())
require.NoError(t, db.compactOOOHead(ctx))
@ -7442,8 +7463,8 @@ func testNoGapAfterRestartWithOOO(t *testing.T, scenario sampleTypeScenario) {
series1 := labels.FromStrings("foo", "bar1")
addSamples := func(t *testing.T, db *DB, fromMins, toMins int64, success bool) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, _, err := scenario.appendFunc(app, series1, ts, ts)
if success {
require.NoError(t, err)
@ -7456,8 +7477,8 @@ func testNoGapAfterRestartWithOOO(t *testing.T, scenario sampleTypeScenario) {
verifySamples := func(t *testing.T, db *DB, fromMins, toMins int64) {
var expSamples []chunks.Sample
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
expSamples = append(expSamples, scenario.sampleFunc(ts, ts))
}
@ -7574,8 +7595,8 @@ func testWblReplayAfterOOODisableAndRestart(t *testing.T, scenario sampleTypeSce
var allSamples []chunks.Sample
addSamples := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, s, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
allSamples = append(allSamples, s)
@ -7643,8 +7664,8 @@ func testPanicOnApplyConfig(t *testing.T, scenario sampleTypeScenario) {
var allSamples []chunks.Sample
addSamples := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, s, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
allSamples = append(allSamples, s)
@ -7702,8 +7723,8 @@ func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenari
var allSamples []chunks.Sample
addSamples := func(fromMins, toMins int64) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
for m := fromMins; m <= toMins; m++ {
ts := m * time.Minute.Milliseconds()
_, s, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
allSamples = append(allSamples, s)

View file

@ -1634,12 +1634,12 @@ func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o
// It assumes that the time range is 1/ratioToFull full.
// Assuming that the samples will keep arriving at the same rate, it will make the
// remaining n chunks within this chunk range (before max) equally sized.
func computeChunkEndTime(start, cur, max int64, ratioToFull float64) int64 {
n := float64(max-start) / (float64(cur-start+1) * ratioToFull)
func computeChunkEndTime(start, cur, maxT int64, ratioToFull float64) int64 {
n := float64(maxT-start) / (float64(cur-start+1) * ratioToFull)
if n <= 1 {
return max
return maxT
}
return int64(float64(start) + float64(max-start)/math.Floor(n))
return int64(float64(start) + float64(maxT-start)/math.Floor(n))
}
func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk {

View file

@ -481,31 +481,12 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
return elem, true, offset == 0, nil
}
// mergedChunks return an iterable over one or more OOO chunks for the given
// chunks.Meta reference from memory or by m-mapping it from the disk. The
// returned iterable will be a merge of all the overlapping chunks, if any,
// amongst all the chunks in the OOOHead.
// mergedChunks return an iterable over all chunks that overlap the
// time window [mint,maxt], plus meta.Chunk if populated.
// If hr is non-nil then in-order chunks are included.
// This function is not thread safe unless the caller holds a lock.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) {
_, cid, _ := unpackHeadChunkRef(meta.Ref)
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
ix := int(cid) - int(s.ooo.firstOOOChunkID)
if ix < 0 || ix > len(s.ooo.oooMmappedChunks) {
return nil, storage.ErrNotFound
}
if ix == len(s.ooo.oooMmappedChunks) {
if s.ooo.oooHeadChunk == nil {
return nil, errors.New("invalid ooo head chunk")
}
}
// We create a temporary slice of chunk metas to hold the information of all
// possible chunks that may overlap with the requested chunk.
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1)

View file

@ -2759,7 +2759,7 @@ func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario) {
require.Equal(t, int64(math.MinInt64), db.head.minValidTime.Load())
require.NoError(t, db.Compact(ctx))
require.Greater(t, db.head.minValidTime.Load(), int64(0))
require.Positive(t, db.head.minValidTime.Load())
app = db.Appender(ctx)
_, err = appendSample(app, db.head.minValidTime.Load()-2)
@ -3679,7 +3679,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
require.Len(t, ms.mmappedChunks, 25)
expMmapChunks := make([]*mmappedChunk, 0, 20)
for _, mmap := range ms.mmappedChunks {
require.Greater(t, mmap.numSamples, uint16(0))
require.Positive(t, mmap.numSamples)
cpy := *mmap
expMmapChunks = append(expMmapChunks, &cpy)
}

View file

@ -20,10 +20,10 @@ import (
func TestPostingsStats(t *testing.T) {
stats := &maxHeap{}
max := 3000000
heapLength := 10
const maxCount = 3000000
const heapLength = 10
stats.init(heapLength)
for i := 0; i < max; i++ {
for i := 0; i < maxCount; i++ {
item := Stat{
Name: "Label-da",
Count: uint64(i),
@ -35,13 +35,13 @@ func TestPostingsStats(t *testing.T) {
data := stats.get()
require.Len(t, data, 10)
for i := 0; i < heapLength; i++ {
require.Equal(t, uint64(max-i), data[i].Count)
require.Equal(t, uint64(maxCount-i), data[i].Count)
}
}
func TestPostingsStats2(t *testing.T) {
stats := &maxHeap{}
heapLength := 10
const heapLength = 10
stats.init(heapLength)
stats.push(Stat{Name: "Stuff", Count: 10})
@ -57,12 +57,12 @@ func TestPostingsStats2(t *testing.T) {
func BenchmarkPostingStatsMaxHep(b *testing.B) {
stats := &maxHeap{}
max := 9000000
heapLength := 10
const maxCount = 9000000
const heapLength = 10
b.ResetTimer()
for n := 0; n < b.N; n++ {
stats.init(heapLength)
for i := 0; i < max; i++ {
for i := 0; i < maxCount; i++ {
item := Stat{
Name: "Label-da",
Count: uint64(i),

View file

@ -292,7 +292,7 @@ func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader,
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
if !isOOO {
if !isOOO && meta.Chunk == nil { // meta.Chunk can have a copy of OOO head samples, even on non-OOO chunk ID.
return cr.cr.ChunkOrIterable(meta)
}
@ -303,6 +303,10 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu
}
s.Lock()
if s.ooo == nil { // Must have s.ooo non-nil to call mergedChunks().
s.Unlock()
return cr.cr.ChunkOrIterable(meta)
}
mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
s.Unlock()

View file

@ -572,6 +572,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
w.writer.AppendHistograms(histogramsToSend)
histogramsToSend = histogramsToSend[:0]
}
case record.FloatHistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms {
@ -610,11 +611,13 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
return err
}
w.writer.StoreMetadata(meta)
case record.Tombstones:
default:
case record.Unknown:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc()
default:
// We're not interested in other types of records.
}
}
if err := r.Err(); err != nil {
@ -643,14 +646,12 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error
}
w.writer.UpdateSeriesSegment(series, segmentNum)
// Ignore these; we're only interested in series.
case record.Samples:
case record.Exemplars:
case record.Tombstones:
default:
case record.Unknown:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc()
default:
// We're only interested in series.
}
}
if err := r.Err(); err != nil {

View file

@ -612,16 +612,16 @@ func (w *WL) setSegment(segment *Segment) error {
// flushPage writes the new contents of the page to disk. If no more records will fit into
// the page, the remaining bytes will be set to zero and a new page will be started.
// If clear is true, this is enforced regardless of how many bytes are left in the page.
func (w *WL) flushPage(clear bool) error {
// If forceClear is true, this is enforced regardless of how many bytes are left in the page.
func (w *WL) flushPage(forceClear bool) error {
w.metrics.pageFlushes.Inc()
p := w.page
clear = clear || p.full()
shouldClear := forceClear || p.full()
// No more data will fit into the page or an implicit clear.
// Enqueue and clear it.
if clear {
if shouldClear {
p.alloc = pageSize // Write till end of page.
}
@ -633,7 +633,7 @@ func (w *WL) flushPage(clear bool) error {
p.flushed += n
// We flushed an entire page, prepare a new one.
if clear {
if shouldClear {
p.reset()
w.donePages++
w.metrics.pageCompletions.Inc()

View file

@ -155,7 +155,7 @@ func DirHash(t *testing.T, path string) []byte {
modTime, err := info.ModTime().GobEncode()
require.NoError(t, err)
_, err = io.WriteString(hash, string(modTime))
_, err = hash.Write(modTime)
require.NoError(t, err)
return nil
})

View file

@ -481,14 +481,14 @@ func New(logger log.Logger, o *Options) *Handler {
router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, o.AppName+" is Healthy.\n")
fmt.Fprintf(w, "%s is Healthy.\n", o.AppName)
})
router.Head("/-/healthy", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
})
router.Get("/-/ready", readyf(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, o.AppName+" is Ready.\n")
fmt.Fprintf(w, "%s is Ready.\n", o.AppName)
}))
router.Head("/-/ready", readyf(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)