mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 22:19:40 -08:00
fc9dc72028
Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
1321 lines
37 KiB
Go
1321 lines
37 KiB
Go
// Copyright 2015 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package promqltest
|
|
|
|
import (
|
|
"context"
|
|
"embed"
|
|
"errors"
|
|
"fmt"
|
|
"io/fs"
|
|
"math"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/grafana/regexp"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/prometheus/prometheus/model/exemplar"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/promql"
|
|
"github.com/prometheus/prometheus/promql/parser"
|
|
"github.com/prometheus/prometheus/promql/parser/posrange"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/util/almost"
|
|
"github.com/prometheus/prometheus/util/teststorage"
|
|
"github.com/prometheus/prometheus/util/testutil"
|
|
)
|
|
|
|
var (
|
|
patSpace = regexp.MustCompile("[\t ]+")
|
|
patLoad = regexp.MustCompile(`^load(?:_(with_nhcb))?\s+(.+?)$`)
|
|
patEvalInstant = regexp.MustCompile(`^eval(?:_(fail|warn|ordered))?\s+instant\s+(?:at\s+(.+?))?\s+(.+)$`)
|
|
patEvalRange = regexp.MustCompile(`^eval(?:_(fail|warn))?\s+range\s+from\s+(.+)\s+to\s+(.+)\s+step\s+(.+?)\s+(.+)$`)
|
|
)
|
|
|
|
const (
|
|
defaultEpsilon = 0.000001 // Relative error allowed for sample values.
|
|
DefaultMaxSamplesPerQuery = 10000
|
|
)
|
|
|
|
var testStartTime = time.Unix(0, 0).UTC()
|
|
|
|
// LoadedStorage returns storage with generated data using the provided load statements.
|
|
// Non-load statements will cause test errors.
|
|
func LoadedStorage(t testutil.T, input string) *teststorage.TestStorage {
|
|
test, err := newTest(t, input)
|
|
require.NoError(t, err)
|
|
|
|
for _, cmd := range test.cmds {
|
|
switch cmd.(type) {
|
|
case *loadCmd:
|
|
require.NoError(t, test.exec(cmd, nil))
|
|
default:
|
|
t.Errorf("only 'load' commands accepted, got '%s'", cmd)
|
|
}
|
|
}
|
|
return test.storage
|
|
}
|
|
|
|
func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine {
|
|
return promql.NewEngine(promql.EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: maxSamples,
|
|
Timeout: 100 * time.Second,
|
|
NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) },
|
|
EnableAtModifier: true,
|
|
EnableNegativeOffset: true,
|
|
EnablePerStepStats: enablePerStepStats,
|
|
LookbackDelta: lookbackDelta,
|
|
})
|
|
}
|
|
|
|
// RunBuiltinTests runs an acceptance test suite against the provided engine.
|
|
func RunBuiltinTests(t *testing.T, engine promql.QueryEngine) {
|
|
t.Cleanup(func() { parser.EnableExperimentalFunctions = false })
|
|
parser.EnableExperimentalFunctions = true
|
|
|
|
files, err := fs.Glob(testsFs, "*/*.test")
|
|
require.NoError(t, err)
|
|
|
|
for _, fn := range files {
|
|
t.Run(fn, func(t *testing.T) {
|
|
content, err := fs.ReadFile(testsFs, fn)
|
|
require.NoError(t, err)
|
|
RunTest(t, string(content), engine)
|
|
})
|
|
}
|
|
}
|
|
|
|
// RunTest parses and runs the test against the provided engine.
|
|
func RunTest(t testutil.T, input string, engine promql.QueryEngine) {
|
|
require.NoError(t, runTest(t, input, engine))
|
|
}
|
|
|
|
func runTest(t testutil.T, input string, engine promql.QueryEngine) error {
|
|
test, err := newTest(t, input)
|
|
|
|
// Why do this before checking err? newTest() can create the test storage and then return an error,
|
|
// and we want to make sure to clean that up to avoid leaking goroutines.
|
|
defer func() {
|
|
if test == nil {
|
|
return
|
|
}
|
|
if test.storage != nil {
|
|
test.storage.Close()
|
|
}
|
|
if test.cancelCtx != nil {
|
|
test.cancelCtx()
|
|
}
|
|
}()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, cmd := range test.cmds {
|
|
if err := test.exec(cmd, engine); err != nil {
|
|
// TODO(fabxc): aggregate command errors, yield diffs for result
|
|
// comparison errors.
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// test is a sequence of read and write commands that are run
|
|
// against a test storage.
|
|
type test struct {
|
|
testutil.T
|
|
|
|
cmds []testCommand
|
|
|
|
storage *teststorage.TestStorage
|
|
|
|
context context.Context
|
|
cancelCtx context.CancelFunc
|
|
}
|
|
|
|
// newTest returns an initialized empty Test.
|
|
func newTest(t testutil.T, input string) (*test, error) {
|
|
test := &test{
|
|
T: t,
|
|
cmds: []testCommand{},
|
|
}
|
|
err := test.parse(input)
|
|
test.clear()
|
|
|
|
return test, err
|
|
}
|
|
|
|
//go:embed testdata
|
|
var testsFs embed.FS
|
|
|
|
func raise(line int, format string, v ...interface{}) error {
|
|
return &parser.ParseErr{
|
|
LineOffset: line,
|
|
Err: fmt.Errorf(format, v...),
|
|
}
|
|
}
|
|
|
|
func parseLoad(lines []string, i int) (int, *loadCmd, error) {
|
|
if !patLoad.MatchString(lines[i]) {
|
|
return i, nil, raise(i, "invalid load command. (load[_with_nhcb] <step:duration>)")
|
|
}
|
|
parts := patLoad.FindStringSubmatch(lines[i])
|
|
var (
|
|
withNHCB = parts[1] == "with_nhcb"
|
|
step = parts[2]
|
|
)
|
|
gap, err := model.ParseDuration(step)
|
|
if err != nil {
|
|
return i, nil, raise(i, "invalid step definition %q: %s", step, err)
|
|
}
|
|
cmd := newLoadCmd(time.Duration(gap), withNHCB)
|
|
for i+1 < len(lines) {
|
|
i++
|
|
defLine := lines[i]
|
|
if len(defLine) == 0 {
|
|
i--
|
|
break
|
|
}
|
|
metric, vals, err := parseSeries(defLine, i)
|
|
if err != nil {
|
|
return i, nil, err
|
|
}
|
|
cmd.set(metric, vals...)
|
|
}
|
|
return i, cmd, nil
|
|
}
|
|
|
|
func parseSeries(defLine string, line int) (labels.Labels, []parser.SequenceValue, error) {
|
|
metric, vals, err := parser.ParseSeriesDesc(defLine)
|
|
if err != nil {
|
|
parser.EnrichParseError(err, func(parseErr *parser.ParseErr) {
|
|
parseErr.LineOffset = line
|
|
})
|
|
return labels.Labels{}, nil, err
|
|
}
|
|
return metric, vals, nil
|
|
}
|
|
|
|
func (t *test) parseEval(lines []string, i int) (int, *evalCmd, error) {
|
|
instantParts := patEvalInstant.FindStringSubmatch(lines[i])
|
|
rangeParts := patEvalRange.FindStringSubmatch(lines[i])
|
|
|
|
if instantParts == nil && rangeParts == nil {
|
|
return i, nil, raise(i, "invalid evaluation command. Must be either 'eval[_fail|_warn|_ordered] instant [at <offset:duration>] <query>' or 'eval[_fail|_warn] range from <from> to <to> step <step> <query>'")
|
|
}
|
|
|
|
isInstant := instantParts != nil
|
|
|
|
var mod string
|
|
var expr string
|
|
|
|
if isInstant {
|
|
mod = instantParts[1]
|
|
expr = instantParts[3]
|
|
} else {
|
|
mod = rangeParts[1]
|
|
expr = rangeParts[5]
|
|
}
|
|
|
|
_, err := parser.ParseExpr(expr)
|
|
if err != nil {
|
|
parser.EnrichParseError(err, func(parseErr *parser.ParseErr) {
|
|
parseErr.LineOffset = i
|
|
posOffset := posrange.Pos(strings.Index(lines[i], expr))
|
|
parseErr.PositionRange.Start += posOffset
|
|
parseErr.PositionRange.End += posOffset
|
|
parseErr.Query = lines[i]
|
|
})
|
|
return i, nil, err
|
|
}
|
|
|
|
formatErr := func(format string, args ...any) error {
|
|
combinedArgs := []any{expr, i + 1}
|
|
|
|
combinedArgs = append(combinedArgs, args...)
|
|
return fmt.Errorf("error in eval %s (line %v): "+format, combinedArgs...)
|
|
}
|
|
|
|
var cmd *evalCmd
|
|
|
|
if isInstant {
|
|
at := instantParts[2]
|
|
offset, err := model.ParseDuration(at)
|
|
if err != nil {
|
|
return i, nil, formatErr("invalid timestamp definition %q: %s", at, err)
|
|
}
|
|
ts := testStartTime.Add(time.Duration(offset))
|
|
cmd = newInstantEvalCmd(expr, ts, i+1)
|
|
} else {
|
|
from := rangeParts[2]
|
|
to := rangeParts[3]
|
|
step := rangeParts[4]
|
|
|
|
parsedFrom, err := model.ParseDuration(from)
|
|
if err != nil {
|
|
return i, nil, formatErr("invalid start timestamp definition %q: %s", from, err)
|
|
}
|
|
|
|
parsedTo, err := model.ParseDuration(to)
|
|
if err != nil {
|
|
return i, nil, formatErr("invalid end timestamp definition %q: %s", to, err)
|
|
}
|
|
|
|
if parsedTo < parsedFrom {
|
|
return i, nil, formatErr("invalid test definition, end timestamp (%s) is before start timestamp (%s)", to, from)
|
|
}
|
|
|
|
parsedStep, err := model.ParseDuration(step)
|
|
if err != nil {
|
|
return i, nil, formatErr("invalid step definition %q: %s", step, err)
|
|
}
|
|
|
|
cmd = newRangeEvalCmd(expr, testStartTime.Add(time.Duration(parsedFrom)), testStartTime.Add(time.Duration(parsedTo)), time.Duration(parsedStep), i+1)
|
|
}
|
|
|
|
switch mod {
|
|
case "ordered":
|
|
// Ordered results are not supported for range queries, but the regex for range query commands does not allow
|
|
// asserting an ordered result, so we don't need to do any error checking here.
|
|
cmd.ordered = true
|
|
case "fail":
|
|
cmd.fail = true
|
|
case "warn":
|
|
cmd.warn = true
|
|
}
|
|
|
|
for j := 1; i+1 < len(lines); j++ {
|
|
i++
|
|
defLine := lines[i]
|
|
if len(defLine) == 0 {
|
|
i--
|
|
break
|
|
}
|
|
|
|
if cmd.fail && strings.HasPrefix(defLine, "expected_fail_message") {
|
|
cmd.expectedFailMessage = strings.TrimSpace(strings.TrimPrefix(defLine, "expected_fail_message"))
|
|
break
|
|
}
|
|
|
|
if cmd.fail && strings.HasPrefix(defLine, "expected_fail_regexp") {
|
|
pattern := strings.TrimSpace(strings.TrimPrefix(defLine, "expected_fail_regexp"))
|
|
cmd.expectedFailRegexp, err = regexp.Compile(pattern)
|
|
if err != nil {
|
|
return i, nil, formatErr("invalid regexp '%s' for expected_fail_regexp: %w", pattern, err)
|
|
}
|
|
break
|
|
}
|
|
|
|
if f, err := parseNumber(defLine); err == nil {
|
|
cmd.expect(0, parser.SequenceValue{Value: f})
|
|
break
|
|
}
|
|
metric, vals, err := parseSeries(defLine, i)
|
|
if err != nil {
|
|
return i, nil, err
|
|
}
|
|
|
|
// Currently, we are not expecting any matrices.
|
|
if len(vals) > 1 && isInstant {
|
|
return i, nil, formatErr("expecting multiple values in instant evaluation not allowed")
|
|
}
|
|
cmd.expectMetric(j, metric, vals...)
|
|
}
|
|
return i, cmd, nil
|
|
}
|
|
|
|
// getLines returns trimmed lines after removing the comments.
|
|
func getLines(input string) []string {
|
|
lines := strings.Split(input, "\n")
|
|
for i, l := range lines {
|
|
l = strings.TrimSpace(l)
|
|
if strings.HasPrefix(l, "#") {
|
|
l = ""
|
|
}
|
|
lines[i] = l
|
|
}
|
|
return lines
|
|
}
|
|
|
|
// parse the given command sequence and appends it to the test.
|
|
func (t *test) parse(input string) error {
|
|
lines := getLines(input)
|
|
var err error
|
|
// Scan for steps line by line.
|
|
for i := 0; i < len(lines); i++ {
|
|
l := lines[i]
|
|
if len(l) == 0 {
|
|
continue
|
|
}
|
|
var cmd testCommand
|
|
|
|
switch c := strings.ToLower(patSpace.Split(l, 2)[0]); {
|
|
case c == "clear":
|
|
cmd = &clearCmd{}
|
|
case strings.HasPrefix(c, "load"):
|
|
i, cmd, err = parseLoad(lines, i)
|
|
case strings.HasPrefix(c, "eval"):
|
|
i, cmd, err = t.parseEval(lines, i)
|
|
default:
|
|
return raise(i, "invalid command %q", l)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.cmds = append(t.cmds, cmd)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// testCommand is an interface that ensures that only the package internal
|
|
// types can be a valid command for a test.
|
|
type testCommand interface {
|
|
testCmd()
|
|
}
|
|
|
|
func (*clearCmd) testCmd() {}
|
|
func (*loadCmd) testCmd() {}
|
|
func (*evalCmd) testCmd() {}
|
|
|
|
// loadCmd is a command that loads sequences of sample values for specific
|
|
// metrics into the storage.
|
|
type loadCmd struct {
|
|
gap time.Duration
|
|
metrics map[uint64]labels.Labels
|
|
defs map[uint64][]promql.Sample
|
|
exemplars map[uint64][]exemplar.Exemplar
|
|
withNHCB bool
|
|
}
|
|
|
|
func newLoadCmd(gap time.Duration, withNHCB bool) *loadCmd {
|
|
return &loadCmd{
|
|
gap: gap,
|
|
metrics: map[uint64]labels.Labels{},
|
|
defs: map[uint64][]promql.Sample{},
|
|
exemplars: map[uint64][]exemplar.Exemplar{},
|
|
withNHCB: withNHCB,
|
|
}
|
|
}
|
|
|
|
func (cmd loadCmd) String() string {
|
|
return "load"
|
|
}
|
|
|
|
// set a sequence of sample values for the given metric.
|
|
func (cmd *loadCmd) set(m labels.Labels, vals ...parser.SequenceValue) {
|
|
h := m.Hash()
|
|
|
|
samples := make([]promql.Sample, 0, len(vals))
|
|
ts := testStartTime
|
|
for _, v := range vals {
|
|
if !v.Omitted {
|
|
samples = append(samples, promql.Sample{
|
|
T: ts.UnixNano() / int64(time.Millisecond/time.Nanosecond),
|
|
F: v.Value,
|
|
H: v.Histogram,
|
|
})
|
|
}
|
|
ts = ts.Add(cmd.gap)
|
|
}
|
|
cmd.defs[h] = samples
|
|
cmd.metrics[h] = m
|
|
}
|
|
|
|
// append the defined time series to the storage.
|
|
func (cmd *loadCmd) append(a storage.Appender) error {
|
|
for h, smpls := range cmd.defs {
|
|
m := cmd.metrics[h]
|
|
|
|
for _, s := range smpls {
|
|
if err := appendSample(a, s, m); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
if cmd.withNHCB {
|
|
return cmd.appendCustomHistogram(a)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getHistogramMetricBase(m labels.Labels, suffix string) (labels.Labels, uint64) {
|
|
mName := m.Get(labels.MetricName)
|
|
baseM := labels.NewBuilder(m).
|
|
Set(labels.MetricName, strings.TrimSuffix(mName, suffix)).
|
|
Del(labels.BucketLabel).
|
|
Labels()
|
|
hash := baseM.Hash()
|
|
return baseM, hash
|
|
}
|
|
|
|
type tempHistogramWrapper struct {
|
|
metric labels.Labels
|
|
upperBounds []float64
|
|
histogramByTs map[int64]tempHistogram
|
|
}
|
|
|
|
func newTempHistogramWrapper() tempHistogramWrapper {
|
|
return tempHistogramWrapper{
|
|
upperBounds: []float64{},
|
|
histogramByTs: map[int64]tempHistogram{},
|
|
}
|
|
}
|
|
|
|
type tempHistogram struct {
|
|
bucketCounts map[float64]float64
|
|
count float64
|
|
sum float64
|
|
}
|
|
|
|
func newTempHistogram() tempHistogram {
|
|
return tempHistogram{
|
|
bucketCounts: map[float64]float64{},
|
|
}
|
|
}
|
|
|
|
func processClassicHistogramSeries(m labels.Labels, suffix string, histogramMap map[uint64]tempHistogramWrapper, smpls []promql.Sample, updateHistogramWrapper func(*tempHistogramWrapper), updateHistogram func(*tempHistogram, float64)) {
|
|
m2, m2hash := getHistogramMetricBase(m, suffix)
|
|
histogramWrapper, exists := histogramMap[m2hash]
|
|
if !exists {
|
|
histogramWrapper = newTempHistogramWrapper()
|
|
}
|
|
histogramWrapper.metric = m2
|
|
if updateHistogramWrapper != nil {
|
|
updateHistogramWrapper(&histogramWrapper)
|
|
}
|
|
for _, s := range smpls {
|
|
if s.H != nil {
|
|
continue
|
|
}
|
|
histogram, exists := histogramWrapper.histogramByTs[s.T]
|
|
if !exists {
|
|
histogram = newTempHistogram()
|
|
}
|
|
updateHistogram(&histogram, s.F)
|
|
histogramWrapper.histogramByTs[s.T] = histogram
|
|
}
|
|
histogramMap[m2hash] = histogramWrapper
|
|
}
|
|
|
|
func processUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64) ([]float64, *histogram.FloatHistogram) {
|
|
sort.Float64s(upperBounds0)
|
|
upperBounds := make([]float64, 0, len(upperBounds0))
|
|
prevLE := math.Inf(-1)
|
|
for _, le := range upperBounds0 {
|
|
if le != prevLE { // deduplicate
|
|
upperBounds = append(upperBounds, le)
|
|
prevLE = le
|
|
}
|
|
}
|
|
var customBounds []float64
|
|
if upperBounds[len(upperBounds)-1] == math.Inf(1) {
|
|
customBounds = upperBounds[:len(upperBounds)-1]
|
|
} else {
|
|
customBounds = upperBounds
|
|
}
|
|
return upperBounds, &histogram.FloatHistogram{
|
|
Count: 0,
|
|
Sum: 0,
|
|
Schema: histogram.CustomBucketsSchema,
|
|
PositiveSpans: []histogram.Span{
|
|
{Offset: 0, Length: uint32(len(upperBounds))},
|
|
},
|
|
PositiveBuckets: make([]float64, len(upperBounds)),
|
|
CustomValues: customBounds,
|
|
}
|
|
}
|
|
|
|
// If classic histograms are defined, convert them into native histograms with custom
|
|
// bounds and append the defined time series to the storage.
|
|
func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
|
|
histogramMap := map[uint64]tempHistogramWrapper{}
|
|
|
|
// Go through all the time series to collate classic histogram data
|
|
// and organise them by timestamp.
|
|
for hash, smpls := range cmd.defs {
|
|
m := cmd.metrics[hash]
|
|
mName := m.Get(labels.MetricName)
|
|
switch {
|
|
case strings.HasSuffix(mName, "_bucket") && m.Has(labels.BucketLabel):
|
|
le, err := strconv.ParseFloat(m.Get(labels.BucketLabel), 64)
|
|
if err != nil || math.IsNaN(le) {
|
|
continue
|
|
}
|
|
processClassicHistogramSeries(m, "_bucket", histogramMap, smpls, func(histogramWrapper *tempHistogramWrapper) {
|
|
histogramWrapper.upperBounds = append(histogramWrapper.upperBounds, le)
|
|
}, func(histogram *tempHistogram, f float64) {
|
|
histogram.bucketCounts[le] = f
|
|
})
|
|
case strings.HasSuffix(mName, "_count"):
|
|
processClassicHistogramSeries(m, "_count", histogramMap, smpls, nil, func(histogram *tempHistogram, f float64) {
|
|
histogram.count = f
|
|
})
|
|
case strings.HasSuffix(mName, "_sum"):
|
|
processClassicHistogramSeries(m, "_sum", histogramMap, smpls, nil, func(histogram *tempHistogram, f float64) {
|
|
histogram.sum = f
|
|
})
|
|
}
|
|
}
|
|
|
|
// Convert the collated classic histogram data into native histograms
|
|
// with custom bounds and append them to the storage.
|
|
for _, histogramWrapper := range histogramMap {
|
|
upperBounds, fhBase := processUpperBoundsAndCreateBaseHistogram(histogramWrapper.upperBounds)
|
|
samples := make([]promql.Sample, 0, len(histogramWrapper.histogramByTs))
|
|
for t, histogram := range histogramWrapper.histogramByTs {
|
|
fh := fhBase.Copy()
|
|
var prevCount, total float64
|
|
for i, le := range upperBounds {
|
|
currCount, exists := histogram.bucketCounts[le]
|
|
if !exists {
|
|
currCount = 0
|
|
}
|
|
count := currCount - prevCount
|
|
fh.PositiveBuckets[i] = count
|
|
total += count
|
|
prevCount = currCount
|
|
}
|
|
fh.Sum = histogram.sum
|
|
if histogram.count != 0 {
|
|
total = histogram.count
|
|
}
|
|
fh.Count = total
|
|
s := promql.Sample{T: t, H: fh.Compact(0)}
|
|
if err := s.H.Validate(); err != nil {
|
|
return err
|
|
}
|
|
samples = append(samples, s)
|
|
}
|
|
sort.Slice(samples, func(i, j int) bool { return samples[i].T < samples[j].T })
|
|
for _, s := range samples {
|
|
if err := appendSample(a, s, histogramWrapper.metric); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func appendSample(a storage.Appender, s promql.Sample, m labels.Labels) error {
|
|
if s.H != nil {
|
|
if _, err := a.AppendHistogram(0, m, s.T, nil, s.H); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if _, err := a.Append(0, m, s.T, s.F); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// evalCmd is a command that evaluates an expression for the given time (range)
|
|
// and expects a specific result.
|
|
type evalCmd struct {
|
|
expr string
|
|
start time.Time
|
|
end time.Time
|
|
step time.Duration
|
|
line int
|
|
|
|
isRange bool // if false, instant query
|
|
fail, warn, ordered bool
|
|
expectedFailMessage string
|
|
expectedFailRegexp *regexp.Regexp
|
|
|
|
metrics map[uint64]labels.Labels
|
|
expected map[uint64]entry
|
|
}
|
|
|
|
type entry struct {
|
|
pos int
|
|
vals []parser.SequenceValue
|
|
}
|
|
|
|
func (e entry) String() string {
|
|
return fmt.Sprintf("%d: %s", e.pos, e.vals)
|
|
}
|
|
|
|
func newInstantEvalCmd(expr string, start time.Time, line int) *evalCmd {
|
|
return &evalCmd{
|
|
expr: expr,
|
|
start: start,
|
|
line: line,
|
|
|
|
metrics: map[uint64]labels.Labels{},
|
|
expected: map[uint64]entry{},
|
|
}
|
|
}
|
|
|
|
func newRangeEvalCmd(expr string, start, end time.Time, step time.Duration, line int) *evalCmd {
|
|
return &evalCmd{
|
|
expr: expr,
|
|
start: start,
|
|
end: end,
|
|
step: step,
|
|
line: line,
|
|
isRange: true,
|
|
|
|
metrics: map[uint64]labels.Labels{},
|
|
expected: map[uint64]entry{},
|
|
}
|
|
}
|
|
|
|
func (ev *evalCmd) String() string {
|
|
return "eval"
|
|
}
|
|
|
|
// expect adds a sequence of values to the set of expected
|
|
// results for the query.
|
|
func (ev *evalCmd) expect(pos int, vals ...parser.SequenceValue) {
|
|
ev.expected[0] = entry{pos: pos, vals: vals}
|
|
}
|
|
|
|
// expectMetric adds a new metric with a sequence of values to the set of expected
|
|
// results for the query.
|
|
func (ev *evalCmd) expectMetric(pos int, m labels.Labels, vals ...parser.SequenceValue) {
|
|
h := m.Hash()
|
|
ev.metrics[h] = m
|
|
ev.expected[h] = entry{pos: pos, vals: vals}
|
|
}
|
|
|
|
// compareResult compares the result value with the defined expectation.
|
|
func (ev *evalCmd) compareResult(result parser.Value) error {
|
|
switch val := result.(type) {
|
|
case promql.Matrix:
|
|
if ev.ordered {
|
|
return fmt.Errorf("expected ordered result, but query returned a matrix")
|
|
}
|
|
|
|
if err := assertMatrixSorted(val); err != nil {
|
|
return err
|
|
}
|
|
|
|
seen := map[uint64]bool{}
|
|
for _, s := range val {
|
|
hash := s.Metric.Hash()
|
|
if _, ok := ev.metrics[hash]; !ok {
|
|
return fmt.Errorf("unexpected metric %s in result, has %s", s.Metric, formatSeriesResult(s))
|
|
}
|
|
seen[hash] = true
|
|
exp := ev.expected[hash]
|
|
|
|
var expectedFloats []promql.FPoint
|
|
var expectedHistograms []promql.HPoint
|
|
|
|
for i, e := range exp.vals {
|
|
ts := ev.start.Add(time.Duration(i) * ev.step)
|
|
|
|
if ts.After(ev.end) {
|
|
return fmt.Errorf("expected %v points for %s, but query time range cannot return this many points", len(exp.vals), ev.metrics[hash])
|
|
}
|
|
|
|
t := ts.UnixNano() / int64(time.Millisecond/time.Nanosecond)
|
|
|
|
if e.Histogram != nil {
|
|
expectedHistograms = append(expectedHistograms, promql.HPoint{T: t, H: e.Histogram})
|
|
} else if !e.Omitted {
|
|
expectedFloats = append(expectedFloats, promql.FPoint{T: t, F: e.Value})
|
|
}
|
|
}
|
|
|
|
if len(expectedFloats) != len(s.Floats) || len(expectedHistograms) != len(s.Histograms) {
|
|
return fmt.Errorf("expected %v float points and %v histogram points for %s, but got %s", len(expectedFloats), len(expectedHistograms), ev.metrics[hash], formatSeriesResult(s))
|
|
}
|
|
|
|
for i, expected := range expectedFloats {
|
|
actual := s.Floats[i]
|
|
|
|
if expected.T != actual.T {
|
|
return fmt.Errorf("expected float value at index %v for %s to have timestamp %v, but it had timestamp %v (result has %s)", i, ev.metrics[hash], expected.T, actual.T, formatSeriesResult(s))
|
|
}
|
|
|
|
if !almost.Equal(actual.F, expected.F, defaultEpsilon) {
|
|
return fmt.Errorf("expected float value at index %v (t=%v) for %s to be %v, but got %v (result has %s)", i, actual.T, ev.metrics[hash], expected.F, actual.F, formatSeriesResult(s))
|
|
}
|
|
}
|
|
|
|
for i, expected := range expectedHistograms {
|
|
actual := s.Histograms[i]
|
|
|
|
if expected.T != actual.T {
|
|
return fmt.Errorf("expected histogram value at index %v for %s to have timestamp %v, but it had timestamp %v (result has %s)", i, ev.metrics[hash], expected.T, actual.T, formatSeriesResult(s))
|
|
}
|
|
|
|
if !actual.H.Equals(expected.H.Compact(0)) {
|
|
return fmt.Errorf("expected histogram value at index %v (t=%v) for %s to be %v, but got %v (result has %s)", i, actual.T, ev.metrics[hash], expected.H, actual.H, formatSeriesResult(s))
|
|
}
|
|
}
|
|
}
|
|
|
|
for hash := range ev.expected {
|
|
if !seen[hash] {
|
|
return fmt.Errorf("expected metric %s not found", ev.metrics[hash])
|
|
}
|
|
}
|
|
|
|
case promql.Vector:
|
|
seen := map[uint64]bool{}
|
|
for pos, v := range val {
|
|
fp := v.Metric.Hash()
|
|
if _, ok := ev.metrics[fp]; !ok {
|
|
if v.H != nil {
|
|
return fmt.Errorf("unexpected metric %s in result, has value %v", v.Metric, v.H)
|
|
}
|
|
|
|
return fmt.Errorf("unexpected metric %s in result, has value %v", v.Metric, v.F)
|
|
}
|
|
exp := ev.expected[fp]
|
|
if ev.ordered && exp.pos != pos+1 {
|
|
return fmt.Errorf("expected metric %s with %v at position %d but was at %d", v.Metric, exp.vals, exp.pos, pos+1)
|
|
}
|
|
exp0 := exp.vals[0]
|
|
expH := exp0.Histogram
|
|
if expH == nil && v.H != nil {
|
|
return fmt.Errorf("expected float value %v for %s but got histogram %s", exp0, v.Metric, HistogramTestExpression(v.H))
|
|
}
|
|
if expH != nil && v.H == nil {
|
|
return fmt.Errorf("expected histogram %s for %s but got float value %v", HistogramTestExpression(expH), v.Metric, v.F)
|
|
}
|
|
if expH != nil && !expH.Compact(0).Equals(v.H) {
|
|
return fmt.Errorf("expected %v for %s but got %s", HistogramTestExpression(expH), v.Metric, HistogramTestExpression(v.H))
|
|
}
|
|
if !almost.Equal(exp0.Value, v.F, defaultEpsilon) {
|
|
return fmt.Errorf("expected %v for %s but got %v", exp0.Value, v.Metric, v.F)
|
|
}
|
|
|
|
seen[fp] = true
|
|
}
|
|
for fp, expVals := range ev.expected {
|
|
if !seen[fp] {
|
|
return fmt.Errorf("expected metric %s with %v not found", ev.metrics[fp], expVals)
|
|
}
|
|
}
|
|
|
|
case promql.Scalar:
|
|
if len(ev.expected) != 1 {
|
|
return fmt.Errorf("expected vector result, but got scalar %s", val.String())
|
|
}
|
|
exp0 := ev.expected[0].vals[0]
|
|
if exp0.Histogram != nil {
|
|
return fmt.Errorf("expected Histogram %v but got scalar %s", exp0.Histogram.TestExpression(), val.String())
|
|
}
|
|
if !almost.Equal(exp0.Value, val.V, defaultEpsilon) {
|
|
return fmt.Errorf("expected Scalar %v but got %v", val.V, exp0.Value)
|
|
}
|
|
|
|
default:
|
|
panic(fmt.Errorf("promql.Test.compareResult: unexpected result type %T", result))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ev *evalCmd) checkExpectedFailure(actual error) error {
|
|
if ev.expectedFailMessage != "" {
|
|
if ev.expectedFailMessage != actual.Error() {
|
|
return fmt.Errorf("expected error %q evaluating query %q (line %d), but got: %s", ev.expectedFailMessage, ev.expr, ev.line, actual.Error())
|
|
}
|
|
}
|
|
|
|
if ev.expectedFailRegexp != nil {
|
|
if !ev.expectedFailRegexp.MatchString(actual.Error()) {
|
|
return fmt.Errorf("expected error matching pattern %q evaluating query %q (line %d), but got: %s", ev.expectedFailRegexp.String(), ev.expr, ev.line, actual.Error())
|
|
}
|
|
}
|
|
|
|
// We're not expecting a particular error, or we got the error we expected.
|
|
// This test passes.
|
|
return nil
|
|
}
|
|
|
|
func formatSeriesResult(s promql.Series) string {
|
|
floatPlural := "s"
|
|
histogramPlural := "s"
|
|
|
|
if len(s.Floats) == 1 {
|
|
floatPlural = ""
|
|
}
|
|
|
|
if len(s.Histograms) == 1 {
|
|
histogramPlural = ""
|
|
}
|
|
|
|
return fmt.Sprintf("%v float point%s %v and %v histogram point%s %v", len(s.Floats), floatPlural, s.Floats, len(s.Histograms), histogramPlural, s.Histograms)
|
|
}
|
|
|
|
// HistogramTestExpression returns TestExpression() for the given histogram or "" if the histogram is nil.
|
|
func HistogramTestExpression(h *histogram.FloatHistogram) string {
|
|
if h != nil {
|
|
return h.TestExpression()
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// clearCmd is a command that wipes the test's storage state.
|
|
type clearCmd struct{}
|
|
|
|
func (cmd clearCmd) String() string {
|
|
return "clear"
|
|
}
|
|
|
|
type atModifierTestCase struct {
|
|
expr string
|
|
evalTime time.Time
|
|
}
|
|
|
|
func atModifierTestCases(exprStr string, evalTime time.Time) ([]atModifierTestCase, error) {
|
|
expr, err := parser.ParseExpr(exprStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ts := timestamp.FromTime(evalTime)
|
|
|
|
containsNonStepInvariant := false
|
|
// Setting the @ timestamp for all selectors to be evalTime.
|
|
// If there is a subquery, then the selectors inside it don't get the @ timestamp.
|
|
// If any selector already has the @ timestamp set, then it is untouched.
|
|
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
|
|
if hasAtModifier(path) {
|
|
// There is a subquery with timestamp in the path,
|
|
// hence don't change any timestamps further.
|
|
return nil
|
|
}
|
|
switch n := node.(type) {
|
|
case *parser.VectorSelector:
|
|
if n.Timestamp == nil {
|
|
n.Timestamp = makeInt64Pointer(ts)
|
|
}
|
|
|
|
case *parser.MatrixSelector:
|
|
if vs := n.VectorSelector.(*parser.VectorSelector); vs.Timestamp == nil {
|
|
vs.Timestamp = makeInt64Pointer(ts)
|
|
}
|
|
|
|
case *parser.SubqueryExpr:
|
|
if n.Timestamp == nil {
|
|
n.Timestamp = makeInt64Pointer(ts)
|
|
}
|
|
|
|
case *parser.Call:
|
|
_, ok := promql.AtModifierUnsafeFunctions[n.Func.Name]
|
|
containsNonStepInvariant = containsNonStepInvariant || ok
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if containsNonStepInvariant {
|
|
// Expression contains a function whose result can vary with evaluation
|
|
// time, even though its arguments are step invariant: skip it.
|
|
return nil, nil
|
|
}
|
|
|
|
newExpr := expr.String() // With all the @ evalTime set.
|
|
additionalEvalTimes := []int64{-10 * ts, 0, ts / 5, ts, 10 * ts}
|
|
if ts == 0 {
|
|
additionalEvalTimes = []int64{-1000, -ts, 1000}
|
|
}
|
|
testCases := make([]atModifierTestCase, 0, len(additionalEvalTimes))
|
|
for _, et := range additionalEvalTimes {
|
|
testCases = append(testCases, atModifierTestCase{
|
|
expr: newExpr,
|
|
evalTime: timestamp.Time(et),
|
|
})
|
|
}
|
|
|
|
return testCases, nil
|
|
}
|
|
|
|
func hasAtModifier(path []parser.Node) bool {
|
|
for _, node := range path {
|
|
if n, ok := node.(*parser.SubqueryExpr); ok {
|
|
if n.Timestamp != nil {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// exec processes a single step of the test.
|
|
func (t *test) exec(tc testCommand, engine promql.QueryEngine) error {
|
|
switch cmd := tc.(type) {
|
|
case *clearCmd:
|
|
t.clear()
|
|
|
|
case *loadCmd:
|
|
app := t.storage.Appender(t.context)
|
|
if err := cmd.append(app); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
|
|
if err := app.Commit(); err != nil {
|
|
return err
|
|
}
|
|
|
|
case *evalCmd:
|
|
return t.execEval(cmd, engine)
|
|
|
|
default:
|
|
panic("promql.Test.exec: unknown test command type")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *test) execEval(cmd *evalCmd, engine promql.QueryEngine) error {
|
|
if cmd.isRange {
|
|
return t.execRangeEval(cmd, engine)
|
|
}
|
|
|
|
return t.execInstantEval(cmd, engine)
|
|
}
|
|
|
|
func (t *test) execRangeEval(cmd *evalCmd, engine promql.QueryEngine) error {
|
|
q, err := engine.NewRangeQuery(t.context, t.storage, nil, cmd.expr, cmd.start, cmd.end, cmd.step)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating range query for %q (line %d): %w", cmd.expr, cmd.line, err)
|
|
}
|
|
res := q.Exec(t.context)
|
|
countWarnings, _ := res.Warnings.CountWarningsAndInfo()
|
|
if !cmd.warn && countWarnings > 0 {
|
|
return fmt.Errorf("unexpected warnings evaluating query %q (line %d): %v", cmd.expr, cmd.line, res.Warnings)
|
|
}
|
|
if cmd.warn && countWarnings == 0 {
|
|
return fmt.Errorf("expected warnings evaluating query %q (line %d) but got none", cmd.expr, cmd.line)
|
|
}
|
|
if res.Err != nil {
|
|
if cmd.fail {
|
|
return cmd.checkExpectedFailure(res.Err)
|
|
}
|
|
|
|
return fmt.Errorf("error evaluating query %q (line %d): %w", cmd.expr, cmd.line, res.Err)
|
|
}
|
|
if res.Err == nil && cmd.fail {
|
|
return fmt.Errorf("expected error evaluating query %q (line %d) but got none", cmd.expr, cmd.line)
|
|
}
|
|
defer q.Close()
|
|
|
|
if err := cmd.compareResult(res.Value); err != nil {
|
|
return fmt.Errorf("error in %s %s (line %d): %w", cmd, cmd.expr, cmd.line, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *test) execInstantEval(cmd *evalCmd, engine promql.QueryEngine) error {
|
|
queries, err := atModifierTestCases(cmd.expr, cmd.start)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...)
|
|
for _, iq := range queries {
|
|
if err := t.runInstantQuery(iq, cmd, engine); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine promql.QueryEngine) error {
|
|
q, err := engine.NewInstantQuery(t.context, t.storage, nil, iq.expr, iq.evalTime)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating instant query for %q (line %d): %w", cmd.expr, cmd.line, err)
|
|
}
|
|
defer q.Close()
|
|
res := q.Exec(t.context)
|
|
countWarnings, _ := res.Warnings.CountWarningsAndInfo()
|
|
if !cmd.warn && countWarnings > 0 {
|
|
return fmt.Errorf("unexpected warnings evaluating query %q (line %d): %v", iq.expr, cmd.line, res.Warnings)
|
|
}
|
|
if cmd.warn && countWarnings == 0 {
|
|
return fmt.Errorf("expected warnings evaluating query %q (line %d) but got none", iq.expr, cmd.line)
|
|
}
|
|
if res.Err != nil {
|
|
if cmd.fail {
|
|
if err := cmd.checkExpectedFailure(res.Err); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
return fmt.Errorf("error evaluating query %q (line %d): %w", iq.expr, cmd.line, res.Err)
|
|
}
|
|
if res.Err == nil && cmd.fail {
|
|
return fmt.Errorf("expected error evaluating query %q (line %d) but got none", iq.expr, cmd.line)
|
|
}
|
|
err = cmd.compareResult(res.Value)
|
|
if err != nil {
|
|
return fmt.Errorf("error in %s %s (line %d): %w", cmd, iq.expr, cmd.line, err)
|
|
}
|
|
|
|
// Check query returns same result in range mode,
|
|
// by checking against the middle step.
|
|
q, err = engine.NewRangeQuery(t.context, t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating range query for %q (line %d): %w", cmd.expr, cmd.line, err)
|
|
}
|
|
rangeRes := q.Exec(t.context)
|
|
if rangeRes.Err != nil {
|
|
return fmt.Errorf("error evaluating query %q (line %d) in range mode: %w", iq.expr, cmd.line, rangeRes.Err)
|
|
}
|
|
defer q.Close()
|
|
if cmd.ordered {
|
|
// Range queries are always sorted by labels, so skip this test case that expects results in a particular order.
|
|
return nil
|
|
}
|
|
mat := rangeRes.Value.(promql.Matrix)
|
|
if err := assertMatrixSorted(mat); err != nil {
|
|
return err
|
|
}
|
|
|
|
vec := make(promql.Vector, 0, len(mat))
|
|
for _, series := range mat {
|
|
// We expect either Floats or Histograms.
|
|
for _, point := range series.Floats {
|
|
if point.T == timeMilliseconds(iq.evalTime) {
|
|
vec = append(vec, promql.Sample{Metric: series.Metric, T: point.T, F: point.F})
|
|
break
|
|
}
|
|
}
|
|
for _, point := range series.Histograms {
|
|
if point.T == timeMilliseconds(iq.evalTime) {
|
|
vec = append(vec, promql.Sample{Metric: series.Metric, T: point.T, H: point.H})
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if _, ok := res.Value.(promql.Scalar); ok {
|
|
err = cmd.compareResult(promql.Scalar{V: vec[0].F})
|
|
} else {
|
|
err = cmd.compareResult(vec)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("error in %s %s (line %d) range mode: %w", cmd, iq.expr, cmd.line, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func assertMatrixSorted(m promql.Matrix) error {
|
|
if len(m) <= 1 {
|
|
return nil
|
|
}
|
|
|
|
for i, s := range m[:len(m)-1] {
|
|
nextIndex := i + 1
|
|
nextMetric := m[nextIndex].Metric
|
|
|
|
if labels.Compare(s.Metric, nextMetric) > 0 {
|
|
return fmt.Errorf("matrix results should always be sorted by labels, but matrix is not sorted: series at index %v with labels %s sorts before series at index %v with labels %s", nextIndex, nextMetric, i, s.Metric)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// clear the current test storage of all inserted samples.
|
|
func (t *test) clear() {
|
|
if t.storage != nil {
|
|
err := t.storage.Close()
|
|
require.NoError(t.T, err, "Unexpected error while closing test storage.")
|
|
}
|
|
if t.cancelCtx != nil {
|
|
t.cancelCtx()
|
|
}
|
|
t.storage = teststorage.New(t)
|
|
t.context, t.cancelCtx = context.WithCancel(context.Background())
|
|
}
|
|
|
|
func parseNumber(s string) (float64, error) {
|
|
n, err := strconv.ParseInt(s, 0, 64)
|
|
f := float64(n)
|
|
if err != nil {
|
|
f, err = strconv.ParseFloat(s, 64)
|
|
}
|
|
if err != nil {
|
|
return 0, fmt.Errorf("error parsing number: %w", err)
|
|
}
|
|
return f, nil
|
|
}
|
|
|
|
// LazyLoader lazily loads samples into storage.
|
|
// This is specifically implemented for unit testing of rules.
|
|
type LazyLoader struct {
|
|
loadCmd *loadCmd
|
|
|
|
storage storage.Storage
|
|
SubqueryInterval time.Duration
|
|
|
|
queryEngine *promql.Engine
|
|
context context.Context
|
|
cancelCtx context.CancelFunc
|
|
|
|
opts LazyLoaderOpts
|
|
}
|
|
|
|
// LazyLoaderOpts are options for the lazy loader.
|
|
type LazyLoaderOpts struct {
|
|
// Both of these must be set to true for regular PromQL (as of
|
|
// Prometheus v2.33). They can still be disabled here for legacy and
|
|
// other uses.
|
|
EnableAtModifier, EnableNegativeOffset bool
|
|
}
|
|
|
|
// NewLazyLoader returns an initialized empty LazyLoader.
|
|
func NewLazyLoader(input string, opts LazyLoaderOpts) (*LazyLoader, error) {
|
|
ll := &LazyLoader{
|
|
opts: opts,
|
|
}
|
|
err := ll.parse(input)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = ll.clear()
|
|
return ll, err
|
|
}
|
|
|
|
// parse the given load command.
|
|
func (ll *LazyLoader) parse(input string) error {
|
|
lines := getLines(input)
|
|
// Accepts only 'load' command.
|
|
for i := 0; i < len(lines); i++ {
|
|
l := lines[i]
|
|
if len(l) == 0 {
|
|
continue
|
|
}
|
|
if strings.HasPrefix(strings.ToLower(patSpace.Split(l, 2)[0]), "load") {
|
|
_, cmd, err := parseLoad(lines, i)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ll.loadCmd = cmd
|
|
return nil
|
|
}
|
|
|
|
return raise(i, "invalid command %q", l)
|
|
}
|
|
return errors.New("no \"load\" command found")
|
|
}
|
|
|
|
// clear the current test storage of all inserted samples.
|
|
func (ll *LazyLoader) clear() error {
|
|
if ll.storage != nil {
|
|
if err := ll.storage.Close(); err != nil {
|
|
return fmt.Errorf("closing test storage: %w", err)
|
|
}
|
|
}
|
|
if ll.cancelCtx != nil {
|
|
ll.cancelCtx()
|
|
}
|
|
var err error
|
|
ll.storage, err = teststorage.NewWithError()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
opts := promql.EngineOpts{
|
|
Logger: nil,
|
|
Reg: nil,
|
|
MaxSamples: 10000,
|
|
Timeout: 100 * time.Second,
|
|
NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(ll.SubqueryInterval) },
|
|
EnableAtModifier: ll.opts.EnableAtModifier,
|
|
EnableNegativeOffset: ll.opts.EnableNegativeOffset,
|
|
}
|
|
|
|
ll.queryEngine = promql.NewEngine(opts)
|
|
ll.context, ll.cancelCtx = context.WithCancel(context.Background())
|
|
return nil
|
|
}
|
|
|
|
// appendTill appends the defined time series to the storage till the given timestamp (in milliseconds).
|
|
func (ll *LazyLoader) appendTill(ts int64) error {
|
|
app := ll.storage.Appender(ll.Context())
|
|
for h, smpls := range ll.loadCmd.defs {
|
|
m := ll.loadCmd.metrics[h]
|
|
for i, s := range smpls {
|
|
if s.T > ts {
|
|
// Removing the already added samples.
|
|
ll.loadCmd.defs[h] = smpls[i:]
|
|
break
|
|
}
|
|
if err := appendSample(app, s, m); err != nil {
|
|
return err
|
|
}
|
|
if i == len(smpls)-1 {
|
|
ll.loadCmd.defs[h] = nil
|
|
}
|
|
}
|
|
}
|
|
return app.Commit()
|
|
}
|
|
|
|
// WithSamplesTill loads the samples till given timestamp and executes the given function.
|
|
func (ll *LazyLoader) WithSamplesTill(ts time.Time, fn func(error)) {
|
|
tsMilli := ts.Sub(time.Unix(0, 0).UTC()) / time.Millisecond
|
|
fn(ll.appendTill(int64(tsMilli)))
|
|
}
|
|
|
|
// QueryEngine returns the LazyLoader's query engine.
|
|
func (ll *LazyLoader) QueryEngine() *promql.Engine {
|
|
return ll.queryEngine
|
|
}
|
|
|
|
// Queryable allows querying the LazyLoader's data.
|
|
// Note: only the samples till the max timestamp used
|
|
// in `WithSamplesTill` can be queried.
|
|
func (ll *LazyLoader) Queryable() storage.Queryable {
|
|
return ll.storage
|
|
}
|
|
|
|
// Context returns the LazyLoader's context.
|
|
func (ll *LazyLoader) Context() context.Context {
|
|
return ll.context
|
|
}
|
|
|
|
// Storage returns the LazyLoader's storage.
|
|
func (ll *LazyLoader) Storage() storage.Storage {
|
|
return ll.storage
|
|
}
|
|
|
|
// Close closes resources associated with the LazyLoader.
|
|
func (ll *LazyLoader) Close() error {
|
|
ll.cancelCtx()
|
|
return ll.storage.Close()
|
|
}
|
|
|
|
func makeInt64Pointer(val int64) *int64 {
|
|
valp := new(int64)
|
|
*valp = val
|
|
return valp
|
|
}
|
|
|
|
func timeMilliseconds(t time.Time) int64 {
|
|
return t.UnixNano() / int64(time.Millisecond/time.Nanosecond)
|
|
}
|
|
|
|
func durationMilliseconds(d time.Duration) int64 {
|
|
return int64(d / (time.Millisecond / time.Nanosecond))
|
|
}
|