Merge branch 'main' into HEAD

This commit is contained in:
Jan Fajerski 2024-09-04 18:50:00 +02:00
commit fe4289b502
45 changed files with 1003 additions and 386 deletions

56
.github/stale.yml vendored
View file

@ -1,56 +0,0 @@
# Configuration for probot-stale - https://github.com/probot/stale
# Number of days of inactivity before an Issue or Pull Request becomes stale
daysUntilStale: 60
# Number of days of inactivity before an Issue or Pull Request with the stale label is closed.
# Set to false to disable. If disabled, issues still need to be closed manually, but will remain marked as stale.
daysUntilClose: false
# Only issues or pull requests with all of these labels are check if stale. Defaults to `[]` (disabled)
onlyLabels: []
# Issues or Pull Requests with these labels will never be considered stale. Set to `[]` to disable
exemptLabels:
- keepalive
# Set to true to ignore issues in a project (defaults to false)
exemptProjects: false
# Set to true to ignore issues in a milestone (defaults to false)
exemptMilestones: false
# Set to true to ignore issues with an assignee (defaults to false)
exemptAssignees: false
# Label to use when marking as stale
staleLabel: stale
# Comment to post when marking as stale. Set to `false` to disable
markComment: false
# Comment to post when removing the stale label.
# unmarkComment: >
# Your comment here.
# Comment to post when closing a stale Issue or Pull Request.
# closeComment: >
# Your comment here.
# Limit the number of actions per hour, from 1-30. Default is 30
limitPerRun: 30
# Limit to only `issues` or `pulls`
only: pulls
# Optionally, specify configuration settings that are specific to just 'issues' or 'pulls':
# pulls:
# daysUntilStale: 30
# markComment: >
# This pull request has been automatically marked as stale because it has not had
# recent activity. It will be closed if no further activity occurs. Thank you
# for your contributions.
# issues:
# exemptLabels:
# - confirmed

View file

@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- uses: bufbuild/buf-setup-action@aceb106d2419c4cff48863df90161d92decb8591 # v1.35.1
- uses: bufbuild/buf-setup-action@54abbed4fe8d8d45173eca4798b0c39a53a7b658 # v1.39.0
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
- uses: bufbuild/buf-lint-action@06f9dd823d873146471cfaaf108a993fe00e5325 # v1.1.1

View file

@ -13,7 +13,7 @@ jobs:
if: github.repository_owner == 'prometheus'
steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- uses: bufbuild/buf-setup-action@aceb106d2419c4cff48863df90161d92decb8591 # v1.35.1
- uses: bufbuild/buf-setup-action@54abbed4fe8d8d45173eca4798b0c39a53a7b658 # v1.39.0
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
- uses: bufbuild/buf-lint-action@06f9dd823d873146471cfaaf108a993fe00e5325 # v1.1.1

View file

@ -14,7 +14,7 @@ jobs:
image: quay.io/prometheus/golang-builder:1.22-base
steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- uses: prometheus/promci@3cb0c3871f223bd5ce1226995bd52ffb314798b6 # v0.1.0
- uses: prometheus/promci@45166329da36d74895901808f1c8c97efafc7f84 # v0.3.0
- uses: ./.github/promci/actions/setup_environment
- run: make GOOPTS=--tags=stringlabels GO_ONLY=1 SKIP_GOLANGCI_LINT=1
- run: go test --tags=stringlabels ./tsdb/ -test.tsdb-isolation=false
@ -28,7 +28,7 @@ jobs:
image: quay.io/prometheus/golang-builder:1.22-base
steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- uses: prometheus/promci@3cb0c3871f223bd5ce1226995bd52ffb314798b6 # v0.1.0
- uses: prometheus/promci@45166329da36d74895901808f1c8c97efafc7f84 # v0.3.0
- uses: ./.github/promci/actions/setup_environment
- run: go test --tags=dedupelabels ./...
- run: GOARCH=386 go test ./cmd/prometheus
@ -58,7 +58,7 @@ jobs:
steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- uses: prometheus/promci@3cb0c3871f223bd5ce1226995bd52ffb314798b6 # v0.1.0
- uses: prometheus/promci@45166329da36d74895901808f1c8c97efafc7f84 # v0.3.0
- uses: ./.github/promci/actions/setup_environment
with:
enable_go: false
@ -117,7 +117,7 @@ jobs:
thread: [ 0, 1, 2 ]
steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- uses: prometheus/promci@3cb0c3871f223bd5ce1226995bd52ffb314798b6 # v0.1.0
- uses: prometheus/promci@45166329da36d74895901808f1c8c97efafc7f84 # v0.3.0
- uses: ./.github/promci/actions/build
with:
promu_opts: "-p linux/amd64 -p windows/amd64 -p linux/arm64 -p darwin/amd64 -p darwin/arm64 -p linux/386"
@ -142,7 +142,7 @@ jobs:
# should also be updated.
steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- uses: prometheus/promci@3cb0c3871f223bd5ce1226995bd52ffb314798b6 # v0.1.0
- uses: prometheus/promci@45166329da36d74895901808f1c8c97efafc7f84 # v0.3.0
- uses: ./.github/promci/actions/build
with:
parallelism: 12
@ -204,7 +204,7 @@ jobs:
if: github.event_name == 'push' && github.event.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- uses: prometheus/promci@3cb0c3871f223bd5ce1226995bd52ffb314798b6 # v0.1.0
- uses: prometheus/promci@45166329da36d74895901808f1c8c97efafc7f84 # v0.3.0
- uses: ./.github/promci/actions/publish_main
with:
docker_hub_login: ${{ secrets.docker_hub_login }}
@ -221,7 +221,7 @@ jobs:
(github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v3.'))
steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- uses: prometheus/promci@3cb0c3871f223bd5ce1226995bd52ffb314798b6 # v0.1.0
- uses: prometheus/promci@45166329da36d74895901808f1c8c97efafc7f84 # v0.3.0
- uses: ./.github/promci/actions/publish_release
with:
docker_hub_login: ${{ secrets.docker_hub_login }}
@ -236,9 +236,9 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- uses: prometheus/promci@3cb0c3871f223bd5ce1226995bd52ffb314798b6 # v0.1.0
- uses: prometheus/promci@45166329da36d74895901808f1c8c97efafc7f84 # v0.3.0
- name: Install nodejs
uses: actions/setup-node@60edb5dd545a775178f52524783378180af0d1f8 # v4.0.2
uses: actions/setup-node@1e60f620b9541d16bece96c5465dc8ee9832be0b # v4.0.3
with:
node-version-file: "web/ui/.nvmrc"
registry-url: "https://registry.npmjs.org"

View file

@ -27,12 +27,12 @@ jobs:
uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
- name: Initialize CodeQL
uses: github/codeql-action/init@afb54ba388a7dca6ecae48f608c4ff05ff4cc77a # v3.25.15
uses: github/codeql-action/init@4dd16135b69a43b6c8efb853346f8437d92d3c93 # v3.26.6
with:
languages: ${{ matrix.language }}
- name: Autobuild
uses: github/codeql-action/autobuild@afb54ba388a7dca6ecae48f608c4ff05ff4cc77a # v3.25.15
uses: github/codeql-action/autobuild@4dd16135b69a43b6c8efb853346f8437d92d3c93 # v3.26.6
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@afb54ba388a7dca6ecae48f608c4ff05ff4cc77a # v3.25.15
uses: github/codeql-action/analyze@4dd16135b69a43b6c8efb853346f8437d92d3c93 # v3.26.6

View file

@ -45,6 +45,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@afb54ba388a7dca6ecae48f608c4ff05ff4cc77a # tag=v3.25.15
uses: github/codeql-action/upload-sarif@4dd16135b69a43b6c8efb853346f8437d92d3c93 # tag=v3.26.6
with:
sarif_file: results.sarif

31
.github/workflows/stale.yml vendored Normal file
View file

@ -0,0 +1,31 @@
name: Stale Check
on:
workflow_dispatch: {}
schedule:
- cron: '16 22 * * *'
permissions:
issues: write
pull-requests: write
jobs:
stale:
if: github.repository_owner == 'prometheus' || github.repository_owner == 'prometheus-community' # Don't run this workflow on forks.
runs-on: ubuntu-latest
steps:
- uses: actions/stale@28ca1036281a5e5922ead5184a1bbf96e5fc984e # v9.0.0
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
# opt out of defaults to avoid marking issues as stale and closing them
# https://github.com/actions/stale#days-before-close
# https://github.com/actions/stale#days-before-stale
days-before-stale: -1
days-before-close: -1
# Setting it to empty string to skip comments.
# https://github.com/actions/stale#stale-pr-message
# https://github.com/actions/stale#stale-issue-message
stale-pr-message: ''
stale-issue-message: ''
operations-per-run: 30
# override days-before-stale, for only marking the pull requests as stale
days-before-pr-stale: 60
stale-pr-label: stale
exempt-pr-labels: keepalive

View file

@ -960,12 +960,18 @@ func main() {
listeners, err := webHandler.Listeners()
if err != nil {
level.Error(logger).Log("msg", "Unable to start web listeners", "err", err)
if err := queryEngine.Close(); err != nil {
level.Warn(logger).Log("msg", "Closing query engine failed", "err", err)
}
os.Exit(1)
}
err = toolkit_web.Validate(*webConfig)
if err != nil {
level.Error(logger).Log("msg", "Unable to validate web configuration file", "err", err)
if err := queryEngine.Close(); err != nil {
level.Warn(logger).Log("msg", "Closing query engine failed", "err", err)
}
os.Exit(1)
}
@ -987,6 +993,9 @@ func main() {
case <-cancel:
reloadReady.Close()
}
if err := queryEngine.Close(); err != nil {
level.Warn(logger).Log("msg", "Closing query engine failed", "err", err)
}
return nil
},
func(err error) {

View file

@ -394,8 +394,16 @@ func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
m.targets[poolKey] = make(map[string]*targetgroup.Group)
}
for _, tg := range tgs {
if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
// Some Discoverers send nil target group so need to check for it to avoid panics.
if tg == nil {
continue
}
if len(tg.Targets) > 0 {
m.targets[poolKey][tg.Source] = tg
} else {
// The target group is empty, drop the corresponding entry to avoid leaks.
// In case the group yielded targets before, allGroups() will take care of making consumers drop them.
delete(m.targets[poolKey], tg.Source)
}
}
}

View file

@ -1051,8 +1051,8 @@ func TestDiscovererConfigs(t *testing.T) {
}
// TestTargetSetRecreatesEmptyStaticConfigs ensures that reloading a config file after
// removing all targets from the static_configs sends an update with empty targetGroups.
// This is required to signal the receiver that this target set has no current targets.
// removing all targets from the static_configs cleans the corresponding targetGroups entries to avoid leaks and sends an empty update.
// The update is required to signal the consumers that the previous targets should be dropped.
func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -1085,16 +1085,14 @@ func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) {
discoveryManager.ApplyConfig(c)
syncedTargets = <-discoveryManager.SyncCh()
require.Len(t, discoveryManager.targets, 1)
p = pk("static", "prometheus", 1)
targetGroups, ok := discoveryManager.targets[p]
require.True(t, ok, "'%v' should be present in target groups", p)
group, ok := targetGroups[""]
require.True(t, ok, "missing '' key in target groups %v", targetGroups)
require.Empty(t, group.Targets, "Invalid number of targets.")
require.Len(t, syncedTargets, 1)
require.Len(t, syncedTargets["prometheus"], 1)
require.Nil(t, syncedTargets["prometheus"][0].Labels)
require.True(t, ok, "'%v' should be present in targets", p)
// Otherwise the targetGroups will leak, see https://github.com/prometheus/prometheus/issues/12436.
require.Empty(t, targetGroups, 0, "'%v' should no longer have any associated target groups", p)
require.Len(t, syncedTargets, 1, "an update with no targetGroups should still be sent.")
require.Empty(t, syncedTargets["prometheus"], 0)
}
func TestIdenticalConfigurationsAreCoalesced(t *testing.T) {

View file

@ -307,6 +307,17 @@ tls_config:
[ proxy_connect_header:
[ <string>: [<secret>, ...] ] ]
# Custom HTTP headers to be sent along with each request.
# Headers that are set by Prometheus itself can't be overwritten.
http_headers:
# Header name.
[ <string>:
# Header values.
[ values: [<string>, ...] ]
# Headers values. Hidden in configuration page.
[ secrets: [<secret>, ...] ]
# Files to read header values from.
[ files: [<string>, ...] ] ]
# List of Azure service discovery configurations.
azure_sd_configs:

View file

@ -8,8 +8,8 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/influxdata/influxdb v1.11.5
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/common v0.55.0
github.com/prometheus/client_golang v1.20.0
github.com/prometheus/common v0.57.0
github.com/prometheus/prometheus v0.53.1
github.com/stretchr/testify v1.9.0
)
@ -35,7 +35,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
@ -55,10 +55,10 @@ require (
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect

View file

@ -187,8 +187,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@ -253,8 +253,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_golang v1.20.0 h1:jBzTZ7B099Rg24tny+qngoynol8LtVYlA2bqx3vEloI=
github.com/prometheus/client_golang v1.20.0/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -264,8 +264,8 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/common v0.57.0 h1:Ro/rKjwdq9mZn1K5QPctzh+MA4Lp0BuYk5ZZEVhoNcY=
github.com/prometheus/common v0.57.0/go.mod h1:7uRPFSUTbfZWsJ7MHY56sqt7hLQu3bxXHDnNhl8E9qI=
github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4=
github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
@ -323,8 +323,8 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA=
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@ -344,8 +344,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
@ -373,11 +373,11 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA=
golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0=
golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk=
golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=

12
go.mod
View file

@ -21,7 +21,7 @@ require (
github.com/docker/docker v27.1.1+incompatible
github.com/edsrzf/mmap-go v1.1.0
github.com/envoyproxy/go-control-plane v0.12.0
github.com/envoyproxy/protoc-gen-validate v1.0.4
github.com/envoyproxy/protoc-gen-validate v1.1.0
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fsnotify/fsnotify v1.7.0
github.com/go-kit/log v0.2.1
@ -36,10 +36,10 @@ require (
github.com/gophercloud/gophercloud v1.14.0
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/consul/api v1.29.2
github.com/hashicorp/consul/api v1.29.4
github.com/hashicorp/nomad/api v0.0.0-20240717122358-3d93bd3778f3
github.com/hetznercloud/hcloud-go/v2 v2.12.0
github.com/ionos-cloud/sdk-go/v6 v6.2.0
github.com/hetznercloud/hcloud-go/v2 v2.13.1
github.com/ionos-cloud/sdk-go/v6 v6.2.1
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.9
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b
@ -52,7 +52,7 @@ require (
github.com/oklog/ulid v1.3.1
github.com/ovh/go-ovh v1.6.0
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/client_golang v1.20.0
github.com/prometheus/client_golang v1.20.2
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.56.0
github.com/prometheus/common/assets v0.2.0
@ -75,7 +75,7 @@ require (
go.uber.org/automaxprocs v1.5.3
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
golang.org/x/oauth2 v0.21.0
golang.org/x/oauth2 v0.22.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.22.0
golang.org/x/text v0.16.0

24
go.sum
View file

@ -171,8 +171,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI=
github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM=
github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb h1:IT4JYU7k4ikYg1SCxNI1/Tieq/NFvh6dzLdgi7eu0tM=
@ -353,8 +353,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFb
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/api v1.29.2 h1:aYyRn8EdE2mSfG14S1+L9Qkjtz8RzmaWh6AcNGRNwPw=
github.com/hashicorp/consul/api v1.29.2/go.mod h1:0YObcaLNDSbtlgzIRtmRXI1ZkeuK0trCBxwZQ4MYnIk=
github.com/hashicorp/consul/api v1.29.4 h1:P6slzxDLBOxUSj3fWo2o65VuKtbtOXFi7TSSgtXutuE=
github.com/hashicorp/consul/api v1.29.4/go.mod h1:HUlfw+l2Zy68ceJavv2zAyArl2fqhGWnMycyt56sBgg=
github.com/hashicorp/consul/proto-public v0.6.2 h1:+DA/3g/IiKlJZb88NBn0ZgXrxJp2NlvCZdEyl+qxvL0=
github.com/hashicorp/consul/proto-public v0.6.2/go.mod h1:cXXbOg74KBNGajC+o8RlA502Esf0R9prcoJgiOX/2Tg=
github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
@ -414,8 +414,8 @@ github.com/hashicorp/nomad/api v0.0.0-20240717122358-3d93bd3778f3/go.mod h1:svtx
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY=
github.com/hashicorp/serf v0.10.1/go.mod h1:yL2t6BqATOLGc5HF7qbFkTfXoPIY0WZdWHfEvMqbG+4=
github.com/hetznercloud/hcloud-go/v2 v2.12.0 h1:nOgfNTo0gyXZJJdM8mo/XH5MO/e80wAEpldRzdWayhY=
github.com/hetznercloud/hcloud-go/v2 v2.12.0/go.mod h1:dhix40Br3fDiBhwaSG/zgaYOFFddpfBm/6R1Zz0IiF0=
github.com/hetznercloud/hcloud-go/v2 v2.13.1 h1:jq0GP4QaYE5d8xR/Zw17s9qoaESRJMXfGmtD1a/qckQ=
github.com/hetznercloud/hcloud-go/v2 v2.13.1/go.mod h1:dhix40Br3fDiBhwaSG/zgaYOFFddpfBm/6R1Zz0IiF0=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
@ -423,8 +423,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/ionos-cloud/sdk-go/v6 v6.2.0 h1:qX7gachC0wJSmFfVRnd+DHmz9AStvVraKcwQ/JokIB4=
github.com/ionos-cloud/sdk-go/v6 v6.2.0/go.mod h1:EzEgRIDxBELvfoa/uBN0kOQaqovLjUWEB7iW4/Q+t4k=
github.com/ionos-cloud/sdk-go/v6 v6.2.1 h1:mxxN+frNVmbFrmmFfXnBC3g2USYJrl6mc1LW2iNYbFY=
github.com/ionos-cloud/sdk-go/v6 v6.2.1/go.mod h1:SXrO9OGyWjd2rZhAhEpdYN6VUAODzzqRdqA9BCviQtI=
github.com/jarcoal/httpmock v1.3.1 h1:iUx3whfZWVf3jT01hQTO/Eo5sAYtB2/rqaUuOtpInww=
github.com/jarcoal/httpmock v1.3.1/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
@ -608,8 +608,8 @@ github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeD
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.20.0 h1:jBzTZ7B099Rg24tny+qngoynol8LtVYlA2bqx3vEloI=
github.com/prometheus/client_golang v1.20.0/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_golang v1.20.2 h1:5ctymQzZlyOON1666svgwn3s6IKWgfbjsejTMiXIyjg=
github.com/prometheus/client_golang v1.20.2/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -865,8 +865,8 @@ golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4Iltr
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA=
golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

View file

@ -409,6 +409,7 @@ func (p *ProtobufParser) Next() (Entry, error) {
switch p.state {
case EntryInvalid:
p.metricPos = 0
p.exemplarPos = 0
p.fieldPos = -2
n, err := readDelimited(p.in[p.inPos:], p.mf)
p.inPos += n
@ -485,6 +486,7 @@ func (p *ProtobufParser) Next() (Entry, error) {
p.metricPos++
p.fieldPos = -2
p.fieldsDone = false
p.exemplarPos = 0
// If this is a metric family containing native
// histograms, we have to switch back to native
// histograms after parsing a classic histogram.

View file

@ -695,6 +695,70 @@ metric: <
timestamp_ms: 1234568
>
`,
`name: "test_histogram_with_native_histogram_exemplars2"
help: "Another histogram with native histogram exemplars."
type: HISTOGRAM
metric: <
histogram: <
sample_count: 175
sample_sum: 0.0008280461746287094
bucket: <
cumulative_count: 2
upper_bound: -0.0004899999999999998
>
bucket: <
cumulative_count: 4
upper_bound: -0.0003899999999999998
>
bucket: <
cumulative_count: 16
upper_bound: -0.0002899999999999998
>
schema: 3
zero_threshold: 2.938735877055719e-39
zero_count: 2
negative_span: <
offset: -162
length: 1
>
negative_span: <
offset: 23
length: 4
>
negative_delta: 1
negative_delta: 3
negative_delta: -2
negative_delta: -1
negative_delta: 1
positive_span: <
offset: -161
length: 1
>
positive_span: <
offset: 8
length: 3
>
positive_delta: 1
positive_delta: 2
positive_delta: -1
positive_delta: -1
exemplars: <
label: <
name: "dummyID"
value: "59780"
>
value: -0.00039
timestamp: <
seconds: 1625851155
nanos: 146848499
>
>
>
timestamp_ms: 1234568
>
`,
}
@ -1276,6 +1340,41 @@ func TestProtobufParse(t *testing.T) {
{Labels: labels.FromStrings("dummyID", "59772"), Value: -0.00052, HasTs: true, Ts: 1625851160156},
},
},
{
m: "test_histogram_with_native_histogram_exemplars2",
help: "Another histogram with native histogram exemplars.",
},
{
m: "test_histogram_with_native_histogram_exemplars2",
typ: model.MetricTypeHistogram,
},
{
m: "test_histogram_with_native_histogram_exemplars2",
t: 1234568,
shs: &histogram.Histogram{
Count: 175,
ZeroCount: 2,
Sum: 0.0008280461746287094,
ZeroThreshold: 2.938735877055719e-39,
Schema: 3,
PositiveSpans: []histogram.Span{
{Offset: -161, Length: 1},
{Offset: 8, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: -162, Length: 1},
{Offset: 23, Length: 4},
},
PositiveBuckets: []int64{1, 2, -1, -1},
NegativeBuckets: []int64{1, 3, -2, -1, 1},
},
lset: labels.FromStrings(
"__name__", "test_histogram_with_native_histogram_exemplars2",
),
e: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "59780"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
},
},
},
},
{
@ -1995,15 +2094,15 @@ func TestProtobufParse(t *testing.T) {
"__name__", "without_quantiles_sum",
),
},
{ // 78
{ // 81
m: "empty_histogram",
help: "A histogram without observations and with a zero threshold of zero but with a no-op span to identify it as a native histogram.",
},
{ // 79
{ // 82
m: "empty_histogram",
typ: model.MetricTypeHistogram,
},
{ // 80
{ // 83
m: "empty_histogram",
shs: &histogram.Histogram{
CounterResetHint: histogram.UnknownCounterReset,
@ -2014,15 +2113,15 @@ func TestProtobufParse(t *testing.T) {
"__name__", "empty_histogram",
),
},
{ // 81
{ // 84
m: "test_counter_with_createdtimestamp",
help: "A counter with a created timestamp.",
},
{ // 82
{ // 85
m: "test_counter_with_createdtimestamp",
typ: model.MetricTypeCounter,
},
{ // 83
{ // 86
m: "test_counter_with_createdtimestamp",
v: 42,
ct: 1000,
@ -2030,15 +2129,15 @@ func TestProtobufParse(t *testing.T) {
"__name__", "test_counter_with_createdtimestamp",
),
},
{ // 84
{ // 87
m: "test_summary_with_createdtimestamp",
help: "A summary with a created timestamp.",
},
{ // 85
{ // 88
m: "test_summary_with_createdtimestamp",
typ: model.MetricTypeSummary,
},
{ // 86
{ // 89
m: "test_summary_with_createdtimestamp_count",
v: 42,
ct: 1000,
@ -2046,7 +2145,7 @@ func TestProtobufParse(t *testing.T) {
"__name__", "test_summary_with_createdtimestamp_count",
),
},
{ // 87
{ // 90
m: "test_summary_with_createdtimestamp_sum",
v: 1.234,
ct: 1000,
@ -2054,15 +2153,15 @@ func TestProtobufParse(t *testing.T) {
"__name__", "test_summary_with_createdtimestamp_sum",
),
},
{ // 88
{ // 91
m: "test_histogram_with_createdtimestamp",
help: "A histogram with a created timestamp.",
},
{ // 89
{ // 92
m: "test_histogram_with_createdtimestamp",
typ: model.MetricTypeHistogram,
},
{ // 90
{ // 93
m: "test_histogram_with_createdtimestamp",
ct: 1000,
shs: &histogram.Histogram{
@ -2074,15 +2173,15 @@ func TestProtobufParse(t *testing.T) {
"__name__", "test_histogram_with_createdtimestamp",
),
},
{ // 91
{ // 94
m: "test_gaugehistogram_with_createdtimestamp",
help: "A gauge histogram with a created timestamp.",
},
{ // 92
{ // 95
m: "test_gaugehistogram_with_createdtimestamp",
typ: model.MetricTypeGaugeHistogram,
},
{ // 93
{ // 96
m: "test_gaugehistogram_with_createdtimestamp",
ct: 1000,
shs: &histogram.Histogram{
@ -2094,15 +2193,15 @@ func TestProtobufParse(t *testing.T) {
"__name__", "test_gaugehistogram_with_createdtimestamp",
),
},
{ // 94
{ // 97
m: "test_histogram_with_native_histogram_exemplars",
help: "A histogram with native histogram exemplars.",
},
{ // 95
{ // 98
m: "test_histogram_with_native_histogram_exemplars",
typ: model.MetricTypeHistogram,
},
{ // 96
{ // 99
m: "test_histogram_with_native_histogram_exemplars",
t: 1234568,
shs: &histogram.Histogram{
@ -2130,7 +2229,7 @@ func TestProtobufParse(t *testing.T) {
{Labels: labels.FromStrings("dummyID", "59772"), Value: -0.00052, HasTs: true, Ts: 1625851160156},
},
},
{ // 97
{ // 100
m: "test_histogram_with_native_histogram_exemplars_count",
t: 1234568,
v: 175,
@ -2138,7 +2237,7 @@ func TestProtobufParse(t *testing.T) {
"__name__", "test_histogram_with_native_histogram_exemplars_count",
),
},
{ // 98
{ // 101
m: "test_histogram_with_native_histogram_exemplars_sum",
t: 1234568,
v: 0.0008280461746287094,
@ -2146,7 +2245,7 @@ func TestProtobufParse(t *testing.T) {
"__name__", "test_histogram_with_native_histogram_exemplars_sum",
),
},
{ // 99
{ // 102
m: "test_histogram_with_native_histogram_exemplars_bucket\xffle\xff-0.0004899999999999998",
t: 1234568,
v: 2,
@ -2155,7 +2254,7 @@ func TestProtobufParse(t *testing.T) {
"le", "-0.0004899999999999998",
),
},
{ // 100
{ // 103
m: "test_histogram_with_native_histogram_exemplars_bucket\xffle\xff-0.0003899999999999998",
t: 1234568,
v: 4,
@ -2167,7 +2266,7 @@ func TestProtobufParse(t *testing.T) {
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
},
},
{ // 101
{ // 104
m: "test_histogram_with_native_histogram_exemplars_bucket\xffle\xff-0.0002899999999999998",
t: 1234568,
v: 16,
@ -2179,7 +2278,7 @@ func TestProtobufParse(t *testing.T) {
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false},
},
},
{ // 102
{ // 105
m: "test_histogram_with_native_histogram_exemplars_bucket\xffle\xff+Inf",
t: 1234568,
v: 175,
@ -2188,6 +2287,93 @@ func TestProtobufParse(t *testing.T) {
"le", "+Inf",
),
},
{ // 106
m: "test_histogram_with_native_histogram_exemplars2",
help: "Another histogram with native histogram exemplars.",
},
{ // 107
m: "test_histogram_with_native_histogram_exemplars2",
typ: model.MetricTypeHistogram,
},
{ // 108
m: "test_histogram_with_native_histogram_exemplars2",
t: 1234568,
shs: &histogram.Histogram{
Count: 175,
ZeroCount: 2,
Sum: 0.0008280461746287094,
ZeroThreshold: 2.938735877055719e-39,
Schema: 3,
PositiveSpans: []histogram.Span{
{Offset: -161, Length: 1},
{Offset: 8, Length: 3},
},
NegativeSpans: []histogram.Span{
{Offset: -162, Length: 1},
{Offset: 23, Length: 4},
},
PositiveBuckets: []int64{1, 2, -1, -1},
NegativeBuckets: []int64{1, 3, -2, -1, 1},
},
lset: labels.FromStrings(
"__name__", "test_histogram_with_native_histogram_exemplars2",
),
e: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "59780"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
},
},
{ // 109
m: "test_histogram_with_native_histogram_exemplars2_count",
t: 1234568,
v: 175,
lset: labels.FromStrings(
"__name__", "test_histogram_with_native_histogram_exemplars2_count",
),
},
{ // 110
m: "test_histogram_with_native_histogram_exemplars2_sum",
t: 1234568,
v: 0.0008280461746287094,
lset: labels.FromStrings(
"__name__", "test_histogram_with_native_histogram_exemplars2_sum",
),
},
{ // 111
m: "test_histogram_with_native_histogram_exemplars2_bucket\xffle\xff-0.0004899999999999998",
t: 1234568,
v: 2,
lset: labels.FromStrings(
"__name__", "test_histogram_with_native_histogram_exemplars2_bucket",
"le", "-0.0004899999999999998",
),
},
{ // 112
m: "test_histogram_with_native_histogram_exemplars2_bucket\xffle\xff-0.0003899999999999998",
t: 1234568,
v: 4,
lset: labels.FromStrings(
"__name__", "test_histogram_with_native_histogram_exemplars2_bucket",
"le", "-0.0003899999999999998",
),
},
{ // 113
m: "test_histogram_with_native_histogram_exemplars2_bucket\xffle\xff-0.0002899999999999998",
t: 1234568,
v: 16,
lset: labels.FromStrings(
"__name__", "test_histogram_with_native_histogram_exemplars2_bucket",
"le", "-0.0002899999999999998",
),
},
{ // 114
m: "test_histogram_with_native_histogram_exemplars2_bucket\xffle\xff+Inf",
t: 1234568,
v: 175,
lset: labels.FromStrings(
"__name__", "test_histogram_with_native_histogram_exemplars2_bucket",
"le", "+Inf",
),
},
},
},
}

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/teststorage"
@ -274,7 +275,7 @@ func BenchmarkRangeQuery(b *testing.B) {
MaxSamples: 50000000,
Timeout: 100 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(b, opts)
const interval = 10000 // 10s interval.
// A day of data plus 10k steps.
@ -365,7 +366,7 @@ func BenchmarkNativeHistograms(b *testing.B) {
for _, tc := range cases {
b.Run(tc.name, func(b *testing.B) {
ng := promql.NewEngine(opts)
ng := promqltest.NewTestEngineWithOpts(b, opts)
for i := 0; i < b.N; i++ {
qry, err := ng.NewRangeQuery(context.Background(), testStorage, nil, tc.query, start, end, step)
if err != nil {

View file

@ -19,6 +19,7 @@ import (
"context"
"errors"
"fmt"
"io"
"math"
"reflect"
"runtime"
@ -271,6 +272,8 @@ func contextErr(err error, env string) error {
//
// 2) Enforcement of the maximum number of concurrent queries.
type QueryTracker interface {
io.Closer
// GetMaxConcurrent returns maximum number of concurrent queries that are allowed by this tracker.
GetMaxConcurrent() int
@ -430,6 +433,14 @@ func NewEngine(opts EngineOpts) *Engine {
}
}
// Close closes ng.
func (ng *Engine) Close() error {
if ng.activeQueryTracker != nil {
return ng.activeQueryTracker.Close()
}
return nil
}
// SetQueryLogger sets the query logger.
func (ng *Engine) SetQueryLogger(l QueryLogger) {
ng.queryLoggerLock.Lock()
@ -713,7 +724,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
startTimestamp: start,
endTimestamp: start,
interval: 1,
ctx: ctxInnerEval,
maxSamples: ng.maxSamplesPerQuery,
logger: ng.logger,
lookbackDelta: s.LookbackDelta,
@ -723,7 +733,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
}
query.sampleStats.InitStepTracking(start, start, 1)
val, warnings, err := evaluator.Eval(s.Expr)
val, warnings, err := evaluator.Eval(ctxInnerEval, s.Expr)
evalSpanTimer.Finish()
@ -772,7 +782,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
startTimestamp: timeMilliseconds(s.Start),
endTimestamp: timeMilliseconds(s.End),
interval: durationMilliseconds(s.Interval),
ctx: ctxInnerEval,
maxSamples: ng.maxSamplesPerQuery,
logger: ng.logger,
lookbackDelta: s.LookbackDelta,
@ -781,7 +790,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
enableDelayedNameRemoval: ng.enableDelayedNameRemoval,
}
query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval)
val, warnings, err := evaluator.Eval(s.Expr)
val, warnings, err := evaluator.Eval(ctxInnerEval, s.Expr)
evalSpanTimer.Finish()
@ -999,6 +1008,8 @@ func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations
if e.Series != nil {
return nil, nil
}
span := trace.SpanFromContext(ctx)
span.AddEvent("expand start", trace.WithAttributes(attribute.String("selector", e.String())))
series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet)
if e.SkipHistogramBuckets {
for i := range series {
@ -1006,6 +1017,7 @@ func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations
}
}
e.Series = series
span.AddEvent("expand end", trace.WithAttributes(attribute.Int("num_series", len(series))))
return ws, err
}
return nil, nil
@ -1035,8 +1047,6 @@ func (e errWithWarnings) Error() string { return e.err.Error() }
// querier and reports errors. On timeout or cancellation of its context it
// terminates.
type evaluator struct {
ctx context.Context
startTimestamp int64 // Start time in milliseconds.
endTimestamp int64 // End time in milliseconds.
interval int64 // Interval in milliseconds.
@ -1085,10 +1095,10 @@ func (ev *evaluator) recover(expr parser.Expr, ws *annotations.Annotations, errp
}
}
func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws annotations.Annotations, err error) {
func (ev *evaluator) Eval(ctx context.Context, expr parser.Expr) (v parser.Value, ws annotations.Annotations, err error) {
defer ev.recover(expr, &ws, &err)
v, ws = ev.eval(expr)
v, ws = ev.eval(ctx, expr)
if ev.enableDelayedNameRemoval {
ev.cleanupMetricLabels(v)
}
@ -1139,7 +1149,7 @@ func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) {
// function call results.
// The prepSeries function (if provided) can be used to prepare the helper
// for each series, then passed to each call funcCall.
func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, annotations.Annotations), exprs ...parser.Expr) (Matrix, annotations.Annotations) {
func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, annotations.Annotations), exprs ...parser.Expr) (Matrix, annotations.Annotations) {
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
matrixes := make([]Matrix, len(exprs))
origMatrixes := make([]Matrix, len(exprs))
@ -1150,7 +1160,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
// Functions will take string arguments from the expressions, not the values.
if e != nil && e.Type() != parser.ValueTypeString {
// ev.currentSamples will be updated to the correct value within the ev.eval call.
val, ws := ev.eval(e)
val, ws := ev.eval(ctx, e)
warnings.Merge(ws)
matrixes[i] = val.(Matrix)
@ -1202,7 +1212,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
}
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err)
}
// Reset number of samples in memory after each timestamp.
@ -1312,7 +1322,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
return mat, warnings
}
func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) {
func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) {
// Keep a copy of the original point slice so that it can be returned to the pool.
origMatrix := slices.Clone(inputMatrix)
defer func() {
@ -1392,7 +1402,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
}
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err)
}
// Reset number of samples in memory after each timestamp.
@ -1443,11 +1453,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) {
func (ev *evaluator) evalSubquery(ctx context.Context, subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) {
samplesStats := ev.samplesStats
// Avoid double counting samples when running a subquery, those samples will be counted in later stage.
ev.samplesStats = ev.samplesStats.NewChild()
val, ws := ev.eval(subq)
val, ws := ev.eval(ctx, subq)
// But do incorporate the peak from the subquery
samplesStats.UpdatePeakFromSubquery(ev.samplesStats)
ev.samplesStats = samplesStats
@ -1474,18 +1484,20 @@ func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSele
}
// eval evaluates the given expression as the given AST expression node requires.
func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotations) {
func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, annotations.Annotations) {
// This is the top-level evaluation method.
// Thus, we check for timeout/cancellation here.
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err)
}
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
// Create a new span to help investigate inner evaluation performances.
ctxWithSpan, span := otel.Tracer("").Start(ev.ctx, stats.InnerEvalTime.SpanOperation()+" eval "+reflect.TypeOf(expr).String())
ev.ctx = ctxWithSpan
ctx, span := otel.Tracer("").Start(ctx, stats.InnerEvalTime.SpanOperation()+" eval "+reflect.TypeOf(expr).String())
defer span.End()
if ss, ok := expr.(interface{ ShortString() string }); ok {
span.SetAttributes(attribute.String("operation", ss.ShortString()))
}
switch e := expr.(type) {
case *parser.AggregateExpr:
@ -1506,7 +1518,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
sortedGrouping = append(sortedGrouping, valueLabel.Val)
slices.Sort(sortedGrouping)
}
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh)
}, e.Expr)
}
@ -1516,16 +1528,16 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
// param is the number k for topk/bottomk, or q for quantile.
var fParam float64
if param != nil {
val, ws := ev.eval(param)
val, ws := ev.eval(ctx, param)
warnings.Merge(ws)
fParam = val.(Matrix)[0].Floats[0].F
}
// Now fetch the data to be aggregated.
val, ws := ev.eval(e.Expr)
val, ws := ev.eval(ctx, e.Expr)
warnings.Merge(ws)
inputMatrix := val.(Matrix)
result, ws := ev.rangeEvalAgg(e, sortedGrouping, inputMatrix, fParam)
result, ws := ev.rangeEvalAgg(ctx, e, sortedGrouping, inputMatrix, fParam)
warnings.Merge(ws)
ev.currentSamples = originalNumSamples + result.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
@ -1543,7 +1555,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
unwrapParenExpr(&arg)
vs, ok := arg.(*parser.VectorSelector)
if ok {
return ev.rangeEvalTimestampFunctionOverVectorSelector(vs, call, e)
return ev.rangeEvalTimestampFunctionOverVectorSelector(ctx, vs, call, e)
}
}
@ -1567,7 +1579,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
matrixArgIndex = i
matrixArg = true
// Replacing parser.SubqueryExpr with parser.MatrixSelector.
val, totalSamples, ws := ev.evalSubquery(subq)
val, totalSamples, ws := ev.evalSubquery(ctx, subq)
e.Args[i] = val
warnings.Merge(ws)
defer func() {
@ -1582,14 +1594,14 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
// Special handling for functions that work on series not samples.
switch e.Func.Name {
case "label_replace":
return ev.evalLabelReplace(e.Args)
return ev.evalLabelReplace(ctx, e.Args)
case "label_join":
return ev.evalLabelJoin(e.Args)
return ev.evalLabelJoin(ctx, e.Args)
}
if !matrixArg {
// Does not have a matrix argument.
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec, annos := call(v, e.Args, enh)
return vec, warnings.Merge(annos)
}, e.Args...)
@ -1601,7 +1613,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
otherInArgs := make([]Vector, len(e.Args))
for i, e := range e.Args {
if i != matrixArgIndex {
val, ws := ev.eval(e)
val, ws := ev.eval(ctx, e)
otherArgs[i] = val.(Matrix)
otherInArgs[i] = Vector{Sample{}}
inArgs[i] = otherInArgs[i]
@ -1615,7 +1627,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
sel := arg.(*parser.MatrixSelector)
selVS := sel.VectorSelector.(*parser.VectorSelector)
ws, err := checkAndExpandSeriesSet(ev.ctx, sel)
ws, err := checkAndExpandSeriesSet(ctx, sel)
warnings.Merge(ws)
if err != nil {
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), warnings})
@ -1645,7 +1657,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
dropName := e.Func.Name != "last_over_time"
for i, s := range selVS.Series {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err)
}
ev.currentSamples -= len(floats) + totalHPointSize(histograms)
@ -1791,10 +1803,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
return mat, warnings
case *parser.ParenExpr:
return ev.eval(e.Expr)
return ev.eval(ctx, e.Expr)
case *parser.UnaryExpr:
val, ws := ev.eval(e.Expr)
val, ws := ev.eval(ctx, e.Expr)
mat := val.(Matrix)
if e.Op == parser.SUB {
for i := range mat {
@ -1815,7 +1827,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
case *parser.BinaryExpr:
switch lt, rt := e.LHS.Type(), e.RHS.Type(); {
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar:
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
val := scalarBinop(e.Op, v[0].(Vector)[0].F, v[1].(Vector)[0].F)
return append(enh.Out, Sample{F: val}), nil
}, e.LHS, e.RHS)
@ -1828,47 +1840,49 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
}
switch e.Op {
case parser.LAND:
return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil
}, e.LHS, e.RHS)
case parser.LOR:
return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil
}, e.LHS, e.RHS)
case parser.LUNLESS:
return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil
}, e.LHS, e.RHS)
default:
return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec, err := ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, sh[0], sh[1], enh)
return vec, handleVectorBinopError(err, e)
}, e.LHS, e.RHS)
}
case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar:
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec, err := ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].F}, false, e.ReturnBool, enh)
return vec, handleVectorBinopError(err, e)
}, e.LHS, e.RHS)
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector:
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec, err := ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].F}, true, e.ReturnBool, enh)
return vec, handleVectorBinopError(err, e)
}, e.LHS, e.RHS)
}
case *parser.NumberLiteral:
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
span.SetAttributes(attribute.Float64("value", e.Val))
return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return append(enh.Out, Sample{F: e.Val, Metric: labels.EmptyLabels()}), nil
})
case *parser.StringLiteral:
span.SetAttributes(attribute.String("value", e.Val))
return String{V: e.Val, T: ev.startTimestamp}, nil
case *parser.VectorSelector:
ws, err := checkAndExpandSeriesSet(ev.ctx, e)
ws, err := checkAndExpandSeriesSet(ctx, e)
if err != nil {
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
}
@ -1877,7 +1891,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
var chkIter chunkenc.Iterator
for i, s := range e.Series {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err)
}
chkIter = s.Iterator(chkIter)
@ -1928,14 +1942,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
if ev.startTimestamp != ev.endTimestamp {
panic(errors.New("cannot do range evaluation of matrix selector"))
}
return ev.matrixSelector(e)
return ev.matrixSelector(ctx, e)
case *parser.SubqueryExpr:
offsetMillis := durationMilliseconds(e.Offset)
rangeMillis := durationMilliseconds(e.Range)
newEv := &evaluator{
endTimestamp: ev.endTimestamp - offsetMillis,
ctx: ev.ctx,
currentSamples: ev.currentSamples,
maxSamples: ev.maxSamples,
logger: ev.logger,
@ -1965,7 +1978,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
setOffsetForAtModifier(newEv.startTimestamp, e.Expr)
}
res, ws := newEv.eval(e.Expr)
res, ws := newEv.eval(ctx, e.Expr)
ev.currentSamples = newEv.currentSamples
ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats)
ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples)
@ -1973,14 +1986,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
case *parser.StepInvariantExpr:
switch ce := e.Expr.(type) {
case *parser.StringLiteral, *parser.NumberLiteral:
return ev.eval(ce)
return ev.eval(ctx, ce)
}
newEv := &evaluator{
startTimestamp: ev.startTimestamp,
endTimestamp: ev.startTimestamp, // Always a single evaluation.
interval: ev.interval,
ctx: ev.ctx,
currentSamples: ev.currentSamples,
maxSamples: ev.maxSamples,
logger: ev.logger,
@ -1989,7 +2001,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn,
enableDelayedNameRemoval: ev.enableDelayedNameRemoval,
}
res, ws := newEv.eval(e.Expr)
res, ws := newEv.eval(ctx, e.Expr)
ev.currentSamples = newEv.currentSamples
ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats)
for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval {
@ -2065,8 +2077,8 @@ func reuseOrGetFPointSlices(prevSS *Series, numSteps int) (r []FPoint) {
return getFPointSlice(numSteps)
}
func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) {
ws, err := checkAndExpandSeriesSet(ev.ctx, vs)
func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(ctx context.Context, vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) {
ws, err := checkAndExpandSeriesSet(ctx, vs)
if err != nil {
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
}
@ -2077,7 +2089,7 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec
seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta)-1)
}
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
if vs.Timestamp != nil {
// This is a special case for "timestamp()" when the @ modifier is used, to ensure that
// we return a point for each time step in this case.
@ -2213,7 +2225,7 @@ func putMatrixSelectorHPointSlice(p []HPoint) {
}
// matrixSelector evaluates a *parser.MatrixSelector expression.
func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annotations.Annotations) {
func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSelector) (Matrix, annotations.Annotations) {
var (
vs = node.VectorSelector.(*parser.VectorSelector)
@ -2224,7 +2236,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota
it = storage.NewBuffer(durationMilliseconds(node.Range))
)
ws, err := checkAndExpandSeriesSet(ev.ctx, node)
ws, err := checkAndExpandSeriesSet(ctx, node)
if err != nil {
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
}
@ -2232,7 +2244,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota
var chkIter chunkenc.Iterator
series := vs.Series
for i, s := range series {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err)
}
chkIter = s.Iterator(chkIter)

View file

@ -17,7 +17,6 @@ import (
"context"
"errors"
"fmt"
"os"
"sort"
"strconv"
"sync"
@ -55,14 +54,7 @@ func TestMain(m *testing.M) {
func TestQueryConcurrency(t *testing.T) {
maxConcurrency := 10
dir, err := os.MkdirTemp("", "test_concurrency")
require.NoError(t, err)
defer os.RemoveAll(dir)
queryTracker := promql.NewActiveQueryTracker(dir, maxConcurrency, nil)
t.Cleanup(func() {
require.NoError(t, queryTracker.Close())
})
queryTracker := promql.NewActiveQueryTracker(t.TempDir(), maxConcurrency, nil)
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
@ -70,15 +62,17 @@ func TestQueryConcurrency(t *testing.T) {
Timeout: 100 * time.Second,
ActiveQueryTracker: queryTracker,
}
engine := promqltest.NewTestEngineWithOpts(t, opts)
engine := promql.NewEngine(opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
t.Cleanup(cancelCtx)
block := make(chan struct{})
processing := make(chan struct{})
done := make(chan int)
defer close(done)
t.Cleanup(func() {
close(done)
})
f := func(context.Context) error {
select {
@ -163,7 +157,7 @@ func TestQueryTimeout(t *testing.T) {
MaxSamples: 10,
Timeout: 5 * time.Millisecond,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
@ -188,7 +182,7 @@ func TestQueryCancel(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
@ -262,7 +256,7 @@ func TestQueryError(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
errStorage := promql.ErrStorage{errors.New("storage error")}
queryable := storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
return &errQuerier{err: errStorage}, nil
@ -596,7 +590,7 @@ func TestSelectHintsSetCorrectly(t *testing.T) {
},
} {
t.Run(tc.query, func(t *testing.T) {
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
hintsRecorder := &noopHintRecordingQueryable{}
var (
@ -627,7 +621,7 @@ func TestEngineShutdown(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ctx, cancelCtx := context.WithCancel(context.Background())
block := make(chan struct{})
@ -763,7 +757,7 @@ load 10s
t.Run(fmt.Sprintf("%d query=%s", i, c.Query), func(t *testing.T) {
var err error
var qry promql.Query
engine := newTestEngine()
engine := newTestEngine(t)
if c.Interval == 0 {
qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start)
} else {
@ -1302,7 +1296,7 @@ load 10s
for _, c := range cases {
t.Run(c.Query, func(t *testing.T) {
opts := promql.NewPrometheusQueryOpts(true, 0)
engine := promqltest.NewTestEngine(true, 0, promqltest.DefaultMaxSamplesPerQuery)
engine := promqltest.NewTestEngine(t, true, 0, promqltest.DefaultMaxSamplesPerQuery)
runQuery := func(expErr error) *stats.Statistics {
var err error
@ -1329,7 +1323,7 @@ load 10s
if c.SkipMaxCheck {
return
}
engine = promqltest.NewTestEngine(true, 0, stats.Samples.PeakSamples-1)
engine = promqltest.NewTestEngine(t, true, 0, stats.Samples.PeakSamples-1)
runQuery(promql.ErrTooManySamples(env))
})
}
@ -1482,7 +1476,7 @@ load 10s
for _, c := range cases {
t.Run(c.Query, func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
testFunc := func(expError error) {
var err error
var qry promql.Query
@ -1503,18 +1497,18 @@ load 10s
}
// Within limit.
engine = promqltest.NewTestEngine(false, 0, c.MaxSamples)
engine = promqltest.NewTestEngine(t, false, 0, c.MaxSamples)
testFunc(nil)
// Exceeding limit.
engine = promqltest.NewTestEngine(false, 0, c.MaxSamples-1)
engine = promqltest.NewTestEngine(t, false, 0, c.MaxSamples-1)
testFunc(promql.ErrTooManySamples(env))
})
}
}
func TestAtModifier(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := promqltest.LoadedStorage(t, `
load 10s
metric{job="1"} 0+1x1000
@ -1997,7 +1991,7 @@ func TestSubquerySelector(t *testing.T) {
},
} {
t.Run("", func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := promqltest.LoadedStorage(t, tst.loadString)
t.Cleanup(func() { storage.Close() })
@ -2046,7 +2040,7 @@ func TestQueryLogger_basic(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
queryExec := func() {
ctx, cancelCtx := context.WithCancel(context.Background())
@ -2097,7 +2091,7 @@ func TestQueryLogger_fields(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
f1 := NewFakeQueryLogger()
engine.SetQueryLogger(f1)
@ -2126,7 +2120,7 @@ func TestQueryLogger_error(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
f1 := NewFakeQueryLogger()
engine.SetQueryLogger(f1)
@ -3009,7 +3003,7 @@ func TestEngineOptsValidation(t *testing.T) {
}
for _, c := range cases {
eng := promql.NewEngine(c.opts)
eng := promqltest.NewTestEngineWithOpts(t, c.opts)
_, err1 := eng.NewInstantQuery(context.Background(), nil, nil, c.query, time.Unix(10, 0))
_, err2 := eng.NewRangeQuery(context.Background(), nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second)
if c.fail {
@ -3023,7 +3017,7 @@ func TestEngineOptsValidation(t *testing.T) {
}
func TestInstantQueryWithRangeVectorSelector(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
baseT := timestamp.Time(0)
storage := promqltest.LoadedStorage(t, `
@ -3487,7 +3481,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) {
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
@ -3606,7 +3600,7 @@ metric 0 1 2
for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
engine := promqltest.NewTestEngine(false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery)
engine := promqltest.NewTestEngine(t, false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery)
storage := promqltest.LoadedStorage(t, load)
t.Cleanup(func() { storage.Close() })
@ -3643,7 +3637,7 @@ histogram {{sum:4 count:4 buckets:[2 2]}} {{sum:6 count:6 buckets:[3 3]}} {{sum:
`
storage := promqltest.LoadedStorage(t, load)
t.Cleanup(func() { storage.Close() })
engine := promqltest.NewTestEngine(false, 0, promqltest.DefaultMaxSamplesPerQuery)
engine := promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery)
verify := func(t *testing.T, qry promql.Query, expected []histogram.FloatHistogram) {
res := qry.Exec(context.Background())

View file

@ -14,6 +14,7 @@
package promql
import (
"context"
"errors"
"fmt"
"math"
@ -1463,7 +1464,7 @@ func funcChanges(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelp
}
// label_replace function operates only on series; does not look at timestamps or values.
func (ev *evaluator) evalLabelReplace(args parser.Expressions) (parser.Value, annotations.Annotations) {
func (ev *evaluator) evalLabelReplace(ctx context.Context, args parser.Expressions) (parser.Value, annotations.Annotations) {
var (
dst = stringFromArg(args[1])
repl = stringFromArg(args[2])
@ -1479,7 +1480,7 @@ func (ev *evaluator) evalLabelReplace(args parser.Expressions) (parser.Value, an
panic(fmt.Errorf("invalid destination label name in label_replace(): %s", dst))
}
val, ws := ev.eval(args[0])
val, ws := ev.eval(ctx, args[0])
matrix := val.(Matrix)
lb := labels.NewBuilder(labels.EmptyLabels())
@ -1520,7 +1521,7 @@ func funcVector(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelpe
}
// label_join function operates only on series; does not look at timestamps or values.
func (ev *evaluator) evalLabelJoin(args parser.Expressions) (parser.Value, annotations.Annotations) {
func (ev *evaluator) evalLabelJoin(ctx context.Context, args parser.Expressions) (parser.Value, annotations.Annotations) {
var (
dst = stringFromArg(args[1])
sep = stringFromArg(args[2])
@ -1537,7 +1538,7 @@ func (ev *evaluator) evalLabelJoin(args parser.Expressions) (parser.Value, annot
panic(fmt.Errorf("invalid destination label name in label_join(): %s", dst))
}
val, ws := ev.eval(args[0])
val, ws := ev.eval(ctx, args[0])
matrix := val.(Matrix)
srcVals := make([]string, len(srcLabels))
lb := labels.NewBuilder(labels.EmptyLabels())

View file

@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/util/teststorage"
)
@ -39,7 +40,7 @@ func TestDeriv(t *testing.T) {
MaxSamples: 10000,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
a := storage.Appender(context.Background())

View file

@ -72,6 +72,11 @@ func (node *AggregateExpr) String() string {
return aggrString
}
func (node *AggregateExpr) ShortString() string {
aggrString := node.getAggOpStr()
return aggrString
}
func (node *AggregateExpr) getAggOpStr() string {
aggrString := node.Op.String()
@ -95,14 +100,20 @@ func joinLabels(ss []string) string {
return strings.Join(ss, ", ")
}
func (node *BinaryExpr) String() string {
returnBool := ""
func (node *BinaryExpr) returnBool() string {
if node.ReturnBool {
returnBool = " bool"
return " bool"
}
return ""
}
func (node *BinaryExpr) String() string {
matching := node.getMatchingStr()
return fmt.Sprintf("%s %s%s%s %s", node.LHS, node.Op, returnBool, matching, node.RHS)
return fmt.Sprintf("%s %s%s%s %s", node.LHS, node.Op, node.returnBool(), matching, node.RHS)
}
func (node *BinaryExpr) ShortString() string {
return fmt.Sprintf("%s%s%s", node.Op, node.returnBool(), node.getMatchingStr())
}
func (node *BinaryExpr) getMatchingStr() string {
@ -130,9 +141,13 @@ func (node *Call) String() string {
return fmt.Sprintf("%s(%s)", node.Func.Name, node.Args)
}
func (node *MatrixSelector) String() string {
func (node *Call) ShortString() string {
return node.Func.Name
}
func (node *MatrixSelector) atOffset() (string, string) {
// Copy the Vector selector before changing the offset
vecSelector := *node.VectorSelector.(*VectorSelector)
vecSelector := node.VectorSelector.(*VectorSelector)
offset := ""
switch {
case vecSelector.OriginalOffset > time.Duration(0):
@ -149,7 +164,13 @@ func (node *MatrixSelector) String() string {
case vecSelector.StartOrEnd == END:
at = " @ end()"
}
return at, offset
}
func (node *MatrixSelector) String() string {
at, offset := node.atOffset()
// Copy the Vector selector before changing the offset
vecSelector := *node.VectorSelector.(*VectorSelector)
// Do not print the @ and offset twice.
offsetVal, atVal, preproc := vecSelector.OriginalOffset, vecSelector.Timestamp, vecSelector.StartOrEnd
vecSelector.OriginalOffset = 0
@ -163,10 +184,19 @@ func (node *MatrixSelector) String() string {
return str
}
func (node *MatrixSelector) ShortString() string {
at, offset := node.atOffset()
return fmt.Sprintf("[%s]%s%s", model.Duration(node.Range), at, offset)
}
func (node *SubqueryExpr) String() string {
return fmt.Sprintf("%s%s", node.Expr.String(), node.getSubqueryTimeSuffix())
}
func (node *SubqueryExpr) ShortString() string {
return node.getSubqueryTimeSuffix()
}
// getSubqueryTimeSuffix returns the '[<range>:<step>] @ <timestamp> offset <offset>' suffix of the subquery.
func (node *SubqueryExpr) getSubqueryTimeSuffix() string {
step := ""
@ -208,6 +238,10 @@ func (node *UnaryExpr) String() string {
return fmt.Sprintf("%s%s", node.Op, node.Expr)
}
func (node *UnaryExpr) ShortString() string {
return node.Op.String()
}
func (node *VectorSelector) String() string {
var labelStrings []string
if len(node.LabelMatchers) > 1 {

View file

@ -28,12 +28,12 @@ import (
"github.com/prometheus/prometheus/util/teststorage"
)
func newTestEngine() *promql.Engine {
return promqltest.NewTestEngine(false, 0, promqltest.DefaultMaxSamplesPerQuery)
func newTestEngine(t *testing.T) *promql.Engine {
return promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery)
}
func TestEvaluations(t *testing.T) {
promqltest.RunBuiltinTests(t, newTestEngine())
promqltest.RunBuiltinTests(t, newTestEngine(t))
}
// Run a lot of queries at the same time, to check for race conditions.
@ -48,7 +48,7 @@ func TestConcurrentRangeQueries(t *testing.T) {
}
// Enable experimental functions testing
parser.EnableExperimentalFunctions = true
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
const interval = 10000 // 10s interval.
// A day of data plus 10k steps.

View file

@ -79,8 +79,9 @@ func LoadedStorage(t testutil.T, input string) *teststorage.TestStorage {
return test.storage
}
func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine {
return promql.NewEngine(promql.EngineOpts{
// NewTestEngine creates a promql.Engine with enablePerStepStats, lookbackDelta and maxSamples, and returns it.
func NewTestEngine(tb testing.TB, enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine {
return NewTestEngineWithOpts(tb, promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: maxSamples,
@ -94,6 +95,16 @@ func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamp
})
}
// NewTestEngineWithOpts creates a promql.Engine with opts and returns it.
func NewTestEngineWithOpts(tb testing.TB, opts promql.EngineOpts) *promql.Engine {
tb.Helper()
ng := promql.NewEngine(opts)
tb.Cleanup(func() {
require.NoError(tb, ng.Close())
})
return ng
}
// RunBuiltinTests runs an acceptance test suite against the provided engine.
func RunBuiltinTests(t TBRun, engine promql.QueryEngine) {
t.Cleanup(func() { parser.EnableExperimentalFunctions = false })
@ -651,8 +662,9 @@ type evalCmd struct {
expectedFailMessage string
expectedFailRegexp *regexp.Regexp
metrics map[uint64]labels.Labels
expected map[uint64]entry
metrics map[uint64]labels.Labels
expectScalar bool
expected map[uint64]entry
}
type entry struct {
@ -696,12 +708,15 @@ func (ev *evalCmd) String() string {
// expect adds a sequence of values to the set of expected
// results for the query.
func (ev *evalCmd) expect(pos int, vals ...parser.SequenceValue) {
ev.expectScalar = true
ev.expected[0] = entry{pos: pos, vals: vals}
}
// expectMetric adds a new metric with a sequence of values to the set of expected
// results for the query.
func (ev *evalCmd) expectMetric(pos int, m labels.Labels, vals ...parser.SequenceValue) {
ev.expectScalar = false
h := m.Hash()
ev.metrics[h] = m
ev.expected[h] = entry{pos: pos, vals: vals}
@ -715,6 +730,10 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
return fmt.Errorf("expected ordered result, but query returned a matrix")
}
if ev.expectScalar {
return fmt.Errorf("expected scalar result, but got matrix %s", val.String())
}
if err := assertMatrixSorted(val); err != nil {
return err
}
@ -783,6 +802,10 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
}
case promql.Vector:
if ev.expectScalar {
return fmt.Errorf("expected scalar result, but got vector %s", val.String())
}
seen := map[uint64]bool{}
for pos, v := range val {
fp := v.Metric.Hash()
@ -821,15 +844,15 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
}
case promql.Scalar:
if len(ev.expected) != 1 {
return fmt.Errorf("expected vector result, but got scalar %s", val.String())
if !ev.expectScalar {
return fmt.Errorf("expected vector or matrix result, but got %s", val.String())
}
exp0 := ev.expected[0].vals[0]
if exp0.Histogram != nil {
return fmt.Errorf("expected Histogram %v but got scalar %s", exp0.Histogram.TestExpression(), val.String())
return fmt.Errorf("expected histogram %v but got %s", exp0.Histogram.TestExpression(), val.String())
}
if !almost.Equal(exp0.Value, val.V, defaultEpsilon) {
return fmt.Errorf("expected Scalar %v but got %v", val.V, exp0.Value)
return fmt.Errorf("expected scalar %v but got %v", exp0.Value, val.V)
}
default:
@ -1424,7 +1447,11 @@ func (ll *LazyLoader) Storage() storage.Storage {
// Close closes resources associated with the LazyLoader.
func (ll *LazyLoader) Close() error {
ll.cancelCtx()
return ll.storage.Close()
err := ll.queryEngine.Close()
if sErr := ll.storage.Close(); sErr != nil {
return errors.Join(sErr, err)
}
return err
}
func makeInt64Pointer(val int64) *int64 {

View file

@ -554,11 +554,48 @@ eval range from 0 to 5m step 5m testmetric
`,
expectedError: `error in eval testmetric (line 5): expected float value at index 0 for {__name__="testmetric"} to have timestamp 300000, but it had timestamp 0 (result has 1 float point [3 @[0]] and 1 histogram point [{count:0, sum:0} @[300000]])`,
},
"instant query with expected scalar result": {
input: `
eval instant at 1m 3
3
`,
},
"instant query with unexpected scalar result": {
input: `
eval instant at 1m 3
2
`,
expectedError: `error in eval 3 (line 2): expected scalar 2 but got 3`,
},
"instant query that returns a scalar but expects a vector": {
input: `
eval instant at 1m 3
{} 3
`,
expectedError: `error in eval 3 (line 2): expected vector or matrix result, but got scalar: 3 @[60000]`,
},
"instant query that returns a vector but expects a scalar": {
input: `
eval instant at 1m vector(3)
3
`,
expectedError: `error in eval vector(3) (line 2): expected scalar result, but got vector {} => 3 @[60000]`,
},
"range query that returns a matrix but expects a scalar": {
input: `
eval range from 0 to 1m step 30s vector(3)
3
`,
expectedError: `error in eval vector(3) (line 2): expected scalar result, but got matrix {} =>
3 @[0]
3 @[30000]
3 @[60000]`,
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
err := runTest(t, testCase.input, NewTestEngine(false, 0, DefaultMaxSamplesPerQuery))
err := runTest(t, testCase.input, NewTestEngine(t, false, 0, DefaultMaxSamplesPerQuery))
if testCase.expectedError == "" {
require.NoError(t, err)

View file

@ -36,16 +36,19 @@ import (
"github.com/prometheus/prometheus/util/testutil"
)
var testEngine = promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: 100 * time.Second,
NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 },
EnableAtModifier: true,
EnableNegativeOffset: true,
EnablePerStepStats: true,
})
func testEngine(tb testing.TB) *promql.Engine {
tb.Helper()
return promqltest.NewTestEngineWithOpts(tb, promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: 100 * time.Second,
NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 },
EnableAtModifier: true,
EnableNegativeOffset: true,
EnablePerStepStats: true,
})
}
func TestAlertingRuleState(t *testing.T) {
tests := []struct {
@ -225,12 +228,14 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
},
}
ng := testEngine(t)
baseTime := time.Unix(0, 0)
for i, result := range results {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -247,7 +252,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
testutil.RequireEqual(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}
@ -309,13 +314,15 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
},
}
ng := testEngine(t)
evalTime := time.Unix(0, 0)
result[0].T = timestamp.FromTime(evalTime)
result[1].T = timestamp.FromTime(evalTime)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalLabels.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
@ -329,7 +336,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
}
res, err = ruleWithExternalLabels.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
@ -406,9 +413,11 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
result[0].T = timestamp.FromTime(evalTime)
result[1].T = timestamp.FromTime(evalTime)
ng := testEngine(t)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalURL.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
@ -422,7 +431,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
}
res, err = ruleWithExternalURL.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
@ -475,9 +484,11 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {
evalTime := time.Unix(0, 0)
result[0].T = timestamp.FromTime(evalTime)
ng := testEngine(t)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := rule.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
@ -520,6 +531,8 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
)
evalTime := time.Unix(0, 0)
ng := testEngine(t)
startQueryCh := make(chan struct{})
getDoneCh := make(chan struct{})
slowQueryFunc := func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
@ -533,7 +546,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
require.Fail(t, "unexpected blocking when template expanding.")
}
}
return EngineQueryFunc(testEngine, storage)(ctx, q, ts)
return EngineQueryFunc(ng, storage)(ctx, q, ts)
}
go func() {
<-startQueryCh
@ -578,7 +591,7 @@ func TestAlertingRuleDuplicate(t *testing.T) {
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
@ -642,9 +655,9 @@ func TestAlertingRuleLimit(t *testing.T) {
)
evalTime := time.Unix(0, 0)
ng := testEngine(t)
for _, test := range tests {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, test.limit); {
case err != nil:
require.EqualError(t, err, test.err)
case test.err != "":
@ -866,12 +879,13 @@ func TestKeepFiringFor(t *testing.T) {
},
}
ng := testEngine(t)
baseTime := time.Unix(0, 0)
for i, result := range results {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -888,7 +902,7 @@ func TestKeepFiringFor(t *testing.T) {
testutil.RequireEqual(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}
@ -923,9 +937,10 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
F: 1,
}
ng := testEngine(t)
baseTime := time.Unix(0, 0)
result.T = timestamp.FromTime(baseTime)
res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
require.Len(t, res, 2)
@ -940,7 +955,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
}
evalTime := baseTime.Add(time.Minute)
res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}

View file

@ -158,12 +158,13 @@ func TestAlertingRule(t *testing.T) {
},
}
ng := testEngine(t)
for i, test := range tests {
t.Logf("case %d", i)
evalTime := baseTime.Add(test.time)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -299,6 +300,7 @@ func TestForStateAddSamples(t *testing.T) {
},
}
ng := testEngine(t)
var forState float64
for i, test := range tests {
t.Logf("case %d", i)
@ -311,7 +313,7 @@ func TestForStateAddSamples(t *testing.T) {
forState = float64(value.StaleNaN)
}
res, err := rule.Eval(context.TODO(), queryOffset, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), queryOffset, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS' samples.
@ -366,8 +368,9 @@ func TestForStateRestore(t *testing.T) {
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
require.NoError(t, err)
ng := testEngine(t)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(testEngine, storage),
QueryFunc: EngineQueryFunc(ng, storage),
Appendable: storage,
Queryable: storage,
Context: context.Background(),
@ -538,7 +541,7 @@ func TestStaleness(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(engineOpts)
engine := promqltest.NewTestEngineWithOpts(t, engineOpts)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(engine, st),
Appendable: st,
@ -772,7 +775,7 @@ func TestUpdate(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: st,
Queryable: st,
@ -910,7 +913,7 @@ func TestNotify(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(engineOpts)
engine := promqltest.NewTestEngineWithOpts(t, engineOpts)
var lastNotified []*Alert
notifyFunc := func(ctx context.Context, expr string, alerts ...*Alert) {
lastNotified = alerts
@ -985,7 +988,7 @@ func TestMetricsUpdate(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
Queryable: storage,
@ -1059,7 +1062,7 @@ func TestGroupStalenessOnRemoval(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
Queryable: storage,
@ -1136,7 +1139,7 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
Queryable: storage,
@ -1238,7 +1241,7 @@ func TestRuleHealthUpdates(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(engineOpts)
engine := promqltest.NewTestEngineWithOpts(t, engineOpts)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(engine, st),
Appendable: st,
@ -1335,9 +1338,10 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) {
},
}
ng := testEngine(t)
testFunc := func(tst testInput) {
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(testEngine, storage),
QueryFunc: EngineQueryFunc(ng, storage),
Appendable: storage,
Queryable: storage,
Context: context.Background(),
@ -1421,8 +1425,9 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
}
require.NoError(t, app.Commit())
ng := testEngine(t)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(testEngine, storage),
QueryFunc: EngineQueryFunc(ng, storage),
Appendable: storage,
Queryable: storage,
Context: context.Background(),

View file

@ -123,10 +123,11 @@ func TestRuleEval(t *testing.T) {
storage := setUpRuleEvalTest(t)
t.Cleanup(func() { storage.Close() })
ng := testEngine(t)
for _, scenario := range ruleEvalTestScenarios {
t.Run(scenario.name, func(t *testing.T) {
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0)
result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
testutil.RequireEqual(t, scenario.expected, result)
})
@ -137,6 +138,7 @@ func BenchmarkRuleEval(b *testing.B) {
storage := setUpRuleEvalTest(b)
b.Cleanup(func() { storage.Close() })
ng := testEngine(b)
for _, scenario := range ruleEvalTestScenarios {
b.Run(scenario.name, func(b *testing.B) {
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
@ -144,7 +146,7 @@ func BenchmarkRuleEval(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0)
_, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0)
if err != nil {
require.NoError(b, err)
}
@ -165,7 +167,7 @@ func TestRuleEvalDuplicate(t *testing.T) {
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
@ -212,10 +214,11 @@ func TestRecordingRuleLimit(t *testing.T) {
labels.FromStrings("test", "test"),
)
ng := testEngine(t)
evalTime := time.Unix(0, 0)
for _, test := range tests {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, test.limit); {
case err != nil:
require.EqualError(t, err, test.err)
case test.err != "":

View file

@ -1178,7 +1178,7 @@ scrape_configs:
)
}
// TestOnlyStaleTargetsAreDropped makes sure that when a job has multiple providers, when aone of them should no,
// TestOnlyStaleTargetsAreDropped makes sure that when a job has multiple providers, when one of them should no
// longer discover targets, only the stale targets of that provier are dropped.
func TestOnlyStaleTargetsAreDropped(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

View file

@ -1655,12 +1655,14 @@ loop:
if seriesAlreadyScraped && parsedTimestamp == nil {
err = storage.ErrDuplicateSampleForTimestamp
} else {
if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.
level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err)
if sl.enableCTZeroIngestion {
if ctMs := p.CreatedTimestamp(); ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.
level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err)
}
}
}

View file

@ -37,7 +37,7 @@ if [ -z "${GITHUB_TOKEN}" ]; then
fi
# List of files that should be synced.
SYNC_FILES="CODE_OF_CONDUCT.md LICENSE Makefile.common SECURITY.md .yamllint scripts/golangci-lint.yml .github/workflows/scorecards.yml .github/workflows/container_description.yml"
SYNC_FILES="CODE_OF_CONDUCT.md LICENSE Makefile.common SECURITY.md .yamllint scripts/golangci-lint.yml .github/workflows/scorecards.yml .github/workflows/container_description.yml .github/workflows/stale.yml"
# Go to the root of the repo
cd "$(git rev-parse --show-cdup)" || exit 1

37
storage/interface_test.go Normal file
View file

@ -0,0 +1,37 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
func TestMockSeries(t *testing.T) {
s := storage.MockSeries([]int64{1, 2, 3}, []float64{1, 2, 3}, []string{"__name__", "foo"})
it := s.Iterator(nil)
ts := []int64{}
vs := []float64{}
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
ts = append(ts, t)
vs = append(vs, v)
}
require.Equal(t, []int64{1, 2, 3}, ts)
require.Equal(t, []float64{1, 2, 3}, vs)
}

View file

@ -1118,7 +1118,7 @@ func (a *appender) logSeries() error {
return nil
}
// mintTs returns the minimum timestamp that a sample can have
// minValidTime returns the minimum timestamp that a sample can have
// and is needed for preventing underflow.
func (a *appender) minValidTime(lastTs int64) int64 {
if lastTs < math.MinInt64+a.opts.OutOfOrderTimeWindow {

View file

@ -213,7 +213,7 @@ func MockSeriesIterator(timestamps []int64, values []float64) Iterator {
return &mockSeriesIterator{
timeStamps: timestamps,
values: values,
currIndex: 0,
currIndex: -1,
}
}

View file

@ -974,6 +974,7 @@ func (it *floatHistogramIterator) Reset(b []byte) {
if it.atFloatHistogramCalled {
it.atFloatHistogramCalled = false
it.pBuckets, it.nBuckets = nil, nil
it.pSpans, it.nSpans = nil, nil
} else {
it.pBuckets, it.nBuckets = it.pBuckets[:0], it.nBuckets[:0]
}
@ -1069,7 +1070,7 @@ func (it *floatHistogramIterator) Next() ValueType {
// The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code,
// so we don't need a separate single delta logic for the 2nd sample.
// Recycle bucket slices that have not been returned yet. Otherwise, copy them.
// Recycle bucket and span slices that have not been returned yet. Otherwise, copy them.
// We can always recycle the slices for leading and trailing bits as they are
// never returned to the caller.
if it.atFloatHistogramCalled {
@ -1088,6 +1089,20 @@ func (it *floatHistogramIterator) Next() ValueType {
} else {
it.nBuckets = nil
}
if len(it.pSpans) > 0 {
newSpans := make([]histogram.Span, len(it.pSpans))
copy(newSpans, it.pSpans)
it.pSpans = newSpans
} else {
it.pSpans = nil
}
if len(it.nSpans) > 0 {
newSpans := make([]histogram.Span, len(it.nSpans))
copy(newSpans, it.nSpans)
it.nSpans = newSpans
} else {
it.nSpans = nil
}
}
tDod, err := readVarbitInt(&it.br)

View file

@ -1306,3 +1306,54 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) {
require.EqualError(t, err, "float histogram counter reset")
})
}
func TestFloatHistogramUniqueSpansAfterNext(t *testing.T) {
// Create two histograms with the same schema and spans.
h1 := &histogram.FloatHistogram{
Schema: 1,
ZeroThreshold: 1e-100,
Count: 10,
ZeroCount: 2,
Sum: 15.0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []float64{1, 2, 3, 4},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 1},
},
NegativeBuckets: []float64{2},
}
h2 := h1.Copy()
// Create a chunk and append both histograms.
c := NewFloatHistogramChunk()
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
it := c.Iterator(nil)
require.Equal(t, ValFloatHistogram, it.Next())
_, rh1 := it.AtFloatHistogram(nil)
// Advance to the second histogram and retrieve it.
require.Equal(t, ValFloatHistogram, it.Next())
_, rh2 := it.AtFloatHistogram(nil)
require.Equal(t, rh1.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh1.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
require.Equal(t, rh2.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh2.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
// Check that the spans for h1 and h2 are unique slices.
require.NotSame(t, &rh1.PositiveSpans[0], &rh2.PositiveSpans[0], "PositiveSpans should be unique between histograms")
require.NotSame(t, &rh1.NegativeSpans[0], &rh2.NegativeSpans[0], "NegativeSpans should be unique between histograms")
}

View file

@ -1073,6 +1073,7 @@ func (it *histogramIterator) Reset(b []byte) {
if it.atHistogramCalled {
it.atHistogramCalled = false
it.pBuckets, it.nBuckets = nil, nil
it.pSpans, it.nSpans = nil, nil
} else {
it.pBuckets = it.pBuckets[:0]
it.nBuckets = it.nBuckets[:0]
@ -1185,8 +1186,25 @@ func (it *histogramIterator) Next() ValueType {
// The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code,
// so we don't need a separate single delta logic for the 2nd sample.
// Recycle bucket slices that have not been returned yet. Otherwise,
// Recycle bucket and span slices that have not been returned yet. Otherwise, copy them.
// copy them.
if it.atFloatHistogramCalled || it.atHistogramCalled {
if len(it.pSpans) > 0 {
newSpans := make([]histogram.Span, len(it.pSpans))
copy(newSpans, it.pSpans)
it.pSpans = newSpans
} else {
it.pSpans = nil
}
if len(it.nSpans) > 0 {
newSpans := make([]histogram.Span, len(it.nSpans))
copy(newSpans, it.nSpans)
it.nSpans = newSpans
} else {
it.nSpans = nil
}
}
if it.atHistogramCalled {
it.atHistogramCalled = false
if len(it.pBuckets) > 0 {
@ -1204,6 +1222,7 @@ func (it *histogramIterator) Next() ValueType {
it.nBuckets = nil
}
}
// FloatBuckets are set from scratch, so simply create empty ones.
if it.atFloatHistogramCalled {
it.atFloatHistogramCalled = false

View file

@ -1487,6 +1487,108 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
})
}
func TestHistogramUniqueSpansAfterNext(t *testing.T) {
// Create two histograms with the same schema and spans.
h1 := &histogram.Histogram{
Schema: 1,
ZeroThreshold: 1e-100,
Count: 10,
ZeroCount: 2,
Sum: 15.0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{1, 2, 3, 4},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{2},
}
h2 := h1.Copy()
// Create a chunk and append both histograms.
c := NewHistogramChunk()
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
it := c.Iterator(nil)
require.Equal(t, ValHistogram, it.Next())
_, rh1 := it.AtHistogram(nil)
// Advance to the second histogram and retrieve it.
require.Equal(t, ValHistogram, it.Next())
_, rh2 := it.AtHistogram(nil)
require.Equal(t, rh1.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh1.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
require.Equal(t, rh2.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh2.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
// Check that the spans for h1 and h2 are unique slices.
require.NotSame(t, &rh1.PositiveSpans[0], &rh2.PositiveSpans[0], "PositiveSpans should be unique between histograms")
require.NotSame(t, &rh1.NegativeSpans[0], &rh2.NegativeSpans[0], "NegativeSpans should be unique between histograms")
}
func TestHistogramUniqueSpansAfterNextWithAtFloatHistogram(t *testing.T) {
// Create two histograms with the same schema and spans.
h1 := &histogram.Histogram{
Schema: 1,
ZeroThreshold: 1e-100,
Count: 10,
ZeroCount: 2,
Sum: 15.0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{1, 2, 3, 4},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{2},
}
h2 := h1.Copy()
// Create a chunk and append both histograms.
c := NewHistogramChunk()
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
it := c.Iterator(nil)
require.Equal(t, ValHistogram, it.Next())
_, rh1 := it.AtFloatHistogram(nil)
// Advance to the second histogram and retrieve it.
require.Equal(t, ValHistogram, it.Next())
_, rh2 := it.AtFloatHistogram(nil)
require.Equal(t, rh1.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh1.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
require.Equal(t, rh2.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh2.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
// Check that the spans for h1 and h2 are unique slices.
require.NotSame(t, &rh1.PositiveSpans[0], &rh2.PositiveSpans[0], "PositiveSpans should be unique between histograms")
require.NotSame(t, &rh1.NegativeSpans[0], &rh2.NegativeSpans[0], "NegativeSpans should be unique between histograms")
}
func BenchmarkAppendable(b *testing.B) {
// Create a histogram with a bunch of spans and buckets.
const (

View file

@ -408,7 +408,7 @@ func TestChunkDiskMapper_Truncate_WriteQueueRaceCondition(t *testing.T) {
wg.Wait()
}
// TestHeadReadWriter_TruncateAfterIterateChunksError tests for
// TestHeadReadWriter_TruncateAfterFailedIterateChunks tests for
// https://github.com/prometheus/prometheus/issues/7753
func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
hrw := createChunkDiskMapper(t, "")

View file

@ -33,7 +33,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
@ -438,6 +437,32 @@ func BenchmarkLoadWLs(b *testing.B) {
}
}
// BenchmarkLoadRealWLs will be skipped unless the BENCHMARK_LOAD_REAL_WLS_DIR environment variable is set.
// BENCHMARK_LOAD_REAL_WLS_DIR should be the folder where `wal` and `chunks_head` are located.
func BenchmarkLoadRealWLs(b *testing.B) {
dir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
if dir == "" {
b.SkipNow()
}
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
require.NoError(b, err)
b.Cleanup(func() { wal.Close() })
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
require.NoError(b, err)
b.Cleanup(func() { wbl.Close() })
// Load the WAL.
for i := 0; i < b.N; i++ {
opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
require.NoError(b, err)
require.NoError(b, h.Init(0))
}
}
// TestHead_HighConcurrencyReadAndWrite generates 1000 series with a step of 15s and fills a whole block with samples,
// this means in total it generates 4000 chunks because with a step of 15s there are 4 chunks per block per series.
// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the
@ -5919,16 +5944,16 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
for _, tc := range []struct {
name string
appendableSamples []appendableSamples
expectedSamples []model.Sample
expectedSamples []chunks.Sample
}{
{
name: "In order ct+normal sample",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
},
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
expectedSamples: []chunks.Sample{
sample{t: 1, f: 0},
sample{t: 100, f: 10},
},
},
{
@ -5937,10 +5962,10 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
{ts: 100, val: 10, ct: 1},
{ts: 101, val: 10, ct: 1},
},
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
{Timestamp: 101, Value: 10},
expectedSamples: []chunks.Sample{
sample{t: 1, f: 0},
sample{t: 100, f: 10},
sample{t: 101, f: 10},
},
},
{
@ -5949,11 +5974,11 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
{ts: 100, val: 10, ct: 1},
{ts: 102, val: 10, ct: 101},
},
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
{Timestamp: 101, Value: 0},
{Timestamp: 102, Value: 10},
expectedSamples: []chunks.Sample{
sample{t: 1, f: 0},
sample{t: 100, f: 10},
sample{t: 101, f: 0},
sample{t: 102, f: 10},
},
},
{
@ -5962,41 +5987,33 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
{ts: 100, val: 10, ct: 1},
{ts: 101, val: 10, ct: 100},
},
expectedSamples: []model.Sample{
{Timestamp: 1, Value: 0},
{Timestamp: 100, Value: 10},
{Timestamp: 101, Value: 10},
expectedSamples: []chunks.Sample{
sample{t: 1, f: 0},
sample{t: 100, f: 10},
sample{t: 101, f: 10},
},
},
} {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
defer func() {
require.NoError(t, h.Close())
}()
a := h.Appender(context.Background())
lbls := labels.FromStrings("foo", "bar")
for _, sample := range tc.appendableSamples {
_, err := a.AppendCTZeroSample(0, lbls, sample.ts, sample.ct)
require.NoError(t, err)
_, err = a.Append(0, lbls, sample.ts, sample.val)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
t.Run(tc.name, func(t *testing.T) {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
defer func() {
require.NoError(t, h.Close())
}()
a := h.Appender(context.Background())
lbls := labels.FromStrings("foo", "bar")
for _, sample := range tc.appendableSamples {
_, err := a.AppendCTZeroSample(0, lbls, sample.ts, sample.ct)
require.NoError(t, err)
_, err = a.Append(0, lbls, sample.ts, sample.val)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.True(t, ss.Next())
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator(nil)
for _, sample := range tc.expectedSamples {
require.Equal(t, chunkenc.ValFloat, it.Next())
timestamp, value := it.At()
require.Equal(t, sample.Timestamp, model.Time(timestamp))
require.Equal(t, sample.Value, model.SampleValue(value))
}
require.Equal(t, chunkenc.ValNone, it.Next())
q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
result := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Equal(t, tc.expectedSamples, result[`{foo="bar"}`])
})
}
}

View file

@ -22,10 +22,10 @@ import (
var minNormal = math.Float64frombits(0x0010000000000000) // The smallest positive normal value of type float64.
// Equal returns true if a and b differ by less than their sum
// multiplied by epsilon.
// multiplied by epsilon, or if both are StaleNaN, or if both are any other NaN.
func Equal(a, b, epsilon float64) bool {
// StaleNaN is a special value that is used as staleness maker, so
// the two values are equal when both are exactly equals to stale NaN.
// StaleNaN is a special value that is used as staleness maker, and
// we don't want it to compare equal to any other NaN.
if value.IsStaleNaN(a) || value.IsStaleNaN(b) {
return value.IsStaleNaN(a) && value.IsStaleNaN(b)
}

View file

@ -0,0 +1,50 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package almost
import (
"fmt"
"math"
"testing"
"github.com/prometheus/prometheus/model/value"
)
func TestEqual(t *testing.T) {
staleNaN := math.Float64frombits(value.StaleNaN)
tests := []struct {
a float64
b float64
epsilon float64
want bool
}{
{0.0, 0.0, 0.0, true},
{0.0, 0.1, 0.0, false},
{1.0, 1.1, 0.1, true},
{-1.0, -1.1, 0.1, true},
{math.MaxFloat64, math.MaxFloat64 / 10, 0.1, false},
{1.0, math.NaN(), 0.1, false},
{math.NaN(), math.NaN(), 0.1, true},
{math.NaN(), staleNaN, 0.1, false},
{staleNaN, math.NaN(), 0.1, false},
{staleNaN, staleNaN, 0.1, true},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%v,%v,%v", tt.a, tt.b, tt.epsilon), func(t *testing.T) {
if got := Equal(tt.a, tt.b, tt.epsilon); got != tt.want {
t.Errorf("Equal(%v,%v,%v) = %v, want %v", tt.a, tt.b, tt.epsilon, got, tt.want)
}
})
}
}

View file

@ -59,16 +59,19 @@ import (
"github.com/prometheus/prometheus/util/teststorage"
)
var testEngine = promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: 100 * time.Second,
NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 },
EnableAtModifier: true,
EnableNegativeOffset: true,
EnablePerStepStats: true,
})
func testEngine(t *testing.T) *promql.Engine {
t.Helper()
return promqltest.NewTestEngineWithOpts(t, promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: 100 * time.Second,
NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 },
EnableAtModifier: true,
EnableNegativeOffset: true,
EnablePerStepStats: true,
})
}
// testMetaStore satisfies the scrape.MetricMetadataStore interface.
// It is used to inject specific metadata as part of a test case.
@ -306,8 +309,7 @@ func (m *rulesRetrieverMock) CreateRuleGroups() {
MaxSamples: 10,
Timeout: 100 * time.Second,
}
engine := promql.NewEngine(engineOpts)
engine := promqltest.NewTestEngineWithOpts(m.testing, engineOpts)
opts := &rules.ManagerOptions{
QueryFunc: rules.EngineQueryFunc(engine, storage),
Appendable: storage,
@ -431,9 +433,10 @@ func TestEndpoints(t *testing.T) {
now := time.Now()
ng := testEngine(t)
t.Run("local", func(t *testing.T) {
algr := rulesRetrieverMock{}
algr.testing = t
algr := rulesRetrieverMock{testing: t}
algr.CreateAlertingRules()
algr.CreateRuleGroups()
@ -445,7 +448,7 @@ func TestEndpoints(t *testing.T) {
api := &API{
Queryable: storage,
QueryEngine: testEngine,
QueryEngine: ng,
ExemplarQueryable: storage.ExemplarQueryable(),
targetRetriever: testTargetRetriever.toFactory(),
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
@ -496,8 +499,7 @@ func TestEndpoints(t *testing.T) {
})
require.NoError(t, err)
algr := rulesRetrieverMock{}
algr.testing = t
algr := rulesRetrieverMock{testing: t}
algr.CreateAlertingRules()
algr.CreateRuleGroups()
@ -509,7 +511,7 @@ func TestEndpoints(t *testing.T) {
api := &API{
Queryable: remote,
QueryEngine: testEngine,
QueryEngine: ng,
ExemplarQueryable: storage.ExemplarQueryable(),
targetRetriever: testTargetRetriever.toFactory(),
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
@ -651,7 +653,7 @@ func TestQueryExemplars(t *testing.T) {
api := &API{
Queryable: storage,
QueryEngine: testEngine,
QueryEngine: testEngine(t),
ExemplarQueryable: storage.ExemplarQueryable(),
}
@ -870,7 +872,7 @@ func TestStats(t *testing.T) {
api := &API{
Queryable: storage,
QueryEngine: testEngine,
QueryEngine: testEngine(t),
now: func() time.Time {
return time.Unix(123, 0)
},

View file

@ -32,6 +32,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
@ -86,7 +87,7 @@ func TestApiStatusCodes(t *testing.T) {
"error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}},
} {
t.Run(fmt.Sprintf("%s/%s", name, k), func(t *testing.T) {
r := createPrometheusAPI(q)
r := createPrometheusAPI(t, q)
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/api/v1/query?query=up", nil)
@ -100,8 +101,10 @@ func TestApiStatusCodes(t *testing.T) {
}
}
func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router {
engine := promql.NewEngine(promql.EngineOpts{
func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route.Router {
t.Helper()
engine := promqltest.NewTestEngineWithOpts(t, promql.EngineOpts{
Logger: log.NewNopLogger(),
Reg: nil,
ActiveQueryTracker: nil,