mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Merge branch 'main' into nhcb
Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
This commit is contained in:
commit
f028496133
|
@ -5,6 +5,7 @@
|
|||
* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980
|
||||
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974
|
||||
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
|
||||
* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620
|
||||
* [BUGFIX] OTLP: Don't generate target_info unless at least one identifying label is defined. #13991
|
||||
* [BUGFIX] OTLP: Don't generate target_info unless there are metrics. #13991
|
||||
|
||||
|
|
|
@ -56,8 +56,8 @@ import (
|
|||
"github.com/prometheus/prometheus/model/rulefmt"
|
||||
"github.com/prometheus/prometheus/notifier"
|
||||
_ "github.com/prometheus/prometheus/plugins" // Register plugins.
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
"github.com/prometheus/prometheus/scrape"
|
||||
"github.com/prometheus/prometheus/util/documentcli"
|
||||
)
|
||||
|
@ -377,7 +377,7 @@ func main() {
|
|||
|
||||
case testRulesCmd.FullCommand():
|
||||
os.Exit(RulesUnitTest(
|
||||
promql.LazyLoaderOpts{
|
||||
promqltest.LazyLoaderOpts{
|
||||
EnableAtModifier: true,
|
||||
EnableNegativeOffset: true,
|
||||
},
|
||||
|
|
|
@ -26,7 +26,7 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
)
|
||||
|
||||
|
@ -88,7 +88,7 @@ func normalizeNewLine(b []byte) []byte {
|
|||
}
|
||||
|
||||
func TestTSDBDump(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
metric{foo="bar", baz="abc"} 1 2 3 4 5
|
||||
heavy_metric{foo="bar"} 5 4 3 2 1
|
||||
|
@ -158,7 +158,7 @@ func TestTSDBDump(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTSDBDumpOpenMetrics(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
my_counter{foo="bar", baz="abc"} 1 2 3 4 5
|
||||
my_gauge{bar="foo", abc="baz"} 9 8 0 4 7
|
||||
|
|
|
@ -36,13 +36,14 @@ import (
|
|||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
"github.com/prometheus/prometheus/rules"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
// RulesUnitTest does unit testing of rules based on the unit testing files provided.
|
||||
// More info about the file format can be found in the docs.
|
||||
func RulesUnitTest(queryOpts promql.LazyLoaderOpts, runStrings []string, diffFlag bool, files ...string) int {
|
||||
func RulesUnitTest(queryOpts promqltest.LazyLoaderOpts, runStrings []string, diffFlag bool, files ...string) int {
|
||||
failed := false
|
||||
|
||||
var run *regexp.Regexp
|
||||
|
@ -69,7 +70,7 @@ func RulesUnitTest(queryOpts promql.LazyLoaderOpts, runStrings []string, diffFla
|
|||
return successExitCode
|
||||
}
|
||||
|
||||
func ruleUnitTest(filename string, queryOpts promql.LazyLoaderOpts, run *regexp.Regexp, diffFlag bool) []error {
|
||||
func ruleUnitTest(filename string, queryOpts promqltest.LazyLoaderOpts, run *regexp.Regexp, diffFlag bool) []error {
|
||||
fmt.Println("Unit Testing: ", filename)
|
||||
|
||||
b, err := os.ReadFile(filename)
|
||||
|
@ -175,9 +176,9 @@ type testGroup struct {
|
|||
}
|
||||
|
||||
// test performs the unit tests.
|
||||
func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]int, queryOpts promql.LazyLoaderOpts, diffFlag bool, ruleFiles ...string) (outErr []error) {
|
||||
func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]int, queryOpts promqltest.LazyLoaderOpts, diffFlag bool, ruleFiles ...string) (outErr []error) {
|
||||
// Setup testing suite.
|
||||
suite, err := promql.NewLazyLoader(tg.seriesLoadingString(), queryOpts)
|
||||
suite, err := promqltest.NewLazyLoader(tg.seriesLoadingString(), queryOpts)
|
||||
if err != nil {
|
||||
return []error{err}
|
||||
}
|
||||
|
@ -413,7 +414,7 @@ Outer:
|
|||
gotSamples = append(gotSamples, parsedSample{
|
||||
Labels: s.Metric.Copy(),
|
||||
Value: s.F,
|
||||
Histogram: promql.HistogramTestExpression(s.H),
|
||||
Histogram: promqltest.HistogramTestExpression(s.H),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -443,7 +444,7 @@ Outer:
|
|||
expSamples = append(expSamples, parsedSample{
|
||||
Labels: lb,
|
||||
Value: s.Value,
|
||||
Histogram: promql.HistogramTestExpression(hist),
|
||||
Histogram: promqltest.HistogramTestExpression(hist),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
)
|
||||
|
||||
func TestRulesUnitTest(t *testing.T) {
|
||||
|
@ -28,7 +28,7 @@ func TestRulesUnitTest(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
queryOpts promql.LazyLoaderOpts
|
||||
queryOpts promqltest.LazyLoaderOpts
|
||||
want int
|
||||
}{
|
||||
{
|
||||
|
@ -92,7 +92,7 @@ func TestRulesUnitTest(t *testing.T) {
|
|||
args: args{
|
||||
files: []string{"./testdata/at-modifier-test.yml"},
|
||||
},
|
||||
queryOpts: promql.LazyLoaderOpts{
|
||||
queryOpts: promqltest.LazyLoaderOpts{
|
||||
EnableAtModifier: true,
|
||||
},
|
||||
want: 0,
|
||||
|
@ -109,7 +109,7 @@ func TestRulesUnitTest(t *testing.T) {
|
|||
args: args{
|
||||
files: []string{"./testdata/negative-offset-test.yml"},
|
||||
},
|
||||
queryOpts: promql.LazyLoaderOpts{
|
||||
queryOpts: promqltest.LazyLoaderOpts{
|
||||
EnableNegativeOffset: true,
|
||||
},
|
||||
want: 0,
|
||||
|
@ -119,7 +119,7 @@ func TestRulesUnitTest(t *testing.T) {
|
|||
args: args{
|
||||
files: []string{"./testdata/no-test-group-interval.yml"},
|
||||
},
|
||||
queryOpts: promql.LazyLoaderOpts{
|
||||
queryOpts: promqltest.LazyLoaderOpts{
|
||||
EnableNegativeOffset: true,
|
||||
},
|
||||
want: 0,
|
||||
|
@ -142,7 +142,7 @@ func TestRulesUnitTestRun(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
queryOpts promql.LazyLoaderOpts
|
||||
queryOpts promqltest.LazyLoaderOpts
|
||||
want int
|
||||
}{
|
||||
{
|
||||
|
|
2
go.mod
2
go.mod
|
@ -60,7 +60,6 @@ require (
|
|||
github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c
|
||||
github.com/stretchr/testify v1.9.0
|
||||
github.com/vultr/govultr/v2 v2.17.2
|
||||
go.opentelemetry.io/collector/featuregate v1.5.0
|
||||
go.opentelemetry.io/collector/pdata v1.5.0
|
||||
go.opentelemetry.io/collector/semconv v0.98.0
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.50.0
|
||||
|
@ -151,7 +150,6 @@ require (
|
|||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
|
||||
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
|
||||
github.com/hashicorp/go-version v1.6.0 // indirect
|
||||
github.com/hashicorp/golang-lru v0.6.0 // indirect
|
||||
github.com/hashicorp/serf v0.10.1 // indirect
|
||||
github.com/imdario/mergo v0.3.16 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -722,8 +722,6 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
|||
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
||||
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
|
||||
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
|
||||
go.opentelemetry.io/collector/featuregate v1.5.0 h1:uK8qnYQKz1TMkK+FDTFsywg/EybW/gbnOUaPNUkRznM=
|
||||
go.opentelemetry.io/collector/featuregate v1.5.0/go.mod h1:w7nUODKxEi3FLf1HslCiE6YWtMtOOrMnSwsDam8Mg9w=
|
||||
go.opentelemetry.io/collector/pdata v1.5.0 h1:1fKTmUpr0xCOhP/B0VEvtz7bYPQ45luQ8XFyA07j8LE=
|
||||
go.opentelemetry.io/collector/pdata v1.5.0/go.mod h1:TYj8aKRWZyT/KuKQXKyqSEvK/GV+slFaDMEI+Ke64Yw=
|
||||
go.opentelemetry.io/collector/semconv v0.98.0 h1:zO4L4TmlxXoYu8UgPeYElGY19BW7wPjM+quL5CzoOoY=
|
||||
|
|
|
@ -48,7 +48,7 @@ const (
|
|||
Drop Action = "drop"
|
||||
// KeepEqual drops targets for which the input does not match the target.
|
||||
KeepEqual Action = "keepequal"
|
||||
// Drop drops targets for which the input does match the target.
|
||||
// DropEqual drops targets for which the input does match the target.
|
||||
DropEqual Action = "dropequal"
|
||||
// HashMod sets a label to the modulus of a hash of labels.
|
||||
HashMod Action = "hashmod"
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package promql
|
||||
package promql_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -23,13 +23,14 @@ import (
|
|||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
)
|
||||
|
||||
func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *Engine, interval, numIntervals int) error {
|
||||
func setupRangeQueryTestData(stor *teststorage.TestStorage, _ *promql.Engine, interval, numIntervals int) error {
|
||||
ctx := context.Background()
|
||||
|
||||
metrics := []labels.Labels{}
|
||||
|
@ -249,13 +250,13 @@ func BenchmarkRangeQuery(b *testing.B) {
|
|||
stor := teststorage.New(b)
|
||||
stor.DB.DisableCompactions() // Don't want auto-compaction disrupting timings.
|
||||
defer stor.Close()
|
||||
opts := EngineOpts{
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxSamples: 50000000,
|
||||
Timeout: 100 * time.Second,
|
||||
}
|
||||
engine := NewEngine(opts)
|
||||
engine := promql.NewEngine(opts)
|
||||
|
||||
const interval = 10000 // 10s interval.
|
||||
// A day of data plus 10k steps.
|
||||
|
@ -324,7 +325,7 @@ func BenchmarkNativeHistograms(b *testing.B) {
|
|||
},
|
||||
}
|
||||
|
||||
opts := EngineOpts{
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxSamples: 50000000,
|
||||
|
@ -338,7 +339,7 @@ func BenchmarkNativeHistograms(b *testing.B) {
|
|||
|
||||
for _, tc := range cases {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
ng := NewEngine(opts)
|
||||
ng := promql.NewEngine(opts)
|
||||
for i := 0; i < b.N; i++ {
|
||||
qry, err := ng.NewRangeQuery(context.Background(), testStorage, nil, tc.query, start, end, step)
|
||||
if err != nil {
|
||||
|
|
|
@ -573,7 +573,8 @@ func (ng *Engine) validateOpts(expr parser.Expr) error {
|
|||
return validationErr
|
||||
}
|
||||
|
||||
func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
|
||||
// NewTestQuery: inject special behaviour into Query for testing.
|
||||
func (ng *Engine) NewTestQuery(f func(context.Context) error) Query {
|
||||
qry := &query{
|
||||
q: "test statement",
|
||||
stmt: parser.TestStmt(f),
|
||||
|
|
82
promql/engine_internal_test.go
Normal file
82
promql/engine_internal_test.go
Normal file
|
@ -0,0 +1,82 @@
|
|||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package promql
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
)
|
||||
|
||||
func TestRecoverEvaluatorRuntime(t *testing.T) {
|
||||
var output []interface{}
|
||||
logger := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error {
|
||||
output = append(output, keyvals...)
|
||||
return nil
|
||||
}))
|
||||
ev := &evaluator{logger: logger}
|
||||
|
||||
expr, _ := parser.ParseExpr("sum(up)")
|
||||
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
require.EqualError(t, err, "unexpected error: runtime error: index out of range [123] with length 0")
|
||||
require.Contains(t, output, "sum(up)")
|
||||
}()
|
||||
defer ev.recover(expr, nil, &err)
|
||||
|
||||
// Cause a runtime panic.
|
||||
var a []int
|
||||
a[123] = 1
|
||||
}
|
||||
|
||||
func TestRecoverEvaluatorError(t *testing.T) {
|
||||
ev := &evaluator{logger: log.NewNopLogger()}
|
||||
var err error
|
||||
|
||||
e := errors.New("custom error")
|
||||
|
||||
defer func() {
|
||||
require.EqualError(t, err, e.Error())
|
||||
}()
|
||||
defer ev.recover(nil, nil, &err)
|
||||
|
||||
panic(e)
|
||||
}
|
||||
|
||||
func TestRecoverEvaluatorErrorWithWarnings(t *testing.T) {
|
||||
ev := &evaluator{logger: log.NewNopLogger()}
|
||||
var err error
|
||||
var ws annotations.Annotations
|
||||
|
||||
warnings := annotations.New().Add(errors.New("custom warning"))
|
||||
e := errWithWarnings{
|
||||
err: errors.New("custom error"),
|
||||
warnings: warnings,
|
||||
}
|
||||
|
||||
defer func() {
|
||||
require.EqualError(t, err, e.Error())
|
||||
require.Equal(t, warnings, ws, "wrong warning message")
|
||||
}()
|
||||
defer ev.recover(nil, &ws, &err)
|
||||
|
||||
panic(e)
|
||||
}
|
File diff suppressed because it is too large
Load diff
|
@ -991,15 +991,6 @@ func funcTimestamp(vals []parser.Value, args parser.Expressions, enh *EvalNodeHe
|
|||
return enh.Out, nil
|
||||
}
|
||||
|
||||
func kahanSum(samples []float64) float64 {
|
||||
var sum, c float64
|
||||
|
||||
for _, v := range samples {
|
||||
sum, c = kahanSumInc(v, sum, c)
|
||||
}
|
||||
return sum + c
|
||||
}
|
||||
|
||||
func kahanSumInc(inc, sum, c float64) (newSum, newC float64) {
|
||||
t := sum + inc
|
||||
// Using Neumaier improvement, swap if next term larger than sum.
|
||||
|
|
|
@ -11,11 +11,10 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package promql
|
||||
package promql_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -23,6 +22,7 @@ import (
|
|||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
)
|
||||
|
@ -33,13 +33,13 @@ func TestDeriv(t *testing.T) {
|
|||
// so we test it by hand.
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
opts := EngineOpts{
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxSamples: 10000,
|
||||
Timeout: 10 * time.Second,
|
||||
}
|
||||
engine := NewEngine(opts)
|
||||
engine := promql.NewEngine(opts)
|
||||
|
||||
a := storage.Appender(context.Background())
|
||||
|
||||
|
@ -70,19 +70,13 @@ func TestDeriv(t *testing.T) {
|
|||
|
||||
func TestFunctionList(t *testing.T) {
|
||||
// Test that Functions and parser.Functions list the same functions.
|
||||
for i := range FunctionCalls {
|
||||
for i := range promql.FunctionCalls {
|
||||
_, ok := parser.Functions[i]
|
||||
require.True(t, ok, "function %s exists in promql package, but not in parser package", i)
|
||||
}
|
||||
|
||||
for i := range parser.Functions {
|
||||
_, ok := FunctionCalls[i]
|
||||
_, ok := promql.FunctionCalls[i]
|
||||
require.True(t, ok, "function %s exists in parser package, but not in promql package", i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKahanSum(t *testing.T) {
|
||||
vals := []float64{1.0, math.Pow(10, 100), 1.0, -1 * math.Pow(10, 100)}
|
||||
expected := 2.0
|
||||
require.Equal(t, expected, kahanSum(vals))
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package promql
|
||||
package promql_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -22,37 +22,30 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
)
|
||||
|
||||
func newTestEngine() *Engine {
|
||||
return NewEngine(EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxSamples: 10000,
|
||||
Timeout: 100 * time.Second,
|
||||
NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) },
|
||||
EnableAtModifier: true,
|
||||
EnableNegativeOffset: true,
|
||||
EnablePerStepStats: true,
|
||||
})
|
||||
func newTestEngine() *promql.Engine {
|
||||
return promqltest.NewTestEngine(false, 0, promqltest.DefaultMaxSamplesPerQuery)
|
||||
}
|
||||
|
||||
func TestEvaluations(t *testing.T) {
|
||||
RunBuiltinTests(t, newTestEngine())
|
||||
promqltest.RunBuiltinTests(t, newTestEngine())
|
||||
}
|
||||
|
||||
// Run a lot of queries at the same time, to check for race conditions.
|
||||
func TestConcurrentRangeQueries(t *testing.T) {
|
||||
stor := teststorage.New(t)
|
||||
defer stor.Close()
|
||||
opts := EngineOpts{
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxSamples: 50000000,
|
||||
Timeout: 100 * time.Second,
|
||||
}
|
||||
engine := NewEngine(opts)
|
||||
engine := promql.NewEngine(opts)
|
||||
|
||||
const interval = 10000 // 10s interval.
|
||||
// A day of data plus 10k steps.
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package promql
|
||||
package promqltest
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -34,16 +34,16 @@ import (
|
|||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/promql/parser/posrange"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/almost"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
var (
|
||||
minNormal = math.Float64frombits(0x0010000000000000) // The smallest positive normal value of type float64.
|
||||
|
||||
patSpace = regexp.MustCompile("[\t ]+")
|
||||
patLoad = regexp.MustCompile(`^load(?:_(with_nhcb))?\s+(.+?)$`)
|
||||
patEvalInstant = regexp.MustCompile(`^eval(?:_(with_nhcb))?(?:_(fail|warn|ordered))?\s+instant\s+(?:at\s+(.+?))?\s+(.+)$`)
|
||||
|
@ -76,7 +76,8 @@ var (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultEpsilon = 0.000001 // Relative error allowed for sample values.
|
||||
defaultEpsilon = 0.000001 // Relative error allowed for sample values.
|
||||
DefaultMaxSamplesPerQuery = 10000
|
||||
)
|
||||
|
||||
var testStartTime = time.Unix(0, 0).UTC()
|
||||
|
@ -98,8 +99,22 @@ func LoadedStorage(t testutil.T, input string) *teststorage.TestStorage {
|
|||
return test.storage
|
||||
}
|
||||
|
||||
func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine {
|
||||
return promql.NewEngine(promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxSamples: maxSamples,
|
||||
Timeout: 100 * time.Second,
|
||||
NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) },
|
||||
EnableAtModifier: true,
|
||||
EnableNegativeOffset: true,
|
||||
EnablePerStepStats: enablePerStepStats,
|
||||
LookbackDelta: lookbackDelta,
|
||||
})
|
||||
}
|
||||
|
||||
// RunBuiltinTests runs an acceptance test suite against the provided engine.
|
||||
func RunBuiltinTests(t *testing.T, engine QueryEngine) {
|
||||
func RunBuiltinTests(t *testing.T, engine promql.QueryEngine) {
|
||||
t.Cleanup(func() { parser.EnableExperimentalFunctions = false })
|
||||
parser.EnableExperimentalFunctions = true
|
||||
|
||||
|
@ -116,11 +131,11 @@ func RunBuiltinTests(t *testing.T, engine QueryEngine) {
|
|||
}
|
||||
|
||||
// RunTest parses and runs the test against the provided engine.
|
||||
func RunTest(t testutil.T, input string, engine QueryEngine) {
|
||||
func RunTest(t testutil.T, input string, engine promql.QueryEngine) {
|
||||
require.NoError(t, runTest(t, input, engine))
|
||||
}
|
||||
|
||||
func runTest(t testutil.T, input string, engine QueryEngine) error {
|
||||
func runTest(t testutil.T, input string, engine promql.QueryEngine) error {
|
||||
test, err := newTest(t, input)
|
||||
|
||||
// Why do this before checking err? newTest() can create the test storage and then return an error,
|
||||
|
@ -402,7 +417,7 @@ func (*evalCmd) testCmd() {}
|
|||
type loadCmd struct {
|
||||
gap time.Duration
|
||||
metrics map[uint64]labels.Labels
|
||||
defs map[uint64][]Sample
|
||||
defs map[uint64][]promql.Sample
|
||||
exemplars map[uint64][]exemplar.Exemplar
|
||||
withNhcb bool
|
||||
}
|
||||
|
@ -411,7 +426,7 @@ func newLoadCmd(gap time.Duration, withNhcb bool) *loadCmd {
|
|||
return &loadCmd{
|
||||
gap: gap,
|
||||
metrics: map[uint64]labels.Labels{},
|
||||
defs: map[uint64][]Sample{},
|
||||
defs: map[uint64][]promql.Sample{},
|
||||
exemplars: map[uint64][]exemplar.Exemplar{},
|
||||
withNhcb: withNhcb,
|
||||
}
|
||||
|
@ -425,11 +440,11 @@ func (cmd loadCmd) String() string {
|
|||
func (cmd *loadCmd) set(m labels.Labels, vals ...parser.SequenceValue) {
|
||||
h := m.Hash()
|
||||
|
||||
samples := make([]Sample, 0, len(vals))
|
||||
samples := make([]promql.Sample, 0, len(vals))
|
||||
ts := testStartTime
|
||||
for _, v := range vals {
|
||||
if !v.Omitted {
|
||||
samples = append(samples, Sample{
|
||||
samples = append(samples, promql.Sample{
|
||||
T: ts.UnixNano() / int64(time.Millisecond/time.Nanosecond),
|
||||
F: v.Value,
|
||||
H: v.Histogram,
|
||||
|
@ -493,7 +508,7 @@ func newTempHistogram() tempHistogram {
|
|||
}
|
||||
}
|
||||
|
||||
func processClassicHistogramSeries(m labels.Labels, suffix string, histMap map[uint64]tempHistogramWrapper, smpls []Sample, updateHistWrapper func(*tempHistogramWrapper), updateHist func(*tempHistogram, float64)) {
|
||||
func processClassicHistogramSeries(m labels.Labels, suffix string, histMap map[uint64]tempHistogramWrapper, smpls []promql.Sample, updateHistWrapper func(*tempHistogramWrapper), updateHist func(*tempHistogram, float64)) {
|
||||
m2, m2hash := getHistogramMetricBase(m, suffix)
|
||||
histWrapper, exists := histMap[m2hash]
|
||||
if !exists {
|
||||
|
@ -581,7 +596,7 @@ func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
|
|||
// with custom bounds and append them to the storage.
|
||||
for _, histWrapper := range histMap {
|
||||
upperBounds, fhBase := processUpperBoundsAndCreateBaseHistogram(histWrapper.upperBounds)
|
||||
samples := make([]Sample, 0, len(histWrapper.histByTs))
|
||||
samples := make([]promql.Sample, 0, len(histWrapper.histByTs))
|
||||
for t, hist := range histWrapper.histByTs {
|
||||
fh := fhBase.Copy()
|
||||
var prevCount, total float64
|
||||
|
@ -600,7 +615,7 @@ func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
|
|||
total = hist.count
|
||||
}
|
||||
fh.Count = total
|
||||
s := Sample{T: t, H: fh.Compact(0)}
|
||||
s := promql.Sample{T: t, H: fh.Compact(0)}
|
||||
if err := s.H.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -616,7 +631,7 @@ func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func appendSample(a storage.Appender, s Sample, m labels.Labels) error {
|
||||
func appendSample(a storage.Appender, s promql.Sample, m labels.Labels) error {
|
||||
if s.H != nil {
|
||||
if _, err := a.AppendHistogram(0, m, s.T, nil, s.H); err != nil {
|
||||
return err
|
||||
|
@ -701,7 +716,7 @@ func (ev *evalCmd) expectMetric(pos int, m labels.Labels, vals ...parser.Sequenc
|
|||
// compareResult compares the result value with the defined expectation.
|
||||
func (ev *evalCmd) compareResult(result parser.Value) error {
|
||||
switch val := result.(type) {
|
||||
case Matrix:
|
||||
case promql.Matrix:
|
||||
if ev.ordered {
|
||||
return fmt.Errorf("expected ordered result, but query returned a matrix")
|
||||
}
|
||||
|
@ -719,8 +734,8 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
|
|||
seen[hash] = true
|
||||
exp := ev.expected[hash]
|
||||
|
||||
var expectedFloats []FPoint
|
||||
var expectedHistograms []HPoint
|
||||
var expectedFloats []promql.FPoint
|
||||
var expectedHistograms []promql.HPoint
|
||||
|
||||
for i, e := range exp.vals {
|
||||
ts := ev.start.Add(time.Duration(i) * ev.step)
|
||||
|
@ -732,9 +747,9 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
|
|||
t := ts.UnixNano() / int64(time.Millisecond/time.Nanosecond)
|
||||
|
||||
if e.Histogram != nil {
|
||||
expectedHistograms = append(expectedHistograms, HPoint{T: t, H: e.Histogram})
|
||||
expectedHistograms = append(expectedHistograms, promql.HPoint{T: t, H: e.Histogram})
|
||||
} else if !e.Omitted {
|
||||
expectedFloats = append(expectedFloats, FPoint{T: t, F: e.Value})
|
||||
expectedFloats = append(expectedFloats, promql.FPoint{T: t, F: e.Value})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -749,7 +764,7 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
|
|||
return fmt.Errorf("expected float value at index %v for %s to have timestamp %v, but it had timestamp %v (result has %s)", i, ev.metrics[hash], expected.T, actual.T, formatSeriesResult(s))
|
||||
}
|
||||
|
||||
if !almostEqual(actual.F, expected.F, defaultEpsilon) {
|
||||
if !almost.Equal(actual.F, expected.F, defaultEpsilon) {
|
||||
return fmt.Errorf("expected float value at index %v (t=%v) for %s to be %v, but got %v (result has %s)", i, actual.T, ev.metrics[hash], expected.F, actual.F, formatSeriesResult(s))
|
||||
}
|
||||
}
|
||||
|
@ -773,7 +788,7 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
|
|||
}
|
||||
}
|
||||
|
||||
case Vector:
|
||||
case promql.Vector:
|
||||
seen := map[uint64]bool{}
|
||||
for pos, v := range val {
|
||||
fp := v.Metric.Hash()
|
||||
|
@ -799,7 +814,7 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
|
|||
if expH != nil && !expH.Compact(0).Equals(v.H) {
|
||||
return fmt.Errorf("expected %v for %s but got %s", HistogramTestExpression(expH), v.Metric, HistogramTestExpression(v.H))
|
||||
}
|
||||
if !almostEqual(exp0.Value, v.F, defaultEpsilon) {
|
||||
if !almost.Equal(exp0.Value, v.F, defaultEpsilon) {
|
||||
return fmt.Errorf("expected %v for %s but got %v", exp0.Value, v.Metric, v.F)
|
||||
}
|
||||
|
||||
|
@ -811,7 +826,7 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
|
|||
}
|
||||
}
|
||||
|
||||
case Scalar:
|
||||
case promql.Scalar:
|
||||
if len(ev.expected) != 1 {
|
||||
return fmt.Errorf("expected vector result, but got scalar %s", val.String())
|
||||
}
|
||||
|
@ -819,7 +834,7 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
|
|||
if exp0.Histogram != nil {
|
||||
return fmt.Errorf("expected Histogram %v but got scalar %s", exp0.Histogram.TestExpression(), val.String())
|
||||
}
|
||||
if !almostEqual(exp0.Value, val.V, defaultEpsilon) {
|
||||
if !almost.Equal(exp0.Value, val.V, defaultEpsilon) {
|
||||
return fmt.Errorf("expected Scalar %v but got %v", val.V, exp0.Value)
|
||||
}
|
||||
|
||||
|
@ -829,7 +844,7 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func formatSeriesResult(s Series) string {
|
||||
func formatSeriesResult(s promql.Series) string {
|
||||
floatPlural := "s"
|
||||
histogramPlural := "s"
|
||||
|
||||
|
@ -876,8 +891,7 @@ func atModifierTestCases(exprStr string, evalTime time.Time) ([]atModifierTestCa
|
|||
// If there is a subquery, then the selectors inside it don't get the @ timestamp.
|
||||
// If any selector already has the @ timestamp set, then it is untouched.
|
||||
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
|
||||
_, _, subqTs := subqueryTimes(path)
|
||||
if subqTs != nil {
|
||||
if hasAtModifier(path) {
|
||||
// There is a subquery with timestamp in the path,
|
||||
// hence don't change any timestamps further.
|
||||
return nil
|
||||
|
@ -899,7 +913,7 @@ func atModifierTestCases(exprStr string, evalTime time.Time) ([]atModifierTestCa
|
|||
}
|
||||
|
||||
case *parser.Call:
|
||||
_, ok := AtModifierUnsafeFunctions[n.Func.Name]
|
||||
_, ok := promql.AtModifierUnsafeFunctions[n.Func.Name]
|
||||
containsNonStepInvariant = containsNonStepInvariant || ok
|
||||
}
|
||||
return nil
|
||||
|
@ -927,8 +941,19 @@ func atModifierTestCases(exprStr string, evalTime time.Time) ([]atModifierTestCa
|
|||
return testCases, nil
|
||||
}
|
||||
|
||||
func hasAtModifier(path []parser.Node) bool {
|
||||
for _, node := range path {
|
||||
if n, ok := node.(*parser.SubqueryExpr); ok {
|
||||
if n.Timestamp != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// exec processes a single step of the test.
|
||||
func (t *test) exec(tc testCommand, engine QueryEngine) error {
|
||||
func (t *test) exec(tc testCommand, engine promql.QueryEngine) error {
|
||||
switch cmd := tc.(type) {
|
||||
case *clearCmd:
|
||||
t.clear()
|
||||
|
@ -953,7 +978,7 @@ func (t *test) exec(tc testCommand, engine QueryEngine) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *test) execEval(cmd *evalCmd, engine QueryEngine) error {
|
||||
func (t *test) execEval(cmd *evalCmd, engine promql.QueryEngine) error {
|
||||
if cmd.isRange {
|
||||
return t.execRangeEval(cmd, engine)
|
||||
}
|
||||
|
@ -961,7 +986,7 @@ func (t *test) execEval(cmd *evalCmd, engine QueryEngine) error {
|
|||
return t.execInstantEval(cmd, engine)
|
||||
}
|
||||
|
||||
func (t *test) execRangeEval(cmd *evalCmd, engine QueryEngine) error {
|
||||
func (t *test) execRangeEval(cmd *evalCmd, engine promql.QueryEngine) error {
|
||||
q, err := engine.NewRangeQuery(t.context, t.storage, nil, cmd.expr, cmd.start, cmd.end, cmd.step)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating range query for %q (line %d): %w", cmd.expr, cmd.line, err)
|
||||
|
@ -993,7 +1018,7 @@ func (t *test) execRangeEval(cmd *evalCmd, engine QueryEngine) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *test) execInstantEval(cmd *evalCmd, engine QueryEngine) error {
|
||||
func (t *test) execInstantEval(cmd *evalCmd, engine promql.QueryEngine) error {
|
||||
queries, err := atModifierTestCases(cmd.expr, cmd.start)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1018,7 +1043,7 @@ func (t *test) execInstantEval(cmd *evalCmd, engine QueryEngine) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine QueryEngine) error {
|
||||
func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine promql.QueryEngine) error {
|
||||
q, err := engine.NewInstantQuery(t.context, t.storage, nil, iq.expr, iq.evalTime)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating instant query for %q (line %d): %w", cmd.expr, cmd.line, err)
|
||||
|
@ -1061,29 +1086,29 @@ func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine Query
|
|||
// Range queries are always sorted by labels, so skip this test case that expects results in a particular order.
|
||||
return nil
|
||||
}
|
||||
mat := rangeRes.Value.(Matrix)
|
||||
mat := rangeRes.Value.(promql.Matrix)
|
||||
if err := assertMatrixSorted(mat); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
vec := make(Vector, 0, len(mat))
|
||||
vec := make(promql.Vector, 0, len(mat))
|
||||
for _, series := range mat {
|
||||
// We expect either Floats or Histograms.
|
||||
for _, point := range series.Floats {
|
||||
if point.T == timeMilliseconds(iq.evalTime) {
|
||||
vec = append(vec, Sample{Metric: series.Metric, T: point.T, F: point.F})
|
||||
vec = append(vec, promql.Sample{Metric: series.Metric, T: point.T, F: point.F})
|
||||
break
|
||||
}
|
||||
}
|
||||
for _, point := range series.Histograms {
|
||||
if point.T == timeMilliseconds(iq.evalTime) {
|
||||
vec = append(vec, Sample{Metric: series.Metric, T: point.T, H: point.H})
|
||||
vec = append(vec, promql.Sample{Metric: series.Metric, T: point.T, H: point.H})
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if _, ok := res.Value.(Scalar); ok {
|
||||
err = cmd.compareResult(Scalar{V: vec[0].F})
|
||||
if _, ok := res.Value.(promql.Scalar); ok {
|
||||
err = cmd.compareResult(promql.Scalar{V: vec[0].F})
|
||||
} else {
|
||||
err = cmd.compareResult(vec)
|
||||
}
|
||||
|
@ -1093,7 +1118,7 @@ func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine Query
|
|||
return nil
|
||||
}
|
||||
|
||||
func assertMatrixSorted(m Matrix) error {
|
||||
func assertMatrixSorted(m promql.Matrix) error {
|
||||
if len(m) <= 1 {
|
||||
return nil
|
||||
}
|
||||
|
@ -1123,29 +1148,6 @@ func (t *test) clear() {
|
|||
t.context, t.cancelCtx = context.WithCancel(context.Background())
|
||||
}
|
||||
|
||||
// almostEqual returns true if a and b differ by less than their sum
|
||||
// multiplied by epsilon.
|
||||
func almostEqual(a, b, epsilon float64) bool {
|
||||
// NaN has no equality but for testing we still want to know whether both values
|
||||
// are NaN.
|
||||
if math.IsNaN(a) && math.IsNaN(b) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Cf. http://floating-point-gui.de/errors/comparison/
|
||||
if a == b {
|
||||
return true
|
||||
}
|
||||
|
||||
absSum := math.Abs(a) + math.Abs(b)
|
||||
diff := math.Abs(a - b)
|
||||
|
||||
if a == 0 || b == 0 || absSum < minNormal {
|
||||
return diff < epsilon*minNormal
|
||||
}
|
||||
return diff/math.Min(absSum, math.MaxFloat64) < epsilon
|
||||
}
|
||||
|
||||
func parseNumber(s string) (float64, error) {
|
||||
n, err := strconv.ParseInt(s, 0, 64)
|
||||
f := float64(n)
|
||||
|
@ -1166,7 +1168,7 @@ type LazyLoader struct {
|
|||
storage storage.Storage
|
||||
SubqueryInterval time.Duration
|
||||
|
||||
queryEngine *Engine
|
||||
queryEngine *promql.Engine
|
||||
context context.Context
|
||||
cancelCtx context.CancelFunc
|
||||
|
||||
|
@ -1233,7 +1235,7 @@ func (ll *LazyLoader) clear() error {
|
|||
return err
|
||||
}
|
||||
|
||||
opts := EngineOpts{
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
MaxSamples: 10000,
|
||||
|
@ -1243,7 +1245,7 @@ func (ll *LazyLoader) clear() error {
|
|||
EnableNegativeOffset: ll.opts.EnableNegativeOffset,
|
||||
}
|
||||
|
||||
ll.queryEngine = NewEngine(opts)
|
||||
ll.queryEngine = promql.NewEngine(opts)
|
||||
ll.context, ll.cancelCtx = context.WithCancel(context.Background())
|
||||
return nil
|
||||
}
|
||||
|
@ -1277,7 +1279,7 @@ func (ll *LazyLoader) WithSamplesTill(ts time.Time, fn func(error)) {
|
|||
}
|
||||
|
||||
// QueryEngine returns the LazyLoader's query engine.
|
||||
func (ll *LazyLoader) QueryEngine() *Engine {
|
||||
func (ll *LazyLoader) QueryEngine() *promql.Engine {
|
||||
return ll.queryEngine
|
||||
}
|
||||
|
||||
|
@ -1303,3 +1305,17 @@ func (ll *LazyLoader) Close() error {
|
|||
ll.cancelCtx()
|
||||
return ll.storage.Close()
|
||||
}
|
||||
|
||||
func makeInt64Pointer(val int64) *int64 {
|
||||
valp := new(int64)
|
||||
*valp = val
|
||||
return valp
|
||||
}
|
||||
|
||||
func timeMilliseconds(t time.Time) int64 {
|
||||
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond)
|
||||
}
|
||||
|
||||
func durationMilliseconds(d time.Duration) int64 {
|
||||
return int64(d / (time.Millisecond / time.Nanosecond))
|
||||
}
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package promql
|
||||
package promqltest
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
@ -21,14 +21,15 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
)
|
||||
|
||||
func TestLazyLoader_WithSamplesTill(t *testing.T) {
|
||||
type testCase struct {
|
||||
ts time.Time
|
||||
series []Series // Each series is checked separately. Need not mention all series here.
|
||||
checkOnlyError bool // If this is true, series is not checked.
|
||||
series []promql.Series // Each series is checked separately. Need not mention all series here.
|
||||
checkOnlyError bool // If this is true, series is not checked.
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
|
@ -44,33 +45,33 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
|
|||
testCases: []testCase{
|
||||
{
|
||||
ts: time.Unix(40, 0),
|
||||
series: []Series{
|
||||
series: []promql.Series{
|
||||
{
|
||||
Metric: labels.FromStrings("__name__", "metric1"),
|
||||
Floats: []FPoint{
|
||||
{0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5},
|
||||
Floats: []promql.FPoint{
|
||||
{T: 0, F: 1}, {T: 10000, F: 2}, {T: 20000, F: 3}, {T: 30000, F: 4}, {T: 40000, F: 5},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ts: time.Unix(10, 0),
|
||||
series: []Series{
|
||||
series: []promql.Series{
|
||||
{
|
||||
Metric: labels.FromStrings("__name__", "metric1"),
|
||||
Floats: []FPoint{
|
||||
{0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5},
|
||||
Floats: []promql.FPoint{
|
||||
{T: 0, F: 1}, {T: 10000, F: 2}, {T: 20000, F: 3}, {T: 30000, F: 4}, {T: 40000, F: 5},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ts: time.Unix(60, 0),
|
||||
series: []Series{
|
||||
series: []promql.Series{
|
||||
{
|
||||
Metric: labels.FromStrings("__name__", "metric1"),
|
||||
Floats: []FPoint{
|
||||
{0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5}, {50000, 6}, {60000, 7},
|
||||
Floats: []promql.FPoint{
|
||||
{T: 0, F: 1}, {T: 10000, F: 2}, {T: 20000, F: 3}, {T: 30000, F: 4}, {T: 40000, F: 5}, {T: 50000, F: 6}, {T: 60000, F: 7},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -86,17 +87,17 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
|
|||
testCases: []testCase{
|
||||
{ // Adds all samples of metric1.
|
||||
ts: time.Unix(70, 0),
|
||||
series: []Series{
|
||||
series: []promql.Series{
|
||||
{
|
||||
Metric: labels.FromStrings("__name__", "metric1"),
|
||||
Floats: []FPoint{
|
||||
{0, 1}, {10000, 1}, {20000, 1}, {30000, 1}, {40000, 1}, {50000, 1},
|
||||
Floats: []promql.FPoint{
|
||||
{T: 0, F: 1}, {T: 10000, F: 1}, {T: 20000, F: 1}, {T: 30000, F: 1}, {T: 40000, F: 1}, {T: 50000, F: 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
Metric: labels.FromStrings("__name__", "metric2"),
|
||||
Floats: []FPoint{
|
||||
{0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5}, {50000, 6}, {60000, 7}, {70000, 8},
|
||||
Floats: []promql.FPoint{
|
||||
{T: 0, F: 1}, {T: 10000, F: 2}, {T: 20000, F: 3}, {T: 30000, F: 4}, {T: 40000, F: 5}, {T: 50000, F: 6}, {T: 60000, F: 7}, {T: 70000, F: 8},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -140,13 +141,13 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
|
|||
require.False(t, ss.Next(), "Expecting only 1 series")
|
||||
|
||||
// Convert `storage.Series` to `promql.Series`.
|
||||
got := Series{
|
||||
got := promql.Series{
|
||||
Metric: storageSeries.Labels(),
|
||||
}
|
||||
it := storageSeries.Iterator(nil)
|
||||
for it.Next() == chunkenc.ValFloat {
|
||||
t, v := it.At()
|
||||
got.Floats = append(got.Floats, FPoint{T: t, F: v})
|
||||
got.Floats = append(got.Floats, promql.FPoint{T: t, F: v})
|
||||
}
|
||||
require.NoError(t, it.Err())
|
||||
|
||||
|
@ -450,7 +451,7 @@ eval range from 0 to 5m step 5m testmetric
|
|||
|
||||
for name, testCase := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
err := runTest(t, testCase.input, newTestEngine())
|
||||
err := runTest(t, testCase.input, NewTestEngine(false, 0, DefaultMaxSamplesPerQuery))
|
||||
|
||||
if testCase.expectedError == "" {
|
||||
require.NoError(t, err)
|
||||
|
@ -463,42 +464,42 @@ eval range from 0 to 5m step 5m testmetric
|
|||
|
||||
func TestAssertMatrixSorted(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
matrix Matrix
|
||||
matrix promql.Matrix
|
||||
expectedError string
|
||||
}{
|
||||
"empty matrix": {
|
||||
matrix: Matrix{},
|
||||
matrix: promql.Matrix{},
|
||||
},
|
||||
"matrix with one series": {
|
||||
matrix: Matrix{
|
||||
Series{Metric: labels.FromStrings("the_label", "value_1")},
|
||||
matrix: promql.Matrix{
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_1")},
|
||||
},
|
||||
},
|
||||
"matrix with two series, series in sorted order": {
|
||||
matrix: Matrix{
|
||||
Series{Metric: labels.FromStrings("the_label", "value_1")},
|
||||
Series{Metric: labels.FromStrings("the_label", "value_2")},
|
||||
matrix: promql.Matrix{
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_1")},
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_2")},
|
||||
},
|
||||
},
|
||||
"matrix with two series, series in reverse order": {
|
||||
matrix: Matrix{
|
||||
Series{Metric: labels.FromStrings("the_label", "value_2")},
|
||||
Series{Metric: labels.FromStrings("the_label", "value_1")},
|
||||
matrix: promql.Matrix{
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_2")},
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_1")},
|
||||
},
|
||||
expectedError: `matrix results should always be sorted by labels, but matrix is not sorted: series at index 1 with labels {the_label="value_1"} sorts before series at index 0 with labels {the_label="value_2"}`,
|
||||
},
|
||||
"matrix with three series, series in sorted order": {
|
||||
matrix: Matrix{
|
||||
Series{Metric: labels.FromStrings("the_label", "value_1")},
|
||||
Series{Metric: labels.FromStrings("the_label", "value_2")},
|
||||
Series{Metric: labels.FromStrings("the_label", "value_3")},
|
||||
matrix: promql.Matrix{
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_1")},
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_2")},
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_3")},
|
||||
},
|
||||
},
|
||||
"matrix with three series, series not in sorted order": {
|
||||
matrix: Matrix{
|
||||
Series{Metric: labels.FromStrings("the_label", "value_1")},
|
||||
Series{Metric: labels.FromStrings("the_label", "value_3")},
|
||||
Series{Metric: labels.FromStrings("the_label", "value_2")},
|
||||
matrix: promql.Matrix{
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_1")},
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_3")},
|
||||
promql.Series{Metric: labels.FromStrings("the_label", "value_2")},
|
||||
},
|
||||
expectedError: `matrix results should always be sorted by labels, but matrix is not sorted: series at index 2 with labels {the_label="value_2"} sorts before series at index 1 with labels {the_label="value_3"}`,
|
||||
},
|
|
@ -764,6 +764,14 @@ eval instant at 1m avg_over_time(metric10[1m])
|
|||
eval instant at 1m sum_over_time(metric10[1m])/count_over_time(metric10[1m])
|
||||
{} 0
|
||||
|
||||
# Test if very big intermediate values cause loss of detail.
|
||||
clear
|
||||
load 10s
|
||||
metric 1 1e100 1 -1e100
|
||||
|
||||
eval instant at 1m sum_over_time(metric[1m])
|
||||
{} 2
|
||||
|
||||
# Tests for stddev_over_time and stdvar_over_time.
|
||||
clear
|
||||
load 10s
|
|
@ -20,6 +20,7 @@ import (
|
|||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/util/almost"
|
||||
)
|
||||
|
||||
// smallDeltaTolerance is the threshold for relative deltas between classic
|
||||
|
@ -411,7 +412,7 @@ func ensureMonotonicAndIgnoreSmallDeltas(buckets buckets, tolerance float64) (bo
|
|||
// No correction needed if the counts are identical between buckets.
|
||||
continue
|
||||
}
|
||||
if almostEqual(prev, curr, tolerance) {
|
||||
if almost.Equal(prev, curr, tolerance) {
|
||||
// Silently correct numerically insignificant differences from floating
|
||||
// point precision errors, regardless of direction.
|
||||
// Do not update the 'prev' value as we are ignoring the difference.
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package promql
|
||||
package promql_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
@ -19,39 +19,40 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
)
|
||||
|
||||
func TestVector_ContainsSameLabelset(t *testing.T) {
|
||||
for name, tc := range map[string]struct {
|
||||
vector Vector
|
||||
vector promql.Vector
|
||||
expected bool
|
||||
}{
|
||||
"empty vector": {
|
||||
vector: Vector{},
|
||||
vector: promql.Vector{},
|
||||
expected: false,
|
||||
},
|
||||
"vector with one series": {
|
||||
vector: Vector{
|
||||
vector: promql.Vector{
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
"vector with two different series": {
|
||||
vector: Vector{
|
||||
vector: promql.Vector{
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
{Metric: labels.FromStrings("lbl", "b")},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
"vector with two equal series": {
|
||||
vector: Vector{
|
||||
vector: promql.Vector{
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
"vector with three series, two equal": {
|
||||
vector: Vector{
|
||||
vector: promql.Vector{
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
{Metric: labels.FromStrings("lbl", "b")},
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
|
@ -67,35 +68,35 @@ func TestVector_ContainsSameLabelset(t *testing.T) {
|
|||
|
||||
func TestMatrix_ContainsSameLabelset(t *testing.T) {
|
||||
for name, tc := range map[string]struct {
|
||||
matrix Matrix
|
||||
matrix promql.Matrix
|
||||
expected bool
|
||||
}{
|
||||
"empty matrix": {
|
||||
matrix: Matrix{},
|
||||
matrix: promql.Matrix{},
|
||||
expected: false,
|
||||
},
|
||||
"matrix with one series": {
|
||||
matrix: Matrix{
|
||||
matrix: promql.Matrix{
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
"matrix with two different series": {
|
||||
matrix: Matrix{
|
||||
matrix: promql.Matrix{
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
{Metric: labels.FromStrings("lbl", "b")},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
"matrix with two equal series": {
|
||||
matrix: Matrix{
|
||||
matrix: promql.Matrix{
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
"matrix with three series, two equal": {
|
||||
matrix: Matrix{
|
||||
matrix: promql.Matrix{
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
{Metric: labels.FromStrings("lbl", "b")},
|
||||
{Metric: labels.FromStrings("lbl", "a")},
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/prometheus/prometheus/notifier"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
|
@ -148,7 +149,7 @@ func TestAlertingRuleTemplateWithHistogram(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAlertingRuleLabelsUpdate(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 85 70 70 stale
|
||||
`)
|
||||
|
@ -252,7 +253,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 85 70 70
|
||||
`)
|
||||
|
@ -345,7 +346,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 85 70 70
|
||||
`)
|
||||
|
@ -438,7 +439,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 85 70 70
|
||||
`)
|
||||
|
@ -492,7 +493,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAlertingRuleQueryInTemplate(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 70 85 70 70
|
||||
`)
|
||||
|
@ -601,7 +602,7 @@ func TestAlertingRuleDuplicate(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAlertingRuleLimit(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
metric{label="1"} 1
|
||||
metric{label="2"} 1
|
||||
|
@ -783,7 +784,7 @@ func TestSendAlertsDontAffectActiveAlerts(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestKeepFiringFor(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 85 70 70 10x5
|
||||
`)
|
||||
|
@ -893,7 +894,7 @@ func TestKeepFiringFor(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestPendingAndKeepFiringFor(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 10x10
|
||||
`)
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
|
@ -50,7 +51,7 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
|
||||
func TestAlertingRule(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 5m
|
||||
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85
|
||||
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140
|
||||
|
@ -190,7 +191,7 @@ func TestAlertingRule(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestForStateAddSamples(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 5m
|
||||
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85
|
||||
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140
|
||||
|
@ -347,7 +348,7 @@ func sortAlerts(items []*Alert) {
|
|||
}
|
||||
|
||||
func TestForStateRestore(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 5m
|
||||
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 50 0 0 25 0 0 40 0 120
|
||||
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 125 90 60 0 0 25 0 0 40 0 130
|
||||
|
@ -1232,7 +1233,7 @@ func TestRuleHealthUpdates(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRuleGroupEvalIterationFunc(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 5m
|
||||
http_requests{instance="0"} 75 85 50 0 0 25 0 0 40 0 120
|
||||
`)
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
@ -111,7 +112,7 @@ var ruleEvalTestScenarios = []struct {
|
|||
}
|
||||
|
||||
func setUpRuleEvalTest(t require.TestingT) *teststorage.TestStorage {
|
||||
return promql.LoadedStorage(t, `
|
||||
return promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
metric{label_a="1",label_b="3"} 1
|
||||
metric{label_a="2",label_b="4"} 10
|
||||
|
@ -178,7 +179,7 @@ func TestRuleEvalDuplicate(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRecordingRuleLimit(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
metric{label="1"} 1
|
||||
metric{label="2"} 1
|
||||
|
|
|
@ -19,15 +19,6 @@ package prometheus
|
|||
import (
|
||||
"strings"
|
||||
"unicode"
|
||||
|
||||
"go.opentelemetry.io/collector/featuregate"
|
||||
)
|
||||
|
||||
var dropSanitizationGate = featuregate.GlobalRegistry().MustRegister(
|
||||
"pkg.translator.prometheus.PermissiveLabelSanitization",
|
||||
featuregate.StageAlpha,
|
||||
featuregate.WithRegisterDescription("Controls whether to change labels starting with '_' to 'key_'."),
|
||||
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/8950"),
|
||||
)
|
||||
|
||||
// Normalizes the specified label to follow Prometheus label names standard
|
||||
|
@ -50,7 +41,7 @@ func NormalizeLabel(label string) string {
|
|||
// If label starts with a number, prepend with "key_"
|
||||
if unicode.IsDigit(rune(label[0])) {
|
||||
label = "key_" + label
|
||||
} else if strings.HasPrefix(label, "_") && !strings.HasPrefix(label, "__") && !dropSanitizationGate.IsEnabled() {
|
||||
} else if strings.HasPrefix(label, "_") && !strings.HasPrefix(label, "__") {
|
||||
label = "key" + label
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"strings"
|
||||
"unicode"
|
||||
|
||||
"go.opentelemetry.io/collector/featuregate"
|
||||
"go.opentelemetry.io/collector/pdata/pmetric"
|
||||
)
|
||||
|
||||
|
@ -78,13 +77,6 @@ var perUnitMap = map[string]string{
|
|||
"y": "year",
|
||||
}
|
||||
|
||||
var normalizeNameGate = featuregate.GlobalRegistry().MustRegister(
|
||||
"pkg.translator.prometheus.NormalizeName",
|
||||
featuregate.StageBeta,
|
||||
featuregate.WithRegisterDescription("Controls whether metrics names are automatically normalized to follow Prometheus naming convention"),
|
||||
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/8950"),
|
||||
)
|
||||
|
||||
// BuildCompliantName builds a Prometheus-compliant metric name for the specified metric
|
||||
//
|
||||
// Metric name is prefixed with specified namespace and underscore (if any).
|
||||
|
@ -97,7 +89,7 @@ func BuildCompliantName(metric pmetric.Metric, namespace string, addMetricSuffix
|
|||
var metricName string
|
||||
|
||||
// Full normalization following standard Prometheus naming conventions
|
||||
if addMetricSuffixes && normalizeNameGate.IsEnabled() {
|
||||
if addMetricSuffixes {
|
||||
return normalizeName(metric, namespace)
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ type Settings struct {
|
|||
SendMetadata bool
|
||||
}
|
||||
|
||||
// PrometheusConverter converts from OTel write format to Prometheus write format.
|
||||
// PrometheusConverter converts from OTel write format to Prometheus remote write format.
|
||||
type PrometheusConverter struct {
|
||||
unique map[uint64]*prompb.TimeSeries
|
||||
conflicts map[uint64][]*prompb.TimeSeries
|
||||
|
@ -130,25 +130,6 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings)
|
|||
return
|
||||
}
|
||||
|
||||
// TimeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format.
|
||||
func (c *PrometheusConverter) TimeSeries() []prompb.TimeSeries {
|
||||
conflicts := 0
|
||||
for _, ts := range c.conflicts {
|
||||
conflicts += len(ts)
|
||||
}
|
||||
allTS := make([]prompb.TimeSeries, 0, len(c.unique)+conflicts)
|
||||
for _, ts := range c.unique {
|
||||
allTS = append(allTS, *ts)
|
||||
}
|
||||
for _, cTS := range c.conflicts {
|
||||
for _, ts := range cTS {
|
||||
allTS = append(allTS, *ts)
|
||||
}
|
||||
}
|
||||
|
||||
return allTS
|
||||
}
|
||||
|
||||
func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool {
|
||||
if len(ts.Labels) != len(lbls) {
|
||||
return false
|
||||
|
|
|
@ -64,7 +64,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
func createExportRequest(resourceAttributeCount int, histogramCount int, nonHistogramCount int, labelsPerMetric int, exemplarsPerSeries int) pmetricotlp.ExportRequest {
|
||||
func createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries int) pmetricotlp.ExportRequest {
|
||||
request := pmetricotlp.NewExportRequest()
|
||||
|
||||
rm := request.Metrics().ResourceMetrics().AppendEmpty()
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// Provenance-includes-location:
|
||||
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/metrics_to_prw.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: Copyright The OpenTelemetry Authors.
|
||||
|
||||
package prometheusremotewrite
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
// TimeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format.
|
||||
func (c *PrometheusConverter) TimeSeries() []prompb.TimeSeries {
|
||||
conflicts := 0
|
||||
for _, ts := range c.conflicts {
|
||||
conflicts += len(ts)
|
||||
}
|
||||
allTS := make([]prompb.TimeSeries, 0, len(c.unique)+conflicts)
|
||||
for _, ts := range c.unique {
|
||||
allTS = append(allTS, *ts)
|
||||
}
|
||||
for _, cTS := range c.conflicts {
|
||||
for _, ts := range cTS {
|
||||
allTS = append(allTS, *ts)
|
||||
}
|
||||
}
|
||||
|
||||
return allTS
|
||||
}
|
|
@ -30,14 +30,14 @@ import (
|
|||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
)
|
||||
|
||||
func TestSampledReadEndpoint(t *testing.T) {
|
||||
store := promql.LoadedStorage(t, `
|
||||
store := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
test_metric1{foo="bar",baz="qux"} 1
|
||||
`)
|
||||
|
@ -132,7 +132,7 @@ func TestSampledReadEndpoint(t *testing.T) {
|
|||
}
|
||||
|
||||
func BenchmarkStreamReadEndpoint(b *testing.B) {
|
||||
store := promql.LoadedStorage(b, `
|
||||
store := promqltest.LoadedStorage(b, `
|
||||
load 1m
|
||||
test_metric1{foo="bar1",baz="qux"} 0+100x119
|
||||
test_metric1{foo="bar2",baz="qux"} 0+100x120
|
||||
|
@ -200,7 +200,7 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
// Second with 121 float samples, We expect 1 frame with 2 chunks.
|
||||
// Third with 241 float samples. We expect 1 frame with 2 chunks, and 1 frame with 1 chunk for the same series due to bytes limit.
|
||||
// Fourth with 25 histogram samples. We expect 1 frame with 1 chunk.
|
||||
store := promql.LoadedStorage(t, `
|
||||
store := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
test_metric1{foo="bar1",baz="qux"} 0+100x119
|
||||
test_metric1{foo="bar2",baz="qux"} 0+100x120
|
||||
|
|
|
@ -55,8 +55,8 @@ func NewListSeries(lset labels.Labels, s []chunks.Sample) *SeriesEntry {
|
|||
}
|
||||
}
|
||||
|
||||
// NewListChunkSeriesFromSamples returns chunk series entry that allows to iterate over provided samples.
|
||||
// NOTE: It uses inefficient chunks encoding implementation, not caring about chunk size.
|
||||
// NewListChunkSeriesFromSamples returns a chunk series entry that allows to iterate over provided samples.
|
||||
// NOTE: It uses an inefficient chunks encoding implementation, not caring about chunk size.
|
||||
// Use only for testing.
|
||||
func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]chunks.Sample) *ChunkSeriesEntry {
|
||||
chksFromSamples := make([]chunks.Meta, 0, len(samples))
|
||||
|
|
|
@ -77,6 +77,10 @@ type IndexReader interface {
|
|||
// during background garbage collections.
|
||||
Postings(ctx context.Context, name string, values ...string) (index.Postings, error)
|
||||
|
||||
// PostingsForLabelMatching returns a sorted iterator over postings having a label with the given name and a value for which match returns true.
|
||||
// If no postings are found having at least one matching label, an empty iterator is returned.
|
||||
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings
|
||||
|
||||
// SortedPostings returns a postings list that is reordered to be sorted
|
||||
// by the label set of the underlying series.
|
||||
SortedPostings(index.Postings) index.Postings
|
||||
|
@ -518,6 +522,10 @@ func (r blockIndexReader) Postings(ctx context.Context, name string, values ...s
|
|||
return p, nil
|
||||
}
|
||||
|
||||
func (r blockIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||
return r.ir.PostingsForLabelMatching(ctx, name, match)
|
||||
}
|
||||
|
||||
func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||
return r.ir.SortedPostings(p)
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
|
@ -509,6 +510,86 @@ func TestLabelNamesWithMatchers(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestBlockIndexReader_PostingsForLabelMatching(t *testing.T) {
|
||||
testPostingsForLabelMatching(t, 2, func(t *testing.T, series []labels.Labels) IndexReader {
|
||||
var seriesEntries []storage.Series
|
||||
for _, s := range series {
|
||||
seriesEntries = append(seriesEntries, storage.NewListSeries(s, []chunks.Sample{sample{100, 0, nil, nil}}))
|
||||
}
|
||||
|
||||
blockDir := createBlock(t, t.TempDir(), seriesEntries)
|
||||
files, err := sequenceFiles(chunkDir(blockDir))
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, files, "No chunk created.")
|
||||
|
||||
block, err := OpenBlock(nil, blockDir, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, block.Close()) })
|
||||
|
||||
ir, err := block.Index()
|
||||
require.NoError(t, err)
|
||||
return ir
|
||||
})
|
||||
}
|
||||
|
||||
func testPostingsForLabelMatching(t *testing.T, offset storage.SeriesRef, setUp func(*testing.T, []labels.Labels) IndexReader) {
|
||||
t.Helper()
|
||||
|
||||
ctx := context.Background()
|
||||
series := []labels.Labels{
|
||||
labels.FromStrings("n", "1"),
|
||||
labels.FromStrings("n", "1", "i", "a"),
|
||||
labels.FromStrings("n", "1", "i", "b"),
|
||||
labels.FromStrings("n", "2"),
|
||||
labels.FromStrings("n", "2.5"),
|
||||
}
|
||||
ir := setUp(t, series)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, ir.Close())
|
||||
})
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
labelName string
|
||||
match func(string) bool
|
||||
exp []storage.SeriesRef
|
||||
}{
|
||||
{
|
||||
name: "n=1",
|
||||
labelName: "n",
|
||||
match: func(val string) bool {
|
||||
return val == "1"
|
||||
},
|
||||
exp: []storage.SeriesRef{offset + 1, offset + 2, offset + 3},
|
||||
},
|
||||
{
|
||||
name: "n=2",
|
||||
labelName: "n",
|
||||
match: func(val string) bool {
|
||||
return val == "2"
|
||||
},
|
||||
exp: []storage.SeriesRef{offset + 4},
|
||||
},
|
||||
{
|
||||
name: "missing label",
|
||||
labelName: "missing",
|
||||
match: func(val string) bool {
|
||||
return true
|
||||
},
|
||||
exp: nil,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
p := ir.PostingsForLabelMatching(ctx, tc.labelName, tc.match)
|
||||
require.NotNil(t, p)
|
||||
srs, err := index.ExpandPostings(p)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.exp, srs)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// createBlock creates a block with given set of series and returns its dir.
|
||||
func createBlock(tb testing.TB, dir string, series []storage.Series) string {
|
||||
blockDir, err := CreateBlock(series, dir, 0, log.NewNopLogger())
|
||||
|
|
|
@ -42,7 +42,7 @@ type BlockWriter struct {
|
|||
// ErrNoSeriesAppended is returned if the series count is zero while flushing blocks.
|
||||
var ErrNoSeriesAppended = errors.New("no series appended, aborting")
|
||||
|
||||
// NewBlockWriter create a new block writer.
|
||||
// NewBlockWriter creates a new block writer.
|
||||
//
|
||||
// The returned writer accumulates all the series in the Head block until `Flush` is called.
|
||||
//
|
||||
|
|
|
@ -52,6 +52,12 @@ type bstream struct {
|
|||
count uint8 // How many right-most bits are available for writing in the current byte (the last byte of the stream).
|
||||
}
|
||||
|
||||
// Reset resets b around stream.
|
||||
func (b *bstream) Reset(stream []byte) {
|
||||
b.stream = stream
|
||||
b.count = 0
|
||||
}
|
||||
|
||||
func (b *bstream) bytes() []byte {
|
||||
return b.stream
|
||||
}
|
||||
|
|
|
@ -19,6 +19,19 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBstream_Reset(t *testing.T) {
|
||||
bs := bstream{
|
||||
stream: []byte("test"),
|
||||
count: 10,
|
||||
}
|
||||
bs.Reset([]byte("was reset"))
|
||||
|
||||
require.Equal(t, bstream{
|
||||
stream: []byte("was reset"),
|
||||
count: 0,
|
||||
}, bs)
|
||||
}
|
||||
|
||||
func TestBstreamReader(t *testing.T) {
|
||||
// Write to the bit stream.
|
||||
w := bstream{}
|
||||
|
|
|
@ -87,6 +87,9 @@ type Chunk interface {
|
|||
// There's no strong guarantee that no samples will be appended once
|
||||
// Compact() is called. Implementing this function is optional.
|
||||
Compact()
|
||||
|
||||
// Reset resets the chunk given stream.
|
||||
Reset(stream []byte)
|
||||
}
|
||||
|
||||
type Iterable interface {
|
||||
|
@ -303,64 +306,47 @@ func NewPool() Pool {
|
|||
}
|
||||
|
||||
func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
|
||||
var c Chunk
|
||||
switch e {
|
||||
case EncXOR:
|
||||
c := p.xor.Get().(*XORChunk)
|
||||
c.b.stream = b
|
||||
c.b.count = 0
|
||||
return c, nil
|
||||
c = p.xor.Get().(*XORChunk)
|
||||
case EncHistogram:
|
||||
c := p.histogram.Get().(*HistogramChunk)
|
||||
c.b.stream = b
|
||||
c.b.count = 0
|
||||
return c, nil
|
||||
c = p.histogram.Get().(*HistogramChunk)
|
||||
case EncFloatHistogram:
|
||||
c := p.floatHistogram.Get().(*FloatHistogramChunk)
|
||||
c.b.stream = b
|
||||
c.b.count = 0
|
||||
return c, nil
|
||||
c = p.floatHistogram.Get().(*FloatHistogramChunk)
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid chunk encoding %q", e)
|
||||
}
|
||||
return nil, fmt.Errorf("invalid chunk encoding %q", e)
|
||||
|
||||
c.Reset(b)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (p *pool) Put(c Chunk) error {
|
||||
var sp *sync.Pool
|
||||
var ok bool
|
||||
switch c.Encoding() {
|
||||
case EncXOR:
|
||||
xc, ok := c.(*XORChunk)
|
||||
// This may happen often with wrapped chunks. Nothing we can really do about
|
||||
// it but returning an error would cause a lot of allocations again. Thus,
|
||||
// we just skip it.
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
xc.b.stream = nil
|
||||
xc.b.count = 0
|
||||
p.xor.Put(c)
|
||||
_, ok = c.(*XORChunk)
|
||||
sp = &p.xor
|
||||
case EncHistogram:
|
||||
sh, ok := c.(*HistogramChunk)
|
||||
// This may happen often with wrapped chunks. Nothing we can really do about
|
||||
// it but returning an error would cause a lot of allocations again. Thus,
|
||||
// we just skip it.
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sh.b.stream = nil
|
||||
sh.b.count = 0
|
||||
p.histogram.Put(c)
|
||||
_, ok = c.(*HistogramChunk)
|
||||
sp = &p.histogram
|
||||
case EncFloatHistogram:
|
||||
sh, ok := c.(*FloatHistogramChunk)
|
||||
// This may happen often with wrapped chunks. Nothing we can really do about
|
||||
// it but returning an error would cause a lot of allocations again. Thus,
|
||||
// we just skip it.
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
sh.b.stream = nil
|
||||
sh.b.count = 0
|
||||
p.floatHistogram.Put(c)
|
||||
_, ok = c.(*FloatHistogramChunk)
|
||||
sp = &p.floatHistogram
|
||||
default:
|
||||
return fmt.Errorf("invalid chunk encoding %q", c.Encoding())
|
||||
}
|
||||
if !ok {
|
||||
// This may happen often with wrapped chunks. Nothing we can really do about
|
||||
// it but returning an error would cause a lot of allocations again. Thus,
|
||||
// we just skip it.
|
||||
return nil
|
||||
}
|
||||
|
||||
c.Reset(nil)
|
||||
sp.Put(c)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -110,6 +110,96 @@ func testChunk(t *testing.T, c Chunk) {
|
|||
require.Equal(t, ValNone, it3.Seek(exp[len(exp)-1].t+1))
|
||||
}
|
||||
|
||||
func TestPool(t *testing.T) {
|
||||
p := NewPool()
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
encoding Encoding
|
||||
expErr error
|
||||
}{
|
||||
{
|
||||
name: "xor",
|
||||
encoding: EncXOR,
|
||||
},
|
||||
{
|
||||
name: "histogram",
|
||||
encoding: EncHistogram,
|
||||
},
|
||||
{
|
||||
name: "float histogram",
|
||||
encoding: EncFloatHistogram,
|
||||
},
|
||||
{
|
||||
name: "invalid encoding",
|
||||
encoding: EncNone,
|
||||
expErr: fmt.Errorf(`invalid chunk encoding "none"`),
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
c, err := p.Get(tc.encoding, []byte("test"))
|
||||
if tc.expErr != nil {
|
||||
require.EqualError(t, err, tc.expErr.Error())
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
var b *bstream
|
||||
switch tc.encoding {
|
||||
case EncHistogram:
|
||||
b = &c.(*HistogramChunk).b
|
||||
case EncFloatHistogram:
|
||||
b = &c.(*FloatHistogramChunk).b
|
||||
default:
|
||||
b = &c.(*XORChunk).b
|
||||
}
|
||||
|
||||
require.Equal(t, &bstream{
|
||||
stream: []byte("test"),
|
||||
count: 0,
|
||||
}, b)
|
||||
|
||||
b.count = 1
|
||||
require.NoError(t, p.Put(c))
|
||||
require.Equal(t, &bstream{
|
||||
stream: nil,
|
||||
count: 0,
|
||||
}, b)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("put bad chunk wrapper", func(t *testing.T) {
|
||||
// When a wrapping chunk poses as an encoding it can't be converted to, Put should skip it.
|
||||
c := fakeChunk{
|
||||
encoding: EncXOR,
|
||||
t: t,
|
||||
}
|
||||
require.NoError(t, p.Put(c))
|
||||
})
|
||||
t.Run("put invalid encoding", func(t *testing.T) {
|
||||
c := fakeChunk{
|
||||
encoding: EncNone,
|
||||
t: t,
|
||||
}
|
||||
require.EqualError(t, p.Put(c), `invalid chunk encoding "none"`)
|
||||
})
|
||||
}
|
||||
|
||||
type fakeChunk struct {
|
||||
Chunk
|
||||
|
||||
encoding Encoding
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (c fakeChunk) Encoding() Encoding {
|
||||
return c.encoding
|
||||
}
|
||||
|
||||
func (c fakeChunk) Reset([]byte) {
|
||||
c.t.Fatal("Reset should not be called")
|
||||
}
|
||||
|
||||
func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
|
||||
const samplesPerChunk = 250
|
||||
var (
|
||||
|
|
|
@ -44,6 +44,10 @@ func NewFloatHistogramChunk() *FloatHistogramChunk {
|
|||
return &FloatHistogramChunk{b: bstream{stream: b, count: 0}}
|
||||
}
|
||||
|
||||
func (c *FloatHistogramChunk) Reset(stream []byte) {
|
||||
c.b.Reset(stream)
|
||||
}
|
||||
|
||||
// xorValue holds all the necessary information to encode
|
||||
// and decode XOR encoded float64 values.
|
||||
type xorValue struct {
|
||||
|
|
|
@ -45,6 +45,10 @@ func NewHistogramChunk() *HistogramChunk {
|
|||
return &HistogramChunk{b: bstream{stream: b, count: 0}}
|
||||
}
|
||||
|
||||
func (c *HistogramChunk) Reset(stream []byte) {
|
||||
c.b.Reset(stream)
|
||||
}
|
||||
|
||||
// Encoding returns the encoding type.
|
||||
func (c *HistogramChunk) Encoding() Encoding {
|
||||
return EncHistogram
|
||||
|
|
|
@ -61,7 +61,7 @@ func putVarbitInt(b *bstream, val int64) {
|
|||
}
|
||||
}
|
||||
|
||||
// readVarbitInt reads an int64 encoced with putVarbitInt.
|
||||
// readVarbitInt reads an int64 encoded with putVarbitInt.
|
||||
func readVarbitInt(b *bstreamReader) (int64, error) {
|
||||
var d byte
|
||||
for i := 0; i < 8; i++ {
|
||||
|
@ -166,7 +166,7 @@ func putVarbitUint(b *bstream, val uint64) {
|
|||
}
|
||||
}
|
||||
|
||||
// readVarbitUint reads a uint64 encoced with putVarbitUint.
|
||||
// readVarbitUint reads a uint64 encoded with putVarbitUint.
|
||||
func readVarbitUint(b *bstreamReader) (uint64, error) {
|
||||
var d byte
|
||||
for i := 0; i < 8; i++ {
|
||||
|
|
|
@ -66,6 +66,10 @@ func NewXORChunk() *XORChunk {
|
|||
return &XORChunk{b: bstream{stream: b, count: 0}}
|
||||
}
|
||||
|
||||
func (c *XORChunk) Reset(stream []byte) {
|
||||
c.b.Reset(stream)
|
||||
}
|
||||
|
||||
// Encoding returns the encoding type.
|
||||
func (c *XORChunk) Encoding() Encoding {
|
||||
return EncXOR
|
||||
|
@ -171,7 +175,6 @@ func (a *xorAppender) Append(t int64, v float64) {
|
|||
}
|
||||
|
||||
a.writeVDelta(v)
|
||||
|
||||
default:
|
||||
tDelta = uint64(t - a.t)
|
||||
dod := int64(tDelta - a.tDelta)
|
||||
|
|
|
@ -233,7 +233,7 @@ func ChunkMetasToSamples(chunks []Meta) (result []Sample) {
|
|||
// Iterator iterates over the chunks of a single time series.
|
||||
type Iterator interface {
|
||||
// At returns the current meta.
|
||||
// It depends on implementation if the chunk is populated or not.
|
||||
// It depends on the implementation whether the chunk is populated or not.
|
||||
At() Meta
|
||||
// Next advances the iterator by one.
|
||||
Next() bool
|
||||
|
@ -478,7 +478,7 @@ func (w *Writer) WriteChunks(chks ...Meta) error {
|
|||
// the batch is too large to fit in the current segment.
|
||||
cutNewBatch := (i != 0) && (batchSize+SegmentHeaderSize > w.segmentSize)
|
||||
|
||||
// When the segment already has some data than
|
||||
// If the segment already has some data then
|
||||
// the first batch size calculation should account for that.
|
||||
if firstBatch && w.n > SegmentHeaderSize {
|
||||
cutNewBatch = batchSize+w.n > w.segmentSize
|
||||
|
@ -717,7 +717,7 @@ func nextSequenceFile(dir string) (string, int, error) {
|
|||
}
|
||||
// It is not necessary that we find the files in number order,
|
||||
// for example with '1000000' and '200000', '1000000' would come first.
|
||||
// Though this is a very very race case, we check anyway for the max id.
|
||||
// Though this is a very very rare case, we check anyway for the max id.
|
||||
if j > i {
|
||||
i = j
|
||||
}
|
||||
|
|
|
@ -188,8 +188,8 @@ func (f *chunkPos) bytesToWriteForChunk(chkLen uint64) uint64 {
|
|||
return bytes
|
||||
}
|
||||
|
||||
// ChunkDiskMapper is for writing the Head block chunks to the disk
|
||||
// and access chunks via mmapped file.
|
||||
// ChunkDiskMapper is for writing the Head block chunks to disk
|
||||
// and access chunks via mmapped files.
|
||||
type ChunkDiskMapper struct {
|
||||
/// Writer.
|
||||
dir *os.File
|
||||
|
@ -231,7 +231,7 @@ type ChunkDiskMapper struct {
|
|||
closed bool
|
||||
}
|
||||
|
||||
// mmappedChunkFile provides mmapp access to an entire head chunks file that holds many chunks.
|
||||
// mmappedChunkFile provides mmap access to an entire head chunks file that holds many chunks.
|
||||
type mmappedChunkFile struct {
|
||||
byteSlice ByteSlice
|
||||
maxt int64 // Max timestamp among all of this file's chunks.
|
||||
|
@ -240,7 +240,7 @@ type mmappedChunkFile struct {
|
|||
// NewChunkDiskMapper returns a new ChunkDiskMapper against the given directory
|
||||
// using the default head chunk file duration.
|
||||
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
|
||||
// to set the maxt of all the file.
|
||||
// to set the maxt of all files.
|
||||
func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Pool, writeBufferSize, writeQueueSize int) (*ChunkDiskMapper, error) {
|
||||
// Validate write buffer size.
|
||||
if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
|
||||
|
@ -425,7 +425,7 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
|
|||
return files, nil
|
||||
}
|
||||
|
||||
// WriteChunk writes the chunk to the disk.
|
||||
// WriteChunk writes the chunk to disk.
|
||||
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
|
||||
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, isOOO bool, callback func(err error)) (chkRef ChunkDiskMapperRef) {
|
||||
// cdm.evtlPosMtx must be held to serialize the calls to cdm.evtlPos.getNextChunkRef() and the writing of the chunk (either with or without queue).
|
||||
|
@ -784,7 +784,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
|
|||
// IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it
|
||||
// and runs the provided function with information about each chunk. It returns on the first error encountered.
|
||||
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper
|
||||
// to set the maxt of all the file.
|
||||
// to set the maxt of all files.
|
||||
func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error) (err error) {
|
||||
cdm.writePathMtx.Lock()
|
||||
defer cdm.writePathMtx.Unlock()
|
||||
|
@ -904,7 +904,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
|
|||
return nil
|
||||
}
|
||||
|
||||
// Truncate deletes the head chunk files whose file number is less than given fileNo.
|
||||
// Truncate deletes the head chunk files with numbers less than the given fileNo.
|
||||
func (cdm *ChunkDiskMapper) Truncate(fileNo uint32) error {
|
||||
cdm.readPathMtx.RLock()
|
||||
|
||||
|
|
|
@ -272,7 +272,7 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
|
|||
meta := dms[i].meta
|
||||
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
|
||||
// If the block is entirely deleted, then we don't care about the block being big enough.
|
||||
// TODO: This is assuming single tombstone is for distinct series, which might be no true.
|
||||
// TODO: This is assuming a single tombstone is for a distinct series, which might not be true.
|
||||
if meta.Stats.NumTombstones > 0 && meta.Stats.NumTombstones >= meta.Stats.NumSeries {
|
||||
return []string{dms[i].dir}, nil
|
||||
}
|
||||
|
@ -372,7 +372,7 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
|
|||
t0 = tr * ((m.MinTime - tr + 1) / tr)
|
||||
}
|
||||
// Skip blocks that don't fall into the range. This can happen via mis-alignment or
|
||||
// by being the multiple of the intended range.
|
||||
// by being a multiple of the intended range.
|
||||
if m.MaxTime > t0+tr {
|
||||
i++
|
||||
continue
|
||||
|
@ -395,7 +395,7 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
|
|||
return splitDirs
|
||||
}
|
||||
|
||||
// CompactBlockMetas merges many block metas into one, combining it's source blocks together
|
||||
// CompactBlockMetas merges many block metas into one, combining its source blocks together
|
||||
// and adjusting compaction level. Min/Max time of result block meta covers all input blocks.
|
||||
func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
|
||||
res := &BlockMeta{
|
||||
|
@ -833,7 +833,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
|
|||
chksIter = s.Iterator(chksIter)
|
||||
chks = chks[:0]
|
||||
for chksIter.Next() {
|
||||
// We are not iterating in streaming way over chunk as
|
||||
// We are not iterating in a streaming way over chunks as
|
||||
// it's more efficient to do bulk write for index and
|
||||
// chunk file purposes.
|
||||
chks = append(chks, chksIter.At())
|
||||
|
@ -842,7 +842,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
|
|||
return fmt.Errorf("chunk iter: %w", err)
|
||||
}
|
||||
|
||||
// Skip the series with all deleted chunks.
|
||||
// Skip series with all deleted chunks.
|
||||
if len(chks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -206,7 +206,7 @@ type DB struct {
|
|||
compactor Compactor
|
||||
blocksToDelete BlocksToDeleteFunc
|
||||
|
||||
// Mutex for that must be held when modifying the general block layout or lastGarbageCollectedMmapRef.
|
||||
// mtx must be held when modifying the general block layout or lastGarbageCollectedMmapRef.
|
||||
mtx sync.RWMutex
|
||||
blocks []*Block
|
||||
|
||||
|
@ -1431,7 +1431,7 @@ func (db *DB) reloadBlocks() (err error) {
|
|||
db.metrics.reloads.Inc()
|
||||
}()
|
||||
|
||||
// Now that we reload TSDB every minute, there is high chance for race condition with a reload
|
||||
// Now that we reload TSDB every minute, there is a high chance for a race condition with a reload
|
||||
// triggered by CleanTombstones(). We need to lock the reload to avoid the situation where
|
||||
// a normal reload and CleanTombstones try to delete the same block.
|
||||
db.mtx.Lock()
|
||||
|
|
|
@ -27,10 +27,10 @@ in-file offset (lower 4 bytes) and segment sequence number (upper 4 bytes).
|
|||
|
||||
# Chunk
|
||||
|
||||
Unlike chunks in the on-disk blocks, here we additionally store series
|
||||
reference that the chunks belongs to and the mint/maxt of the chunks. This is
|
||||
because we don't have an index associated with these chunks, hence these meta
|
||||
information are used while replaying the chunks.
|
||||
Unlike chunks in the on-disk blocks, here we additionally store the series
|
||||
reference that each chunk belongs to and the mint/maxt of the chunks. This is
|
||||
because we don't have an index associated with these chunks, hence this metadata
|
||||
is used while replaying the chunks.
|
||||
|
||||
```
|
||||
┌─────────────────────┬───────────────────────┬───────────────────────┬───────────────────┬───────────────┬──────────────┬────────────────┐
|
||||
|
|
|
@ -40,7 +40,7 @@ Most of the sections described below start with a `len` field. It always specifi
|
|||
|
||||
### Symbol Table
|
||||
|
||||
The symbol table holds a sorted list of deduplicated strings that occurred in label pairs of the stored series. They can be referenced from subsequent sections and significantly reduce the total index size.
|
||||
The symbol table holds a sorted list of deduplicated strings that occur in label pairs of the stored series. They can be referenced from subsequent sections and significantly reduce the total index size.
|
||||
|
||||
The section contains a sequence of the string entries, each prefixed with the string's length in raw bytes. All strings are utf-8 encoded.
|
||||
Strings are referenced by sequential indexing. The strings are sorted in lexicographically ascending order.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Usage
|
||||
|
||||
TSDB can be - and is - used by other applications such as [Cortex](https://cortexmetrics.io/) and [Thanos](https://thanos.io/).
|
||||
TSDB can be - and is - used by other applications such as [Cortex](https://cortexmetrics.io/), [Thanos](https://thanos.io/), and [Grafana Mimir](https://grafana.com/oss/mimir/).
|
||||
This directory contains documentation for any developers who wish to work on or with TSDB.
|
||||
|
||||
For a full example of instantiating a database, adding and querying data, see the [tsdb example in the docs](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb).
|
||||
|
@ -18,7 +18,7 @@ A `DB` has the following main components:
|
|||
* [`Head`](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb#DB.Head)
|
||||
* [Blocks (persistent blocks)](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb#DB.Blocks)
|
||||
|
||||
The `Head` is responsible for a lot. Here are its main components:
|
||||
The `Head` is responsible for a lot. Here are its main components:
|
||||
|
||||
* [WAL](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb/wal#WAL) (Write Ahead Log).
|
||||
* [`stripeSeries`](https://github.com/prometheus/prometheus/blob/411021ada9ab41095923b8d2df9365b632fd40c3/tsdb/head.go#L1292):
|
||||
|
|
|
@ -111,7 +111,7 @@ func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics {
|
|||
return &m
|
||||
}
|
||||
|
||||
// NewCircularExemplarStorage creates an circular in memory exemplar storage.
|
||||
// NewCircularExemplarStorage creates a circular in memory exemplar storage.
|
||||
// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in
|
||||
// 1GB of extra memory, accounting for the fact that this is heap allocated space.
|
||||
// If len <= 0, then the exemplar storage is essentially a noop storage but can later be
|
||||
|
|
|
@ -1467,8 +1467,8 @@ func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count i
|
|||
return
|
||||
}
|
||||
|
||||
// Write chunks starting from the oldest one and stop before we get to current s.headChunk.
|
||||
// If we have this chain: s.headChunk{t4} -> t3 -> t2 -> t1 -> t0
|
||||
// Write chunks starting from the oldest one and stop before we get to current s.headChunks.
|
||||
// If we have this chain: s.headChunks{t4} -> t3 -> t2 -> t1 -> t0
|
||||
// then we need to write chunks t0 to t3, but skip s.headChunks.
|
||||
for i := s.headChunks.len() - 1; i > 0; i-- {
|
||||
chk := s.headChunks.atOffset(i)
|
||||
|
|
|
@ -121,6 +121,10 @@ func (h *headIndexReader) Postings(ctx context.Context, name string, values ...s
|
|||
}
|
||||
}
|
||||
|
||||
func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||
return h.head.postings.PostingsForLabelMatching(ctx, name, match)
|
||||
}
|
||||
|
||||
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||
series := make([]*memSeries, 0, 128)
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -552,3 +553,25 @@ func TestMemSeries_chunk(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHeadIndexReader_PostingsForLabelMatching(t *testing.T) {
|
||||
testPostingsForLabelMatching(t, 0, func(t *testing.T, series []labels.Labels) IndexReader {
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = 1000
|
||||
opts.ChunkDirRoot = t.TempDir()
|
||||
h, err := NewHead(nil, nil, nil, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, h.Close())
|
||||
})
|
||||
app := h.Appender(context.Background())
|
||||
for _, s := range series {
|
||||
app.Append(0, s, 0, 0)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
ir, err := h.Index()
|
||||
require.NoError(t, err)
|
||||
return ir
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1496,7 +1496,7 @@ Outer:
|
|||
}
|
||||
|
||||
default:
|
||||
// This is a record type we don't understand. It is either and old format from earlier versions,
|
||||
// This is a record type we don't understand. It is either an old format from earlier versions,
|
||||
// or a new format and the code was rolled back to old version.
|
||||
loopErr = fmt.Errorf("unsupported snapshot record type 0b%b", rec[0])
|
||||
break Outer
|
||||
|
|
|
@ -158,7 +158,7 @@ type Writer struct {
|
|||
postingsEncoder PostingsEncoder
|
||||
}
|
||||
|
||||
// TOC represents index Table Of Content that states where each section of index starts.
|
||||
// TOC represents the index Table Of Contents that states where each section of the index starts.
|
||||
type TOC struct {
|
||||
Symbols uint64
|
||||
Series uint64
|
||||
|
@ -168,7 +168,7 @@ type TOC struct {
|
|||
PostingsTable uint64
|
||||
}
|
||||
|
||||
// NewTOCFromByteSlice return parsed TOC from given index byte slice.
|
||||
// NewTOCFromByteSlice returns a parsed TOC from the given index byte slice.
|
||||
func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
|
||||
if bs.Len() < indexTOCLen {
|
||||
return nil, encoding.ErrInvalidSize
|
||||
|
@ -1536,36 +1536,14 @@ func (r *Reader) LabelValues(ctx context.Context, name string, matchers ...*labe
|
|||
if len(e) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
values := make([]string, 0, len(e)*symbolFactor)
|
||||
|
||||
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
|
||||
d.Skip(e[0].off)
|
||||
lastVal := e[len(e)-1].value
|
||||
|
||||
skip := 0
|
||||
for d.Err() == nil && ctx.Err() == nil {
|
||||
if skip == 0 {
|
||||
// These are always the same number of bytes,
|
||||
// and it's faster to skip than parse.
|
||||
skip = d.Len()
|
||||
d.Uvarint() // Keycount.
|
||||
d.UvarintBytes() // Label name.
|
||||
skip -= d.Len()
|
||||
} else {
|
||||
d.Skip(skip)
|
||||
}
|
||||
s := yoloString(d.UvarintBytes()) // Label value.
|
||||
values = append(values, s)
|
||||
if s == lastVal {
|
||||
break
|
||||
}
|
||||
d.Uvarint64() // Offset.
|
||||
}
|
||||
if d.Err() != nil {
|
||||
return nil, fmt.Errorf("get postings offset entry: %w", d.Err())
|
||||
}
|
||||
|
||||
return values, ctx.Err()
|
||||
err := r.traversePostingOffsets(ctx, e[0].off, func(val string, _ uint64) (bool, error) {
|
||||
values = append(values, val)
|
||||
return val != lastVal, nil
|
||||
})
|
||||
return values, err
|
||||
}
|
||||
|
||||
// LabelNamesFor returns all the label names for the series referred to by IDs.
|
||||
|
@ -1662,6 +1640,44 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch
|
|||
return nil
|
||||
}
|
||||
|
||||
// traversePostingOffsets traverses r's posting offsets table, starting at off, and calls cb with every label value and postings offset.
|
||||
// If cb returns false (or an error), the traversing is interrupted.
|
||||
func (r *Reader) traversePostingOffsets(ctx context.Context, off int, cb func(string, uint64) (bool, error)) error {
|
||||
// Don't Crc32 the entire postings offset table, this is very slow
|
||||
// so hope any issues were caught at startup.
|
||||
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
|
||||
d.Skip(off)
|
||||
skip := 0
|
||||
ctxErr := ctx.Err()
|
||||
for d.Err() == nil && ctxErr == nil {
|
||||
if skip == 0 {
|
||||
// These are always the same number of bytes,
|
||||
// and it's faster to skip than to parse.
|
||||
skip = d.Len()
|
||||
d.Uvarint() // Keycount.
|
||||
d.UvarintBytes() // Label name.
|
||||
skip -= d.Len()
|
||||
} else {
|
||||
d.Skip(skip)
|
||||
}
|
||||
v := yoloString(d.UvarintBytes()) // Label value.
|
||||
postingsOff := d.Uvarint64() // Offset.
|
||||
if ok, err := cb(v, postingsOff); err != nil {
|
||||
return err
|
||||
} else if !ok {
|
||||
break
|
||||
}
|
||||
ctxErr = ctx.Err()
|
||||
}
|
||||
if d.Err() != nil {
|
||||
return fmt.Errorf("get postings offset entry: %w", d.Err())
|
||||
}
|
||||
if ctxErr != nil {
|
||||
return fmt.Errorf("get postings offset entry: %w", ctxErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reader) Postings(ctx context.Context, name string, values ...string) (Postings, error) {
|
||||
if r.version == FormatV1 {
|
||||
e, ok := r.postingsV1[name]
|
||||
|
@ -1696,7 +1712,6 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
|
|||
|
||||
slices.Sort(values) // Values must be in order so we can step through the table on disk.
|
||||
res := make([]Postings, 0, len(values))
|
||||
skip := 0
|
||||
valueIndex := 0
|
||||
for valueIndex < len(values) && values[valueIndex] < e[0].value {
|
||||
// Discard values before the start.
|
||||
|
@ -1714,33 +1729,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
|
|||
// Need to look from previous entry.
|
||||
i--
|
||||
}
|
||||
// Don't Crc32 the entire postings offset table, this is very slow
|
||||
// so hope any issues were caught at startup.
|
||||
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
|
||||
d.Skip(e[i].off)
|
||||
|
||||
// Iterate on the offset table.
|
||||
var postingsOff uint64 // The offset into the postings table.
|
||||
for d.Err() == nil && ctx.Err() == nil {
|
||||
if skip == 0 {
|
||||
// These are always the same number of bytes,
|
||||
// and it's faster to skip than parse.
|
||||
skip = d.Len()
|
||||
d.Uvarint() // Keycount.
|
||||
d.UvarintBytes() // Label name.
|
||||
skip -= d.Len()
|
||||
} else {
|
||||
d.Skip(skip)
|
||||
}
|
||||
v := d.UvarintBytes() // Label value.
|
||||
postingsOff = d.Uvarint64() // Offset.
|
||||
for string(v) >= value {
|
||||
if string(v) == value {
|
||||
if err := r.traversePostingOffsets(ctx, e[i].off, func(val string, postingsOff uint64) (bool, error) {
|
||||
for val >= value {
|
||||
if val == value {
|
||||
// Read from the postings table.
|
||||
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
|
||||
_, p, err := r.dec.Postings(d2.Get())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode postings: %w", err)
|
||||
return false, fmt.Errorf("decode postings: %w", err)
|
||||
}
|
||||
res = append(res, p)
|
||||
}
|
||||
|
@ -1752,20 +1749,72 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
|
|||
}
|
||||
if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) {
|
||||
// Need to go to a later postings offset entry, if there is one.
|
||||
break
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
if d.Err() != nil {
|
||||
return nil, fmt.Errorf("get postings offset entry: %w", d.Err())
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return nil, fmt.Errorf("get postings offset entry: %w", ctx.Err())
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return Merge(ctx, res...), nil
|
||||
}
|
||||
|
||||
func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
|
||||
if r.version == FormatV1 {
|
||||
return r.postingsForLabelMatchingV1(ctx, name, match)
|
||||
}
|
||||
|
||||
e := r.postings[name]
|
||||
if len(e) == 0 {
|
||||
return EmptyPostings()
|
||||
}
|
||||
|
||||
lastVal := e[len(e)-1].value
|
||||
var its []Postings
|
||||
if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) {
|
||||
if match(val) {
|
||||
// We want this postings iterator since the value is a match
|
||||
postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
|
||||
_, p, err := r.dec.PostingsFromDecbuf(postingsDec)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("decode postings: %w", err)
|
||||
}
|
||||
its = append(its, p)
|
||||
}
|
||||
return val != lastVal, nil
|
||||
}); err != nil {
|
||||
return ErrPostings(err)
|
||||
}
|
||||
|
||||
return Merge(ctx, its...)
|
||||
}
|
||||
|
||||
func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, match func(string) bool) Postings {
|
||||
e := r.postingsV1[name]
|
||||
if len(e) == 0 {
|
||||
return EmptyPostings()
|
||||
}
|
||||
|
||||
var its []Postings
|
||||
for val, offset := range e {
|
||||
if !match(val) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Read from the postings table.
|
||||
d := encoding.NewDecbufAt(r.b, int(offset), castagnoliTable)
|
||||
_, p, err := r.dec.PostingsFromDecbuf(d)
|
||||
if err != nil {
|
||||
return ErrPostings(fmt.Errorf("decode postings: %w", err))
|
||||
}
|
||||
|
||||
its = append(its, p)
|
||||
}
|
||||
|
||||
return Merge(ctx, its...)
|
||||
}
|
||||
|
||||
// SortedPostings returns the given postings list reordered so that the backing series
|
||||
// are sorted.
|
||||
func (r *Reader) SortedPostings(p Postings) Postings {
|
||||
|
@ -1856,6 +1905,11 @@ type Decoder struct {
|
|||
// Postings returns a postings list for b and its number of elements.
|
||||
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
|
||||
d := encoding.Decbuf{B: b}
|
||||
return dec.PostingsFromDecbuf(d)
|
||||
}
|
||||
|
||||
// PostingsFromDecbuf returns a postings list for d and its number of elements.
|
||||
func (dec *Decoder) PostingsFromDecbuf(d encoding.Decbuf) (int, Postings, error) {
|
||||
n := d.Be32int()
|
||||
l := d.Get()
|
||||
if d.Err() != nil {
|
||||
|
|
|
@ -397,6 +397,35 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
|
||||
p.mtx.RLock()
|
||||
|
||||
e := p.m[name]
|
||||
if len(e) == 0 {
|
||||
p.mtx.RUnlock()
|
||||
return EmptyPostings()
|
||||
}
|
||||
|
||||
// Benchmarking shows that first copying the values into a slice and then matching over that is
|
||||
// faster than matching over the map keys directly, at least on AMD64.
|
||||
vals := make([]string, 0, len(e))
|
||||
for v, srs := range e {
|
||||
if len(srs) > 0 {
|
||||
vals = append(vals, v)
|
||||
}
|
||||
}
|
||||
|
||||
var its []Postings
|
||||
for _, v := range vals {
|
||||
if match(v) {
|
||||
its = append(its, NewListPostings(e[v]))
|
||||
}
|
||||
}
|
||||
p.mtx.RUnlock()
|
||||
|
||||
return Merge(ctx, its...)
|
||||
}
|
||||
|
||||
// ExpandPostings returns the postings expanded as a slice.
|
||||
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
|
||||
for p.Next() {
|
||||
|
|
|
@ -446,6 +446,10 @@ func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string,
|
|||
return index.NewListPostings(ir.ch.postings), nil
|
||||
}
|
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
|
||||
return index.ErrPostings(errors.New("not supported"))
|
||||
}
|
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||
// This will already be sorted from the Postings() call above.
|
||||
return p
|
||||
|
|
|
@ -326,23 +326,8 @@ func postingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Matcher)
|
|||
}
|
||||
}
|
||||
|
||||
vals, err := ix.LabelValues(ctx, m.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := vals[:0]
|
||||
for _, val := range vals {
|
||||
if m.Matches(val) {
|
||||
res = append(res, val)
|
||||
}
|
||||
}
|
||||
|
||||
if len(res) == 0 {
|
||||
return index.EmptyPostings(), nil
|
||||
}
|
||||
|
||||
return ix.Postings(ctx, m.Name, res...)
|
||||
it := ix.PostingsForLabelMatching(ctx, m.Name, m.Matches)
|
||||
return it, it.Err()
|
||||
}
|
||||
|
||||
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
||||
|
|
|
@ -2326,6 +2326,16 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
|
|||
return index.NewListPostings(ep)
|
||||
}
|
||||
|
||||
func (m mockIndex) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||
var res []index.Postings
|
||||
for l, srs := range m.postings {
|
||||
if l.Name == name && match(l.Value) {
|
||||
res = append(res, index.NewListPostings(srs))
|
||||
}
|
||||
}
|
||||
return index.Merge(ctx, res...)
|
||||
}
|
||||
|
||||
func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||
out := make([]storage.SeriesRef, 0, 128)
|
||||
|
||||
|
@ -3238,6 +3248,10 @@ func (m mockMatcherIndex) LabelNames(context.Context, ...*labels.Matcher) ([]str
|
|||
return []string{}, nil
|
||||
}
|
||||
|
||||
func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
|
||||
return index.ErrPostings(fmt.Errorf("PostingsForLabelMatching called"))
|
||||
}
|
||||
|
||||
func TestPostingsForMatcher(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
|
|
|
@ -163,7 +163,7 @@ type RefMetadata struct {
|
|||
Help string
|
||||
}
|
||||
|
||||
// RefExemplar is an exemplar with it's labels, timestamp, value the exemplar was collected/observed with, and a reference to a series.
|
||||
// RefExemplar is an exemplar with the labels, timestamp, value the exemplar was collected/observed with, and a reference to a series.
|
||||
type RefExemplar struct {
|
||||
Ref chunks.HeadSeriesRef
|
||||
T int64
|
||||
|
@ -798,7 +798,7 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b
|
|||
return buf.Get()
|
||||
}
|
||||
|
||||
// Encode encodes the Float Histogram into a byte slice.
|
||||
// EncodeFloatHistogram encodes the Float Histogram into a byte slice.
|
||||
func EncodeFloatHistogram(buf *encoding.Encbuf, h *histogram.FloatHistogram) {
|
||||
buf.PutByte(byte(h.CounterResetHint))
|
||||
|
||||
|
|
41
util/almost/almost.go
Normal file
41
util/almost/almost.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package almost
|
||||
|
||||
import "math"
|
||||
|
||||
var minNormal = math.Float64frombits(0x0010000000000000) // The smallest positive normal value of type float64.
|
||||
|
||||
// Equal returns true if a and b differ by less than their sum
|
||||
// multiplied by epsilon.
|
||||
func Equal(a, b, epsilon float64) bool {
|
||||
// NaN has no equality but for testing we still want to know whether both values
|
||||
// are NaN.
|
||||
if math.IsNaN(a) && math.IsNaN(b) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Cf. http://floating-point-gui.de/errors/comparison/
|
||||
if a == b {
|
||||
return true
|
||||
}
|
||||
|
||||
absSum := math.Abs(a) + math.Abs(b)
|
||||
diff := math.Abs(a - b)
|
||||
|
||||
if a == 0 || b == 0 || absSum < minNormal {
|
||||
return diff < epsilon*minNormal
|
||||
}
|
||||
return diff/math.Min(absSum, math.MaxFloat64) < epsilon
|
||||
}
|
|
@ -49,6 +49,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/timestamp"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
"github.com/prometheus/prometheus/rules"
|
||||
"github.com/prometheus/prometheus/scrape"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
|
@ -338,7 +339,7 @@ var sampleFlagMap = map[string]string{
|
|||
}
|
||||
|
||||
func TestEndpoints(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
test_metric1{foo="bar"} 0+100x100
|
||||
test_metric1{foo="boo"} 1+0x100
|
||||
|
@ -502,7 +503,7 @@ func (b byLabels) Less(i, j int) bool { return labels.Compare(b[i], b[j]) < 0 }
|
|||
func TestGetSeries(t *testing.T) {
|
||||
// TestEndpoints doesn't have enough label names to test api.labelNames
|
||||
// endpoint properly. Hence we test it separately.
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
test_metric1{foo1="bar", baz="abc"} 0+100x100
|
||||
test_metric1{foo2="boo"} 1+0x100
|
||||
|
@ -606,7 +607,7 @@ func TestGetSeries(t *testing.T) {
|
|||
|
||||
func TestQueryExemplars(t *testing.T) {
|
||||
start := time.Unix(0, 0)
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
test_metric1{foo="bar"} 0+100x100
|
||||
test_metric1{foo="boo"} 1+0x100
|
||||
|
@ -725,7 +726,7 @@ func TestQueryExemplars(t *testing.T) {
|
|||
func TestLabelNames(t *testing.T) {
|
||||
// TestEndpoints doesn't have enough label names to test api.labelNames
|
||||
// endpoint properly. Hence we test it separately.
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
test_metric1{foo1="bar", baz="abc"} 0+100x100
|
||||
test_metric1{foo2="boo"} 1+0x100
|
||||
|
@ -3835,7 +3836,7 @@ func TestExtractQueryOpts(t *testing.T) {
|
|||
|
||||
// Test query timeout parameter.
|
||||
func TestQueryTimeout(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
test_metric1{foo="bar"} 0+100x100
|
||||
`)
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/textparse"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/promqltest"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
|
@ -201,7 +202,7 @@ test_metric_without_labels{instance="baz"} 1001 6000000
|
|||
}
|
||||
|
||||
func TestFederation(t *testing.T) {
|
||||
storage := promql.LoadedStorage(t, `
|
||||
storage := promqltest.LoadedStorage(t, `
|
||||
load 1m
|
||||
test_metric1{foo="bar",instance="i"} 0+100x100
|
||||
test_metric1{foo="boo",instance="i"} 1+0x100
|
||||
|
|
Loading…
Reference in a new issue