mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Merge branch 'main' into sum-and-avg-over-mixed-custom-exponential-histograms
Signed-off-by: Charles Korn <charleskorn@users.noreply.github.com>
This commit is contained in:
commit
f992f81bd0
8
.github/workflows/ci.yml
vendored
8
.github/workflows/ci.yml
vendored
|
@ -75,7 +75,7 @@ jobs:
|
|||
runs-on: windows-latest
|
||||
steps:
|
||||
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
|
||||
- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
|
||||
- uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2
|
||||
with:
|
||||
go-version: 1.22.x
|
||||
- run: |
|
||||
|
@ -162,7 +162,7 @@ jobs:
|
|||
- name: Checkout repository
|
||||
uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
|
||||
uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2
|
||||
with:
|
||||
cache: false
|
||||
go-version: 1.22.x
|
||||
|
@ -175,14 +175,14 @@ jobs:
|
|||
- name: Checkout repository
|
||||
uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
|
||||
uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2
|
||||
with:
|
||||
go-version: 1.22.x
|
||||
- name: Install snmp_exporter/generator dependencies
|
||||
run: sudo apt-get update && sudo apt-get -y install libsnmp-dev
|
||||
if: github.repository == 'prometheus/snmp_exporter'
|
||||
- name: Lint
|
||||
uses: golangci/golangci-lint-action@a4f60bb28d35aeee14e6880718e0c85ff1882e64 # v6.0.1
|
||||
uses: golangci/golangci-lint-action@aaa42aa0628b4ae2578232a66b541047968fac86 # v6.1.0
|
||||
with:
|
||||
args: --verbose
|
||||
# Make sure to sync this with Makefile.common and scripts/golangci-lint.yml.
|
||||
|
|
2
.github/workflows/scorecards.yml
vendored
2
.github/workflows/scorecards.yml
vendored
|
@ -26,7 +26,7 @@ jobs:
|
|||
persist-credentials: false
|
||||
|
||||
- name: "Run analysis"
|
||||
uses: ossf/scorecard-action@dc50aa9510b46c811795eb24b2f1ba02a914e534 # tag=v2.3.3
|
||||
uses: ossf/scorecard-action@62b2cac7ed8198b15735ed49ab1e5cf35480ba46 # tag=v2.4.0
|
||||
with:
|
||||
results_file: results.sarif
|
||||
results_format: sarif
|
||||
|
|
|
@ -234,6 +234,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
|
|||
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
|
||||
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
|
||||
level.Info(logger).Log("msg", "Experimental created timestamp zero ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
|
||||
case "delayed-compaction":
|
||||
c.tsdb.EnableDelayedCompaction = true
|
||||
level.Info(logger).Log("msg", "Experimental delayed compaction is enabled.")
|
||||
case "":
|
||||
continue
|
||||
case "promql-at-modifier", "promql-negative-offset":
|
||||
|
@ -381,6 +384,9 @@ func main() {
|
|||
serverOnlyFlag(a, "storage.tsdb.allow-overlapping-blocks", "[DEPRECATED] This flag has no effect. Overlapping blocks are enabled by default now.").
|
||||
Default("true").Hidden().BoolVar(&b)
|
||||
|
||||
serverOnlyFlag(a, "storage.tsdb.allow-overlapping-compaction", "Allow compaction of overlapping blocks. If set to false, TSDB stops vertical compaction and leaves overlapping blocks there. The use case is to let another component handle the compaction of overlapping blocks.").
|
||||
Default("true").Hidden().BoolVar(&cfg.tsdb.EnableOverlappingCompaction)
|
||||
|
||||
serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL.").
|
||||
Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression)
|
||||
|
||||
|
@ -475,7 +481,7 @@ func main() {
|
|||
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
|
||||
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
|
||||
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
Default("").StringsVar(&cfg.featureList)
|
||||
|
||||
promlogflag.AddFlags(a, &cfg.promlogConfig)
|
||||
|
@ -1715,6 +1721,8 @@ type tsdbOptions struct {
|
|||
MaxExemplars int64
|
||||
EnableMemorySnapshotOnShutdown bool
|
||||
EnableNativeHistograms bool
|
||||
EnableDelayedCompaction bool
|
||||
EnableOverlappingCompaction bool
|
||||
}
|
||||
|
||||
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
||||
|
@ -1735,7 +1743,8 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
|||
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
|
||||
EnableNativeHistograms: opts.EnableNativeHistograms,
|
||||
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
|
||||
EnableOverlappingCompaction: true,
|
||||
EnableDelayedCompaction: opts.EnableDelayedCompaction,
|
||||
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ The Prometheus monitoring server
|
|||
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
|
||||
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
|
||||
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
|
||||
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
|
||||
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
|
||||
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
|
||||
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |
|
||||
|
||||
|
|
|
@ -200,8 +200,9 @@ won't work when you push OTLP metrics.
|
|||
|
||||
`--enable-feature=promql-experimental-functions`
|
||||
|
||||
Enables PromQL functions that are considered experimental and whose name or
|
||||
semantics could change.
|
||||
Enables PromQL functions that are considered experimental. These functions
|
||||
might change their name, syntax, or semantics. They might also get removed
|
||||
entirely.
|
||||
|
||||
## Created Timestamps Zero Injection
|
||||
|
||||
|
@ -234,3 +235,17 @@ metadata changes as WAL records on a per-series basis.
|
|||
|
||||
This must be used if
|
||||
you are also using remote write 2.0 as it will only gather metadata from the WAL.
|
||||
|
||||
## Delay compaction start time
|
||||
|
||||
`--enable-feature=delayed-compaction`
|
||||
|
||||
A random offset, up to `10%` of the chunk range, is added to the Head compaction start time. This assists Prometheus instances in avoiding simultaneous compactions and reduces the load on shared resources.
|
||||
|
||||
Only auto Head compactions and the operations directly resulting from them are subject to this delay.
|
||||
|
||||
In the event of multiple consecutive Head compactions being possible, only the first compaction experiences this delay.
|
||||
|
||||
Note that during this delay, the Head continues its usual operations, which include serving and appending series.
|
||||
|
||||
Despite the delay in compaction, the blocks produced are time-aligned in the same manner as they would be if the delay was not in place.
|
||||
|
|
|
@ -94,16 +94,46 @@ type OpenMetricsParser struct {
|
|||
exemplarVal float64
|
||||
exemplarTs int64
|
||||
hasExemplarTs bool
|
||||
|
||||
skipCTSeries bool
|
||||
}
|
||||
|
||||
// NewOpenMetricsParser returns a new parser of the byte slice.
|
||||
func NewOpenMetricsParser(b []byte, st *labels.SymbolTable) Parser {
|
||||
return &OpenMetricsParser{
|
||||
l: &openMetricsLexer{b: b},
|
||||
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
|
||||
type openMetricsParserOptions struct {
|
||||
SkipCTSeries bool
|
||||
}
|
||||
|
||||
type OpenMetricsOption func(*openMetricsParserOptions)
|
||||
|
||||
// WithOMParserCTSeriesSkipped turns off exposing _created lines
|
||||
// as series, which makes those only used for parsing created timestamp
|
||||
// for `CreatedTimestamp` method purposes.
|
||||
//
|
||||
// It's recommended to use this option to avoid using _created lines for other
|
||||
// purposes than created timestamp, but leave false by default for the
|
||||
// best-effort compatibility.
|
||||
func WithOMParserCTSeriesSkipped() OpenMetricsOption {
|
||||
return func(o *openMetricsParserOptions) {
|
||||
o.SkipCTSeries = true
|
||||
}
|
||||
}
|
||||
|
||||
// NewOpenMetricsParser returns a new parser for the byte slice with option to skip CT series parsing.
|
||||
func NewOpenMetricsParser(b []byte, st *labels.SymbolTable, opts ...OpenMetricsOption) Parser {
|
||||
options := &openMetricsParserOptions{}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(options)
|
||||
}
|
||||
|
||||
parser := &OpenMetricsParser{
|
||||
l: &openMetricsLexer{b: b},
|
||||
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
|
||||
skipCTSeries: options.SkipCTSeries,
|
||||
}
|
||||
|
||||
return parser
|
||||
}
|
||||
|
||||
// Series returns the bytes of the series, the timestamp if set, and the value
|
||||
// of the current sample.
|
||||
func (p *OpenMetricsParser) Series() ([]byte, *int64, float64) {
|
||||
|
@ -219,10 +249,90 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// CreatedTimestamp returns nil as it's not implemented yet.
|
||||
// TODO(bwplotka): https://github.com/prometheus/prometheus/issues/12980
|
||||
// CreatedTimestamp returns the created timestamp for a current Metric if exists or nil.
|
||||
// NOTE(Maniktherana): Might use additional CPU/mem resources due to deep copy of parser required for peeking given 1.0 OM specification on _created series.
|
||||
func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
|
||||
return nil
|
||||
if !TypeRequiresCT(p.mtype) {
|
||||
// Not a CT supported metric type, fast path.
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
currLset labels.Labels
|
||||
buf []byte
|
||||
peekWithoutNameLsetHash uint64
|
||||
)
|
||||
p.Metric(&currLset)
|
||||
currFamilyLsetHash, buf := currLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile")
|
||||
// Search for the _created line for the currFamilyLsetHash using ephemeral parser until
|
||||
// we see EOF or new metric family. We have to do it as we don't know where (and if)
|
||||
// that CT line is.
|
||||
// TODO(bwplotka): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this.
|
||||
peek := deepCopy(p)
|
||||
for {
|
||||
eType, err := peek.Next()
|
||||
if err != nil {
|
||||
// This means peek will give error too later on, so def no CT line found.
|
||||
// This might result in partial scrape with wrong/missing CT, but only
|
||||
// spec improvement would help.
|
||||
// TODO(bwplotka): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this.
|
||||
return nil
|
||||
}
|
||||
if eType != EntrySeries {
|
||||
// Assume we hit different family, no CT line found.
|
||||
return nil
|
||||
}
|
||||
|
||||
var peekedLset labels.Labels
|
||||
peek.Metric(&peekedLset)
|
||||
peekedName := peekedLset.Get(model.MetricNameLabel)
|
||||
if !strings.HasSuffix(peekedName, "_created") {
|
||||
// Not a CT line, search more.
|
||||
continue
|
||||
}
|
||||
|
||||
// We got a CT line here, but let's search if CT line is actually for our series, edge case.
|
||||
peekWithoutNameLsetHash, _ = peekedLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile")
|
||||
if peekWithoutNameLsetHash != currFamilyLsetHash {
|
||||
// CT line for a different series, for our series no CT.
|
||||
return nil
|
||||
}
|
||||
ct := int64(peek.val)
|
||||
return &ct
|
||||
}
|
||||
}
|
||||
|
||||
// TypeRequiresCT returns true if the metric type requires a _created timestamp.
|
||||
func TypeRequiresCT(t model.MetricType) bool {
|
||||
switch t {
|
||||
case model.MetricTypeCounter, model.MetricTypeSummary, model.MetricTypeHistogram:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// deepCopy creates a copy of a parser without re-using the slices' original memory addresses.
|
||||
func deepCopy(p *OpenMetricsParser) OpenMetricsParser {
|
||||
newB := make([]byte, len(p.l.b))
|
||||
copy(newB, p.l.b)
|
||||
|
||||
newLexer := &openMetricsLexer{
|
||||
b: newB,
|
||||
i: p.l.i,
|
||||
start: p.l.start,
|
||||
err: p.l.err,
|
||||
state: p.l.state,
|
||||
}
|
||||
|
||||
newParser := OpenMetricsParser{
|
||||
l: newLexer,
|
||||
builder: p.builder,
|
||||
mtype: p.mtype,
|
||||
val: p.val,
|
||||
skipCTSeries: false,
|
||||
}
|
||||
return newParser
|
||||
}
|
||||
|
||||
// nextToken returns the next token from the openMetricsLexer.
|
||||
|
@ -337,7 +447,13 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
|
|||
}
|
||||
|
||||
p.series = p.l.b[p.start:p.l.i]
|
||||
return p.parseMetricSuffix(p.nextToken())
|
||||
if err := p.parseSeriesEndOfLine(p.nextToken()); err != nil {
|
||||
return EntryInvalid, err
|
||||
}
|
||||
if p.skipCTSeries && p.isCreatedSeries() {
|
||||
return p.Next()
|
||||
}
|
||||
return EntrySeries, nil
|
||||
case tMName:
|
||||
p.offsets = append(p.offsets, p.start, p.l.i)
|
||||
p.series = p.l.b[p.start:p.l.i]
|
||||
|
@ -351,8 +467,14 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
|
|||
p.series = p.l.b[p.start:p.l.i]
|
||||
t2 = p.nextToken()
|
||||
}
|
||||
return p.parseMetricSuffix(t2)
|
||||
|
||||
if err := p.parseSeriesEndOfLine(t2); err != nil {
|
||||
return EntryInvalid, err
|
||||
}
|
||||
if p.skipCTSeries && p.isCreatedSeries() {
|
||||
return p.Next()
|
||||
}
|
||||
return EntrySeries, nil
|
||||
default:
|
||||
err = p.parseError("expected a valid start token", t)
|
||||
}
|
||||
|
@ -467,51 +589,64 @@ func (p *OpenMetricsParser) parseLVals(offsets []int, isExemplar bool) ([]int, e
|
|||
}
|
||||
}
|
||||
|
||||
// parseMetricSuffix parses the end of the line after the metric name and
|
||||
// labels. It starts parsing with the provided token.
|
||||
func (p *OpenMetricsParser) parseMetricSuffix(t token) (Entry, error) {
|
||||
// isCreatedSeries returns true if the current series is a _created series.
|
||||
func (p *OpenMetricsParser) isCreatedSeries() bool {
|
||||
var newLbs labels.Labels
|
||||
p.Metric(&newLbs)
|
||||
name := newLbs.Get(model.MetricNameLabel)
|
||||
if TypeRequiresCT(p.mtype) && strings.HasSuffix(name, "_created") {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// parseSeriesEndOfLine parses the series end of the line (value, optional
|
||||
// timestamp, commentary, etc.) after the metric name and labels.
|
||||
// It starts parsing with the provided token.
|
||||
func (p *OpenMetricsParser) parseSeriesEndOfLine(t token) error {
|
||||
if p.offsets[0] == -1 {
|
||||
return EntryInvalid, fmt.Errorf("metric name not set while parsing: %q", p.l.b[p.start:p.l.i])
|
||||
return fmt.Errorf("metric name not set while parsing: %q", p.l.b[p.start:p.l.i])
|
||||
}
|
||||
|
||||
var err error
|
||||
p.val, err = p.getFloatValue(t, "metric")
|
||||
if err != nil {
|
||||
return EntryInvalid, err
|
||||
return err
|
||||
}
|
||||
|
||||
p.hasTS = false
|
||||
switch t2 := p.nextToken(); t2 {
|
||||
case tEOF:
|
||||
return EntryInvalid, errors.New("data does not end with # EOF")
|
||||
return errors.New("data does not end with # EOF")
|
||||
case tLinebreak:
|
||||
break
|
||||
case tComment:
|
||||
if err := p.parseComment(); err != nil {
|
||||
return EntryInvalid, err
|
||||
return err
|
||||
}
|
||||
case tTimestamp:
|
||||
p.hasTS = true
|
||||
var ts float64
|
||||
// A float is enough to hold what we need for millisecond resolution.
|
||||
if ts, err = parseFloat(yoloString(p.l.buf()[1:])); err != nil {
|
||||
return EntryInvalid, fmt.Errorf("%w while parsing: %q", err, p.l.b[p.start:p.l.i])
|
||||
return fmt.Errorf("%w while parsing: %q", err, p.l.b[p.start:p.l.i])
|
||||
}
|
||||
if math.IsNaN(ts) || math.IsInf(ts, 0) {
|
||||
return EntryInvalid, fmt.Errorf("invalid timestamp %f", ts)
|
||||
return fmt.Errorf("invalid timestamp %f", ts)
|
||||
}
|
||||
p.ts = int64(ts * 1000)
|
||||
switch t3 := p.nextToken(); t3 {
|
||||
case tLinebreak:
|
||||
case tComment:
|
||||
if err := p.parseComment(); err != nil {
|
||||
return EntryInvalid, err
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return EntryInvalid, p.parseError("expected next entry after timestamp", t3)
|
||||
return p.parseError("expected next entry after timestamp", t3)
|
||||
}
|
||||
}
|
||||
return EntrySeries, nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *OpenMetricsParser) getFloatValue(t token, after string) (float64, error) {
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package textparse
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
|
@ -24,6 +25,8 @@ import (
|
|||
"github.com/prometheus/prometheus/model/labels"
|
||||
)
|
||||
|
||||
func int64p(x int64) *int64 { return &x }
|
||||
|
||||
func TestOpenMetricsParse(t *testing.T) {
|
||||
input := `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
|
||||
# TYPE go_gc_duration_seconds summary
|
||||
|
@ -63,15 +66,34 @@ ss{A="a"} 0
|
|||
_metric_starting_with_underscore 1
|
||||
testmetric{_label_starting_with_underscore="foo"} 1
|
||||
testmetric{label="\"bar\""} 1
|
||||
# HELP foo Counter with and without labels to certify CT is parsed for both cases
|
||||
# TYPE foo counter
|
||||
foo_total 17.0 1520879607.789 # {id="counter-test"} 5`
|
||||
foo_total 17.0 1520879607.789 # {id="counter-test"} 5
|
||||
foo_created 1000
|
||||
foo_total{a="b"} 17.0 1520879607.789 # {id="counter-test"} 5
|
||||
foo_created{a="b"} 1000
|
||||
# HELP bar Summary with CT at the end, making sure we find CT even if it's multiple lines a far
|
||||
# TYPE bar summary
|
||||
bar_count 17.0
|
||||
bar_sum 324789.3
|
||||
bar{quantile="0.95"} 123.7
|
||||
bar{quantile="0.99"} 150.0
|
||||
bar_created 1520430000
|
||||
# HELP baz Histogram with the same objective as above's summary
|
||||
# TYPE baz histogram
|
||||
baz_bucket{le="0.0"} 0
|
||||
baz_bucket{le="+Inf"} 17
|
||||
baz_count 17
|
||||
baz_sum 324789.3
|
||||
baz_created 1520430000
|
||||
# HELP fizz_created Gauge which shouldn't be parsed as CT
|
||||
# TYPE fizz_created gauge
|
||||
fizz_created 17.0`
|
||||
|
||||
input += "\n# HELP metric foo\x00bar"
|
||||
input += "\nnull_byte_metric{a=\"abc\x00\"} 1"
|
||||
input += "\n# EOF\n"
|
||||
|
||||
int64p := func(x int64) *int64 { return &x }
|
||||
|
||||
exp := []expectedParse{
|
||||
{
|
||||
m: "go_gc_duration_seconds",
|
||||
|
@ -216,6 +238,9 @@ foo_total 17.0 1520879607.789 # {id="counter-test"} 5`
|
|||
m: "testmetric{label=\"\\\"bar\\\"\"}",
|
||||
v: 1,
|
||||
lset: labels.FromStrings("__name__", "testmetric", "label", `"bar"`),
|
||||
}, {
|
||||
m: "foo",
|
||||
help: "Counter with and without labels to certify CT is parsed for both cases",
|
||||
}, {
|
||||
m: "foo",
|
||||
typ: model.MetricTypeCounter,
|
||||
|
@ -225,6 +250,76 @@ foo_total 17.0 1520879607.789 # {id="counter-test"} 5`
|
|||
lset: labels.FromStrings("__name__", "foo_total"),
|
||||
t: int64p(1520879607789),
|
||||
e: &exemplar.Exemplar{Labels: labels.FromStrings("id", "counter-test"), Value: 5},
|
||||
ct: int64p(1000),
|
||||
}, {
|
||||
m: `foo_total{a="b"}`,
|
||||
v: 17.0,
|
||||
lset: labels.FromStrings("__name__", "foo_total", "a", "b"),
|
||||
t: int64p(1520879607789),
|
||||
e: &exemplar.Exemplar{Labels: labels.FromStrings("id", "counter-test"), Value: 5},
|
||||
ct: int64p(1000),
|
||||
}, {
|
||||
m: "bar",
|
||||
help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far",
|
||||
}, {
|
||||
m: "bar",
|
||||
typ: model.MetricTypeSummary,
|
||||
}, {
|
||||
m: "bar_count",
|
||||
v: 17.0,
|
||||
lset: labels.FromStrings("__name__", "bar_count"),
|
||||
ct: int64p(1520430000),
|
||||
}, {
|
||||
m: "bar_sum",
|
||||
v: 324789.3,
|
||||
lset: labels.FromStrings("__name__", "bar_sum"),
|
||||
ct: int64p(1520430000),
|
||||
}, {
|
||||
m: `bar{quantile="0.95"}`,
|
||||
v: 123.7,
|
||||
lset: labels.FromStrings("__name__", "bar", "quantile", "0.95"),
|
||||
ct: int64p(1520430000),
|
||||
}, {
|
||||
m: `bar{quantile="0.99"}`,
|
||||
v: 150.0,
|
||||
lset: labels.FromStrings("__name__", "bar", "quantile", "0.99"),
|
||||
ct: int64p(1520430000),
|
||||
}, {
|
||||
m: "baz",
|
||||
help: "Histogram with the same objective as above's summary",
|
||||
}, {
|
||||
m: "baz",
|
||||
typ: model.MetricTypeHistogram,
|
||||
}, {
|
||||
m: `baz_bucket{le="0.0"}`,
|
||||
v: 0,
|
||||
lset: labels.FromStrings("__name__", "baz_bucket", "le", "0.0"),
|
||||
ct: int64p(1520430000),
|
||||
}, {
|
||||
m: `baz_bucket{le="+Inf"}`,
|
||||
v: 17,
|
||||
lset: labels.FromStrings("__name__", "baz_bucket", "le", "+Inf"),
|
||||
ct: int64p(1520430000),
|
||||
}, {
|
||||
m: `baz_count`,
|
||||
v: 17,
|
||||
lset: labels.FromStrings("__name__", "baz_count"),
|
||||
ct: int64p(1520430000),
|
||||
}, {
|
||||
m: `baz_sum`,
|
||||
v: 324789.3,
|
||||
lset: labels.FromStrings("__name__", "baz_sum"),
|
||||
ct: int64p(1520430000),
|
||||
}, {
|
||||
m: "fizz_created",
|
||||
help: "Gauge which shouldn't be parsed as CT",
|
||||
}, {
|
||||
m: "fizz_created",
|
||||
typ: model.MetricTypeGauge,
|
||||
}, {
|
||||
m: `fizz_created`,
|
||||
v: 17,
|
||||
lset: labels.FromStrings("__name__", "fizz_created"),
|
||||
}, {
|
||||
m: "metric",
|
||||
help: "foo\x00bar",
|
||||
|
@ -235,8 +330,8 @@ foo_total 17.0 1520879607.789 # {id="counter-test"} 5`
|
|||
},
|
||||
}
|
||||
|
||||
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable())
|
||||
checkParseResults(t, p, exp)
|
||||
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
|
||||
checkParseResultsWithCT(t, p, exp, true)
|
||||
}
|
||||
|
||||
func TestUTF8OpenMetricsParse(t *testing.T) {
|
||||
|
@ -251,6 +346,7 @@ func TestUTF8OpenMetricsParse(t *testing.T) {
|
|||
# UNIT "go.gc_duration_seconds" seconds
|
||||
{"go.gc_duration_seconds",quantile="0"} 4.9351e-05
|
||||
{"go.gc_duration_seconds",quantile="0.25"} 7.424100000000001e-05
|
||||
{"go.gc_duration_seconds_created"} 12313
|
||||
{"go.gc_duration_seconds",quantile="0.5",a="b"} 8.3835e-05
|
||||
{"http.status",q="0.9",a="b"} 8.3835e-05
|
||||
{"http.status",q="0.9",a="b"} 8.3835e-05
|
||||
|
@ -274,10 +370,12 @@ func TestUTF8OpenMetricsParse(t *testing.T) {
|
|||
m: `{"go.gc_duration_seconds",quantile="0"}`,
|
||||
v: 4.9351e-05,
|
||||
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0"),
|
||||
ct: int64p(12313),
|
||||
}, {
|
||||
m: `{"go.gc_duration_seconds",quantile="0.25"}`,
|
||||
v: 7.424100000000001e-05,
|
||||
lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0.25"),
|
||||
ct: int64p(12313),
|
||||
}, {
|
||||
m: `{"go.gc_duration_seconds",quantile="0.5",a="b"}`,
|
||||
v: 8.3835e-05,
|
||||
|
@ -306,8 +404,8 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"),
|
|||
},
|
||||
}
|
||||
|
||||
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable())
|
||||
checkParseResults(t, p, exp)
|
||||
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
|
||||
checkParseResultsWithCT(t, p, exp, true)
|
||||
}
|
||||
|
||||
func TestOpenMetricsParseErrors(t *testing.T) {
|
||||
|
@ -598,10 +696,6 @@ func TestOpenMetricsParseErrors(t *testing.T) {
|
|||
input: "# TYPE hhh histogram\nhhh_bucket{le=\"+Inf\"} 1 # {aa=\"bb\"} 4 -Inf",
|
||||
err: `invalid exemplar timestamp -Inf`,
|
||||
},
|
||||
{
|
||||
input: "# TYPE hhh histogram\nhhh_bucket{le=\"+Inf\"} 1 # {aa=\"bb\"} 4 Inf",
|
||||
err: `invalid exemplar timestamp +Inf`,
|
||||
},
|
||||
}
|
||||
|
||||
for i, c := range cases {
|
||||
|
@ -684,3 +778,217 @@ func TestOMNullByteHandling(t *testing.T) {
|
|||
require.Equal(t, c.err, err.Error(), "test %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
// While not desirable, there are cases were CT fails to parse and
|
||||
// these tests show them.
|
||||
// TODO(maniktherana): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this.
|
||||
func TestCTParseFailures(t *testing.T) {
|
||||
input := `# HELP something Histogram with _created between buckets and summary
|
||||
# TYPE something histogram
|
||||
something_count 17
|
||||
something_sum 324789.3
|
||||
something_created 1520430001
|
||||
something_bucket{le="0.0"} 0
|
||||
something_bucket{le="+Inf"} 17
|
||||
# HELP thing Histogram with _created as first line
|
||||
# TYPE thing histogram
|
||||
thing_created 1520430002
|
||||
thing_count 17
|
||||
thing_sum 324789.3
|
||||
thing_bucket{le="0.0"} 0
|
||||
thing_bucket{le="+Inf"} 17
|
||||
# HELP yum Summary with _created between sum and quantiles
|
||||
# TYPE yum summary
|
||||
yum_count 17.0
|
||||
yum_sum 324789.3
|
||||
yum_created 1520430003
|
||||
yum{quantile="0.95"} 123.7
|
||||
yum{quantile="0.99"} 150.0
|
||||
# HELP foobar Summary with _created as the first line
|
||||
# TYPE foobar summary
|
||||
foobar_created 1520430004
|
||||
foobar_count 17.0
|
||||
foobar_sum 324789.3
|
||||
foobar{quantile="0.95"} 123.7
|
||||
foobar{quantile="0.99"} 150.0`
|
||||
|
||||
input += "\n# EOF\n"
|
||||
|
||||
int64p := func(x int64) *int64 { return &x }
|
||||
|
||||
type expectCT struct {
|
||||
m string
|
||||
ct *int64
|
||||
typ model.MetricType
|
||||
help string
|
||||
isErr bool
|
||||
}
|
||||
|
||||
exp := []expectCT{
|
||||
{
|
||||
m: "something",
|
||||
help: "Histogram with _created between buckets and summary",
|
||||
isErr: false,
|
||||
}, {
|
||||
m: "something",
|
||||
typ: model.MetricTypeHistogram,
|
||||
isErr: false,
|
||||
}, {
|
||||
m: `something_count`,
|
||||
ct: int64p(1520430001),
|
||||
isErr: false,
|
||||
}, {
|
||||
m: `something_sum`,
|
||||
ct: int64p(1520430001),
|
||||
isErr: false,
|
||||
}, {
|
||||
m: `something_bucket{le="0.0"}`,
|
||||
ct: int64p(1520430001),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: `something_bucket{le="+Inf"}`,
|
||||
ct: int64p(1520430001),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: "thing",
|
||||
help: "Histogram with _created as first line",
|
||||
isErr: false,
|
||||
}, {
|
||||
m: "thing",
|
||||
typ: model.MetricTypeHistogram,
|
||||
isErr: false,
|
||||
}, {
|
||||
m: `thing_count`,
|
||||
ct: int64p(1520430002),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: `thing_sum`,
|
||||
ct: int64p(1520430002),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: `thing_bucket{le="0.0"}`,
|
||||
ct: int64p(1520430002),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: `thing_bucket{le="+Inf"}`,
|
||||
ct: int64p(1520430002),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: "yum",
|
||||
help: "Summary with _created between summary and quantiles",
|
||||
isErr: false,
|
||||
}, {
|
||||
m: "yum",
|
||||
typ: model.MetricTypeSummary,
|
||||
isErr: false,
|
||||
}, {
|
||||
m: "yum_count",
|
||||
ct: int64p(1520430003),
|
||||
isErr: false,
|
||||
}, {
|
||||
m: "yum_sum",
|
||||
ct: int64p(1520430003),
|
||||
isErr: false,
|
||||
}, {
|
||||
m: `yum{quantile="0.95"}`,
|
||||
ct: int64p(1520430003),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: `yum{quantile="0.99"}`,
|
||||
ct: int64p(1520430003),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: "foobar",
|
||||
help: "Summary with _created as the first line",
|
||||
isErr: false,
|
||||
}, {
|
||||
m: "foobar",
|
||||
typ: model.MetricTypeSummary,
|
||||
isErr: false,
|
||||
}, {
|
||||
m: "foobar_count",
|
||||
ct: int64p(1520430004),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: "foobar_sum",
|
||||
ct: int64p(1520430004),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: `foobar{quantile="0.95"}`,
|
||||
ct: int64p(1520430004),
|
||||
isErr: true,
|
||||
}, {
|
||||
m: `foobar{quantile="0.99"}`,
|
||||
ct: int64p(1520430004),
|
||||
isErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
|
||||
i := 0
|
||||
|
||||
var res labels.Labels
|
||||
for {
|
||||
et, err := p.Next()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
switch et {
|
||||
case EntrySeries:
|
||||
p.Metric(&res)
|
||||
|
||||
if ct := p.CreatedTimestamp(); exp[i].isErr {
|
||||
require.Nil(t, ct)
|
||||
} else {
|
||||
require.Equal(t, *exp[i].ct, *ct)
|
||||
}
|
||||
default:
|
||||
i++
|
||||
continue
|
||||
}
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeepCopy(t *testing.T) {
|
||||
input := []byte(`# HELP go_goroutines A gauge goroutines.
|
||||
# TYPE go_goroutines gauge
|
||||
go_goroutines 33 123.123
|
||||
# TYPE go_gc_duration_seconds summary
|
||||
go_gc_duration_seconds
|
||||
go_gc_duration_seconds_created`)
|
||||
|
||||
st := labels.NewSymbolTable()
|
||||
parser := NewOpenMetricsParser(input, st, WithOMParserCTSeriesSkipped()).(*OpenMetricsParser)
|
||||
|
||||
// Modify the original parser state
|
||||
_, err := parser.Next()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "go_goroutines", string(parser.l.b[parser.offsets[0]:parser.offsets[1]]))
|
||||
require.True(t, parser.skipCTSeries)
|
||||
|
||||
// Create a deep copy of the parser
|
||||
copyParser := deepCopy(parser)
|
||||
etype, err := copyParser.Next()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, EntryType, etype)
|
||||
require.True(t, parser.skipCTSeries)
|
||||
require.False(t, copyParser.skipCTSeries)
|
||||
|
||||
// Modify the original parser further
|
||||
parser.Next()
|
||||
parser.Next()
|
||||
parser.Next()
|
||||
require.Equal(t, "go_gc_duration_seconds", string(parser.l.b[parser.offsets[0]:parser.offsets[1]]))
|
||||
require.Equal(t, "summary", string(parser.mtype))
|
||||
require.False(t, copyParser.skipCTSeries)
|
||||
require.True(t, parser.skipCTSeries)
|
||||
|
||||
// Ensure the copy remains unchanged
|
||||
copyParser.Next()
|
||||
copyParser.Next()
|
||||
require.Equal(t, "go_gc_duration_seconds", string(copyParser.l.b[copyParser.offsets[0]:copyParser.offsets[1]]))
|
||||
require.False(t, copyParser.skipCTSeries)
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/klauspost/compress/gzip"
|
||||
|
@ -41,6 +42,7 @@ type expectedParse struct {
|
|||
unit string
|
||||
comment string
|
||||
e *exemplar.Exemplar
|
||||
ct *int64
|
||||
}
|
||||
|
||||
func TestPromParse(t *testing.T) {
|
||||
|
@ -188,6 +190,10 @@ testmetric{label="\"bar\""} 1`
|
|||
}
|
||||
|
||||
func checkParseResults(t *testing.T, p Parser, exp []expectedParse) {
|
||||
checkParseResultsWithCT(t, p, exp, false)
|
||||
}
|
||||
|
||||
func checkParseResultsWithCT(t *testing.T, p Parser, exp []expectedParse, ctLinesRemoved bool) {
|
||||
i := 0
|
||||
|
||||
var res labels.Labels
|
||||
|
@ -205,6 +211,14 @@ func checkParseResults(t *testing.T, p Parser, exp []expectedParse) {
|
|||
|
||||
p.Metric(&res)
|
||||
|
||||
if ctLinesRemoved {
|
||||
// Are CT series skipped?
|
||||
_, typ := p.Type()
|
||||
if TypeRequiresCT(typ) && strings.HasSuffix(res.Get(labels.MetricName), "_created") {
|
||||
t.Fatalf("we exped created lines skipped")
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, exp[i].m, string(m))
|
||||
require.Equal(t, exp[i].t, ts)
|
||||
require.Equal(t, exp[i].v, v)
|
||||
|
@ -218,6 +232,11 @@ func checkParseResults(t *testing.T, p Parser, exp []expectedParse) {
|
|||
require.True(t, found)
|
||||
testutil.RequireEqual(t, *exp[i].e, e)
|
||||
}
|
||||
if ct := p.CreatedTimestamp(); ct != nil {
|
||||
require.Equal(t, *exp[i].ct, *ct)
|
||||
} else {
|
||||
require.Nil(t, exp[i].ct)
|
||||
}
|
||||
|
||||
case EntryType:
|
||||
m, typ := p.Type()
|
||||
|
@ -475,8 +494,10 @@ const (
|
|||
|
||||
func BenchmarkParse(b *testing.B) {
|
||||
for parserName, parser := range map[string]func([]byte, *labels.SymbolTable) Parser{
|
||||
"prometheus": NewPromParser,
|
||||
"openmetrics": NewOpenMetricsParser,
|
||||
"prometheus": NewPromParser,
|
||||
"openmetrics": func(b []byte, st *labels.SymbolTable) Parser {
|
||||
return NewOpenMetricsParser(b, st)
|
||||
},
|
||||
} {
|
||||
for _, fn := range []string{"promtestdata.txt", "promtestdata.nometa.txt"} {
|
||||
f, err := os.Open(fn)
|
||||
|
|
|
@ -2356,6 +2356,11 @@ loop:
|
|||
} else {
|
||||
histograms = append(histograms, HPoint{H: &histogram.FloatHistogram{}})
|
||||
}
|
||||
if histograms[n].H == nil {
|
||||
// Make sure to pass non zero H to AtFloatHistogram so that it does a deep-copy.
|
||||
// Not an issue in the loop above since that uses an intermediate buffer.
|
||||
histograms[n].H = &histogram.FloatHistogram{}
|
||||
}
|
||||
histograms[n].T, histograms[n].H = it.AtFloatHistogram(histograms[n].H)
|
||||
if value.IsStaleNaN(histograms[n].H.Sum) {
|
||||
histograms = histograms[:n]
|
||||
|
|
|
@ -3797,3 +3797,62 @@ func makeInt64Pointer(val int64) *int64 {
|
|||
*valp = val
|
||||
return valp
|
||||
}
|
||||
|
||||
func TestHistogramCopyFromIteratorRegression(t *testing.T) {
|
||||
// Loading the following histograms creates two chunks because there's a
|
||||
// counter reset. Not only the counter is lower in the last histogram
|
||||
// but also there's missing buckets.
|
||||
// This in turns means that chunk iterators will have different spans.
|
||||
load := `load 1m
|
||||
histogram {{sum:4 count:4 buckets:[2 2]}} {{sum:6 count:6 buckets:[3 3]}} {{sum:1 count:1 buckets:[1]}}
|
||||
`
|
||||
storage := promqltest.LoadedStorage(t, load)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
engine := promqltest.NewTestEngine(false, 0, promqltest.DefaultMaxSamplesPerQuery)
|
||||
|
||||
verify := func(t *testing.T, qry promql.Query, expected []histogram.FloatHistogram) {
|
||||
res := qry.Exec(context.Background())
|
||||
require.NoError(t, res.Err)
|
||||
|
||||
m, ok := res.Value.(promql.Matrix)
|
||||
require.True(t, ok)
|
||||
|
||||
require.Len(t, m, 1)
|
||||
series := m[0]
|
||||
|
||||
require.Empty(t, series.Floats)
|
||||
require.Len(t, series.Histograms, len(expected))
|
||||
for i, e := range expected {
|
||||
series.Histograms[i].H.CounterResetHint = histogram.UnknownCounterReset // Don't care.
|
||||
require.Equal(t, &e, series.Histograms[i].H)
|
||||
}
|
||||
}
|
||||
|
||||
qry, err := engine.NewRangeQuery(context.Background(), storage, nil, "increase(histogram[60s])", time.Unix(0, 0), time.Unix(0, 0).Add(1*time.Minute), time.Minute)
|
||||
require.NoError(t, err)
|
||||
verify(t, qry, []histogram.FloatHistogram{
|
||||
{
|
||||
Count: 2,
|
||||
Sum: 2, // Increase from 4 to 6 is 2.
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}}, // Two buckets changed between the first and second histogram.
|
||||
PositiveBuckets: []float64{1, 1}, // Increase from 2 to 3 is 1 in both buckets.
|
||||
},
|
||||
})
|
||||
|
||||
qry, err = engine.NewInstantQuery(context.Background(), storage, nil, "histogram[60s]", time.Unix(0, 0).Add(2*time.Minute))
|
||||
require.NoError(t, err)
|
||||
verify(t, qry, []histogram.FloatHistogram{
|
||||
{
|
||||
Count: 6,
|
||||
Sum: 6,
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}},
|
||||
PositiveBuckets: []float64{3, 3},
|
||||
},
|
||||
{
|
||||
Count: 1,
|
||||
Sum: 1,
|
||||
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
||||
PositiveBuckets: []float64{1},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
|
|
@ -179,15 +179,21 @@ func extrapolatedRate(vals []parser.Value, args parser.Expressions, enh *EvalNod
|
|||
// Otherwise, it returns the calculated histogram and an empty annotation.
|
||||
func histogramRate(points []HPoint, isCounter bool, metricName string, pos posrange.PositionRange) (*histogram.FloatHistogram, annotations.Annotations) {
|
||||
prev := points[0].H
|
||||
usingCustomBuckets := prev.UsesCustomBuckets()
|
||||
last := points[len(points)-1].H
|
||||
if last == nil {
|
||||
return nil, annotations.New().Add(annotations.NewMixedFloatsHistogramsWarning(metricName, pos))
|
||||
}
|
||||
|
||||
minSchema := prev.Schema
|
||||
if last.Schema < minSchema {
|
||||
minSchema = last.Schema
|
||||
}
|
||||
|
||||
if last.UsesCustomBuckets() != usingCustomBuckets {
|
||||
return nil, annotations.New().Add(annotations.NewMixedExponentialCustomHistogramsWarning(metricName, pos))
|
||||
}
|
||||
|
||||
var annos annotations.Annotations
|
||||
|
||||
// We check for gauge type histograms in the loop below, but the loop below does not run on the first and last point,
|
||||
|
@ -215,6 +221,9 @@ func histogramRate(points []HPoint, isCounter bool, metricName string, pos posra
|
|||
if curr.Schema < minSchema {
|
||||
minSchema = curr.Schema
|
||||
}
|
||||
if curr.UsesCustomBuckets() != usingCustomBuckets {
|
||||
return nil, annotations.New().Add(annotations.NewMixedExponentialCustomHistogramsWarning(metricName, pos))
|
||||
}
|
||||
}
|
||||
|
||||
h := last.CopyToSchema(minSchema)
|
||||
|
|
|
@ -68,6 +68,10 @@ func fuzzParseMetricWithContentType(in []byte, contentType string) int {
|
|||
panic(warning)
|
||||
}
|
||||
|
||||
if contentType == "application/openmetrics-text" {
|
||||
p = textparse.NewOpenMetricsParser(in, symbolTable)
|
||||
}
|
||||
|
||||
var err error
|
||||
for {
|
||||
_, err = p.Next()
|
||||
|
|
|
@ -1003,13 +1003,6 @@ func (t *test) execRangeEval(cmd *evalCmd, engine promql.QueryEngine) error {
|
|||
return fmt.Errorf("error creating range query for %q (line %d): %w", cmd.expr, cmd.line, err)
|
||||
}
|
||||
res := q.Exec(t.context)
|
||||
countWarnings, _ := res.Warnings.CountWarningsAndInfo()
|
||||
if !cmd.warn && countWarnings > 0 {
|
||||
return fmt.Errorf("unexpected warnings evaluating query %q (line %d): %v", cmd.expr, cmd.line, res.Warnings)
|
||||
}
|
||||
if cmd.warn && countWarnings == 0 {
|
||||
return fmt.Errorf("expected warnings evaluating query %q (line %d) but got none", cmd.expr, cmd.line)
|
||||
}
|
||||
if res.Err != nil {
|
||||
if cmd.fail {
|
||||
return cmd.checkExpectedFailure(res.Err)
|
||||
|
@ -1020,6 +1013,13 @@ func (t *test) execRangeEval(cmd *evalCmd, engine promql.QueryEngine) error {
|
|||
if res.Err == nil && cmd.fail {
|
||||
return fmt.Errorf("expected error evaluating query %q (line %d) but got none", cmd.expr, cmd.line)
|
||||
}
|
||||
countWarnings, _ := res.Warnings.CountWarningsAndInfo()
|
||||
if !cmd.warn && countWarnings > 0 {
|
||||
return fmt.Errorf("unexpected warnings evaluating query %q (line %d): %v", cmd.expr, cmd.line, res.Warnings)
|
||||
}
|
||||
if cmd.warn && countWarnings == 0 {
|
||||
return fmt.Errorf("expected warnings evaluating query %q (line %d) but got none", cmd.expr, cmd.line)
|
||||
}
|
||||
defer q.Close()
|
||||
|
||||
if err := cmd.compareResult(res.Value); err != nil {
|
||||
|
@ -1050,13 +1050,6 @@ func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine promq
|
|||
}
|
||||
defer q.Close()
|
||||
res := q.Exec(t.context)
|
||||
countWarnings, _ := res.Warnings.CountWarningsAndInfo()
|
||||
if !cmd.warn && countWarnings > 0 {
|
||||
return fmt.Errorf("unexpected warnings evaluating query %q (line %d): %v", iq.expr, cmd.line, res.Warnings)
|
||||
}
|
||||
if cmd.warn && countWarnings == 0 {
|
||||
return fmt.Errorf("expected warnings evaluating query %q (line %d) but got none", iq.expr, cmd.line)
|
||||
}
|
||||
if res.Err != nil {
|
||||
if cmd.fail {
|
||||
if err := cmd.checkExpectedFailure(res.Err); err != nil {
|
||||
|
@ -1070,6 +1063,13 @@ func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine promq
|
|||
if res.Err == nil && cmd.fail {
|
||||
return fmt.Errorf("expected error evaluating query %q (line %d) but got none", iq.expr, cmd.line)
|
||||
}
|
||||
countWarnings, _ := res.Warnings.CountWarningsAndInfo()
|
||||
if !cmd.warn && countWarnings > 0 {
|
||||
return fmt.Errorf("unexpected warnings evaluating query %q (line %d): %v", iq.expr, cmd.line, res.Warnings)
|
||||
}
|
||||
if cmd.warn && countWarnings == 0 {
|
||||
return fmt.Errorf("expected warnings evaluating query %q (line %d) but got none", iq.expr, cmd.line)
|
||||
}
|
||||
err = cmd.compareResult(res.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in %s %s (line %d): %w", cmd, iq.expr, cmd.line, err)
|
||||
|
|
|
@ -763,6 +763,30 @@ eval_warn instant at 30s rate(some_metric[30s])
|
|||
eval_warn instant at 1m rate(some_metric[1m])
|
||||
{} {{count:0.03333333333333333 sum:0.03333333333333333 buckets:[0.03333333333333333]}}
|
||||
|
||||
clear
|
||||
|
||||
# Test rate() over mixed exponential and custom buckets.
|
||||
load 30s
|
||||
some_metric {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}}
|
||||
|
||||
# Start and end with exponential, with custom in the middle.
|
||||
eval_warn instant at 1m rate(some_metric[1m])
|
||||
# Should produce no results.
|
||||
|
||||
# Start and end with custom, with exponential in the middle.
|
||||
eval_warn instant at 1m30s rate(some_metric[1m])
|
||||
# Should produce no results.
|
||||
|
||||
# Start with custom, end with exponential.
|
||||
eval_warn instant at 1m rate(some_metric[30s])
|
||||
# Should produce no results.
|
||||
|
||||
# Start with exponential, end with custom.
|
||||
eval_warn instant at 30s rate(some_metric[30s])
|
||||
# Should produce no results.
|
||||
|
||||
clear
|
||||
|
||||
# Test mixing exponential and custom buckets.
|
||||
load 6m
|
||||
metric{series="exponential"} {{sum:4 count:3 buckets:[1 2 1]}} _ {{sum:4 count:3 buckets:[1 2 1]}}
|
||||
|
|
|
@ -16,9 +16,10 @@ package storage
|
|||
import "fmt"
|
||||
|
||||
type errDuplicateSampleForTimestamp struct {
|
||||
timestamp int64
|
||||
existing float64
|
||||
newValue float64
|
||||
timestamp int64
|
||||
existing float64
|
||||
existingIsHistogram bool
|
||||
newValue float64
|
||||
}
|
||||
|
||||
func NewDuplicateFloatErr(t int64, existing, newValue float64) error {
|
||||
|
@ -29,13 +30,26 @@ func NewDuplicateFloatErr(t int64, existing, newValue float64) error {
|
|||
}
|
||||
}
|
||||
|
||||
// NewDuplicateHistogramToFloatErr describes an error where a new float sample is sent for same timestamp as previous histogram.
|
||||
func NewDuplicateHistogramToFloatErr(t int64, newValue float64) error {
|
||||
return errDuplicateSampleForTimestamp{
|
||||
timestamp: t,
|
||||
existingIsHistogram: true,
|
||||
newValue: newValue,
|
||||
}
|
||||
}
|
||||
|
||||
func (e errDuplicateSampleForTimestamp) Error() string {
|
||||
if e.timestamp == 0 {
|
||||
return "duplicate sample for timestamp"
|
||||
}
|
||||
if e.existingIsHistogram {
|
||||
return fmt.Sprintf("duplicate sample for timestamp %d; overrides not allowed: existing is a histogram, new value %g", e.timestamp, e.newValue)
|
||||
}
|
||||
return fmt.Sprintf("duplicate sample for timestamp %d; overrides not allowed: existing %g, new value %g", e.timestamp, e.existing, e.newValue)
|
||||
}
|
||||
|
||||
// Is implements the anonymous interface checked by errors.Is.
|
||||
// Every errDuplicateSampleForTimestamp compares equal to the global ErrDuplicateSampleForTimestamp.
|
||||
func (e errDuplicateSampleForTimestamp) Is(t error) bool {
|
||||
if t == ErrDuplicateSampleForTimestamp {
|
||||
|
|
38
storage/errors_test.go
Normal file
38
storage/errors_test.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
// Copyright 2014 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestErrDuplicateSampleForTimestamp(t *testing.T) {
|
||||
// All errDuplicateSampleForTimestamp are ErrDuplicateSampleForTimestamp
|
||||
require.ErrorIs(t, ErrDuplicateSampleForTimestamp, errDuplicateSampleForTimestamp{})
|
||||
|
||||
// Same type only is if it has same properties.
|
||||
err := NewDuplicateFloatErr(1_000, 10, 20)
|
||||
sameErr := NewDuplicateFloatErr(1_000, 10, 20)
|
||||
differentErr := NewDuplicateFloatErr(1_001, 30, 40)
|
||||
|
||||
require.ErrorIs(t, err, sameErr)
|
||||
require.NotErrorIs(t, err, differentErr)
|
||||
|
||||
// Also works when err is wrapped.
|
||||
require.ErrorIs(t, fmt.Errorf("failed: %w", err), sameErr)
|
||||
require.NotErrorIs(t, fmt.Errorf("failed: %w", err), differentErr)
|
||||
}
|
|
@ -23,7 +23,6 @@ import (
|
|||
"net"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
text_template "text/template"
|
||||
"time"
|
||||
|
@ -106,25 +105,6 @@ func query(ctx context.Context, q string, ts time.Time, queryFn QueryFunc) (quer
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func convertToFloat(i interface{}) (float64, error) {
|
||||
switch v := i.(type) {
|
||||
case float64:
|
||||
return v, nil
|
||||
case string:
|
||||
return strconv.ParseFloat(v, 64)
|
||||
case int:
|
||||
return float64(v), nil
|
||||
case uint:
|
||||
return float64(v), nil
|
||||
case int64:
|
||||
return float64(v), nil
|
||||
case uint64:
|
||||
return float64(v), nil
|
||||
default:
|
||||
return 0, fmt.Errorf("can't convert %T to float", v)
|
||||
}
|
||||
}
|
||||
|
||||
// Expander executes templates in text or HTML mode with a common set of Prometheus template functions.
|
||||
type Expander struct {
|
||||
text string
|
||||
|
@ -219,7 +199,7 @@ func NewTemplateExpander(
|
|||
return host
|
||||
},
|
||||
"humanize": func(i interface{}) (string, error) {
|
||||
v, err := convertToFloat(i)
|
||||
v, err := common_templates.ConvertToFloat(i)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -248,7 +228,7 @@ func NewTemplateExpander(
|
|||
return fmt.Sprintf("%.4g%s", v, prefix), nil
|
||||
},
|
||||
"humanize1024": func(i interface{}) (string, error) {
|
||||
v, err := convertToFloat(i)
|
||||
v, err := common_templates.ConvertToFloat(i)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -267,30 +247,15 @@ func NewTemplateExpander(
|
|||
},
|
||||
"humanizeDuration": common_templates.HumanizeDuration,
|
||||
"humanizePercentage": func(i interface{}) (string, error) {
|
||||
v, err := convertToFloat(i)
|
||||
v, err := common_templates.ConvertToFloat(i)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf("%.4g%%", v*100), nil
|
||||
},
|
||||
"humanizeTimestamp": func(i interface{}) (string, error) {
|
||||
v, err := convertToFloat(i)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
tm, err := floatToTime(v)
|
||||
switch {
|
||||
case errors.Is(err, errNaNOrInf):
|
||||
return fmt.Sprintf("%.4g", v), nil
|
||||
case err != nil:
|
||||
return "", err
|
||||
}
|
||||
|
||||
return fmt.Sprint(tm), nil
|
||||
},
|
||||
"humanizeTimestamp": common_templates.HumanizeTimestamp,
|
||||
"toTime": func(i interface{}) (*time.Time, error) {
|
||||
v, err := convertToFloat(i)
|
||||
v, err := common_templates.ConvertToFloat(i)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -419,6 +419,7 @@ loop:
|
|||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++ // Mark that we need to insert a bucket in b.
|
||||
bInter.bucketIdx = aIdx
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
|
@ -436,6 +437,7 @@ loop:
|
|||
return nil, nil, false
|
||||
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
bInter.bucketIdx = bIdx
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
|
@ -453,6 +455,7 @@ loop:
|
|||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++
|
||||
bInter.bucketIdx = aIdx
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
|
@ -471,6 +474,7 @@ loop:
|
|||
return nil, nil, false
|
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
bInter.bucketIdx = bIdx
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
|
@ -773,6 +777,23 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
|
|||
happ.appendFloatHistogram(t, h)
|
||||
return newChunk, false, app, nil
|
||||
}
|
||||
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
||||
// The histogram needs to be expanded to have the extra empty buckets
|
||||
// of the chunk.
|
||||
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
|
||||
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
|
||||
// However we need to make a copy in case the input is sharing spans from an iterator.
|
||||
h.PositiveSpans = make([]histogram.Span, len(a.pSpans))
|
||||
copy(h.PositiveSpans, a.pSpans)
|
||||
h.NegativeSpans = make([]histogram.Span, len(a.nSpans))
|
||||
copy(h.NegativeSpans, a.nSpans)
|
||||
} else {
|
||||
// Spans need pre-adjusting to accommodate the new buckets.
|
||||
h.PositiveSpans = adjustForInserts(h.PositiveSpans, pBackwardInserts)
|
||||
h.NegativeSpans = adjustForInserts(h.NegativeSpans, nBackwardInserts)
|
||||
}
|
||||
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
||||
}
|
||||
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
||||
if appendOnly {
|
||||
return nil, false, a, fmt.Errorf("float histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
|
||||
|
@ -784,13 +805,6 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
|
|||
app.(*FloatHistogramAppender).appendFloatHistogram(t, h)
|
||||
return chk, true, app, nil
|
||||
}
|
||||
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
||||
// The histogram needs to be expanded to have the extra empty buckets
|
||||
// of the chunk.
|
||||
h.PositiveSpans = a.pSpans
|
||||
h.NegativeSpans = a.nSpans
|
||||
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
||||
}
|
||||
a.appendFloatHistogram(t, h)
|
||||
return nil, false, a, nil
|
||||
}
|
||||
|
|
|
@ -411,6 +411,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
{Offset: 3, Length: 2},
|
||||
{Offset: 5, Length: 1},
|
||||
}
|
||||
savedH2Spans := h2.PositiveSpans
|
||||
h2.PositiveBuckets = []float64{7, 4, 3, 5, 2}
|
||||
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
|
@ -426,6 +427,43 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
|||
// Check that h2 was recoded.
|
||||
require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2}, h2.PositiveBuckets)
|
||||
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
|
||||
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
|
||||
}
|
||||
|
||||
{ // New histogram that has new buckets AND buckets missing but the buckets missing were empty.
|
||||
emptyBucketH := eh.Copy()
|
||||
emptyBucketH.PositiveBuckets = []float64{6, 0, 3, 2, 4, 0, 1}
|
||||
c, hApp, ts, h1 := setup(emptyBucketH)
|
||||
h2 := h1.Copy()
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: 0, Length: 1},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 3, Length: 2},
|
||||
{Offset: 5, Length: 2},
|
||||
}
|
||||
savedH2Spans := h2.PositiveSpans
|
||||
h2.PositiveBuckets = []float64{7, 4, 3, 5, 2, 3}
|
||||
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.NotEmpty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.NotEmpty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok)
|
||||
require.False(t, cr)
|
||||
|
||||
assertRecodedFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
||||
// Check that h2 was recoded.
|
||||
require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2, 3}, h2.PositiveBuckets)
|
||||
require.Equal(t, []histogram.Span{
|
||||
{Offset: 0, Length: 2}, // Added empty bucket.
|
||||
{Offset: 2, Length: 1}, // Existing - offset adjusted.
|
||||
{Offset: 3, Length: 2}, // Existing.
|
||||
{Offset: 3, Length: 1}, // Added empty bucket.
|
||||
{Offset: 1, Length: 2}, // Existing + the extra bucket.
|
||||
}, h2.PositiveSpans)
|
||||
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
|
||||
}
|
||||
|
||||
{ // New histogram that has a counter reset while buckets are same.
|
||||
|
|
|
@ -437,6 +437,7 @@ loop:
|
|||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++ // Mark that we need to insert a bucket in b.
|
||||
bInter.bucketIdx = aIdx
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
|
@ -454,6 +455,7 @@ loop:
|
|||
return nil, nil, false
|
||||
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
aInter.bucketIdx = bIdx
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
|
@ -471,6 +473,7 @@ loop:
|
|||
// fill in the bucket in b and advance a.
|
||||
if aCount == 0 {
|
||||
bInter.num++
|
||||
bInter.bucketIdx = aIdx
|
||||
// Advance a
|
||||
if aInter.num > 0 {
|
||||
aInserts = append(aInserts, aInter)
|
||||
|
@ -489,6 +492,7 @@ loop:
|
|||
return nil, nil, false
|
||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||
aInter.num++
|
||||
aInter.bucketIdx = bIdx
|
||||
// Advance b
|
||||
if bInter.num > 0 {
|
||||
bInserts = append(bInserts, bInter)
|
||||
|
@ -807,6 +811,23 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
|
|||
happ.appendHistogram(t, h)
|
||||
return newChunk, false, app, nil
|
||||
}
|
||||
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
||||
// The histogram needs to be expanded to have the extra empty buckets
|
||||
// of the chunk.
|
||||
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
|
||||
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
|
||||
// However we need to make a copy in case the input is sharing spans from an iterator.
|
||||
h.PositiveSpans = make([]histogram.Span, len(a.pSpans))
|
||||
copy(h.PositiveSpans, a.pSpans)
|
||||
h.NegativeSpans = make([]histogram.Span, len(a.nSpans))
|
||||
copy(h.NegativeSpans, a.nSpans)
|
||||
} else {
|
||||
// Spans need pre-adjusting to accommodate the new buckets.
|
||||
h.PositiveSpans = adjustForInserts(h.PositiveSpans, pBackwardInserts)
|
||||
h.NegativeSpans = adjustForInserts(h.NegativeSpans, nBackwardInserts)
|
||||
}
|
||||
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
||||
}
|
||||
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
||||
if appendOnly {
|
||||
return nil, false, a, fmt.Errorf("histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
|
||||
|
@ -818,13 +839,6 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
|
|||
app.(*HistogramAppender).appendHistogram(t, h)
|
||||
return chk, true, app, nil
|
||||
}
|
||||
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
||||
// The histogram needs to be expanded to have the extra empty buckets
|
||||
// of the chunk.
|
||||
h.PositiveSpans = a.pSpans
|
||||
h.NegativeSpans = a.nSpans
|
||||
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
||||
}
|
||||
a.appendHistogram(t, h)
|
||||
return nil, false, a, nil
|
||||
}
|
||||
|
|
|
@ -278,6 +278,10 @@ func (b *bucketIterator) Next() (int, bool) {
|
|||
type Insert struct {
|
||||
pos int
|
||||
num int
|
||||
|
||||
// Optional: bucketIdx is the index of the bucket that is inserted.
|
||||
// Can be used to adjust spans.
|
||||
bucketIdx int
|
||||
}
|
||||
|
||||
// Deprecated: expandSpansForward, use expandIntSpansAndBuckets or
|
||||
|
@ -577,3 +581,65 @@ func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterR
|
|||
return histogram.UnknownCounterReset
|
||||
}
|
||||
}
|
||||
|
||||
// adjustForInserts adjusts the spans for the given inserts.
|
||||
func adjustForInserts(spans []histogram.Span, inserts []Insert) (mergedSpans []histogram.Span) {
|
||||
if len(inserts) == 0 {
|
||||
return spans
|
||||
}
|
||||
|
||||
it := newBucketIterator(spans)
|
||||
|
||||
var (
|
||||
lastBucket int
|
||||
i int
|
||||
insertIdx = inserts[i].bucketIdx
|
||||
insertNum = inserts[i].num
|
||||
)
|
||||
|
||||
addBucket := func(b int) {
|
||||
offset := b - lastBucket - 1
|
||||
if offset == 0 && len(mergedSpans) > 0 {
|
||||
mergedSpans[len(mergedSpans)-1].Length++
|
||||
} else {
|
||||
if len(mergedSpans) == 0 {
|
||||
offset++
|
||||
}
|
||||
mergedSpans = append(mergedSpans, histogram.Span{
|
||||
Offset: int32(offset),
|
||||
Length: 1,
|
||||
})
|
||||
}
|
||||
|
||||
lastBucket = b
|
||||
}
|
||||
consumeInsert := func() {
|
||||
// Consume the insert.
|
||||
insertNum--
|
||||
if insertNum == 0 {
|
||||
i++
|
||||
if i < len(inserts) {
|
||||
insertIdx = inserts[i].bucketIdx
|
||||
insertNum = inserts[i].num
|
||||
}
|
||||
} else {
|
||||
insertIdx++
|
||||
}
|
||||
}
|
||||
|
||||
bucket, ok := it.Next()
|
||||
for ok {
|
||||
if i < len(inserts) && insertIdx < bucket {
|
||||
addBucket(insertIdx)
|
||||
consumeInsert()
|
||||
} else {
|
||||
addBucket(bucket)
|
||||
bucket, ok = it.Next()
|
||||
}
|
||||
}
|
||||
for i < len(inserts) {
|
||||
addBucket(inserts[i].bucketIdx)
|
||||
consumeInsert()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -428,6 +428,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
{Offset: 4, Length: 1},
|
||||
{Offset: 1, Length: 1},
|
||||
}
|
||||
savedH2Spans := h2.PositiveSpans
|
||||
h2.PositiveBuckets = []int64{7, -5, 1, 0, 1} // counts: 7, 2, 3, 3, 4 (total 18)
|
||||
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
|
@ -443,6 +444,44 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
|||
// Check that h2 was recoded.
|
||||
require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 4 (total 18)
|
||||
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
|
||||
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
|
||||
}
|
||||
|
||||
{ // New histogram that has new buckets AND buckets missing but the buckets missing were empty.
|
||||
emptyBucketH := eh.Copy()
|
||||
emptyBucketH.PositiveBuckets = []int64{6, -6, 1, 1, -2, 1, 1} // counts: 6, 0, 1, 2, 0, 1, 2 (total 12)
|
||||
c, hApp, ts, h1 := setup(emptyBucketH)
|
||||
h2 := h1.Copy()
|
||||
h2.PositiveSpans = []histogram.Span{ // Missing buckets at offset 1 and 9.
|
||||
{Offset: 0, Length: 1},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 4, Length: 1},
|
||||
{Offset: 1, Length: 2},
|
||||
}
|
||||
savedH2Spans := h2.PositiveSpans
|
||||
h2.PositiveBuckets = []int64{7, -5, 1, 0, 1, 1} // counts: 7, 2, 3, 3, 4, 5 (total 23)
|
||||
|
||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||
require.NotEmpty(t, posInterjections)
|
||||
require.Empty(t, negInterjections)
|
||||
require.NotEmpty(t, backwardPositiveInserts)
|
||||
require.Empty(t, backwardNegativeInserts)
|
||||
require.True(t, ok)
|
||||
require.False(t, cr)
|
||||
|
||||
assertRecodedHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||
|
||||
// Check that h2 was recoded.
|
||||
require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 5 (total 23)
|
||||
require.Equal(t, []histogram.Span{
|
||||
{Offset: 0, Length: 2}, // Added empty bucket.
|
||||
{Offset: 2, Length: 1}, // Existing - offset adjusted.
|
||||
{Offset: 3, Length: 2}, // Added empty bucket.
|
||||
{Offset: 3, Length: 1}, // Existing - offset adjusted.
|
||||
{Offset: 1, Length: 2}, // Existing.
|
||||
}, h2.PositiveSpans)
|
||||
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
|
||||
}
|
||||
|
||||
{ // New histogram that has a counter reset while buckets are same.
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -1925,3 +1926,229 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) {
|
|||
require.Nil(t, ulids)
|
||||
require.NoError(t, block.Close())
|
||||
}
|
||||
|
||||
func TestDelayedCompaction(t *testing.T) {
|
||||
// The delay is chosen in such a way as to not slow down the tests, but also to make
|
||||
// the effective compaction duration negligible compared to it, so that the duration comparisons make sense.
|
||||
delay := 1000 * time.Millisecond
|
||||
|
||||
waitUntilCompactedAndCheck := func(db *DB) {
|
||||
t.Helper()
|
||||
start := time.Now()
|
||||
for db.head.compactable() {
|
||||
// This simulates what happens at the end of commits, for less busy DB, a compaction
|
||||
// is triggered every minute. This is to speed up the test.
|
||||
select {
|
||||
case db.compactc <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
duration := time.Since(start)
|
||||
// Only waited for one offset: offset<=delay<<<2*offset
|
||||
require.Greater(t, duration, db.opts.CompactionDelay)
|
||||
require.Less(t, duration, 2*db.opts.CompactionDelay)
|
||||
}
|
||||
|
||||
compactAndCheck := func(db *DB) {
|
||||
t.Helper()
|
||||
start := time.Now()
|
||||
db.Compact(context.Background())
|
||||
for db.head.compactable() {
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
if runtime.GOOS == "windows" {
|
||||
// TODO: enable on windows once ms resolution timers are better supported.
|
||||
return
|
||||
}
|
||||
duration := time.Since(start)
|
||||
require.Less(t, duration, delay)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
// The delays are chosen in such a way as to not slow down the tests, but also in a way to make the
|
||||
// effective compaction duration negligible compared to them, so that the duration comparisons make sense.
|
||||
compactionDelay time.Duration
|
||||
}{
|
||||
{
|
||||
"delayed compaction not enabled",
|
||||
0,
|
||||
},
|
||||
{
|
||||
"delayed compaction enabled",
|
||||
delay,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var options *Options
|
||||
if c.compactionDelay > 0 {
|
||||
options = &Options{CompactionDelay: c.compactionDelay}
|
||||
}
|
||||
db := openTestDB(t, options, []int64{10})
|
||||
defer func() {
|
||||
require.NoError(t, db.Close())
|
||||
}()
|
||||
|
||||
label := labels.FromStrings("foo", "bar")
|
||||
|
||||
// The first compaction is expected to result in 1 block.
|
||||
db.DisableCompactions()
|
||||
app := db.Appender(context.Background())
|
||||
_, err := app.Append(0, label, 0, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, label, 11, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, label, 21, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
if c.compactionDelay == 0 {
|
||||
// When delay is not enabled, compaction should run on the first trigger.
|
||||
compactAndCheck(db)
|
||||
} else {
|
||||
db.EnableCompactions()
|
||||
waitUntilCompactedAndCheck(db)
|
||||
// The db.compactc signals have been processed multiple times since a compaction is triggered every 1ms by waitUntilCompacted.
|
||||
// This implies that the compaction delay doesn't block or wait on the initial trigger.
|
||||
// 3 is an arbitrary value because it's difficult to determine the precise value.
|
||||
require.GreaterOrEqual(t, prom_testutil.ToFloat64(db.metrics.compactionsTriggered)-prom_testutil.ToFloat64(db.metrics.compactionsSkipped), 3.0)
|
||||
// The delay doesn't change the head blocks alignement.
|
||||
require.Eventually(t, func() bool {
|
||||
return db.head.MinTime() == db.compactor.(*LeveledCompactor).ranges[0]+1
|
||||
}, 500*time.Millisecond, 10*time.Millisecond)
|
||||
// One compaction was run and one block was produced.
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran))
|
||||
}
|
||||
|
||||
// The second compaction is expected to result in 2 blocks.
|
||||
// This ensures that the logic for compaction delay doesn't only work for the first compaction, but also takes into account the future compactions.
|
||||
// This also ensures that no delay happens between consecutive compactions.
|
||||
db.DisableCompactions()
|
||||
app = db.Appender(context.Background())
|
||||
_, err = app.Append(0, label, 31, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, label, 41, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
if c.compactionDelay == 0 {
|
||||
// Compaction should still run on the first trigger.
|
||||
compactAndCheck(db)
|
||||
} else {
|
||||
db.EnableCompactions()
|
||||
waitUntilCompactedAndCheck(db)
|
||||
}
|
||||
|
||||
// Two other compactions were run.
|
||||
require.Eventually(t, func() bool {
|
||||
return prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran) == 3.0
|
||||
}, 500*time.Millisecond, 10*time.Millisecond)
|
||||
|
||||
if c.compactionDelay == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// This test covers a special case. If auto compaction is in a delay period and a manual compaction is triggered,
|
||||
// auto compaction should stop waiting for the delay if the head is no longer compactable.
|
||||
// Of course, if the head is still compactable after the manual compaction, auto compaction will continue waiting for the same delay.
|
||||
getTimeWhenCompactionDelayStarted := func() time.Time {
|
||||
t.Helper()
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
return db.timeWhenCompactionDelayStarted
|
||||
}
|
||||
|
||||
db.DisableCompactions()
|
||||
app = db.Appender(context.Background())
|
||||
_, err = app.Append(0, label, 51, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
require.True(t, db.head.compactable())
|
||||
db.EnableCompactions()
|
||||
// Trigger an auto compaction.
|
||||
db.compactc <- struct{}{}
|
||||
// That made auto compaction start waiting for the delay.
|
||||
require.Eventually(t, func() bool {
|
||||
return !getTimeWhenCompactionDelayStarted().IsZero()
|
||||
}, 100*time.Millisecond, 10*time.Millisecond)
|
||||
// Trigger a manual compaction.
|
||||
require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, 50.0)))
|
||||
require.Equal(t, 4.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran))
|
||||
// Re-trigger an auto compaction.
|
||||
db.compactc <- struct{}{}
|
||||
// That made auto compaction stop waiting for the delay.
|
||||
require.Eventually(t, func() bool {
|
||||
return getTimeWhenCompactionDelayStarted().IsZero()
|
||||
}, 100*time.Millisecond, 10*time.Millisecond)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestDelayedCompactionDoesNotBlockUnrelatedOps makes sure that when delayed compaction is enabled,
|
||||
// operations that don't directly derive from the Head compaction are not delayed, here we consider disk blocks compaction.
|
||||
func TestDelayedCompactionDoesNotBlockUnrelatedOps(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
whenCompactable bool
|
||||
}{
|
||||
{
|
||||
"Head is compactable",
|
||||
true,
|
||||
},
|
||||
{
|
||||
"Head is not compactable",
|
||||
false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
c := c
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tmpdir := t.TempDir()
|
||||
// Some blocks that need compation are present.
|
||||
createBlock(t, tmpdir, genSeries(1, 1, 0, 100))
|
||||
createBlock(t, tmpdir, genSeries(1, 1, 100, 200))
|
||||
createBlock(t, tmpdir, genSeries(1, 1, 200, 300))
|
||||
|
||||
options := DefaultOptions()
|
||||
// This will make the test timeout if compaction really waits for it.
|
||||
options.CompactionDelay = time.Hour
|
||||
db, err := open(tmpdir, log.NewNopLogger(), nil, options, []int64{10, 200}, nil)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, db.Close())
|
||||
}()
|
||||
|
||||
db.DisableCompactions()
|
||||
require.Len(t, db.Blocks(), 3)
|
||||
|
||||
if c.whenCompactable {
|
||||
label := labels.FromStrings("foo", "bar")
|
||||
app := db.Appender(context.Background())
|
||||
_, err := app.Append(0, label, 301, 0)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, label, 317, 0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
// The Head is compactable and will still be at the end.
|
||||
require.True(t, db.head.compactable())
|
||||
defer func() {
|
||||
require.True(t, db.head.compactable())
|
||||
}()
|
||||
}
|
||||
|
||||
// The blocks were compacted.
|
||||
db.Compact(context.Background())
|
||||
require.Len(t, db.Blocks(), 2)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
45
tsdb/db.go
45
tsdb/db.go
|
@ -21,6 +21,7 @@ import (
|
|||
"io"
|
||||
"io/fs"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
|
@ -84,6 +85,8 @@ func DefaultOptions() *Options {
|
|||
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
|
||||
EnableOverlappingCompaction: true,
|
||||
EnableSharding: false,
|
||||
EnableDelayedCompaction: false,
|
||||
CompactionDelay: time.Duration(0),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -184,12 +187,18 @@ type Options struct {
|
|||
// The reason why this flag exists is because there are various users of the TSDB
|
||||
// that do not want vertical compaction happening on ingest time. Instead,
|
||||
// they'd rather keep overlapping blocks and let another component do the overlapping compaction later.
|
||||
// For Prometheus, this will always be true.
|
||||
EnableOverlappingCompaction bool
|
||||
|
||||
// EnableSharding enables query sharding support in TSDB.
|
||||
EnableSharding bool
|
||||
|
||||
// EnableDelayedCompaction, when set to true, assigns a random value to CompactionDelay during DB opening.
|
||||
// When set to false, delayed compaction is disabled, unless CompactionDelay is set directly.
|
||||
EnableDelayedCompaction bool
|
||||
// CompactionDelay delays the start time of auto compactions.
|
||||
// It can be increased by up to one minute if the DB does not commit too often.
|
||||
CompactionDelay time.Duration
|
||||
|
||||
// NewCompactorFunc is a function that returns a TSDB compactor.
|
||||
NewCompactorFunc NewCompactorFunc
|
||||
|
||||
|
@ -246,6 +255,9 @@ type DB struct {
|
|||
// Cancel a running compaction when a shutdown is initiated.
|
||||
compactCancel context.CancelFunc
|
||||
|
||||
// timeWhenCompactionDelayStarted helps delay the compactions start time.
|
||||
timeWhenCompactionDelayStarted time.Time
|
||||
|
||||
// oooWasEnabled is true if out of order support was enabled at least one time
|
||||
// during the time TSDB was up. In which case we need to keep supporting
|
||||
// out-of-order compaction and vertical queries.
|
||||
|
@ -998,6 +1010,10 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
db.oooWasEnabled.Store(true)
|
||||
}
|
||||
|
||||
if opts.EnableDelayedCompaction {
|
||||
opts.CompactionDelay = db.generateCompactionDelay()
|
||||
}
|
||||
|
||||
go db.run(ctx)
|
||||
|
||||
return db, nil
|
||||
|
@ -1186,6 +1202,12 @@ func (a dbAppender) Commit() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// waitingForCompactionDelay returns true if the DB is waiting for the Head compaction delay.
|
||||
// This doesn't guarantee that the Head is really compactable.
|
||||
func (db *DB) waitingForCompactionDelay() bool {
|
||||
return time.Since(db.timeWhenCompactionDelayStarted) < db.opts.CompactionDelay
|
||||
}
|
||||
|
||||
// Compact data if possible. After successful compaction blocks are reloaded
|
||||
// which will also delete the blocks that fall out of the retention window.
|
||||
// Old blocks are only deleted on reloadBlocks based on the new block's parent information.
|
||||
|
@ -1219,7 +1241,21 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) {
|
|||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
if !db.head.compactable() {
|
||||
// Reset the counter once the head compactions are done.
|
||||
// This would also reset it if a manual compaction was triggered while the auto compaction was in its delay period.
|
||||
if !db.timeWhenCompactionDelayStarted.IsZero() {
|
||||
db.timeWhenCompactionDelayStarted = time.Time{}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if db.timeWhenCompactionDelayStarted.IsZero() {
|
||||
// Start counting for the delay.
|
||||
db.timeWhenCompactionDelayStarted = time.Now()
|
||||
}
|
||||
if db.waitingForCompactionDelay() {
|
||||
break
|
||||
}
|
||||
mint := db.head.MinTime()
|
||||
|
@ -1429,7 +1465,7 @@ func (db *DB) compactBlocks() (err error) {
|
|||
// If we have a lot of blocks to compact the whole process might take
|
||||
// long enough that we end up with a HEAD block that needs to be written.
|
||||
// Check if that's the case and stop compactions early.
|
||||
if db.head.compactable() {
|
||||
if db.head.compactable() && !db.waitingForCompactionDelay() {
|
||||
level.Warn(db.logger).Log("msg", "aborting block compactions to persit the head block")
|
||||
return nil
|
||||
}
|
||||
|
@ -1932,6 +1968,11 @@ func (db *DB) EnableCompactions() {
|
|||
level.Info(db.logger).Log("msg", "Compactions enabled")
|
||||
}
|
||||
|
||||
func (db *DB) generateCompactionDelay() time.Duration {
|
||||
// Up to 10% of the head's chunkRange.
|
||||
return time.Duration(rand.Int63n(db.head.chunkRange.Load()/10)) * time.Millisecond
|
||||
}
|
||||
|
||||
// ForceHeadMMap is intended for use only in tests and benchmarks.
|
||||
func (db *DB) ForceHeadMMap() {
|
||||
db.head.mmapHeadChunks()
|
||||
|
|
|
@ -7357,3 +7357,25 @@ func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) {
|
|||
// Make sure only block-1 is queried.
|
||||
require.Equal(t, "block-1", lbls.Get("block"))
|
||||
}
|
||||
|
||||
func TestGenerateCompactionDelay(t *testing.T) {
|
||||
assertDelay := func(delay time.Duration) {
|
||||
t.Helper()
|
||||
require.GreaterOrEqual(t, delay, time.Duration(0))
|
||||
// Less than 10% of the chunkRange.
|
||||
require.LessOrEqual(t, delay, 6000*time.Millisecond)
|
||||
}
|
||||
|
||||
opts := DefaultOptions()
|
||||
opts.EnableDelayedCompaction = true
|
||||
db := openTestDB(t, opts, []int64{60000})
|
||||
defer func() {
|
||||
require.NoError(t, db.Close())
|
||||
}()
|
||||
// The offset is generated and changed while opening.
|
||||
assertDelay(db.opts.CompactionDelay)
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
assertDelay(db.generateCompactionDelay())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -466,6 +466,9 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi
|
|||
// like federation and erroring out at that time would be extremely noisy.
|
||||
// This only checks against the latest in-order sample.
|
||||
// The OOO headchunk has its own method to detect these duplicates.
|
||||
if s.lastHistogramValue != nil || s.lastFloatHistogramValue != nil {
|
||||
return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v)
|
||||
}
|
||||
if math.Float64bits(s.lastValue) != math.Float64bits(v) {
|
||||
return false, 0, storage.NewDuplicateFloatErr(t, s.lastValue, v)
|
||||
}
|
||||
|
@ -1091,7 +1094,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk
|
|||
chunkCreated = true
|
||||
}
|
||||
|
||||
ok := c.chunk.Insert(t, v)
|
||||
ok := c.chunk.Insert(t, v, nil, nil)
|
||||
if ok {
|
||||
if chunkCreated || t < c.minTime {
|
||||
c.minTime = t
|
||||
|
|
|
@ -5919,6 +5919,35 @@ func TestPostingsCardinalityStats(t *testing.T) {
|
|||
require.Equal(t, statsForSomeLabel1, head.PostingsCardinalityStats("n", 1))
|
||||
}
|
||||
|
||||
func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing.T) {
|
||||
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
|
||||
t.Cleanup(func() { head.Close() })
|
||||
|
||||
ls := labels.FromStrings(labels.MetricName, "test")
|
||||
|
||||
{
|
||||
// Append a float 10.0 @ 1_000
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.Append(0, ls, 1_000, 10.0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
{
|
||||
// Append a float histogram @ 2_000
|
||||
app := head.Appender(context.Background())
|
||||
h := tsdbutil.GenerateTestHistogram(1)
|
||||
_, err := app.AppendHistogram(0, ls, 2_000, h, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
app := head.Appender(context.Background())
|
||||
_, err := app.Append(0, ls, 2_000, 10.0)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, storage.NewDuplicateHistogramToFloatErr(2_000, 10.0))
|
||||
}
|
||||
|
||||
func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
|
||||
type appendableSamples struct {
|
||||
ts int64
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
|
||||
"github.com/oklog/ulid"
|
||||
|
@ -39,13 +40,13 @@ func NewOOOChunk() *OOOChunk {
|
|||
|
||||
// Insert inserts the sample such that order is maintained.
|
||||
// Returns false if insert was not possible due to the same timestamp already existing.
|
||||
func (o *OOOChunk) Insert(t int64, v float64) bool {
|
||||
func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool {
|
||||
// Although out-of-order samples can be out-of-order amongst themselves, we
|
||||
// are opinionated and expect them to be usually in-order meaning we could
|
||||
// try to append at the end first if the new timestamp is higher than the
|
||||
// last known timestamp.
|
||||
if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t {
|
||||
o.samples = append(o.samples, sample{t, v, nil, nil})
|
||||
o.samples = append(o.samples, sample{t, v, h, fh})
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -54,7 +55,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool {
|
|||
|
||||
if i >= len(o.samples) {
|
||||
// none found. append it at the end
|
||||
o.samples = append(o.samples, sample{t, v, nil, nil})
|
||||
o.samples = append(o.samples, sample{t, v, h, fh})
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -66,7 +67,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool {
|
|||
// Expand length by 1 to make room. use a zero sample, we will overwrite it anyway.
|
||||
o.samples = append(o.samples, sample{})
|
||||
copy(o.samples[i+1:], o.samples[i:])
|
||||
o.samples[i] = sample{t, v, nil, nil}
|
||||
o.samples[i] = sample{t, v, h, fh}
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -142,9 +143,9 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
|
|||
if newChunk != nil { // A new chunk was allocated.
|
||||
if !recoded {
|
||||
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
|
||||
cmint = s.t
|
||||
}
|
||||
chunk = newChunk
|
||||
cmint = s.t
|
||||
}
|
||||
case chunkenc.EncFloatHistogram:
|
||||
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
|
||||
|
@ -157,9 +158,9 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
|
|||
if newChunk != nil { // A new chunk was allocated.
|
||||
if !recoded {
|
||||
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
|
||||
cmint = s.t
|
||||
}
|
||||
chunk = newChunk
|
||||
cmint = s.t
|
||||
}
|
||||
}
|
||||
cmaxt = s.t
|
||||
|
|
|
@ -14,8 +14,14 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -52,7 +58,7 @@ func TestOOOInsert(t *testing.T) {
|
|||
chunk := NewOOOChunk()
|
||||
chunk.samples = makeEvenSampleSlice(numPreExisting)
|
||||
newSample := samplify(valOdd(insertPos))
|
||||
chunk.Insert(newSample.t, newSample.f)
|
||||
chunk.Insert(newSample.t, newSample.f, nil, nil)
|
||||
|
||||
var expSamples []sample
|
||||
// Our expected new samples slice, will be first the original samples.
|
||||
|
@ -83,7 +89,7 @@ func TestOOOInsertDuplicate(t *testing.T) {
|
|||
dupSample := chunk.samples[dupPos]
|
||||
dupSample.f = 0.123
|
||||
|
||||
ok := chunk.Insert(dupSample.t, dupSample.f)
|
||||
ok := chunk.Insert(dupSample.t, dupSample.f, nil, nil)
|
||||
|
||||
expSamples := makeEvenSampleSlice(num) // We expect no change.
|
||||
require.False(t, ok)
|
||||
|
@ -91,3 +97,136 @@ func TestOOOInsertDuplicate(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
type chunkVerify struct {
|
||||
encoding chunkenc.Encoding
|
||||
minTime int64
|
||||
maxTime int64
|
||||
}
|
||||
|
||||
func TestOOOChunks_ToEncodedChunks(t *testing.T) {
|
||||
h1 := tsdbutil.GenerateTestHistogram(1)
|
||||
// Make h2 appendable but with more buckets, to trigger recoding.
|
||||
h2 := h1.Copy()
|
||||
h2.PositiveSpans = append(h2.PositiveSpans, histogram.Span{Offset: 1, Length: 1})
|
||||
h2.PositiveBuckets = append(h2.PositiveBuckets, 12)
|
||||
|
||||
testCases := map[string]struct {
|
||||
samples []sample
|
||||
expectedCounterResets []histogram.CounterResetHint
|
||||
expectedChunks []chunkVerify
|
||||
}{
|
||||
"empty": {
|
||||
samples: []sample{},
|
||||
},
|
||||
"has floats": {
|
||||
samples: []sample{
|
||||
{t: 1000, f: 43.0},
|
||||
{t: 1100, f: 42.0},
|
||||
},
|
||||
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.UnknownCounterReset},
|
||||
expectedChunks: []chunkVerify{
|
||||
{encoding: chunkenc.EncXOR, minTime: 1000, maxTime: 1100},
|
||||
},
|
||||
},
|
||||
"mix of floats and histograms": {
|
||||
samples: []sample{
|
||||
{t: 1000, f: 43.0},
|
||||
{t: 1100, h: h1},
|
||||
{t: 1200, f: 42.0},
|
||||
},
|
||||
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.UnknownCounterReset, histogram.UnknownCounterReset},
|
||||
expectedChunks: []chunkVerify{
|
||||
{encoding: chunkenc.EncXOR, minTime: 1000, maxTime: 1000},
|
||||
{encoding: chunkenc.EncHistogram, minTime: 1100, maxTime: 1100},
|
||||
{encoding: chunkenc.EncXOR, minTime: 1200, maxTime: 1200},
|
||||
},
|
||||
},
|
||||
"has a counter reset": {
|
||||
samples: []sample{
|
||||
{t: 1000, h: h2},
|
||||
{t: 1100, h: h1},
|
||||
},
|
||||
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.CounterReset},
|
||||
expectedChunks: []chunkVerify{
|
||||
{encoding: chunkenc.EncHistogram, minTime: 1000, maxTime: 1000},
|
||||
{encoding: chunkenc.EncHistogram, minTime: 1100, maxTime: 1100},
|
||||
},
|
||||
},
|
||||
"has a recoded histogram": { // Regression test for wrong minT, maxT in histogram recoding.
|
||||
samples: []sample{
|
||||
{t: 0, h: h1},
|
||||
{t: 1, h: h2},
|
||||
},
|
||||
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset},
|
||||
expectedChunks: []chunkVerify{
|
||||
{encoding: chunkenc.EncHistogram, minTime: 0, maxTime: 1},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
// Sanity check.
|
||||
require.Equal(t, len(tc.samples), len(tc.expectedCounterResets), "number of samples and counter resets")
|
||||
|
||||
oooChunk := OOOChunk{}
|
||||
for _, s := range tc.samples {
|
||||
switch s.Type() {
|
||||
case chunkenc.ValFloat:
|
||||
oooChunk.Insert(s.t, s.f, nil, nil)
|
||||
case chunkenc.ValHistogram:
|
||||
oooChunk.Insert(s.t, 0, s.h.Copy(), nil)
|
||||
case chunkenc.ValFloatHistogram:
|
||||
oooChunk.Insert(s.t, 0, nil, s.fh.Copy())
|
||||
default:
|
||||
t.Fatalf("unexpected sample type %d", s.Type())
|
||||
}
|
||||
}
|
||||
|
||||
chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(tc.expectedChunks), len(chunks), "number of chunks")
|
||||
sampleIndex := 0
|
||||
for i, c := range chunks {
|
||||
require.Equal(t, tc.expectedChunks[i].encoding, c.chunk.Encoding(), "chunk %d encoding", i)
|
||||
require.Equal(t, tc.expectedChunks[i].minTime, c.minTime, "chunk %d minTime", i)
|
||||
require.Equal(t, tc.expectedChunks[i].maxTime, c.maxTime, "chunk %d maxTime", i)
|
||||
samples, err := storage.ExpandSamples(c.chunk.Iterator(nil), newSample)
|
||||
require.GreaterOrEqual(t, len(tc.samples)-sampleIndex, len(samples), "too many samples in chunk %d expected less than %d", i, len(tc.samples)-sampleIndex)
|
||||
require.NoError(t, err)
|
||||
if len(samples) == 0 {
|
||||
// Ignore empty chunks.
|
||||
continue
|
||||
}
|
||||
switch c.chunk.Encoding() {
|
||||
case chunkenc.EncXOR:
|
||||
for j, s := range samples {
|
||||
require.Equal(t, chunkenc.ValFloat, s.Type())
|
||||
// XOR chunks don't have counter reset hints, so we shouldn't expect anything else than UnknownCounterReset.
|
||||
require.Equal(t, histogram.UnknownCounterReset, tc.expectedCounterResets[sampleIndex+j], "sample reset hint %d", sampleIndex+j)
|
||||
require.Equal(t, tc.samples[sampleIndex+j].f, s.F(), "sample %d", sampleIndex+j)
|
||||
}
|
||||
case chunkenc.EncHistogram:
|
||||
for j, s := range samples {
|
||||
require.Equal(t, chunkenc.ValHistogram, s.Type())
|
||||
require.Equal(t, tc.expectedCounterResets[sampleIndex+j], s.H().CounterResetHint, "sample reset hint %d", sampleIndex+j)
|
||||
compareTo := tc.samples[sampleIndex+j].h.Copy()
|
||||
compareTo.CounterResetHint = tc.expectedCounterResets[sampleIndex+j]
|
||||
require.Equal(t, compareTo, s.H().Compact(0), "sample %d", sampleIndex+j)
|
||||
}
|
||||
case chunkenc.EncFloatHistogram:
|
||||
for j, s := range samples {
|
||||
require.Equal(t, chunkenc.ValFloatHistogram, s.Type())
|
||||
require.Equal(t, tc.expectedCounterResets[sampleIndex+j], s.FH().CounterResetHint, "sample reset hint %d", sampleIndex+j)
|
||||
compareTo := tc.samples[sampleIndex+j].fh.Copy()
|
||||
compareTo.CounterResetHint = tc.expectedCounterResets[sampleIndex+j]
|
||||
require.Equal(t, compareTo, s.FH().Compact(0), "sample %d", sampleIndex+j)
|
||||
}
|
||||
}
|
||||
sampleIndex += len(samples)
|
||||
}
|
||||
require.Equal(t, len(tc.samples), sampleIndex, "number of samples")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue