Merge branch 'main' into merge-2.51.0

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-03-19 11:56:14 +01:00 committed by GitHub
commit 0630e49c0d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
40 changed files with 539 additions and 2107 deletions

View file

@ -45,7 +45,8 @@ jobs:
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- run: make build
- run: make test GO_ONLY=1
# Don't run NPM build; don't run race-detector.
- run: make test GO_ONLY=1 test-flags=""
test_ui:
name: UI tests

View file

@ -0,0 +1,55 @@
---
name: Push README to Docker Hub
on:
push:
paths:
- "README.md"
- ".github/workflows/container_description.yml"
branches: [ main, master ]
permissions:
contents: read
jobs:
PushDockerHubReadme:
runs-on: ubuntu-latest
name: Push README to Docker Hub
if: github.repository_owner == 'prometheus' || github.repository_owner == 'prometheus-community' # Don't run this workflow on forks.
steps:
- name: git checkout
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: Set docker hub repo name
run: echo "DOCKER_REPO_NAME=$(make common-docker-repo-name)" >> $GITHUB_ENV
- name: Push README to Dockerhub
uses: christian-korneck/update-container-description-action@d36005551adeaba9698d8d67a296bd16fa91f8e8 # v1
env:
DOCKER_USER: ${{ secrets.DOCKER_HUB_LOGIN }}
DOCKER_PASS: ${{ secrets.DOCKER_HUB_PASSWORD }}
with:
destination_container_repo: ${{ env.DOCKER_REPO_NAME }}
provider: dockerhub
short_description: ${{ env.DOCKER_REPO_NAME }}
readme_file: 'README.md'
PushQuayIoReadme:
runs-on: ubuntu-latest
name: Push README to quay.io
if: github.repository_owner == 'prometheus' || github.repository_owner == 'prometheus-community' # Don't run this workflow on forks.
steps:
- name: git checkout
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: Set quay.io org name
run: echo "DOCKER_REPO=$(echo quay.io/${GITHUB_REPOSITORY_OWNER} | tr -d '-')" >> $GITHUB_ENV
- name: Set quay.io repo name
run: echo "DOCKER_REPO_NAME=$(make common-docker-repo-name)" >> $GITHUB_ENV
- name: Push README to quay.io
uses: christian-korneck/update-container-description-action@d36005551adeaba9698d8d67a296bd16fa91f8e8 # v1
env:
DOCKER_APIKEY: ${{ secrets.QUAY_IO_API_TOKEN }}
with:
destination_container_repo: ${{ env.DOCKER_REPO_NAME }}
provider: quay
readme_file: 'README.md'

View file

@ -1,5 +1,9 @@
# Changelog
## unreleased
* [CHANGE] TSDB: Fix the predicate checking for blocks which are beyond the retention period to include the ones right at the retention boundary. #9633
## 2.51.0 / 2024-03-18
This version is built with Go 1.22.1.

View file

@ -1,6 +1,10 @@
# Maintainers
Julien Pivotto (<roidelapluie@prometheus.io> / @roidelapluie) and Levi Harrison (<levi@leviharrison.dev> / @LeviHarrison) are the main/default maintainers, some parts of the codebase have other maintainers:
General maintainers:
* Bryan Boreham (bjboreham@gmail.com / @bboreham)
* Levi Harrison (levi@leviharrison.dev / @LeviHarrison)
* Ayoub Mrini (ayoubmrini424@gmail.com / @machine424)
* Julien Pivotto (roidelapluie@prometheus.io / @roidelapluie)
* `cmd`
* `promtool`: David Leadbeater (<dgl@dgl.cx> / @dgl)
@ -19,7 +23,6 @@ George Krajcsovits (<gyorgy.krajcsovits@grafana.com> / @krajorama)
* `module`: Augustin Husson (<husson.augustin@gmail.com> @nexucis)
* `Makefile` and related build configuration: Simon Pasquier (<pasquier.simon@gmail.com> / @simonpasquier), Ben Kochie (<superq@gmail.com> / @SuperQ)
For the sake of brevity, not all subtrees are explicitly listed. Due to the
size of this repository, the natural changes in focus of maintainers over time,
and nuances of where particular features live, this list will always be

View file

@ -208,6 +208,10 @@ common-tarball: promu
@echo ">> building release tarball"
$(PROMU) tarball --prefix $(PREFIX) $(BIN_DIR)
.PHONY: common-docker-repo-name
common-docker-repo-name:
@echo "$(DOCKER_REPO)/$(DOCKER_IMAGE_NAME)"
.PHONY: common-docker $(BUILD_DOCKER_ARCHS)
common-docker: $(BUILD_DOCKER_ARCHS)
$(BUILD_DOCKER_ARCHS): common-docker-%:

View file

@ -960,8 +960,8 @@ func main() {
func() error {
// Don't forget to release the reloadReady channel so that waiting blocks can exit normally.
select {
case <-term:
level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
case sig := <-term:
level.Warn(logger).Log("msg", "Received an OS signal, exiting gracefully...", "signal", sig.String())
reloadReady.Close()
case <-webHandler.Quit():
level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...")

View file

@ -1840,7 +1840,7 @@ var expectedErrors = []struct {
},
{
filename: "azure_authentication_method.bad.yml",
errMsg: "unknown authentication_type \"invalid\". Supported types are \"OAuth\" or \"ManagedIdentity\"",
errMsg: "unknown authentication_type \"invalid\". Supported types are \"OAuth\", \"ManagedIdentity\" or \"SDK\"",
},
{
filename: "azure_bearertoken_basicauth.bad.yml",

View file

@ -65,6 +65,7 @@ const (
azureLabelMachineSize = azureLabel + "machine_size"
authMethodOAuth = "OAuth"
authMethodSDK = "SDK"
authMethodManagedIdentity = "ManagedIdentity"
)
@ -164,8 +165,8 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
}
}
if c.AuthenticationMethod != authMethodOAuth && c.AuthenticationMethod != authMethodManagedIdentity {
return fmt.Errorf("unknown authentication_type %q. Supported types are %q or %q", c.AuthenticationMethod, authMethodOAuth, authMethodManagedIdentity)
if c.AuthenticationMethod != authMethodOAuth && c.AuthenticationMethod != authMethodManagedIdentity && c.AuthenticationMethod != authMethodSDK {
return fmt.Errorf("unknown authentication_type %q. Supported types are %q, %q or %q", c.AuthenticationMethod, authMethodOAuth, authMethodManagedIdentity, authMethodSDK)
}
return c.HTTPClientConfig.Validate()
@ -294,6 +295,16 @@ func newCredential(cfg SDConfig, policyClientOptions policy.ClientOptions) (azco
return nil, err
}
credential = azcore.TokenCredential(secretCredential)
case authMethodSDK:
options := &azidentity.DefaultAzureCredentialOptions{ClientOptions: policyClientOptions}
if len(cfg.TenantID) != 0 {
options.TenantID = cfg.TenantID
}
sdkCredential, err := azidentity.NewDefaultAzureCredential(options)
if err != nil {
return nil, err
}
credential = azcore.TokenCredential(sdkCredential)
}
return credential, nil
}

View file

@ -600,8 +600,10 @@ See below for the configuration options for Azure discovery:
# The Azure environment.
[ environment: <string> | default = AzurePublicCloud ]
# The authentication method, either OAuth or ManagedIdentity.
# The authentication method, either OAuth, ManagedIdentity or SDK.
# See https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview
# SDK authentication method uses environment variables by default.
# See https://learn.microsoft.com/en-us/azure/developer/go/azure-sdk-authentication
[ authentication_method: <string> | default = OAuth]
# The subscription ID. Always required.
subscription_id: <string>
@ -3619,6 +3621,11 @@ azuread:
[ client_secret: <string> ]
[ tenant_id: <string> ] ]
# Azure SDK auth.
# See https://learn.microsoft.com/en-us/azure/developer/go/azure-sdk-authentication
[ sdk:
[ tenant_id: <string> ] ]
# Configures the remote write request's TLS settings.
tls_config:
[ <tls_config> ]

View file

@ -9,7 +9,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/influxdata/influxdb v1.11.5
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/common v0.49.0
github.com/prometheus/common v0.50.0
github.com/prometheus/prometheus v0.50.1
github.com/stretchr/testify v1.9.0
)
@ -58,17 +58,17 @@ require (
go.opentelemetry.io/otel/trace v1.22.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apimachinery v0.28.6 // indirect

View file

@ -269,8 +269,8 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.49.0 h1:ToNTdK4zSnPVJmh698mGFkDor9wBI/iGaJy5dbH1EgI=
github.com/prometheus/common v0.49.0/go.mod h1:Kxm+EULxRbUkjGU6WFsQqo3ORzB4tyKvlWFOE9mB2sE=
github.com/prometheus/common v0.50.0 h1:YSZE6aa9+luNa2da6/Tik0q0A5AbR+U003TItK57CPQ=
github.com/prometheus/common v0.50.0/go.mod h1:wHFBCEVWVmHMUpg7pYcOm2QUR/ocQdYSJVQJKnHc3xQ=
github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4=
github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
@ -332,8 +332,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA=
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@ -356,12 +356,12 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ=
golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA=
golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI=
golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -389,12 +389,12 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@ -436,8 +436,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View file

@ -39,7 +39,8 @@ type Label struct {
}
func (ls Labels) String() string {
var b bytes.Buffer
var bytea [1024]byte // On stack to avoid memory allocation while building the output.
b := bytes.NewBuffer(bytea[:0])
b.WriteByte('{')
i := 0
@ -50,7 +51,7 @@ func (ls Labels) String() string {
}
b.WriteString(l.Name)
b.WriteByte('=')
b.WriteString(strconv.Quote(l.Value))
b.Write(strconv.AppendQuote(b.AvailableBuffer(), l.Value))
i++
})
b.WriteByte('}')

View file

@ -43,6 +43,13 @@ func TestLabels_String(t *testing.T) {
}
}
func BenchmarkString(b *testing.B) {
ls := New(benchmarkLabels...)
for i := 0; i < b.N; i++ {
_ = ls.String()
}
}
func TestLabels_MatchLabels(t *testing.T) {
labels := FromStrings(
"__name__", "ALERTS",
@ -785,8 +792,7 @@ func BenchmarkLabels_Hash(b *testing.B) {
}
}
func BenchmarkBuilder(b *testing.B) {
m := []Label{
var benchmarkLabels = []Label{
{"job", "node"},
{"instance", "123.123.1.211:9090"},
{"path", "/api/v1/namespaces/<namespace>/deployments/<name>"},
@ -796,13 +802,14 @@ func BenchmarkBuilder(b *testing.B) {
{"prometheus", "prometheus-core-1"},
{"datacenter", "eu-west-1"},
{"pod_name", "abcdef-99999-defee"},
}
}
func BenchmarkBuilder(b *testing.B) {
var l Labels
builder := NewBuilder(EmptyLabels())
for i := 0; i < b.N; i++ {
builder.Reset(EmptyLabels())
for _, l := range m {
for _, l := range benchmarkLabels {
builder.Set(l.Name, l.Value)
}
l = builder.Labels()
@ -811,18 +818,7 @@ func BenchmarkBuilder(b *testing.B) {
}
func BenchmarkLabels_Copy(b *testing.B) {
m := map[string]string{
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
"prometheus": "prometheus-core-1",
"datacenter": "eu-west-1",
"pod_name": "abcdef-99999-defee",
}
l := FromMap(m)
l := New(benchmarkLabels...)
for i := 0; i < b.N; i++ {
l = l.Copy()

View file

@ -115,6 +115,12 @@ func (e ErrStorage) Error() string {
return e.Err.Error()
}
// QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked.
type QueryEngine interface {
NewInstantQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, ts time.Time) (Query, error)
NewRangeQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error)
}
// QueryLogger is an interface that can be used to log all the queries logged
// by the engine.
type QueryLogger interface {
@ -1196,6 +1202,9 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
if prepSeries != nil {
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
}
// Don't add histogram size here because we only
// copy the pointer above, not the whole
// histogram.
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
@ -1221,7 +1230,6 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
@ -1540,13 +1548,12 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
histSamples := totalHPointSize(ss.Histograms)
if len(ss.Floats)+histSamples > 0 {
if ev.currentSamples+len(ss.Floats)+histSamples <= ev.maxSamples {
if ev.currentSamples+len(ss.Floats)+histSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
mat = append(mat, ss)
prevSS = &mat[len(mat)-1]
ev.currentSamples += len(ss.Floats) + histSamples
} else {
ev.error(ErrTooManySamples(env))
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
@ -1709,27 +1716,29 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
step++
_, f, h, ok := ev.vectorSelectorSingle(it, e, ts)
if ok {
if ev.currentSamples < ev.maxSamples {
if h == nil {
ev.currentSamples++
ev.samplesStats.IncrementSamplesAtStep(step, 1)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
if ss.Floats == nil {
ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps)
}
ss.Floats = append(ss.Floats, FPoint{F: f, T: ts})
ev.currentSamples++
ev.samplesStats.IncrementSamplesAtStep(step, 1)
} else {
if ss.Histograms == nil {
ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps)
}
point := HPoint{H: h, T: ts}
ss.Histograms = append(ss.Histograms, point)
histSize := point.size()
ev.currentSamples += histSize
ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize))
}
} else {
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
if ss.Histograms == nil {
ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps)
}
ss.Histograms = append(ss.Histograms, point)
}
}
}
@ -1856,7 +1865,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
panic(fmt.Errorf("unhandled expression of type: %T", expr))
}
// reuseOrGetFPointSlices reuses the space from previous slice to create new slice if the former has lots of room.
// reuseOrGetHPointSlices reuses the space from previous slice to create new slice if the former has lots of room.
// The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one.
func reuseOrGetHPointSlices(prevSS *Series, numSteps int) (r []HPoint) {
if prevSS != nil && cap(prevSS.Histograms)-2*len(prevSS.Histograms) > 0 {
@ -2168,10 +2177,10 @@ loop:
histograms = histograms[:n]
continue loop
}
if ev.currentSamples >= ev.maxSamples {
ev.currentSamples += histograms[n].size()
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.currentSamples += histograms[n].size()
}
case chunkenc.ValFloat:
t, f := buf.At()
@ -2180,10 +2189,10 @@ loop:
}
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mintFloats {
if ev.currentSamples >= ev.maxSamples {
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.currentSamples++
if floats == nil {
floats = getFPointSlice(16)
}
@ -2211,22 +2220,22 @@ loop:
histograms = histograms[:n]
break
}
if ev.currentSamples >= ev.maxSamples {
ev.currentSamples += histograms[n].size()
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.currentSamples += histograms[n].size()
case chunkenc.ValFloat:
t, f := it.At()
if t == maxt && !value.IsStaleNaN(f) {
if ev.currentSamples >= ev.maxSamples {
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
if floats == nil {
floats = getFPointSlice(16)
}
floats = append(floats, FPoint{T: t, F: f})
ev.currentSamples++
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)

View file

@ -755,6 +755,7 @@ load 10s
metricWith3SampleEvery10Seconds{a="1",b="1"} 1+1x100
metricWith3SampleEvery10Seconds{a="2",b="2"} 1+1x100
metricWith3SampleEvery10Seconds{a="3",b="2"} 1+1x100
metricWith1HistogramEvery10Seconds {{schema:1 count:5 sum:20 buckets:[1 2 1 1]}}+{{schema:1 count:10 sum:5 buckets:[1 2 3 4]}}x100
`)
t.Cleanup(func() { storage.Close() })
@ -795,6 +796,15 @@ load 10s
21000: 1,
},
},
{
Query: "metricWith1HistogramEvery10Seconds",
Start: time.Unix(21, 0),
PeakSamples: 12,
TotalSamples: 12, // 1 histogram sample of size 12 / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 12,
},
},
{
// timestamp function has a special handling.
Query: "timestamp(metricWith1SampleEvery10Seconds)",
@ -805,6 +815,15 @@ load 10s
21000: 1,
},
},
{
Query: "timestamp(metricWith1HistogramEvery10Seconds)",
Start: time.Unix(21, 0),
PeakSamples: 13, // histogram size 12 + 1 extra because of timestamp
TotalSamples: 1, // 1 float sample (because of timestamp) / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 1,
},
},
{
Query: "metricWith1SampleEvery10Seconds",
Start: time.Unix(22, 0),
@ -877,11 +896,20 @@ load 10s
201000: 6,
},
},
{
Query: "metricWith1HistogramEvery10Seconds[60s]",
Start: time.Unix(201, 0),
PeakSamples: 72,
TotalSamples: 72, // 1 histogram (size 12) / 10 seconds * 60 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 72,
},
},
{
Query: "max_over_time(metricWith1SampleEvery10Seconds[59s])[20s:5s]",
Start: time.Unix(201, 0),
PeakSamples: 10,
TotalSamples: 24, // (1 sample / 10 seconds * 60 seconds) * 60/5 (using 59s so we always return 6 samples
TotalSamples: 24, // (1 sample / 10 seconds * 60 seconds) * 20/5 (using 59s so we always return 6 samples
// as if we run a query on 00 looking back 60 seconds we will return 7 samples;
// see next test).
TotalSamplesPerStep: stats.TotalSamplesPerStep{
@ -892,12 +920,22 @@ load 10s
Query: "max_over_time(metricWith1SampleEvery10Seconds[60s])[20s:5s]",
Start: time.Unix(201, 0),
PeakSamples: 11,
TotalSamples: 26, // (1 sample / 10 seconds * 60 seconds) + 2 as
TotalSamples: 26, // (1 sample / 10 seconds * 60 seconds) * 4 + 2 as
// max_over_time(metricWith1SampleEvery10Seconds[60s]) @ 190 and 200 will return 7 samples.
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 26,
},
},
{
Query: "max_over_time(metricWith1HistogramEvery10Seconds[60s])[20s:5s]",
Start: time.Unix(201, 0),
PeakSamples: 72,
TotalSamples: 312, // (1 histogram (size 12) / 10 seconds * 60 seconds) * 4 + 2 * 12 as
// max_over_time(metricWith1SampleEvery10Seconds[60s]) @ 190 and 200 will return 7 samples.
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 312,
},
},
{
Query: "metricWith1SampleEvery10Seconds[60s] @ 30",
Start: time.Unix(201, 0),
@ -907,6 +945,15 @@ load 10s
201000: 4,
},
},
{
Query: "metricWith1HistogramEvery10Seconds[60s] @ 30",
Start: time.Unix(201, 0),
PeakSamples: 48,
TotalSamples: 48, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 1 series
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 48,
},
},
{
Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))",
Start: time.Unix(201, 0),
@ -1035,13 +1082,42 @@ load 10s
},
},
{
// timestamp function as a special handling
Query: `metricWith1HistogramEvery10Seconds`,
Start: time.Unix(204, 0),
End: time.Unix(223, 0),
Interval: 5 * time.Second,
PeakSamples: 48,
TotalSamples: 48, // 1 histogram (size 12) per query * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
204000: 12, // aligned to the step time, not the sample time
209000: 12,
214000: 12,
219000: 12,
},
},
{
// timestamp function has a special handling
Query: "timestamp(metricWith1SampleEvery10Seconds)",
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 5,
TotalSamples: 4, // (1 sample / 10 seconds) * 4 steps
TotalSamples: 4, // 1 sample per query * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 1,
206000: 1,
211000: 1,
216000: 1,
},
},
{
// timestamp function has a special handling
Query: "timestamp(metricWith1HistogramEvery10Seconds)",
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 16,
TotalSamples: 4, // 1 sample per query * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 1,
206000: 1,

View file

@ -43,7 +43,7 @@ type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector,
// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
// It converts scalar into vector results.
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable) QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
if err != nil {

View file

@ -954,13 +954,14 @@ func (c *scrapeCache) iterDone(flushCache bool) {
}
}
func (c *scrapeCache) get(met []byte) (*cacheEntry, bool) {
func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) {
e, ok := c.series[string(met)]
if !ok {
return nil, false
return nil, false, false
}
alreadyScraped := e.lastIter == c.iter
e.lastIter = c.iter
return e, true
return e, true, alreadyScraped
}
func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) {
@ -1566,7 +1567,7 @@ loop:
if sl.cache.getDropped(met) {
continue
}
ce, ok := sl.cache.get(met)
ce, ok, seriesAlreadyScraped := sl.cache.get(met)
var (
ref storage.SeriesRef
hash uint64
@ -1575,6 +1576,7 @@ loop:
if ok {
ref = ce.ref
lset = ce.lset
hash = ce.hash
// Update metadata only if it changed in the current iteration.
updateMetadata(lset, false)
@ -1611,6 +1613,9 @@ loop:
updateMetadata(lset, true)
}
if seriesAlreadyScraped {
err = storage.ErrDuplicateSampleForTimestamp
} else {
if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
@ -1629,7 +1634,15 @@ loop:
} else {
ref, err = app.Append(ref, lset, t, val)
}
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
}
if err == nil {
if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil {
sl.cache.trackStaleness(ce.hash, ce.lset)
}
}
sampleAdded, err = sl.checkAddError(met, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
@ -1650,6 +1663,8 @@ loop:
// Increment added even if there's an error so we correctly report the
// number of samples remaining after relabeling.
// We still report duplicated samples here since this number should be the exact number
// of time series exposed on a scrape after relabelling.
added++
exemplars = exemplars[:0] // Reset and reuse the exemplar slice.
for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {
@ -1744,12 +1759,9 @@ loop:
// Adds samples to the appender, checking the error, and then returns the # of samples added,
// whether the caller should continue to process more samples, and any sample or bucket limit errors.
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
switch {
case err == nil:
if (tp == nil || sl.trackTimestampsStaleness) && ce != nil {
sl.cache.trackStaleness(ce.hash, ce.lset)
}
return true, nil
case errors.Is(err, storage.ErrNotFound):
return false, storage.ErrNotFound
@ -1872,7 +1884,7 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er
}
func (sl *scrapeLoop) addReportSample(app storage.Appender, s []byte, t int64, v float64, b *labels.Builder) error {
ce, ok := sl.cache.get(s)
ce, ok, _ := sl.cache.get(s)
var ref storage.SeriesRef
var lset labels.Labels
if ok {

View file

@ -1068,6 +1068,7 @@ func makeTestMetrics(n int) []byte {
fmt.Fprintf(&sb, "# HELP metric_a help text\n")
fmt.Fprintf(&sb, "metric_a{foo=\"%d\",bar=\"%d\"} 1\n", i, i*100)
}
fmt.Fprintf(&sb, "# EOF\n")
return sb.Bytes()
}
@ -2635,6 +2636,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
_, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
require.Error(t, err)
require.NoError(t, slApp.Rollback())
// We need to cycle staleness cache maps after a manual rollback. Otherwise they will have old entries in them,
// which would cause ErrDuplicateSampleForTimestamp errors on the next append.
sl.cache.iterDone(true)
q, err := s.Querier(time.Time{}.UnixNano(), 0)
require.NoError(t, err)
@ -2971,7 +2975,7 @@ func TestReuseCacheRace(t *testing.T) {
func TestCheckAddError(t *testing.T) {
var appErrs appendErrors
sl := scrapeLoop{l: log.NewNopLogger(), metrics: newTestScrapeMetrics(t)}
sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs)
sl.checkAddError(nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs)
require.Equal(t, 1, appErrs.numOutOfOrder)
}
@ -3599,3 +3603,31 @@ func BenchmarkTargetScraperGzip(b *testing.B) {
})
}
}
// When a scrape contains multiple instances for the same time series we should increment
// prometheus_target_scrapes_sample_duplicate_timestamp_total metric.
func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) {
ctx, sl := simpleTestScrapeLoop(t)
slApp := sl.appender(ctx)
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 1, seriesAdded)
slApp = sl.appender(ctx)
total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "", time.Time{})
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 3, total)
require.Equal(t, 3, added)
require.Equal(t, 0, seriesAdded)
metric := dto.Metric{}
err = sl.metrics.targetScrapeSampleDuplicate.Write(&metric)
require.NoError(t, err)
value := metric.GetCounter().GetValue()
require.Equal(t, 4.0, value)
}

View file

@ -37,7 +37,7 @@ if [ -z "${GITHUB_TOKEN}" ]; then
fi
# List of files that should be synced.
SYNC_FILES="CODE_OF_CONDUCT.md LICENSE Makefile.common SECURITY.md .yamllint scripts/golangci-lint.yml .github/workflows/scorecards.yml"
SYNC_FILES="CODE_OF_CONDUCT.md LICENSE Makefile.common SECURITY.md .yamllint scripts/golangci-lint.yml .github/workflows/scorecards.yml .github/workflows/container_description.yml"
# Go to the root of the repo
cd "$(git rev-parse --show-cdup)" || exit 1
@ -99,6 +99,15 @@ check_go() {
curl -sLf -o /dev/null "https://raw.githubusercontent.com/${org_repo}/${default_branch}/go.mod"
}
check_docker() {
local org_repo
local default_branch
org_repo="$1"
default_branch="$2"
curl -sLf -o /dev/null "https://raw.githubusercontent.com/${org_repo}/${default_branch}/Dockerfile"
}
process_repo() {
local org_repo
local default_branch
@ -119,6 +128,10 @@ process_repo() {
echo "${org_repo} is not Go, skipping golangci-lint.yml."
continue
fi
if [[ "${source_file}" == '.github/workflows/container_description.yml' ]] && ! check_docker "${org_repo}" "${default_branch}" ; then
echo "${org_repo} has no Dockerfile, skipping container_description.yml."
continue
fi
if [[ "${source_file}" == 'LICENSE' ]] && ! check_license "${target_file}" ; then
echo "LICENSE in ${org_repo} is not apache, skipping."
continue
@ -131,7 +144,7 @@ process_repo() {
if [[ -z "${target_file}" ]]; then
echo "${target_filename} doesn't exist in ${org_repo}"
case "${source_file}" in
CODE_OF_CONDUCT.md | SECURITY.md)
CODE_OF_CONDUCT.md | SECURITY.md | .github/workflows/container_description.yml)
echo "${source_file} missing in ${org_repo}, force updating."
needs_update+=("${source_file}")
;;

View file

@ -61,6 +61,12 @@ type OAuthConfig struct {
TenantID string `yaml:"tenant_id,omitempty"`
}
// SDKConfig is used to store azure SDK config values.
type SDKConfig struct {
// TenantID is the tenantId of the azure active directory application that is being used to authenticate.
TenantID string `yaml:"tenant_id,omitempty"`
}
// AzureADConfig is used to store the config values.
type AzureADConfig struct { //nolint:revive // exported.
// ManagedIdentity is the managed identity that is being used to authenticate.
@ -69,6 +75,9 @@ type AzureADConfig struct { //nolint:revive // exported.
// OAuth is the oauth config that is being used to authenticate.
OAuth *OAuthConfig `yaml:"oauth,omitempty"`
// OAuth is the oauth config that is being used to authenticate.
SDK *SDKConfig `yaml:"sdk,omitempty"`
// Cloud is the Azure cloud in which the service is running. Example: AzurePublic/AzureGovernment/AzureChina.
Cloud string `yaml:"cloud,omitempty"`
}
@ -102,14 +111,22 @@ func (c *AzureADConfig) Validate() error {
return fmt.Errorf("must provide a cloud in the Azure AD config")
}
if c.ManagedIdentity == nil && c.OAuth == nil {
return fmt.Errorf("must provide an Azure Managed Identity or Azure OAuth in the Azure AD config")
if c.ManagedIdentity == nil && c.OAuth == nil && c.SDK == nil {
return fmt.Errorf("must provide an Azure Managed Identity, Azure OAuth or Azure SDK in the Azure AD config")
}
if c.ManagedIdentity != nil && c.OAuth != nil {
return fmt.Errorf("cannot provide both Azure Managed Identity and Azure OAuth in the Azure AD config")
}
if c.ManagedIdentity != nil && c.SDK != nil {
return fmt.Errorf("cannot provide both Azure Managed Identity and Azure SDK in the Azure AD config")
}
if c.OAuth != nil && c.SDK != nil {
return fmt.Errorf("cannot provide both Azure OAuth and Azure SDK in the Azure AD config")
}
if c.ManagedIdentity != nil {
if c.ManagedIdentity.ClientID == "" {
return fmt.Errorf("must provide an Azure Managed Identity client_id in the Azure AD config")
@ -143,6 +160,17 @@ func (c *AzureADConfig) Validate() error {
}
}
if c.SDK != nil {
var err error
if c.SDK.TenantID != "" {
_, err = regexp.MatchString("^[0-9a-zA-Z-.]+$", c.SDK.TenantID)
if err != nil {
return fmt.Errorf("the provided Azure OAuth tenant_id is invalid")
}
}
}
return nil
}
@ -225,6 +253,16 @@ func newTokenCredential(cfg *AzureADConfig) (azcore.TokenCredential, error) {
}
}
if cfg.SDK != nil {
sdkConfig := &SDKConfig{
TenantID: cfg.SDK.TenantID,
}
cred, err = newSDKTokenCredential(clientOpts, sdkConfig)
if err != nil {
return nil, err
}
}
return cred, nil
}
@ -241,6 +279,12 @@ func newOAuthTokenCredential(clientOpts *azcore.ClientOptions, oAuthConfig *OAut
return azidentity.NewClientSecretCredential(oAuthConfig.TenantID, oAuthConfig.ClientID, oAuthConfig.ClientSecret, opts)
}
// newSDKTokenCredential returns new SDK token credential.
func newSDKTokenCredential(clientOpts *azcore.ClientOptions, sdkConfig *SDKConfig) (azcore.TokenCredential, error) {
opts := &azidentity.DefaultAzureCredentialOptions{ClientOptions: *clientOpts, TenantID: sdkConfig.TenantID}
return azidentity.NewDefaultAzureCredential(opts)
}
// newTokenProvider helps to fetch accessToken for different types of credential. This also takes care of
// refreshing the accessToken before expiry. This accessToken is attached to the Authorization header while making requests.
func newTokenProvider(cfg *AzureADConfig, cred azcore.TokenCredential) (*tokenProvider, error) {

View file

@ -39,7 +39,7 @@ const (
testTokenString = "testTokenString"
)
var testTokenExpiry = time.Now().Add(5 * time.Second)
func testTokenExpiry() time.Time { return time.Now().Add(5 * time.Second) }
type AzureAdTestSuite struct {
suite.Suite
@ -94,7 +94,7 @@ func (ad *AzureAdTestSuite) TestAzureAdRoundTripper() {
testToken := &azcore.AccessToken{
Token: testTokenString,
ExpiresOn: testTokenExpiry,
ExpiresOn: testTokenExpiry(),
}
ad.mockCredential.On("GetToken", mock.Anything, mock.Anything).Return(*testToken, nil)
@ -145,7 +145,7 @@ func TestAzureAdConfig(t *testing.T) {
// Missing managedidentiy or oauth field.
{
filename: "testdata/azuread_bad_configmissing.yaml",
err: "must provide an Azure Managed Identity or Azure OAuth in the Azure AD config",
err: "must provide an Azure Managed Identity, Azure OAuth or Azure SDK in the Azure AD config",
},
// Invalid managedidentity client id.
{
@ -162,6 +162,11 @@ func TestAzureAdConfig(t *testing.T) {
filename: "testdata/azuread_bad_twoconfig.yaml",
err: "cannot provide both Azure Managed Identity and Azure OAuth in the Azure AD config",
},
// Invalid config when both sdk and oauth is provided.
{
filename: "testdata/azuread_bad_oauthsdkconfig.yaml",
err: "cannot provide both Azure OAuth and Azure SDK in the Azure AD config",
},
// Valid config with missing optionally cloud field.
{
filename: "testdata/azuread_good_cloudmissing.yaml",
@ -174,6 +179,10 @@ func TestAzureAdConfig(t *testing.T) {
{
filename: "testdata/azuread_good_oauth.yaml",
},
// Valid SDK config.
{
filename: "testdata/azuread_good_sdk.yaml",
},
}
for _, c := range cases {
_, err := loadAzureAdConfig(c.filename)
@ -232,6 +241,16 @@ func (s *TokenProviderTestSuite) TestNewTokenProvider() {
},
err: "Cloud is not specified or is incorrect: ",
},
// Invalid tokenProvider for SDK.
{
cfg: &AzureADConfig{
Cloud: "PublicAzure",
SDK: &SDKConfig{
TenantID: dummyTenantID,
},
},
err: "Cloud is not specified or is incorrect: ",
},
// Valid tokenProvider for managedidentity.
{
cfg: &AzureADConfig{
@ -252,6 +271,15 @@ func (s *TokenProviderTestSuite) TestNewTokenProvider() {
},
},
},
// Valid tokenProvider for SDK.
{
cfg: &AzureADConfig{
Cloud: "AzurePublic",
SDK: &SDKConfig{
TenantID: dummyTenantID,
},
},
},
}
mockGetTokenCallCounter := 1
for _, c := range cases {
@ -264,11 +292,11 @@ func (s *TokenProviderTestSuite) TestNewTokenProvider() {
} else {
testToken := &azcore.AccessToken{
Token: testTokenString,
ExpiresOn: testTokenExpiry,
ExpiresOn: testTokenExpiry(),
}
s.mockCredential.On("GetToken", mock.Anything, mock.Anything).Return(*testToken, nil).Once().
On("GetToken", mock.Anything, mock.Anything).Return(getToken(), nil)
On("GetToken", mock.Anything, mock.Anything).Return(getToken(), nil).Once()
actualTokenProvider, actualErr := newTokenProvider(c.cfg, s.mockCredential)

View file

@ -0,0 +1,7 @@
cloud: AzurePublic
oauth:
client_id: 00000000-0000-0000-0000-000000000000
client_secret: Cl1ent$ecret!
tenant_id: 00000000-a12b-3cd4-e56f-000000000000
sdk:
tenant_id: 00000000-a12b-3cd4-e56f-000000000000

View file

@ -0,0 +1,3 @@
cloud: AzurePublic
sdk:
tenant_id: 00000000-a12b-3cd4-e56f-000000000000

View file

@ -3,7 +3,6 @@
This files in the `prometheus/` and `prometheusremotewrite/` are copied from the OpenTelemetry Project[^1].
This is done instead of adding a go.mod dependency because OpenTelemetry depends on `prometheus/prometheus` and a cyclic dependency will be created. This is just a temporary solution and the long-term solution is to move the required packages from OpenTelemetry into `prometheus/prometheus`.
We don't copy in `./prometheus` through this script because that package imports a collector specific featuregate package we don't want to import. The featuregate package is being removed now, and in the future we will copy this folder too.
To update the dependency is a multi-step process:
1. Vendor the latest `prometheus/prometheus`@`main` into [`opentelemetry/opentelemetry-collector-contrib`](https://github.com/open-telemetry/opentelemetry-collector-contrib)

View file

@ -3,7 +3,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package prometheus // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
package prometheus // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
import (
"strings"

View file

@ -3,7 +3,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package prometheus // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
package prometheus // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
import (
"strings"

View file

@ -3,7 +3,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package prometheus // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
package prometheus // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
import "strings"

View file

@ -23,5 +23,5 @@ case $(sed --help 2>&1) in
*) set sed -i '';;
esac
"$@" -e 's#github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus#github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus#g' ./prometheusremotewrite/*.go
"$@" -e 's#github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus#github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus#g' ./prometheusremotewrite/*.go ./prometheus/*.go
"$@" -e '1s#^#// DO NOT EDIT. COPIED AS-IS. SEE ../README.md\n\n#g' ./prometheusremotewrite/*.go ./prometheus/*.go

View file

@ -202,34 +202,16 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
return err
}
querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs)
if err != nil {
chunks := h.getChunkSeriesSet(ctx, query, filteredMatchers)
if err := chunks.Err(); err != nil {
return err
}
defer func() {
if err := querier.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
}
}()
var hints *storage.SelectHints
if query.Hints != nil {
hints = &storage.SelectHints{
Start: query.Hints.StartMs,
End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
Grouping: query.Hints.Grouping,
Range: query.Hints.RangeMs,
By: query.Hints.By,
}
}
ws, err := StreamChunkedReadResponses(
NewChunkedWriter(w, f),
int64(i),
// The streaming API has to provide the series sorted.
querier.Select(ctx, true, hints, filteredMatchers...),
chunks,
sortedExternalLabels,
h.remoteReadMaxBytesInFrame,
h.marshalPool,
@ -254,6 +236,35 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
}
}
// getChunkSeriesSet executes a query to retrieve a ChunkSeriesSet,
// encapsulating the operation in its own function to ensure timely release of
// the querier resources.
func (h *readHandler) getChunkSeriesSet(ctx context.Context, query *prompb.Query, filteredMatchers []*labels.Matcher) storage.ChunkSeriesSet {
querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs)
if err != nil {
return storage.ErrChunkSeriesSet(err)
}
defer func() {
if err := querier.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
}
}()
var hints *storage.SelectHints
if query.Hints != nil {
hints = &storage.SelectHints{
Start: query.Hints.StartMs,
End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
Grouping: query.Hints.Grouping,
Range: query.Hints.RangeMs,
By: query.Hints.By,
}
}
return querier.Select(ctx, true, hints, filteredMatchers...)
}
// filterExtLabelsFromMatchers change equality matchers which match external labels
// to a matcher that looks for an empty label,
// as that label should not be present in the storage.

View file

@ -209,6 +209,22 @@ func TestCorruptedChunk(t *testing.T) {
}
}
func sequenceFiles(dir string) ([]string, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
var res []string
for _, fi := range files {
if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
continue
}
res = append(res, filepath.Join(dir, fi.Name()))
}
return res, nil
}
func TestLabelValuesWithMatchers(t *testing.T) {
tmpdir := t.TempDir()
ctx := context.Background()

View file

@ -202,15 +202,6 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) {
}, nil
}
// PopulatedChunk creates a chunk populated with samples every second starting at minTime.
func PopulatedChunk(numSamples int, minTime int64) (Meta, error) {
samples := make([]Sample, numSamples)
for i := 0; i < numSamples; i++ {
samples[i] = sample{t: minTime + int64(i*1000), f: 1.0}
}
return ChunkFromSamples(samples)
}
// ChunkMetasToSamples converts a slice of chunk meta data to a slice of samples.
// Used in tests to compare the content of chunks.
func ChunkMetasToSamples(chunks []Meta) (result []Sample) {

View file

@ -15,7 +15,6 @@ package chunks
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
@ -690,7 +689,6 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
sgmIndex, chkStart := ref.Unpack()
// We skip the series ref and the mint/maxt beforehand.
chkStart += SeriesRefSize + (2 * MintMaxtSize)
chkCRC32 := newCRC32()
// If it is the current open file, then the chunks can be in the buffer too.
if sgmIndex == cdm.curFileSequence {
@ -755,20 +753,13 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
// Check the CRC.
sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil {
if err := checkCRC32(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd), sum); err != nil {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: err,
}
}
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: fmt.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
}
}
// The chunk data itself.
chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd)
@ -802,8 +793,6 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
cdm.fileMaxtSet = true
}()
chkCRC32 := newCRC32()
// Iterate files in ascending order.
segIDs := make([]int, 0, len(cdm.mmappedChunkFiles))
for seg := range cdm.mmappedChunkFiles {
@ -838,7 +827,6 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
" - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID),
}
}
chkCRC32.Reset()
chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx))
startIdx := idx
@ -877,14 +865,11 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
// Check CRC.
sum := mmapFile.byteSlice.Range(idx, idx+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil {
return err
}
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
if err := checkCRC32(mmapFile.byteSlice.Range(startIdx, idx), sum); err != nil {
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: fmt.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
Err: err,
}
}
idx += CRCSize

View file

@ -24,7 +24,6 @@ import (
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"time"
@ -779,10 +778,6 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
walDir := filepath.Join(dir, "wal")
wblDir := filepath.Join(dir, wlog.WblDirName)
// Migrate old WAL if one exists.
if err := MigrateWAL(l, walDir); err != nil {
return nil, fmt.Errorf("migrate WAL: %w", err)
}
for _, tmpDir := range []string{walDir, dir} {
// Remove tmp dirs.
if err := removeBestEffortTmpDirs(l, tmpDir); err != nil {
@ -1615,7 +1610,7 @@ func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struc
for i, block := range blocks {
// The difference between the first block and this block is larger than
// the retention period so any blocks after that are added as deletable.
if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration {
if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime >= db.opts.RetentionDuration {
for _, b := range blocks[i:] {
deletable[b.meta.ULID] = struct{}{}
}
@ -2213,39 +2208,6 @@ func blockDirs(dir string) ([]string, error) {
return dirs, nil
}
func sequenceFiles(dir string) ([]string, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
var res []string
for _, fi := range files {
if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
continue
}
res = append(res, filepath.Join(dir, fi.Name()))
}
return res, nil
}
func nextSequenceFile(dir string) (string, int, error) {
files, err := os.ReadDir(dir)
if err != nil {
return "", 0, err
}
i := uint64(0)
for _, f := range files {
j, err := strconv.ParseUint(f.Name(), 10, 64)
if err != nil {
continue
}
i = j
}
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
}
func exponential(d, min, max time.Duration) time.Duration {
d *= 2
if d < min {

View file

@ -681,6 +681,34 @@ func TestDB_Snapshot(t *testing.T) {
require.Equal(t, 1000.0, sum)
}
func TestDB_BeyondTimeRetention(t *testing.T) {
opts := DefaultOptions()
opts.RetentionDuration = 100
db := openTestDB(t, opts, nil)
defer func() {
require.NoError(t, db.Close())
}()
// We have 4 blocks, 3 of which are beyond the retention duration.
metas := []BlockMeta{
{MinTime: 300, MaxTime: 500},
{MinTime: 200, MaxTime: 300},
{MinTime: 100, MaxTime: 200},
{MinTime: 0, MaxTime: 100},
}
for _, m := range metas {
createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
}
// Reloading should truncate the 3 blocks which are >= the retention period.
require.NoError(t, db.reloadBlocks())
blocks := db.Blocks()
require.Len(t, blocks, 1)
require.Equal(t, metas[0].MinTime, blocks[0].Meta().MinTime)
require.Equal(t, metas[0].MaxTime, blocks[0].Meta().MaxTime)
}
// TestDB_Snapshot_ChunksOutsideOfCompactedRange ensures that a snapshot removes chunks samples
// that are outside the set block time range.
// See https://github.com/prometheus/prometheus/issues/5105
@ -3598,7 +3626,7 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun
// just to iterate through the bytes slice. We don't really care the reason why
// we read this data, we just need to read it to make sure the memory address
// of the []byte is still valid.
chkCRC32 := newCRC32()
chkCRC32 := crc32.New(crc32.MakeTable(crc32.Castagnoli))
for _, chunk := range chunks {
chkCRC32.Reset()
_, err := chkCRC32.Write(chunk.Bytes())

View file

@ -1829,7 +1829,7 @@ func NewStringListIter(s []string) StringIter {
return &stringListIter{l: s}
}
// symbolsIter implements StringIter.
// stringListIter implements StringIter.
type stringListIter struct {
l []string
cur string

File diff suppressed because it is too large Load diff

View file

@ -1,553 +0,0 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !windows
package tsdb
import (
"encoding/binary"
"io"
"math/rand"
"os"
"path"
"path/filepath"
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/testutil"
)
func TestSegmentWAL_cut(t *testing.T) {
tmpdir := t.TempDir()
// This calls cut() implicitly the first time without a previous tail.
w, err := OpenSegmentWAL(tmpdir, nil, 0, nil)
require.NoError(t, err)
require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!")))
require.NoError(t, w.cut())
// Cutting creates a new file.
require.Len(t, w.files, 2)
require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!")))
require.NoError(t, w.Close())
for _, of := range w.files {
f, err := os.Open(of.Name())
require.NoError(t, err)
// Verify header data.
metab := make([]byte, 8)
_, err = f.Read(metab)
require.NoError(t, err)
require.Equal(t, WALMagic, binary.BigEndian.Uint32(metab[:4]))
require.Equal(t, WALFormatDefault, metab[4])
// We cannot actually check for correct pre-allocation as it is
// optional per filesystem and handled transparently.
et, flag, b, err := newWALReader(nil, nil).entry(f)
require.NoError(t, err)
require.Equal(t, WALEntrySeries, et)
require.Equal(t, byte(walSeriesSimple), flag)
require.Equal(t, []byte("Hello World!!"), b)
}
}
func TestSegmentWAL_Truncate(t *testing.T) {
const (
numMetrics = 20000
batch = 100
)
series, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), numMetrics)
require.NoError(t, err)
dir := t.TempDir()
w, err := OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)
defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(w)
w.segmentSize = 10000
for i := 0; i < numMetrics; i += batch {
var rs []record.RefSeries
for j, s := range series[i : i+batch] {
rs = append(rs, record.RefSeries{Labels: s, Ref: chunks.HeadSeriesRef(i+j) + 1})
}
err := w.LogSeries(rs)
require.NoError(t, err)
}
// We mark the 2nd half of the files with a min timestamp that should discard
// them from the selection of compactable files.
for i, f := range w.files[len(w.files)/2:] {
f.maxTime = int64(1000 + i)
}
// All series in those files must be preserved regarding of the provided postings list.
boundarySeries := w.files[len(w.files)/2].minSeries
// We truncate while keeping every 2nd series.
keep := map[chunks.HeadSeriesRef]struct{}{}
for i := 1; i <= numMetrics; i += 2 {
keep[chunks.HeadSeriesRef(i)] = struct{}{}
}
keepf := func(id chunks.HeadSeriesRef) bool {
_, ok := keep[id]
return ok
}
err = w.Truncate(1000, keepf)
require.NoError(t, err)
var expected []record.RefSeries
for i := 1; i <= numMetrics; i++ {
if i%2 == 1 || chunks.HeadSeriesRef(i) >= boundarySeries {
expected = append(expected, record.RefSeries{Ref: chunks.HeadSeriesRef(i), Labels: series[i-1]})
}
}
// Call Truncate once again to see whether we can read the written file without
// creating a new WAL.
err = w.Truncate(1000, keepf)
require.NoError(t, err)
require.NoError(t, w.Close())
// The same again with a new WAL.
w, err = OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)
defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(w)
var readSeries []record.RefSeries
r := w.Reader()
require.NoError(t, r.Read(func(s []record.RefSeries) {
readSeries = append(readSeries, s...)
}, nil, nil))
testutil.RequireEqual(t, expected, readSeries)
}
// Symmetrical test of reading and writing to the WAL via its main interface.
func TestSegmentWAL_Log_Restore(t *testing.T) {
const (
numMetrics = 50
iterations = 5
stepSize = 5
)
// Generate testing data. It does not make semantic sense but
// for the purpose of this test.
series, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), numMetrics)
require.NoError(t, err)
dir := t.TempDir()
var (
recordedSeries [][]record.RefSeries
recordedSamples [][]record.RefSample
recordedDeletes [][]tombstones.Stone
)
var totalSamples int
// Open WAL a bunch of times, validate all previous data can be read,
// write more data to it, close it.
for k := 0; k < numMetrics; k += numMetrics / iterations {
w, err := OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)
// Set smaller segment size so we can actually write several files.
w.segmentSize = 1000 * 1000
r := w.Reader()
var (
resultSeries [][]record.RefSeries
resultSamples [][]record.RefSample
resultDeletes [][]tombstones.Stone
)
serf := func(series []record.RefSeries) {
if len(series) > 0 {
clsets := make([]record.RefSeries, len(series))
copy(clsets, series)
resultSeries = append(resultSeries, clsets)
}
}
smplf := func(smpls []record.RefSample) {
if len(smpls) > 0 {
csmpls := make([]record.RefSample, len(smpls))
copy(csmpls, smpls)
resultSamples = append(resultSamples, csmpls)
}
}
delf := func(stones []tombstones.Stone) {
if len(stones) > 0 {
cst := make([]tombstones.Stone, len(stones))
copy(cst, stones)
resultDeletes = append(resultDeletes, cst)
}
}
require.NoError(t, r.Read(serf, smplf, delf))
testutil.RequireEqual(t, recordedSamples, resultSamples)
testutil.RequireEqual(t, recordedSeries, resultSeries)
testutil.RequireEqual(t, recordedDeletes, resultDeletes)
series := series[k : k+(numMetrics/iterations)]
// Insert in batches and generate different amounts of samples for each.
for i := 0; i < len(series); i += stepSize {
var samples []record.RefSample
var stones []tombstones.Stone
for j := 0; j < i*10; j++ {
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(j % 10000),
T: int64(j * 2),
V: rand.Float64(),
})
}
for j := 0; j < i*20; j++ {
ts := rand.Int63()
stones = append(stones, tombstones.Stone{Ref: storage.SeriesRef(rand.Uint64()), Intervals: tombstones.Intervals{{Mint: ts, Maxt: ts + rand.Int63n(10000)}}})
}
lbls := series[i : i+stepSize]
series := make([]record.RefSeries, 0, len(series))
for j, l := range lbls {
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i + j),
Labels: l,
})
}
require.NoError(t, w.LogSeries(series))
require.NoError(t, w.LogSamples(samples))
require.NoError(t, w.LogDeletes(stones))
if len(lbls) > 0 {
recordedSeries = append(recordedSeries, series)
}
if len(samples) > 0 {
recordedSamples = append(recordedSamples, samples)
totalSamples += len(samples)
}
if len(stones) > 0 {
recordedDeletes = append(recordedDeletes, stones)
}
}
require.NoError(t, w.Close())
}
}
func TestWALRestoreCorrupted_invalidSegment(t *testing.T) {
dir := t.TempDir()
wal, err := OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)
defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(wal)
_, err = wal.createSegmentFile(filepath.Join(dir, "000000"))
require.NoError(t, err)
f, err := wal.createSegmentFile(filepath.Join(dir, "000001"))
require.NoError(t, err)
f2, err := wal.createSegmentFile(filepath.Join(dir, "000002"))
require.NoError(t, err)
require.NoError(t, f2.Close())
// Make header of second segment invalid.
_, err = f.WriteAt([]byte{1, 2, 3, 4}, 0)
require.NoError(t, err)
require.NoError(t, f.Close())
require.NoError(t, wal.Close())
wal, err = OpenSegmentWAL(dir, log.NewLogfmtLogger(os.Stderr), 0, nil)
require.NoError(t, err)
defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(wal)
files, err := os.ReadDir(dir)
require.NoError(t, err)
fns := []string{}
for _, f := range files {
fns = append(fns, f.Name())
}
require.Equal(t, []string{"000000"}, fns)
}
// Test reading from a WAL that has been corrupted through various means.
func TestWALRestoreCorrupted(t *testing.T) {
cases := []struct {
name string
f func(*testing.T, *SegmentWAL)
}{
{
name: "truncate_checksum",
f: func(t *testing.T, w *SegmentWAL) {
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0o666)
require.NoError(t, err)
defer f.Close()
off, err := f.Seek(0, io.SeekEnd)
require.NoError(t, err)
require.NoError(t, f.Truncate(off-1))
},
},
{
name: "truncate_body",
f: func(t *testing.T, w *SegmentWAL) {
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0o666)
require.NoError(t, err)
defer f.Close()
off, err := f.Seek(0, io.SeekEnd)
require.NoError(t, err)
require.NoError(t, f.Truncate(off-8))
},
},
{
name: "body_content",
f: func(t *testing.T, w *SegmentWAL) {
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0o666)
require.NoError(t, err)
defer f.Close()
off, err := f.Seek(0, io.SeekEnd)
require.NoError(t, err)
// Write junk before checksum starts.
_, err = f.WriteAt([]byte{1, 2, 3, 4}, off-8)
require.NoError(t, err)
},
},
{
name: "checksum",
f: func(t *testing.T, w *SegmentWAL) {
f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0o666)
require.NoError(t, err)
defer f.Close()
off, err := f.Seek(0, io.SeekEnd)
require.NoError(t, err)
// Write junk into checksum
_, err = f.WriteAt([]byte{1, 2, 3, 4}, off-4)
require.NoError(t, err)
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
// Generate testing data. It does not make semantic sense but
// for the purpose of this test.
dir := t.TempDir()
w, err := OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)
defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(w)
require.NoError(t, w.LogSamples([]record.RefSample{{T: 1, V: 2}}))
require.NoError(t, w.LogSamples([]record.RefSample{{T: 2, V: 3}}))
require.NoError(t, w.cut())
// Sleep 2 seconds to avoid error where cut and test "cases" function may write or
// truncate the file out of orders as "cases" are not synchronized with cut.
// Hopefully cut will complete by 2 seconds.
time.Sleep(2 * time.Second)
require.NoError(t, w.LogSamples([]record.RefSample{{T: 3, V: 4}}))
require.NoError(t, w.LogSamples([]record.RefSample{{T: 5, V: 6}}))
require.NoError(t, w.Close())
// cut() truncates and fsyncs the first segment async. If it happens after
// the corruption we apply below, the corruption will be overwritten again.
// Fire and forget a sync to avoid flakiness.
w.files[0].Sync()
// Corrupt the second entry in the first file.
// After re-opening we must be able to read the first entry
// and the rest, including the second file, must be truncated for clean further
// writes.
c.f(t, w)
logger := log.NewLogfmtLogger(os.Stderr)
w2, err := OpenSegmentWAL(dir, logger, 0, nil)
require.NoError(t, err)
defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(w2)
r := w2.Reader()
serf := func(l []record.RefSeries) {
require.Empty(t, l)
}
// Weird hack to check order of reads.
i := 0
samplef := func(s []record.RefSample) {
if i == 0 {
require.Equal(t, []record.RefSample{{T: 1, V: 2}}, s)
i++
} else {
require.Equal(t, []record.RefSample{{T: 99, V: 100}}, s)
}
}
require.NoError(t, r.Read(serf, samplef, nil))
require.NoError(t, w2.LogSamples([]record.RefSample{{T: 99, V: 100}}))
require.NoError(t, w2.Close())
// We should see the first valid entry and the new one, everything after
// is truncated.
w3, err := OpenSegmentWAL(dir, logger, 0, nil)
require.NoError(t, err)
defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(w3)
r = w3.Reader()
i = 0
require.NoError(t, r.Read(serf, samplef, nil))
})
}
}
func TestMigrateWAL_Empty(t *testing.T) {
// The migration procedure must properly deal with a zero-length segment,
// which is valid in the new format.
dir := t.TempDir()
wdir := path.Join(dir, "wal")
// Initialize empty WAL.
w, err := wlog.New(nil, nil, wdir, wlog.CompressionNone)
require.NoError(t, err)
require.NoError(t, w.Close())
require.NoError(t, MigrateWAL(nil, wdir))
}
func TestMigrateWAL_Fuzz(t *testing.T) {
dir := t.TempDir()
wdir := path.Join(dir, "wal")
// Should pass if no WAL exists yet.
require.NoError(t, MigrateWAL(nil, wdir))
oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil)
require.NoError(t, err)
// Write some data.
require.NoError(t, oldWAL.LogSeries([]record.RefSeries{
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
}))
require.NoError(t, oldWAL.LogSamples([]record.RefSample{
{Ref: 1, T: 100, V: 200},
{Ref: 2, T: 300, V: 400},
}))
require.NoError(t, oldWAL.LogSeries([]record.RefSeries{
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
}))
require.NoError(t, oldWAL.LogSamples([]record.RefSample{
{Ref: 3, T: 100, V: 200},
{Ref: 4, T: 300, V: 400},
}))
require.NoError(t, oldWAL.LogDeletes([]tombstones.Stone{
{Ref: 1, Intervals: []tombstones.Interval{{Mint: 100, Maxt: 200}}},
}))
require.NoError(t, oldWAL.Close())
// Perform migration.
require.NoError(t, MigrateWAL(nil, wdir))
w, err := wlog.New(nil, nil, wdir, wlog.CompressionNone)
require.NoError(t, err)
// We can properly write some new data after migration.
var enc record.Encoder
require.NoError(t, w.Log(enc.Samples([]record.RefSample{
{Ref: 500, T: 1, V: 1},
}, nil)))
require.NoError(t, w.Close())
// Read back all data.
sr, err := wlog.NewSegmentsReader(wdir)
require.NoError(t, err)
r := wlog.NewReader(sr)
var res []interface{}
dec := record.NewDecoder(labels.NewSymbolTable())
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
s, err := dec.Series(rec, nil)
require.NoError(t, err)
res = append(res, s)
case record.Samples:
s, err := dec.Samples(rec, nil)
require.NoError(t, err)
res = append(res, s)
case record.Tombstones:
s, err := dec.Tombstones(rec, nil)
require.NoError(t, err)
res = append(res, s)
default:
require.Fail(t, "unknown record type %d", dec.Type(rec))
}
}
require.NoError(t, r.Err())
testutil.RequireEqual(t, []interface{}{
[]record.RefSeries{
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
},
[]record.RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}},
[]record.RefSeries{
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
},
[]record.RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}},
[]tombstones.Stone{{Ref: 1, Intervals: []tombstones.Interval{{Mint: 100, Maxt: 200}}}},
[]record.RefSample{{Ref: 500, T: 1, V: 1}},
}, res)
// Migrating an already migrated WAL shouldn't do anything.
require.NoError(t, MigrateWAL(nil, wdir))
}

View file

@ -177,13 +177,6 @@ type TSDBAdminStats interface {
WALReplayStatus() (tsdb.WALReplayStatus, error)
}
// QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked.
type QueryEngine interface {
SetQueryLogger(l promql.QueryLogger)
NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error)
NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)
}
type QueryOpts interface {
EnablePerStepStats() bool
LookbackDelta() time.Duration
@ -193,7 +186,7 @@ type QueryOpts interface {
// them using the provided storage and query engine.
type API struct {
Queryable storage.SampleAndChunkQueryable
QueryEngine QueryEngine
QueryEngine promql.QueryEngine
ExemplarQueryable storage.ExemplarQueryable
scrapePoolsRetriever func(context.Context) ScrapePoolsRetriever
@ -226,7 +219,7 @@ type API struct {
// NewAPI returns an initialized API type.
func NewAPI(
qe QueryEngine,
qe promql.QueryEngine,
q storage.SampleAndChunkQueryable,
ap storage.Appendable,
eq storage.ExemplarQueryable,

View file

@ -3881,8 +3881,6 @@ type fakeEngine struct {
query fakeQuery
}
func (e *fakeEngine) SetQueryLogger(promql.QueryLogger) {}
func (e *fakeEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
return &e.query, nil
}

View file

@ -48,29 +48,29 @@ var scenarios = map[string]struct {
}{
"empty": {
params: "",
code: 200,
code: http.StatusOK,
body: ``,
},
"match nothing": {
params: "match[]=does_not_match_anything",
code: 200,
code: http.StatusOK,
body: ``,
},
"invalid params from the beginning": {
params: "match[]=-not-a-valid-metric-name",
code: 400,
code: http.StatusBadRequest,
body: `1:1: parse error: unexpected <op:->
`,
},
"invalid params somewhere in the middle": {
params: "match[]=not-a-valid-metric-name",
code: 400,
code: http.StatusBadRequest,
body: `1:4: parse error: unexpected <op:->
`,
},
"test_metric1": {
params: "match[]=test_metric1",
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
@ -78,33 +78,33 @@ test_metric1{foo="boo",instance="i"} 1 6000000
},
"test_metric2": {
params: "match[]=test_metric2",
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric2 untyped
test_metric2{foo="boo",instance="i"} 1 6000000
`,
},
"test_metric_without_labels": {
params: "match[]=test_metric_without_labels",
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric_without_labels untyped
test_metric_without_labels{instance=""} 1001 6000000
`,
},
"test_stale_metric": {
params: "match[]=test_metric_stale",
code: 200,
code: http.StatusOK,
body: ``,
},
"test_old_metric": {
params: "match[]=test_metric_old",
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric_old untyped
test_metric_old{instance=""} 981 5880000
`,
},
"{foo='boo'}": {
params: "match[]={foo='boo'}",
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric1 untyped
test_metric1{foo="boo",instance="i"} 1 6000000
# TYPE test_metric2 untyped
@ -113,7 +113,7 @@ test_metric2{foo="boo",instance="i"} 1 6000000
},
"two matchers": {
params: "match[]=test_metric1&match[]=test_metric2",
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
@ -123,7 +123,7 @@ test_metric2{foo="boo",instance="i"} 1 6000000
},
"two matchers with overlap": {
params: "match[]={__name__=~'test_metric1'}&match[]={foo='bar'}",
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
@ -131,7 +131,7 @@ test_metric1{foo="boo",instance="i"} 1 6000000
},
"everything": {
params: "match[]={__name__=~'.%2b'}", // '%2b' is an URL-encoded '+'.
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
@ -145,7 +145,7 @@ test_metric_without_labels{instance=""} 1001 6000000
},
"empty label value matches everything that doesn't have that label": {
params: "match[]={foo='',__name__=~'.%2b'}",
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric_old untyped
test_metric_old{instance=""} 981 5880000
# TYPE test_metric_without_labels untyped
@ -154,7 +154,7 @@ test_metric_without_labels{instance=""} 1001 6000000
},
"empty label value for a label that doesn't exist at all, matches everything": {
params: "match[]={bar='',__name__=~'.%2b'}",
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
@ -169,7 +169,7 @@ test_metric_without_labels{instance=""} 1001 6000000
"external labels are added if not already present": {
params: "match[]={__name__=~'.%2b'}", // '%2b' is an URL-encoded '+'.
externalLabels: labels.FromStrings("foo", "baz", "zone", "ie"),
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar",instance="i",zone="ie"} 10000 6000000
test_metric1{foo="boo",instance="i",zone="ie"} 1 6000000
@ -186,7 +186,7 @@ test_metric_without_labels{foo="baz",instance="",zone="ie"} 1001 6000000
// know what it does anyway.
params: "match[]={__name__=~'.%2b'}", // '%2b' is an URL-encoded '+'.
externalLabels: labels.FromStrings("instance", "baz"),
code: 200,
code: http.StatusOK,
body: `# TYPE test_metric1 untyped
test_metric1{foo="bar",instance="i"} 10000 6000000
test_metric1{foo="boo",instance="i"} 1 6000000
@ -390,7 +390,6 @@ func TestFederationWithNativeHistograms(t *testing.T) {
require.Equal(t, http.StatusOK, res.Code)
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
p := textparse.NewProtobufParser(body, false, labels.NewSymbolTable())
var actVec promql.Vector
metricFamilies := 0