Merge pull request #9908 from prometheus/beorn7/merge2

Merge main into sparsehistogram, now for real
This commit is contained in:
Björn Rabenstein 2021-11-30 18:21:15 +01:00 committed by GitHub
commit 5b8ad3599a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
40 changed files with 3521 additions and 4788 deletions

View file

@ -36,6 +36,7 @@ jobs:
GOOPTS: "-p 2" GOOPTS: "-p 2"
GOMAXPROCS: "2" GOMAXPROCS: "2"
GO111MODULE: "on" GO111MODULE: "on"
- run: go test ./tsdb/ -test.tsdb-isolation=false
- prometheus/check_proto: - prometheus/check_proto:
version: "3.15.8" version: "3.15.8"
- prometheus/store_artifact: - prometheus/store_artifact:
@ -93,6 +94,7 @@ jobs:
steps: steps:
- checkout - checkout
- run: go test ./tsdb/... - run: go test ./tsdb/...
- run: go test ./tsdb/ -test.tsdb-isolation=false
test_mixins: test_mixins:
executor: golang executor: golang

View file

@ -1,7 +1,7 @@
Julien Pivotto (<roidelapluie@prometheus.io> / @roidelapluie) and Levi Harrison (<levi@leviharrison.dev> / @LeviHarrison) are the main/default maintainers, some parts of the codebase have other maintainers: Julien Pivotto (<roidelapluie@prometheus.io> / @roidelapluie) and Levi Harrison (<levi@leviharrison.dev> / @LeviHarrison) are the main/default maintainers, some parts of the codebase have other maintainers:
* `cmd` * `cmd`
* `promtool`: David Leadbeater (<dgl@dgl.cx> / @dgl), Jessica Grebenschikov (<jessica.greben1@gmail.com> / @jessicagreben) * `promtool`: David Leadbeater (<dgl@dgl.cx> / @dgl), Jessica Grebenschikov (<jessicagreben@prometheus.io> / @jessicagreben)
* `discovery` * `discovery`
* `k8s`: Frederic Branczyk (<fbranczyk@gmail.com> / @brancz) * `k8s`: Frederic Branczyk (<fbranczyk@gmail.com> / @brancz)
* `documentation` * `documentation`

View file

@ -288,12 +288,7 @@ func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) {
} }
func TestTimeMetrics(t *testing.T) { func TestTimeMetrics(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "time_metrics_e2e") tmpDir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(tmpDir))
}()
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()
db, err := openDBWithMetrics(tmpDir, log.NewNopLogger(), reg, nil, nil) db, err := openDBWithMetrics(tmpDir, log.NewNopLogger(), reg, nil, nil)

View file

@ -248,11 +248,7 @@ func (p *queryLogTest) run(t *testing.T) {
p.setQueryLog(t, "") p.setQueryLog(t, "")
} }
dir, err := ioutil.TempDir("", "query_log_test") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
params := append([]string{ params := append([]string{
"-test.main", "-test.main",

View file

@ -15,9 +15,7 @@ package main
import ( import (
"context" "context"
"io/ioutil"
"math" "math"
"os"
"sort" "sort"
"testing" "testing"
"time" "time"
@ -688,13 +686,9 @@ after_eof 1 2
t.Run(test.Description, func(t *testing.T) { t.Run(test.Description, func(t *testing.T) {
t.Logf("Test:%s", test.Description) t.Logf("Test:%s", test.Description)
outputDir, err := ioutil.TempDir("", "myDir") outputDir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(outputDir))
}()
err = backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false, false, test.MaxBlockDuration) err := backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false, false, test.MaxBlockDuration)
if !test.IsOk { if !test.IsOk {
require.Error(t, err, test.Description) require.Error(t, err, test.Description)

View file

@ -17,7 +17,6 @@ import (
"context" "context"
"io/ioutil" "io/ioutil"
"math" "math"
"os"
"path/filepath" "path/filepath"
"testing" "testing"
"time" "time"
@ -73,11 +72,7 @@ func TestBackfillRuleIntegration(t *testing.T) {
} }
for _, tt := range testCases { for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "backfilldata") tmpDir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(tmpDir))
}()
ctx := context.Background() ctx := context.Background()
// Execute the test more than once to simulate running the rule importer twice with the same data. // Execute the test more than once to simulate running the rule importer twice with the same data.
@ -220,11 +215,7 @@ func createMultiRuleTestFiles(path string) error {
// TestBackfillLabels confirms that the labels in the rule file override the labels from the metrics // TestBackfillLabels confirms that the labels in the rule file override the labels from the metrics
// received from Prometheus Query API, including the __name__ label. // received from Prometheus Query API, including the __name__ label.
func TestBackfillLabels(t *testing.T) { func TestBackfillLabels(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "backfilldata") tmpDir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(tmpDir))
}()
ctx := context.Background() ctx := context.Background()
start := time.Date(2009, time.November, 10, 6, 34, 0, 0, time.UTC) start := time.Date(2009, time.November, 10, 6, 34, 0, 0, time.UTC)

View file

@ -1023,8 +1023,8 @@ address defaults to the `host_ip` attribute of the hypervisor.
The following meta labels are available on targets during [relabeling](#relabel_config): The following meta labels are available on targets during [relabeling](#relabel_config):
* `__meta_openstack_hypervisor_host_ip`: the hypervisor node's IP address. * `__meta_openstack_hypervisor_host_ip`: the hypervisor node's IP address.
* `__meta_openstack_hypervisor_hostname`: the hypervisor node's name.
* `__meta_openstack_hypervisor_id`: the hypervisor node's ID. * `__meta_openstack_hypervisor_id`: the hypervisor node's ID.
* `__meta_openstack_hypervisor_name`: the hypervisor node's name.
* `__meta_openstack_hypervisor_state`: the hypervisor node's state. * `__meta_openstack_hypervisor_state`: the hypervisor node's state.
* `__meta_openstack_hypervisor_status`: the hypervisor node's status. * `__meta_openstack_hypervisor_status`: the hypervisor node's status.
* `__meta_openstack_hypervisor_type`: the hypervisor node's type. * `__meta_openstack_hypervisor_type`: the hypervisor node's type.

View file

@ -185,10 +185,10 @@ scrape_configs:
regex: __meta_kubernetes_service_label_(.+) regex: __meta_kubernetes_service_label_(.+)
- source_labels: [__meta_kubernetes_namespace] - source_labels: [__meta_kubernetes_namespace]
action: replace action: replace
target_label: kubernetes_namespace target_label: namespace
- source_labels: [__meta_kubernetes_service_name] - source_labels: [__meta_kubernetes_service_name]
action: replace action: replace
target_label: kubernetes_name target_label: service
# Example scrape config for probing services via the Blackbox Exporter. # Example scrape config for probing services via the Blackbox Exporter.
# #
@ -217,9 +217,9 @@ scrape_configs:
- action: labelmap - action: labelmap
regex: __meta_kubernetes_service_label_(.+) regex: __meta_kubernetes_service_label_(.+)
- source_labels: [__meta_kubernetes_namespace] - source_labels: [__meta_kubernetes_namespace]
target_label: kubernetes_namespace target_label: namespace
- source_labels: [__meta_kubernetes_service_name] - source_labels: [__meta_kubernetes_service_name]
target_label: kubernetes_name target_label: service
# Example scrape config for probing ingresses via the Blackbox Exporter. # Example scrape config for probing ingresses via the Blackbox Exporter.
# #
@ -255,9 +255,9 @@ scrape_configs:
- action: labelmap - action: labelmap
regex: __meta_kubernetes_ingress_label_(.+) regex: __meta_kubernetes_ingress_label_(.+)
- source_labels: [__meta_kubernetes_namespace] - source_labels: [__meta_kubernetes_namespace]
target_label: kubernetes_namespace target_label: namespace
- source_labels: [__meta_kubernetes_ingress_name] - source_labels: [__meta_kubernetes_ingress_name]
target_label: kubernetes_name target_label: ingress
# Example scrape config for pods # Example scrape config for pods
# #
@ -294,7 +294,7 @@ scrape_configs:
regex: __meta_kubernetes_pod_label_(.+) regex: __meta_kubernetes_pod_label_(.+)
- source_labels: [__meta_kubernetes_namespace] - source_labels: [__meta_kubernetes_namespace]
action: replace action: replace
target_label: kubernetes_namespace target_label: namespace
- source_labels: [__meta_kubernetes_pod_name] - source_labels: [__meta_kubernetes_pod_name]
action: replace action: replace
target_label: kubernetes_pod_name target_label: pod

View file

@ -391,7 +391,7 @@
and and
( (
count by (%(prometheusHAGroupLabels)s) ( count by (%(prometheusHAGroupLabels)s) (
changes(process_start_time_seconds{%(prometheusSelector)s}[30m]) > 1 changes(process_start_time_seconds{%(prometheusSelector)s}[1h]) > 1
) )
/ /
count by (%(prometheusHAGroupLabels)s) ( count by (%(prometheusHAGroupLabels)s) (
@ -418,7 +418,7 @@
}, },
annotations: { annotations: {
summary: 'More than half of the Prometheus instances within the same HA group are crashlooping.', summary: 'More than half of the Prometheus instances within the same HA group are crashlooping.',
description: '{{ $value | humanizePercentage }} of Prometheus instances within the %(prometheusHAGroupName)s HA group have had at least 5 total restarts or 2 unclean restarts in the last 30m.' % $._config, description: '{{ $value | humanizePercentage }} of Prometheus instances within the %(prometheusHAGroupName)s HA group have had at least 5 total restarts in the last 30m or 2 unclean restarts in the last 1h.' % $._config,
}, },
}, },
], ],

View file

@ -312,9 +312,9 @@ local template = grafana.template;
) )
.addTemplate( .addTemplate(
template.new( template.new(
'instance', 'cluster',
'$datasource', '$datasource',
'label_values(prometheus_build_info, instance)' % $._config, 'label_values(kube_pod_container_info{image=~".*prometheus.*"}, cluster)' % $._config,
refresh='time', refresh='time',
current={ current={
selected: true, selected: true,
@ -326,9 +326,9 @@ local template = grafana.template;
) )
.addTemplate( .addTemplate(
template.new( template.new(
'cluster', 'instance',
'$datasource', '$datasource',
'label_values(kube_pod_container_info{image=~".*prometheus.*"}, cluster)' % $._config, 'label_values(prometheus_build_info{cluster=~"$cluster"}, instance)' % $._config,
refresh='time', refresh='time',
current={ current={
selected: true, selected: true,

10
go.mod
View file

@ -9,13 +9,13 @@ require (
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a
github.com/aws/aws-sdk-go v1.42.6 github.com/aws/aws-sdk-go v1.42.10
github.com/cespare/xxhash/v2 v2.1.2 github.com/cespare/xxhash/v2 v2.1.2
github.com/containerd/containerd v1.5.7 // indirect github.com/containerd/containerd v1.5.7 // indirect
github.com/dennwc/varint v1.0.0 github.com/dennwc/varint v1.0.0
github.com/dgryski/go-sip13 v0.0.0-20200911182023-62edffca9245 github.com/dgryski/go-sip13 v0.0.0-20200911182023-62edffca9245
github.com/digitalocean/godo v1.71.0 github.com/digitalocean/godo v1.71.0
github.com/docker/docker v20.10.10+incompatible github.com/docker/docker v20.10.11+incompatible
github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-connections v0.4.0 // indirect
github.com/edsrzf/mmap-go v1.0.0 github.com/edsrzf/mmap-go v1.0.0
github.com/envoyproxy/go-control-plane v0.10.1 github.com/envoyproxy/go-control-plane v0.10.1
@ -71,9 +71,9 @@ require (
gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
k8s.io/api v0.22.3 k8s.io/api v0.22.4
k8s.io/apimachinery v0.22.3 k8s.io/apimachinery v0.22.4
k8s.io/client-go v0.22.3 k8s.io/client-go v0.22.4
k8s.io/klog v1.0.0 k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.20.0 k8s.io/klog/v2 v2.20.0
) )

24
go.sum
View file

@ -187,8 +187,8 @@ github.com/aws/aws-sdk-go v1.30.12/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZve
github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.40.11/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/aws/aws-sdk-go v1.40.11/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go v1.42.6 h1:CiJmv8Fdc7wLZhfWy1ZA9TNoOQrFtUC0mhpgyJTaKOs= github.com/aws/aws-sdk-go v1.42.10 h1:PW9G/hnsuKttbFtOcgNKD0vQrp4yfNrtACA+X0p9mjM=
github.com/aws/aws-sdk-go v1.42.6/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/aws/aws-sdk-go v1.42.10/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/benbjohnson/immutable v0.2.1/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI= github.com/benbjohnson/immutable v0.2.1/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
github.com/benbjohnson/tmpl v1.0.0/go.mod h1:igT620JFIi44B6awvU9IsDhR77IXWtFigTLil/RPdps= github.com/benbjohnson/tmpl v1.0.0/go.mod h1:igT620JFIi44B6awvU9IsDhR77IXWtFigTLil/RPdps=
@ -381,8 +381,8 @@ github.com/docker/distribution v0.0.0-20190905152932-14b96e55d84c/go.mod h1:0+TT
github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug= github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug=
github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v20.10.10+incompatible h1:GKkP0T7U4ks6X3lmmHKC2QDprnpRJor2Z5a8m62R9ZM= github.com/docker/docker v20.10.11+incompatible h1:OqzI/g/W54LczvhnccGqniFoQghHx3pklbLuhfXpqGo=
github.com/docker/docker v20.10.10+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker v20.10.11+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-events v0.0.0-20170721190031-9461782956ad/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA= github.com/docker/go-events v0.0.0-20170721190031-9461782956ad/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA=
@ -2006,14 +2006,14 @@ k8s.io/api v0.17.5/go.mod h1:0zV5/ungglgy2Rlm3QK8fbxkXVs+BSJWpJP/+8gUVLY=
k8s.io/api v0.20.1/go.mod h1:KqwcCVogGxQY3nBlRpwt+wpAMF/KjaCc7RpywacvqUo= k8s.io/api v0.20.1/go.mod h1:KqwcCVogGxQY3nBlRpwt+wpAMF/KjaCc7RpywacvqUo=
k8s.io/api v0.20.4/go.mod h1:++lNL1AJMkDymriNniQsWRkMDzRaX2Y/POTUi8yvqYQ= k8s.io/api v0.20.4/go.mod h1:++lNL1AJMkDymriNniQsWRkMDzRaX2Y/POTUi8yvqYQ=
k8s.io/api v0.20.6/go.mod h1:X9e8Qag6JV/bL5G6bU8sdVRltWKmdHsFUGS3eVndqE8= k8s.io/api v0.20.6/go.mod h1:X9e8Qag6JV/bL5G6bU8sdVRltWKmdHsFUGS3eVndqE8=
k8s.io/api v0.22.3 h1:wOoES2GoSkUsdped2RB4zYypPqWtvprGoKCENTOOjP4= k8s.io/api v0.22.4 h1:UvyHW0ezB2oIgHAxlYoo6UJQObYXU7awuNarwoHEOjw=
k8s.io/api v0.22.3/go.mod h1:azgiXFiXqiWyLCfI62/eYBOu19rj2LKmIhFPP4+33fs= k8s.io/api v0.22.4/go.mod h1:Rgs+9gIGYC5laXQSZZ9JqT5NevNgoGiOdVWi1BAB3qk=
k8s.io/apimachinery v0.17.5/go.mod h1:ioIo1G/a+uONV7Tv+ZmCbMG1/a3kVw5YcDdncd8ugQ0= k8s.io/apimachinery v0.17.5/go.mod h1:ioIo1G/a+uONV7Tv+ZmCbMG1/a3kVw5YcDdncd8ugQ0=
k8s.io/apimachinery v0.20.1/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU= k8s.io/apimachinery v0.20.1/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU=
k8s.io/apimachinery v0.20.4/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU= k8s.io/apimachinery v0.20.4/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU=
k8s.io/apimachinery v0.20.6/go.mod h1:ejZXtW1Ra6V1O5H8xPBGz+T3+4gfkTCeExAHKU57MAc= k8s.io/apimachinery v0.20.6/go.mod h1:ejZXtW1Ra6V1O5H8xPBGz+T3+4gfkTCeExAHKU57MAc=
k8s.io/apimachinery v0.22.3 h1:mrvBG5CZnEfwgpVqWcrRKvdsYECTrhAR6cApAgdsflk= k8s.io/apimachinery v0.22.4 h1:9uwcvPpukBw/Ri0EUmWz+49cnFtaoiyEhQTK+xOe7Ck=
k8s.io/apimachinery v0.22.3/go.mod h1:O3oNtNadZdeOMxHFVxOreoznohCpy0z6mocxbZr7oJ0= k8s.io/apimachinery v0.22.4/go.mod h1:yU6oA6Gnax9RrxGzVvPFFJ+mpnW6PBSqp0sx0I0HHW0=
k8s.io/apiserver v0.20.1/go.mod h1:ro5QHeQkgMS7ZGpvf4tSMx6bBOgPfE+f52KwvXfScaU= k8s.io/apiserver v0.20.1/go.mod h1:ro5QHeQkgMS7ZGpvf4tSMx6bBOgPfE+f52KwvXfScaU=
k8s.io/apiserver v0.20.4/go.mod h1:Mc80thBKOyy7tbvFtB4kJv1kbdD0eIH8k8vianJcbFM= k8s.io/apiserver v0.20.4/go.mod h1:Mc80thBKOyy7tbvFtB4kJv1kbdD0eIH8k8vianJcbFM=
k8s.io/apiserver v0.20.6/go.mod h1:QIJXNt6i6JB+0YQRNcS0hdRHJlMhflFmsBDeSgT1r8Q= k8s.io/apiserver v0.20.6/go.mod h1:QIJXNt6i6JB+0YQRNcS0hdRHJlMhflFmsBDeSgT1r8Q=
@ -2021,8 +2021,8 @@ k8s.io/client-go v0.17.5/go.mod h1:S8uZpBpjJJdEH/fEyxcqg7Rn0P5jH+ilkgBHjriSmNo=
k8s.io/client-go v0.20.1/go.mod h1:/zcHdt1TeWSd5HoUe6elJmHSQ6uLLgp4bIJHVEuy+/Y= k8s.io/client-go v0.20.1/go.mod h1:/zcHdt1TeWSd5HoUe6elJmHSQ6uLLgp4bIJHVEuy+/Y=
k8s.io/client-go v0.20.4/go.mod h1:LiMv25ND1gLUdBeYxBIwKpkSC5IsozMMmOOeSJboP+k= k8s.io/client-go v0.20.4/go.mod h1:LiMv25ND1gLUdBeYxBIwKpkSC5IsozMMmOOeSJboP+k=
k8s.io/client-go v0.20.6/go.mod h1:nNQMnOvEUEsOzRRFIIkdmYOjAZrC8bgq0ExboWSU1I0= k8s.io/client-go v0.20.6/go.mod h1:nNQMnOvEUEsOzRRFIIkdmYOjAZrC8bgq0ExboWSU1I0=
k8s.io/client-go v0.22.3 h1:6onkOSc+YNdwq5zXE0wFXicq64rrym+mXwHu/CPVGO4= k8s.io/client-go v0.22.4 h1:aAQ1Wk+I3bjCNk35YWUqbaueqrIonkfDPJSPDDe8Kfg=
k8s.io/client-go v0.22.3/go.mod h1:ElDjYf8gvZsKDYexmsmnMQ0DYO8W9RwBjfQ1PI53yow= k8s.io/client-go v0.22.4/go.mod h1:Yzw4e5e7h1LNHA4uqnMVrpEpUs1hJOiuBsJKIlRCHDA=
k8s.io/component-base v0.20.1/go.mod h1:guxkoJnNoh8LNrbtiQOlyp2Y2XFCZQmrcg2n/DeYNLk= k8s.io/component-base v0.20.1/go.mod h1:guxkoJnNoh8LNrbtiQOlyp2Y2XFCZQmrcg2n/DeYNLk=
k8s.io/component-base v0.20.4/go.mod h1:t4p9EdiagbVCJKrQ1RsA5/V4rFQNDfRlevJajlGwgjI= k8s.io/component-base v0.20.4/go.mod h1:t4p9EdiagbVCJKrQ1RsA5/V4rFQNDfRlevJajlGwgjI=
k8s.io/component-base v0.20.6/go.mod h1:6f1MPBAeI+mvuts3sIdtpjljHWBQ2cIy38oBIWMYnrM= k8s.io/component-base v0.20.6/go.mod h1:6f1MPBAeI+mvuts3sIdtpjljHWBQ2cIy38oBIWMYnrM=
@ -2034,8 +2034,8 @@ k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/kube-openapi v0.0.0-20200316234421-82d701f24f9d/go.mod h1:F+5wygcW0wmRTnM3cOgIqGivxkwSWIWT5YdsDbeAOaU= k8s.io/kube-openapi v0.0.0-20200316234421-82d701f24f9d/go.mod h1:F+5wygcW0wmRTnM3cOgIqGivxkwSWIWT5YdsDbeAOaU=
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM= k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM=
k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e h1:KLHHjkdQFomZy8+06csTWZ0m1343QqxZhR2LJ1OxCYM= k8s.io/kube-openapi v0.0.0-20211109043538-20434351676c h1:jvamsI1tn9V0S8jicyX82qaFC0H/NKxv2e5mbqsgR80=
k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e/go.mod h1:vHXdDvt9+2spS2Rx9ql3I8tycm3H9FDfdUoIuKCefvw= k8s.io/kube-openapi v0.0.0-20211109043538-20434351676c/go.mod h1:vHXdDvt9+2spS2Rx9ql3I8tycm3H9FDfdUoIuKCefvw=
k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20200414100711-2df71ebbae66/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20200414100711-2df71ebbae66/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=

View file

@ -60,10 +60,19 @@ func TestQueryConcurrency(t *testing.T) {
block := make(chan struct{}) block := make(chan struct{})
processing := make(chan struct{}) processing := make(chan struct{})
done := make(chan int)
defer close(done)
f := func(context.Context) error { f := func(context.Context) error {
processing <- struct{}{} select {
<-block case processing <- struct{}{}:
case <-done:
}
select {
case <-block:
case <-done:
}
return nil return nil
} }
@ -372,7 +381,6 @@ func TestSelectHintsSetCorrectly(t *testing.T) {
{Start: -45000, End: 80000, Func: "count_over_time"}, {Start: -45000, End: 80000, Func: "count_over_time"},
}, },
}, { }, {
query: "foo", start: 10000, end: 20000, query: "foo", start: 10000, end: 20000,
expected: []*storage.SelectHints{ expected: []*storage.SelectHints{
{Start: 5000, End: 20000, Step: 1000}, {Start: 5000, End: 20000, Step: 1000},

View file

@ -97,6 +97,14 @@ func (p Point) String() string {
} }
// MarshalJSON implements json.Marshaler. // MarshalJSON implements json.Marshaler.
//
// JSON marshaling is only needed for the HTTP API. Since Point is such a
// frequently marshaled type, it gets an optimized treatment directly in
// web/api/v1/api.go. Therefore, this method is unused within Prometheus. It is
// still provided here as convenience for debugging and for other users of this
// code. Also note that the different marshaling implementations might lead to
// slightly different results in terms of formatting and rounding of the
// timestamp.
func (p Point) MarshalJSON() ([]byte, error) { func (p Point) MarshalJSON() ([]byte, error) {
// TODO(beorn7): Support histogram. // TODO(beorn7): Support histogram.
v := strconv.FormatFloat(p.V, 'f', -1, 64) v := strconv.FormatFloat(p.V, 'f', -1, 64)

View file

@ -133,17 +133,8 @@ func (b *BufferedSeriesIterator) Next() chunkenc.ValueType {
} }
b.valueType = b.it.Next() b.valueType = b.it.Next()
switch b.valueType { if b.valueType != chunkenc.ValNone {
case chunkenc.ValNone: b.lastTime = b.AtT()
// Do nothing.
case chunkenc.ValFloat:
b.lastTime, _ = b.At()
case chunkenc.ValHistogram:
b.lastTime, _ = b.AtHistogram()
case chunkenc.ValFloatHistogram:
b.lastTime, _ = b.AtFloatHistogram()
default:
panic(fmt.Errorf("BufferedSeriesIterator: unknown value type %v", b.valueType))
} }
return b.valueType return b.valueType
} }
@ -163,6 +154,11 @@ func (b *BufferedSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHist
return b.it.AtFloatHistogram() return b.it.AtFloatHistogram()
} }
// AtT returns the current timestamp of the iterator.
func (b *BufferedSeriesIterator) AtT() int64 {
return b.it.AtT()
}
// Err returns the last encountered error. // Err returns the last encountered error.
func (b *BufferedSeriesIterator) Err() error { func (b *BufferedSeriesIterator) Err() error {
return b.it.Err() return b.it.Err()

View file

@ -142,6 +142,11 @@ func (b *MemoizedSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHist
return b.it.AtFloatHistogram() return b.it.AtFloatHistogram()
} }
// AtT returns the current timestamp of the iterator.
func (b *MemoizedSeriesIterator) AtT() int64 {
return b.it.AtT()
}
// Err returns the last encountered error. // Err returns the last encountered error.
func (b *MemoizedSeriesIterator) Err() error { func (b *MemoizedSeriesIterator) Err() error {
return b.it.Err() return b.it.Err()

View file

@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
) )
var writeRequestFixture = &prompb.WriteRequest{ var writeRequestFixture = &prompb.WriteRequest{
@ -191,6 +192,50 @@ func TestConcreteSeriesClonesLabels(t *testing.T) {
require.Equal(t, lbls, gotLabels) require.Equal(t, lbls, gotLabels)
} }
func TestConcreteSeriesIterator(t *testing.T) {
series := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
samples: []prompb.Sample{
{Value: 1, Timestamp: 1},
{Value: 1.5, Timestamp: 1},
{Value: 2, Timestamp: 2},
{Value: 3, Timestamp: 3},
{Value: 4, Timestamp: 4},
},
}
it := series.Iterator()
// Seek to the first sample with ts=1.
require.Equal(t, chunkenc.ValFloat, it.Seek(1))
ts, v := it.At()
require.Equal(t, int64(1), ts)
require.Equal(t, 1., v)
// Seek one further, next sample still has ts=1.
require.Equal(t, chunkenc.ValFloat, it.Next())
ts, v = it.At()
require.Equal(t, int64(1), ts)
require.Equal(t, 1.5, v)
// Seek again to 1 and make sure we stay where we are.
require.Equal(t, chunkenc.ValFloat, it.Seek(1))
ts, v = it.At()
require.Equal(t, int64(1), ts)
require.Equal(t, 1.5, v)
// Another seek.
require.Equal(t, chunkenc.ValFloat, it.Seek(3))
ts, v = it.At()
require.Equal(t, int64(3), ts)
require.Equal(t, 3., v)
// And we don't go back.
require.Equal(t, chunkenc.ValFloat, it.Seek(2))
ts, v = it.At()
require.Equal(t, int64(3), ts)
require.Equal(t, 3., v)
}
func TestFromQueryResultWithDuplicates(t *testing.T) { func TestFromQueryResultWithDuplicates(t *testing.T) {
ts1 := prompb.TimeSeries{ ts1 := prompb.TimeSeries{
Labels: []prompb.Label{ Labels: []prompb.Label{

View file

@ -670,7 +670,8 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
} }
} }
// Update the segment number held against the series, so we can trim older ones in SeriesReset. // UpdateSeriesSegment updates the segment number held against the series,
// so we can trim older ones in SeriesReset.
func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int) { func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int) {
t.seriesSegmentMtx.Lock() t.seriesSegmentMtx.Lock()
defer t.seriesSegmentMtx.Unlock() defer t.seriesSegmentMtx.Unlock()

View file

@ -75,11 +75,7 @@ func TestSampleDelivery(t *testing.T) {
// batch timeout case. // batch timeout case.
n := 3 n := 3
dir, err := ioutil.TempDir("", "TestSampleDelivery") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
defer s.Close() defer s.Close()
@ -154,9 +150,7 @@ func TestSampleDelivery(t *testing.T) {
func TestMetadataDelivery(t *testing.T) { func TestMetadataDelivery(t *testing.T) {
c := NewTestWriteClient() c := NewTestWriteClient()
dir, err := ioutil.TempDir("", "TestMetadataDelivery") dir := t.TempDir()
require.NoError(t, err)
defer os.RemoveAll(dir)
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
@ -198,11 +192,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
cfg.MaxShards = 1 cfg.MaxShards = 1
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
dir, err := ioutil.TempDir("", "TestSampleDeliveryTimeout") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
@ -241,11 +231,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
c := NewTestWriteClient() c := NewTestWriteClient()
c.expectSamples(samples, series) c.expectSamples(samples, series)
dir, err := ioutil.TempDir("", "TestSampleDeliveryOrder") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
@ -265,11 +251,7 @@ func TestShutdown(t *testing.T) {
deadline := 1 * time.Second deadline := 1 * time.Second
c := NewTestBlockedWriteClient() c := NewTestBlockedWriteClient()
dir, err := ioutil.TempDir("", "TestShutdown") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
@ -308,11 +290,7 @@ func TestSeriesReset(t *testing.T) {
numSegments := 4 numSegments := 4
numSeries := 25 numSeries := 25
dir, err := ioutil.TempDir("", "TestSeriesReset") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
@ -343,11 +321,7 @@ func TestReshard(t *testing.T) {
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1 cfg.MaxShards = 1
dir, err := ioutil.TempDir("", "TestReshard") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
@ -740,9 +714,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
cfg.MaxShards = 1 cfg.MaxShards = 1
dir, err := ioutil.TempDir("", "BenchmarkSampleDelivery") dir := b.TempDir()
require.NoError(b, err)
defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
@ -865,11 +837,7 @@ func TestCalculateDesiredShards(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
dir, err := ioutil.TempDir("", "TestCalculateDesiredShards") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)

View file

@ -15,9 +15,7 @@ package remote
import ( import (
"context" "context"
"io/ioutil"
"net/url" "net/url"
"os"
"sort" "sort"
"testing" "testing"
@ -33,9 +31,7 @@ import (
) )
func TestNoDuplicateReadConfigs(t *testing.T) { func TestNoDuplicateReadConfigs(t *testing.T) {
dir, err := ioutil.TempDir("", "TestNoDuplicateReadConfigs") dir := t.TempDir()
require.NoError(t, err)
defer os.RemoveAll(dir)
cfg1 := config.RemoteReadConfig{ cfg1 := config.RemoteReadConfig{
Name: "write-1", Name: "write-1",

View file

@ -14,9 +14,7 @@
package remote package remote
import ( import (
"io/ioutil"
"net/url" "net/url"
"os"
"testing" "testing"
common_config "github.com/prometheus/common/config" common_config "github.com/prometheus/common/config"
@ -26,9 +24,7 @@ import (
) )
func TestStorageLifecycle(t *testing.T) { func TestStorageLifecycle(t *testing.T) {
dir, err := ioutil.TempDir("", "TestStorageLifecycle") dir := t.TempDir()
require.NoError(t, err)
defer os.RemoveAll(dir)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{ conf := &config.Config{
@ -60,14 +56,12 @@ func TestStorageLifecycle(t *testing.T) {
// make sure remote write has a queue. // make sure remote write has a queue.
require.Equal(t, 1, len(s.queryables)) require.Equal(t, 1, len(s.queryables))
err = s.Close() err := s.Close()
require.NoError(t, err) require.NoError(t, err)
} }
func TestUpdateRemoteReadConfigs(t *testing.T) { func TestUpdateRemoteReadConfigs(t *testing.T) {
dir, err := ioutil.TempDir("", "TestUpdateRemoteReadConfigs") dir := t.TempDir()
require.NoError(t, err)
defer os.RemoveAll(dir)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
@ -83,6 +77,6 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
require.NoError(t, s.ApplyConfig(conf)) require.NoError(t, s.ApplyConfig(conf))
require.Equal(t, 1, len(s.queryables)) require.Equal(t, 1, len(s.queryables))
err = s.Close() err := s.Close()
require.NoError(t, err) require.NoError(t, err)
} }

View file

@ -14,9 +14,7 @@
package remote package remote
import ( import (
"io/ioutil"
"net/url" "net/url"
"os"
"testing" "testing"
"time" "time"
@ -44,11 +42,7 @@ func testRemoteWriteConfig() *config.RemoteWriteConfig {
} }
func TestNoDuplicateWriteConfigs(t *testing.T) { func TestNoDuplicateWriteConfigs(t *testing.T) {
dir, err := ioutil.TempDir("", "TestNoDuplicateWriteConfigs") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
cfg1 := config.RemoteWriteConfig{ cfg1 := config.RemoteWriteConfig{
Name: "write-1", Name: "write-1",
@ -132,11 +126,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) {
} }
func TestRestartOnNameChange(t *testing.T) { func TestRestartOnNameChange(t *testing.T) {
dir, err := ioutil.TempDir("", "TestRestartOnNameChange") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
cfg := testRemoteWriteConfig() cfg := testRemoteWriteConfig()
@ -166,11 +156,7 @@ func TestRestartOnNameChange(t *testing.T) {
} }
func TestUpdateWithRegisterer(t *testing.T) { func TestUpdateWithRegisterer(t *testing.T) {
dir, err := ioutil.TempDir("", "TestRestartWithRegisterer") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil) s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil)
c1 := &config.RemoteWriteConfig{ c1 := &config.RemoteWriteConfig{
@ -205,16 +191,12 @@ func TestUpdateWithRegisterer(t *testing.T) {
require.Equal(t, 10, queue.cfg.MaxShards) require.Equal(t, 10, queue.cfg.MaxShards)
} }
err = s.Close() err := s.Close()
require.NoError(t, err) require.NoError(t, err)
} }
func TestWriteStorageLifecycle(t *testing.T) { func TestWriteStorageLifecycle(t *testing.T) {
dir, err := ioutil.TempDir("", "TestWriteStorageLifecycle") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{ conf := &config.Config{
@ -226,16 +208,12 @@ func TestWriteStorageLifecycle(t *testing.T) {
require.NoError(t, s.ApplyConfig(conf)) require.NoError(t, s.ApplyConfig(conf))
require.Equal(t, 1, len(s.queues)) require.Equal(t, 1, len(s.queues))
err = s.Close() err := s.Close()
require.NoError(t, err) require.NoError(t, err)
} }
func TestUpdateExternalLabels(t *testing.T) { func TestUpdateExternalLabels(t *testing.T) {
dir, err := ioutil.TempDir("", "TestUpdateExternalLabels") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil) s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil)
@ -264,11 +242,7 @@ func TestUpdateExternalLabels(t *testing.T) {
} }
func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
dir, err := ioutil.TempDir("", "TestWriteStorageApplyConfigsIdempotent") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
@ -305,11 +279,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
} }
func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
dir, err := ioutil.TempDir("", "TestWriteStorageApplyConfigsPartialUpdate") dir := t.TempDir()
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
@ -383,7 +353,7 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
secondClient := s.queues[hashes[1]].client() secondClient := s.queues[hashes[1]].client()
// Update c1. // Update c1.
c1.HTTPClientConfig.BearerToken = "bar" c1.HTTPClientConfig.BearerToken = "bar"
err = s.ApplyConfig(conf) err := s.ApplyConfig(conf)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 3, len(s.queues)) require.Equal(t, 3, len(s.queues))

62
storage/series_test.go Normal file
View file

@ -0,0 +1,62 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
func TestListSeriesIterator(t *testing.T) {
it := NewListSeriesIterator(samples{
sample{0, 0, nil, nil},
sample{1, 1, nil, nil},
sample{1, 1.5, nil, nil},
sample{2, 2, nil, nil},
sample{3, 3, nil, nil},
})
// Seek to the first sample with ts=1.
require.Equal(t, chunkenc.ValFloat, it.Seek(1))
ts, v := it.At()
require.Equal(t, int64(1), ts)
require.Equal(t, 1., v)
// Seek one further, next sample still has ts=1.
require.Equal(t, chunkenc.ValFloat, it.Next())
ts, v = it.At()
require.Equal(t, int64(1), ts)
require.Equal(t, 1.5, v)
// Seek again to 1 and make sure we stay where we are.
require.Equal(t, chunkenc.ValFloat, it.Seek(1))
ts, v = it.At()
require.Equal(t, int64(1), ts)
require.Equal(t, 1.5, v)
// Another seek.
require.Equal(t, chunkenc.ValFloat, it.Seek(3))
ts, v = it.At()
require.Equal(t, int64(3), ts)
require.Equal(t, 3., v)
// And we don't go back.
require.Equal(t, chunkenc.ValFloat, it.Seek(2))
ts, v = it.At()
require.Equal(t, int64(3), ts)
require.Equal(t, 3., v)
}

View file

@ -79,6 +79,7 @@ func DefaultOptions() *Options {
WALCompression: false, WALCompression: false,
StripeSize: DefaultStripeSize, StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
} }
} }
@ -143,7 +144,7 @@ type Options struct {
// mainly meant for external users who import TSDB. // mainly meant for external users who import TSDB.
BlocksToDelete BlocksToDeleteFunc BlocksToDelete BlocksToDeleteFunc
// Enables the in memory exemplar storage,. // Enables the in memory exemplar storage.
EnableExemplarStorage bool EnableExemplarStorage bool
// Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster. // Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster.
@ -152,6 +153,9 @@ type Options struct {
// MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory. // MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory.
// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage. // See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
MaxExemplars int64 MaxExemplars int64
// Disables isolation between reads and in-flight appends.
IsolationDisabled bool
} }
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
@ -705,6 +709,10 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage headOpts.EnableExemplarStorage = opts.EnableExemplarStorage
headOpts.MaxExemplars.Store(opts.MaxExemplars) headOpts.MaxExemplars.Store(opts.MaxExemplars)
headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown
if opts.IsolationDisabled {
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
headOpts.IsolationDisabled = opts.IsolationDisabled
}
db.head, err = NewHead(r, l, wlog, headOpts, stats.Head) db.head, err = NewHead(r, l, wlog, headOpts, stats.Head)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -17,6 +17,7 @@ import (
"bufio" "bufio"
"context" "context"
"encoding/binary" "encoding/binary"
"flag"
"fmt" "fmt"
"hash/crc32" "hash/crc32"
"io/ioutil" "io/ioutil"
@ -54,6 +55,11 @@ import (
) )
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
var isolationEnabled bool
flag.BoolVar(&isolationEnabled, "test.tsdb-isolation", true, "enable isolation")
flag.Parse()
defaultIsolationDisabled = !isolationEnabled
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func1"), goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func2")) goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func1"), goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func2"))
} }
@ -1008,6 +1014,92 @@ func TestWALSegmentSizeOptions(t *testing.T) {
} }
} }
// https://github.com/prometheus/prometheus/issues/9846
// https://github.com/prometheus/prometheus/issues/9859
func TestWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T) {
const (
numRuns = 1
numSamplesBeforeSeriesCreation = 1000
)
// We test both with few and many samples appended after series creation. If samples are < 120 then there's no
// mmap-ed chunk, otherwise there's at least 1 mmap-ed chunk when replaying the WAL.
for _, numSamplesAfterSeriesCreation := range []int{1, 1000} {
for run := 1; run <= numRuns; run++ {
t.Run(fmt.Sprintf("samples after series creation = %d, run = %d", numSamplesAfterSeriesCreation, run), func(t *testing.T) {
testWALReplayRaceOnSamplesLoggedBeforeSeries(t, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation)
})
}
}
}
func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation int) {
const numSeries = 1000
db := openTestDB(t, nil, nil)
db.DisableCompactions()
for seriesRef := 1; seriesRef <= numSeries; seriesRef++ {
// Log samples before the series is logged to the WAL.
var enc record.Encoder
var samples []record.RefSample
for ts := 0; ts < numSamplesBeforeSeriesCreation; ts++ {
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(uint64(seriesRef)),
T: int64(ts),
V: float64(ts),
})
}
err := db.Head().wal.Log(enc.Samples(samples, nil))
require.NoError(t, err)
// Add samples via appender so that they're logged after the series in the WAL.
app := db.Appender(context.Background())
lbls := labels.FromStrings("series_id", strconv.Itoa(seriesRef))
for ts := numSamplesBeforeSeriesCreation; ts < numSamplesBeforeSeriesCreation+numSamplesAfterSeriesCreation; ts++ {
_, err := app.Append(0, lbls, int64(ts), float64(ts))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
require.NoError(t, db.Close())
// Reopen the DB, replaying the WAL.
reopenDB, err := Open(db.Dir(), log.NewLogfmtLogger(os.Stderr), nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, reopenDB.Close())
})
// Query back chunks for all series.
q, err := reopenDB.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)
set := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+"))
actualSeries := 0
for set.Next() {
actualSeries++
actualChunks := 0
chunksIt := set.At().Iterator()
for chunksIt.Next() {
actualChunks++
}
require.NoError(t, chunksIt.Err())
// We expect 1 chunk every 120 samples after series creation.
require.Equalf(t, (numSamplesAfterSeriesCreation/120)+1, actualChunks, "series: %s", set.At().Labels().String())
}
require.NoError(t, set.Err())
require.Equal(t, numSeries, actualSeries)
}
func TestTombstoneClean(t *testing.T) { func TestTombstoneClean(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)
@ -2408,6 +2500,10 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
} }
func TestDBCannotSeePartialCommits(t *testing.T) { func TestDBCannotSeePartialCommits(t *testing.T) {
if defaultIsolationDisabled {
t.Skip("skipping test since tsdb isolation is disabled")
}
tmpdir, _ := ioutil.TempDir("", "test") tmpdir, _ := ioutil.TempDir("", "test")
defer func() { defer func() {
require.NoError(t, os.RemoveAll(tmpdir)) require.NoError(t, os.RemoveAll(tmpdir))
@ -2478,6 +2574,10 @@ func TestDBCannotSeePartialCommits(t *testing.T) {
} }
func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
if defaultIsolationDisabled {
t.Skip("skipping test since tsdb isolation is disabled")
}
tmpdir, _ := ioutil.TempDir("", "test") tmpdir, _ := ioutil.TempDir("", "test")
defer func() { defer func() {
require.NoError(t, os.RemoveAll(tmpdir)) require.NoError(t, os.RemoveAll(tmpdir))

View file

@ -53,6 +53,9 @@ var (
// ErrAppenderClosed is returned if an appender has already be successfully // ErrAppenderClosed is returned if an appender has already be successfully
// rolled back or committed. // rolled back or committed.
ErrAppenderClosed = errors.New("appender closed") ErrAppenderClosed = errors.New("appender closed")
// defaultIsolationDisabled is true if isolation is disabled by default.
defaultIsolationDisabled = false
) )
// Head handles reads and writes of time series data within a time window. // Head handles reads and writes of time series data within a time window.
@ -132,6 +135,8 @@ type HeadOptions struct {
SeriesCallback SeriesLifecycleCallback SeriesCallback SeriesLifecycleCallback
EnableExemplarStorage bool EnableExemplarStorage bool
EnableMemorySnapshotOnShutdown bool EnableMemorySnapshotOnShutdown bool
IsolationDisabled bool
} }
func DefaultHeadOptions() *HeadOptions { func DefaultHeadOptions() *HeadOptions {
@ -142,6 +147,7 @@ func DefaultHeadOptions() *HeadOptions {
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
StripeSize: DefaultStripeSize, StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{}, SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled,
} }
} }
@ -231,12 +237,13 @@ func (h *Head) resetInMemoryState() error {
return err return err
} }
h.iso = newIsolation(h.opts.IsolationDisabled)
h.exemplarMetrics = em h.exemplarMetrics = em
h.exemplars = es h.exemplars = es
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback) h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
h.postings = index.NewUnorderedMemPostings() h.postings = index.NewUnorderedMemPostings()
h.tombstones = tombstones.NewMemTombstones() h.tombstones = tombstones.NewMemTombstones()
h.iso = newIsolation()
h.deleted = map[chunks.HeadSeriesRef]int{} h.deleted = map[chunks.HeadSeriesRef]int{}
h.chunkRange.Store(h.opts.ChunkRange) h.chunkRange.Store(h.opts.ChunkRange)
h.minTime.Store(math.MaxInt64) h.minTime.Store(math.MaxInt64)
@ -1240,7 +1247,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool) return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool, h.opts.IsolationDisabled)
}) })
if err != nil { if err != nil {
return nil, false, err return nil, false, err
@ -1535,6 +1542,7 @@ type memSeries struct {
memChunkPool *sync.Pool memChunkPool *sync.Pool
// txs is nil if isolation is disabled.
txs *txRing txs *txRing
// TODO(beorn7): The only reason we track this is to create a staleness // TODO(beorn7): The only reason we track this is to create a staleness
@ -1542,15 +1550,17 @@ type memSeries struct {
isHistogramSeries bool isHistogramSeries bool
} }
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, memChunkPool *sync.Pool) *memSeries { func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange int64, memChunkPool *sync.Pool, isolationDisabled bool) *memSeries {
s := &memSeries{ s := &memSeries{
lset: lset, lset: lset,
ref: id, ref: id,
chunkRange: chunkRange, chunkRange: chunkRange,
nextAt: math.MinInt64, nextAt: math.MinInt64,
txs: newTxRing(4),
memChunkPool: memChunkPool, memChunkPool: memChunkPool,
} }
if !isolationDisabled {
s.txs = newTxRing(4)
}
return s return s
} }
@ -1603,7 +1613,9 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after // cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
// acquiring lock. // acquiring lock.
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
s.txs.cleanupAppendIDsBelow(bound) if s.txs != nil {
s.txs.cleanupAppendIDsBelow(bound)
}
} }
func (s *memSeries) head() *memChunk { func (s *memSeries) head() *memChunk {

View file

@ -362,7 +362,7 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch
numSamples := c.chunk.NumSamples() numSamples := c.chunk.NumSamples()
stopAfter := numSamples stopAfter := numSamples
if isoState != nil { if isoState != nil && !isoState.IsolationDisabled() {
totalSamples := 0 // Total samples in this series. totalSamples := 0 // Total samples in this series.
previousSamples := 0 // Samples before this chunk. previousSamples := 0 // Samples before this chunk.

View file

@ -62,6 +62,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.
opts.ChunkDirRoot = dir opts.ChunkDirRoot = dir
opts.EnableExemplarStorage = true opts.EnableExemplarStorage = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
h, err := NewHead(nil, nil, wlog, opts, nil) h, err := NewHead(nil, nil, wlog, opts, nil)
require.NoError(t, err) require.NoError(t, err)
@ -229,7 +230,7 @@ func BenchmarkLoadWAL(b *testing.B) {
require.NoError(b, err) require.NoError(b, err)
for k := 0; k < c.batches*c.seriesPerBatch; k++ { for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time. // Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, c.mmappedChunkT, nil) s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, c.mmappedChunkT, nil, defaultIsolationDisabled)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper) s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper)
s.mmapCurrentHeadChunk(chunkDiskMapper) s.mmapCurrentHeadChunk(chunkDiskMapper)
} }
@ -553,7 +554,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
}, },
} }
s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool) s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool, defaultIsolationDisabled)
for i := 0; i < 4000; i += 5 { for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
@ -1091,7 +1092,7 @@ func TestMemSeries_append(t *testing.T) {
require.NoError(t, chunkDiskMapper.Close()) require.NoError(t, chunkDiskMapper.Close())
}() }()
s := newMemSeries(labels.Labels{}, 1, 500, nil) s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled)
// Add first two samples at the very end of a chunk range and the next two // Add first two samples at the very end of a chunk range and the next two
// on and after it. // on and after it.
@ -1549,6 +1550,10 @@ func TestAddDuplicateLabelName(t *testing.T) {
} }
func TestMemSeriesIsolation(t *testing.T) { func TestMemSeriesIsolation(t *testing.T) {
if defaultIsolationDisabled {
t.Skip("skipping test since tsdb isolation is disabled")
}
// Put a series, select it. GC it and then access it. // Put a series, select it. GC it and then access it.
lastValue := func(h *Head, maxAppendID uint64) int { lastValue := func(h *Head, maxAppendID uint64) int {
idx, err := h.Index() idx, err := h.Index()
@ -1720,6 +1725,10 @@ func TestMemSeriesIsolation(t *testing.T) {
} }
func TestIsolationRollback(t *testing.T) { func TestIsolationRollback(t *testing.T) {
if defaultIsolationDisabled {
t.Skip("skipping test since tsdb isolation is disabled")
}
// Rollback after a failed append and test if the low watermark has progressed anyway. // Rollback after a failed append and test if the low watermark has progressed anyway.
hb, _ := newTestHead(t, 1000, false) hb, _ := newTestHead(t, 1000, false)
defer func() { defer func() {
@ -1748,6 +1757,10 @@ func TestIsolationRollback(t *testing.T) {
} }
func TestIsolationLowWatermarkMonotonous(t *testing.T) { func TestIsolationLowWatermarkMonotonous(t *testing.T) {
if defaultIsolationDisabled {
t.Skip("skipping test since tsdb isolation is disabled")
}
hb, _ := newTestHead(t, 1000, false) hb, _ := newTestHead(t, 1000, false)
defer func() { defer func() {
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
@ -1781,6 +1794,10 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) {
} }
func TestIsolationAppendIDZeroIsNoop(t *testing.T) { func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
if defaultIsolationDisabled {
t.Skip("skipping test since tsdb isolation is disabled")
}
h, _ := newTestHead(t, 1000, false) h, _ := newTestHead(t, 1000, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
@ -1802,6 +1819,10 @@ func TestHeadSeriesChunkRace(t *testing.T) {
} }
func TestIsolationWithoutAdd(t *testing.T) { func TestIsolationWithoutAdd(t *testing.T) {
if defaultIsolationDisabled {
t.Skip("skipping test since tsdb isolation is disabled")
}
hb, _ := newTestHead(t, 1000, false) hb, _ := newTestHead(t, 1000, false)
defer func() { defer func() {
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
@ -2257,7 +2278,7 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
require.NoError(t, chunkDiskMapper.Close()) require.NoError(t, chunkDiskMapper.Close())
}() }()
s := newMemSeries(labels.Labels{}, 1, 500, nil) s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled)
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)

View file

@ -52,18 +52,15 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
var mmapOverlappingChunks uint64 var mmapOverlappingChunks uint64
// Start workers that each process samples for a partition of the series ID space. // Start workers that each process samples for a partition of the series ID space.
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
n = runtime.GOMAXPROCS(0) n = runtime.GOMAXPROCS(0)
inputs = make([]chan []record.RefSample, n) processors = make([]walSubsetProcessor, n)
outputs = make([]chan []record.RefSample, n) exemplarsInput chan record.RefExemplar
exemplarsInput chan record.RefExemplar
histogramsInput chan record.RefHistogram
dec record.Decoder dec record.Decoder
shards = make([][]record.RefSample, n) shards = make([][]record.RefSample, n)
histogramShards = make([][]record.RefHistogram, n)
decoded = make(chan interface{}, 10) decoded = make(chan interface{}, 10)
decodeErr, seriesCreationErr error decodeErr, seriesCreationErr error
@ -99,26 +96,23 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
_, ok := err.(*wal.CorruptionErr) _, ok := err.(*wal.CorruptionErr)
if ok || seriesCreationErr != nil { if ok || seriesCreationErr != nil {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
close(inputs[i]) processors[i].closeAndDrain()
for range outputs[i] {
}
} }
close(exemplarsInput) close(exemplarsInput)
close(histogramsInput)
wg.Wait() wg.Wait()
} }
}() }()
wg.Add(n) wg.Add(n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
outputs[i] = make(chan []record.RefSample, 300) processors[i].setup()
inputs[i] = make(chan []record.RefSample, 300)
go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { go func(wp *walSubsetProcessor) {
unknown := h.processWALSamples(h.minValidTime.Load(), input, output) unknown, unknownHistograms := wp.processWALSamples(h)
unknownRefs.Add(unknown) unknownRefs.Add(unknown)
unknownHistogramRefs.Add(unknownHistograms)
wg.Done() wg.Done()
}(inputs[i], outputs[i]) }(&processors[i])
} }
wg.Add(1) wg.Add(1)
@ -145,47 +139,6 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
} }
}(exemplarsInput) }(exemplarsInput)
wg.Add(1)
histogramsInput = make(chan record.RefHistogram, 300)
go func(input <-chan record.RefHistogram) {
defer wg.Done()
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for rh := range input {
ms := h.series.getByID(rh.Ref)
if ms == nil {
unknownHistogramRefs.Inc()
continue
}
if ms.head() == nil {
// First histogram for the series. Count this in metrics.
ms.isHistogramSeries = true
}
if rh.T < h.minValidTime.Load() {
continue
}
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
// replaying the WAL, so lets just log the error if it's not that type.
_, chunkCreated := ms.appendHistogram(rh.T, rh.H, 0, h.chunkDiskMapper)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if rh.T > maxt {
maxt = rh.T
}
if rh.T < mint {
mint = rh.T
}
}
h.updateMinMaxTime(mint, maxt)
}(histogramsInput)
go func() { go func() {
defer close(decoded) defer close(decoded)
for r.Next() { for r.Next() {
@ -273,11 +226,20 @@ Outer:
h.lastSeriesID.Store(uint64(walSeries.Ref)) h.lastSeriesID.Store(uint64(walSeries.Ref))
} }
idx := uint64(mSeries.ref) % uint64(n)
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
processors[idx].waitUntilIdle()
// Lock the subset so we can modify the series object
processors[idx].mx.Lock()
mmc := mmappedChunks[walSeries.Ref] mmc := mmappedChunks[walSeries.Ref]
if created { if created {
// This is the first WAL series record for this series. // This is the first WAL series record for this series.
h.setMMappedChunks(mSeries, mmc) h.resetSeriesWithMMappedChunks(mSeries, mmc)
processors[idx].mx.Unlock()
continue continue
} }
@ -287,23 +249,6 @@ Outer:
multiRef[walSeries.Ref] = mSeries.ref multiRef[walSeries.Ref] = mSeries.ref
idx := uint64(mSeries.ref) % uint64(n)
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
select {
case <-outputs[idx]: // allow output side to drain to avoid deadlock
default:
}
inputs[idx] <- []record.RefSample{}
for len(inputs[idx]) != 0 {
time.Sleep(1 * time.Millisecond)
select {
case <-outputs[idx]: // allow output side to drain to avoid deadlock
default:
}
}
// Checking if the new m-mapped chunks overlap with the already existing ones. // Checking if the new m-mapped chunks overlap with the already existing ones.
if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 { if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 {
if overlapsClosedInterval( if overlapsClosedInterval(
@ -327,12 +272,9 @@ Outer:
} }
// Replacing m-mapped chunks with the new ones (could be empty). // Replacing m-mapped chunks with the new ones (could be empty).
h.setMMappedChunks(mSeries, mmc) h.resetSeriesWithMMappedChunks(mSeries, mmc)
// Any samples replayed till now would already be compacted. Resetting the head chunk. processors[idx].mx.Unlock()
mSeries.nextAt = 0
mSeries.headChunk = nil
mSeries.app = nil
} }
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. //nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
seriesPool.Put(v) seriesPool.Put(v)
@ -348,12 +290,7 @@ Outer:
m = len(samples) m = len(samples)
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
var buf []record.RefSample shards[i] = processors[i].reuseBuf()
select {
case buf = <-outputs[i]:
default:
}
shards[i] = buf[:0]
} }
for _, sam := range samples[:m] { for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok { if r, ok := multiRef[sam.Ref]; ok {
@ -363,7 +300,7 @@ Outer:
shards[mod] = append(shards[mod], sam) shards[mod] = append(shards[mod], sam)
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
inputs[i] <- shards[i] processors[i].input <- shards[i]
} }
samples = samples[m:] samples = samples[m:]
} }
@ -391,9 +328,30 @@ Outer:
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. //nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
exemplarsPool.Put(v) exemplarsPool.Put(v)
case []record.RefHistogram: case []record.RefHistogram:
// TODO: split into multiple slices and have multiple workers processing the histograms like we do for samples. samples := v
for _, rh := range v { // We split up the samples into chunks of 5000 samples or less.
histogramsInput <- rh // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
m := 5000
if len(samples) < m {
m = len(samples)
}
for i := 0; i < n; i++ {
histogramShards[i] = processors[i].reuseHistogramBuf()
}
for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
mod := uint64(sam.Ref) % uint64(n)
histogramShards[mod] = append(histogramShards[mod], sam)
}
for i := 0; i < n; i++ {
processors[i].input <- histogramShards[i]
}
samples = samples[m:]
} }
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. //nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
histogramsPool.Put(v) histogramsPool.Put(v)
@ -414,12 +372,9 @@ Outer:
// Signal termination to each worker and wait for it to close its output channel. // Signal termination to each worker and wait for it to close its output channel.
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
close(inputs[i]) processors[i].closeAndDrain()
for range outputs[i] {
}
} }
close(exemplarsInput) close(exemplarsInput)
close(histogramsInput)
wg.Wait() wg.Wait()
if r.Err() != nil { if r.Err() != nil {
@ -435,7 +390,8 @@ Outer:
return nil return nil
} }
func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) { // resetSeriesWithMMappedChunks is only used during the WAL replay.
func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) {
h.metrics.chunksCreated.Add(float64(len(mmc))) h.metrics.chunksCreated.Add(float64(len(mmc)))
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks))) h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks)))
@ -447,48 +403,150 @@ func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) {
mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime) h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
} }
// Any samples replayed till now would already be compacted. Resetting the head chunk.
mSeries.nextAt = 0
mSeries.headChunk = nil
mSeries.app = nil
}
type walSubsetProcessor struct {
mx sync.Mutex // Take this lock while modifying series in the subset.
input chan interface{} // Either []record.RefSample or []record.RefHistogram.
output chan []record.RefSample
histogramsOutput chan []record.RefHistogram
}
func (wp *walSubsetProcessor) setup() {
wp.output = make(chan []record.RefSample, 300)
wp.input = make(chan interface{}, 300)
wp.histogramsOutput = make(chan []record.RefHistogram, 300)
}
func (wp *walSubsetProcessor) closeAndDrain() {
close(wp.input)
for range wp.output {
}
for range wp.histogramsOutput {
}
}
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func (wp *walSubsetProcessor) reuseBuf() []record.RefSample {
select {
case buf := <-wp.output:
return buf[:0]
default:
}
return nil
}
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func (wp *walSubsetProcessor) reuseHistogramBuf() []record.RefHistogram {
select {
case buf := <-wp.histogramsOutput:
return buf[:0]
default:
}
return nil
} }
// processWALSamples adds the samples it receives to the head and passes // processWALSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse. // the buffer received to an output channel for reuse.
// Samples before the minValidTime timestamp are discarded. // Samples before the minValidTime timestamp are discarded.
func (h *Head) processWALSamples( func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs, unknownHistogramRefs uint64) {
minValidTime int64, defer close(wp.output)
input <-chan []record.RefSample, output chan<- []record.RefSample, defer close(wp.histogramsOutput)
) (unknownRefs uint64) {
defer close(output)
minValidTime := h.minValidTime.Load()
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range input { for v := range wp.input {
for _, s := range samples { wp.mx.Lock()
if s.T < minValidTime { switch samples := v.(type) {
continue case []record.RefSample:
for _, s := range samples {
if s.T < minValidTime {
continue
}
ms := h.series.getByID(s.Ref)
if ms == nil {
unknownRefs++
continue
}
ms.isHistogramSeries = false
if s.T <= ms.mmMaxTime {
continue
}
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if s.T > maxt {
maxt = s.T
}
if s.T < mint {
mint = s.T
}
} }
ms := h.series.getByID(s.Ref) wp.output <- samples
if ms == nil { case []record.RefHistogram:
unknownRefs++ for _, s := range samples {
continue if s.T < minValidTime {
} continue
if s.T <= ms.mmMaxTime { }
continue ms := h.series.getByID(s.Ref)
} if ms == nil {
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { unknownHistogramRefs++
h.metrics.chunksCreated.Inc() continue
h.metrics.chunks.Inc() }
} ms.isHistogramSeries = true
if s.T > maxt { if s.T <= ms.mmMaxTime {
maxt = s.T continue
} }
if s.T < mint { if _, chunkCreated := ms.appendHistogram(s.T, s.H, 0, h.chunkDiskMapper); chunkCreated {
mint = s.T h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if s.T > maxt {
maxt = s.T
}
if s.T < mint {
mint = s.T
}
} }
wp.histogramsOutput <- samples
} }
output <- samples
wp.mx.Unlock()
} }
h.updateMinMaxTime(mint, maxt) h.updateMinMaxTime(mint, maxt)
return unknownRefs return unknownRefs, unknownHistogramRefs
}
func (wp *walSubsetProcessor) waitUntilIdle() {
select {
case <-wp.output: // Allow output side to drain to avoid deadlock.
default:
}
select {
case <-wp.histogramsOutput: // Allow output side to drain to avoid deadlock.
default:
}
wp.input <- []record.RefSample{}
for len(wp.input) != 0 {
time.Sleep(1 * time.Millisecond)
select {
case <-wp.output: // Allow output side to drain to avoid deadlock.
default:
}
select {
case <-wp.histogramsOutput: // Allow output side to drain to avoid deadlock.
default:
}
}
} }
const ( const (

View file

@ -18,6 +18,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"fmt"
"hash" "hash"
"hash/crc32" "hash/crc32"
"io" "io"
@ -1782,7 +1783,13 @@ func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
d := encoding.Decbuf{B: b} d := encoding.Decbuf{B: b}
n := d.Be32int() n := d.Be32int()
l := d.Get() l := d.Get()
return n, newBigEndianPostings(l), d.Err() if d.Err() != nil {
return 0, nil, d.Err()
}
if len(l) != 4*n {
return 0, nil, fmt.Errorf("unexpected postings length, should be %d bytes for %d postings, got %d bytes", 4*n, n, len(l))
}
return n, newBigEndianPostings(l), nil
} }
// LabelNamesOffsetsFor decodes the offsets of the name symbols for a given series. // LabelNamesOffsetsFor decodes the offsets of the name symbols for a given series.

View file

@ -560,3 +560,8 @@ func TestSymbols(t *testing.T) {
} }
require.NoError(t, iter.Err()) require.NoError(t, iter.Err())
} }
func TestDecoder_Postings_WrongInput(t *testing.T) {
_, _, err := (&Decoder{}).Postings([]byte("the cake is a lie"))
require.Error(t, err)
}

View file

@ -39,6 +39,10 @@ func (i *isolationState) Close() {
i.prev.next = i.next i.prev.next = i.next
} }
func (i *isolationState) IsolationDisabled() bool {
return i.isolation.disabled
}
type isolationAppender struct { type isolationAppender struct {
appendID uint64 appendID uint64
prev *isolationAppender prev *isolationAppender
@ -63,9 +67,11 @@ type isolation struct {
readMtx sync.RWMutex readMtx sync.RWMutex
// All current in use isolationStates. This is a doubly-linked list. // All current in use isolationStates. This is a doubly-linked list.
readsOpen *isolationState readsOpen *isolationState
// If true, writes are not tracked while reads are still tracked.
disabled bool
} }
func newIsolation() *isolation { func newIsolation(disabled bool) *isolation {
isoState := &isolationState{} isoState := &isolationState{}
isoState.next = isoState isoState.next = isoState
isoState.prev = isoState isoState.prev = isoState
@ -78,6 +84,7 @@ func newIsolation() *isolation {
appendsOpen: map[uint64]*isolationAppender{}, appendsOpen: map[uint64]*isolationAppender{},
appendsOpenList: appender, appendsOpenList: appender,
readsOpen: isoState, readsOpen: isoState,
disabled: disabled,
appendersPool: sync.Pool{New: func() interface{} { return &isolationAppender{} }}, appendersPool: sync.Pool{New: func() interface{} { return &isolationAppender{} }},
} }
} }
@ -85,12 +92,20 @@ func newIsolation() *isolation {
// lowWatermark returns the appendID below which we no longer need to track // lowWatermark returns the appendID below which we no longer need to track
// which appends were from which appendID. // which appends were from which appendID.
func (i *isolation) lowWatermark() uint64 { func (i *isolation) lowWatermark() uint64 {
if i.disabled {
return 0
}
i.appendMtx.RLock() // Take appendMtx first. i.appendMtx.RLock() // Take appendMtx first.
defer i.appendMtx.RUnlock() defer i.appendMtx.RUnlock()
return i.lowWatermarkLocked() return i.lowWatermarkLocked()
} }
func (i *isolation) lowWatermarkLocked() uint64 { func (i *isolation) lowWatermarkLocked() uint64 {
if i.disabled {
return 0
}
i.readMtx.RLock() i.readMtx.RLock()
defer i.readMtx.RUnlock() defer i.readMtx.RUnlock()
if i.readsOpen.prev != i.readsOpen { if i.readsOpen.prev != i.readsOpen {
@ -106,6 +121,8 @@ func (i *isolation) lowWatermarkLocked() uint64 {
func (i *isolation) State(mint, maxt int64) *isolationState { func (i *isolation) State(mint, maxt int64) *isolationState {
i.appendMtx.RLock() // Take append mutex before read mutex. i.appendMtx.RLock() // Take append mutex before read mutex.
defer i.appendMtx.RUnlock() defer i.appendMtx.RUnlock()
// We need to track the reads even when isolation is disabled.
isoState := &isolationState{ isoState := &isolationState{
maxAppendID: i.appendsOpenList.appendID, maxAppendID: i.appendsOpenList.appendID,
lowWatermark: i.appendsOpenList.next.appendID, // Lowest appendID from appenders, or lastAppendId. lowWatermark: i.appendsOpenList.next.appendID, // Lowest appendID from appenders, or lastAppendId.
@ -124,6 +141,7 @@ func (i *isolation) State(mint, maxt int64) *isolationState {
isoState.next = i.readsOpen.next isoState.next = i.readsOpen.next
i.readsOpen.next.prev = isoState i.readsOpen.next.prev = isoState
i.readsOpen.next = isoState i.readsOpen.next = isoState
return isoState return isoState
} }
@ -146,6 +164,10 @@ func (i *isolation) TraverseOpenReads(f func(s *isolationState) bool) {
// ID. The first ID returned is 1. // ID. The first ID returned is 1.
// Also returns the low watermark, to keep lock/unlock operations down. // Also returns the low watermark, to keep lock/unlock operations down.
func (i *isolation) newAppendID() (uint64, uint64) { func (i *isolation) newAppendID() (uint64, uint64) {
if i.disabled {
return 0, 0
}
i.appendMtx.Lock() i.appendMtx.Lock()
defer i.appendMtx.Unlock() defer i.appendMtx.Unlock()
@ -165,6 +187,10 @@ func (i *isolation) newAppendID() (uint64, uint64) {
} }
func (i *isolation) lastAppendID() uint64 { func (i *isolation) lastAppendID() uint64 {
if i.disabled {
return 0
}
i.appendMtx.RLock() i.appendMtx.RLock()
defer i.appendMtx.RUnlock() defer i.appendMtx.RUnlock()
@ -172,6 +198,10 @@ func (i *isolation) lastAppendID() uint64 {
} }
func (i *isolation) closeAppend(appendID uint64) { func (i *isolation) closeAppend(appendID uint64) {
if i.disabled {
return
}
i.appendMtx.Lock() i.appendMtx.Lock()
defer i.appendMtx.Unlock() defer i.appendMtx.Unlock()

View file

@ -23,7 +23,7 @@ import (
func BenchmarkIsolation(b *testing.B) { func BenchmarkIsolation(b *testing.B) {
for _, goroutines := range []int{10, 100, 1000, 10000} { for _, goroutines := range []int{10, 100, 1000, 10000} {
b.Run(strconv.Itoa(goroutines), func(b *testing.B) { b.Run(strconv.Itoa(goroutines), func(b *testing.B) {
iso := newIsolation() iso := newIsolation(false)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
start := make(chan struct{}) start := make(chan struct{})
@ -53,7 +53,7 @@ func BenchmarkIsolation(b *testing.B) {
func BenchmarkIsolationWithState(b *testing.B) { func BenchmarkIsolationWithState(b *testing.B) {
for _, goroutines := range []int{10, 100, 1000, 10000} { for _, goroutines := range []int{10, 100, 1000, 10000} {
b.Run(strconv.Itoa(goroutines), func(b *testing.B) { b.Run(strconv.Itoa(goroutines), func(b *testing.B) {
iso := newIsolation() iso := newIsolation(false)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
start := make(chan struct{}) start := make(chan struct{})

View file

@ -1337,15 +1337,15 @@ type RuleGroup struct {
// In order to preserve rule ordering, while exposing type (alerting or recording) // In order to preserve rule ordering, while exposing type (alerting or recording)
// specific properties, both alerting and recording rules are exposed in the // specific properties, both alerting and recording rules are exposed in the
// same array. // same array.
Rules []rule `json:"rules"` Rules []Rule `json:"rules"`
Interval float64 `json:"interval"` Interval float64 `json:"interval"`
EvaluationTime float64 `json:"evaluationTime"` EvaluationTime float64 `json:"evaluationTime"`
LastEvaluation time.Time `json:"lastEvaluation"` LastEvaluation time.Time `json:"lastEvaluation"`
} }
type rule interface{} type Rule interface{}
type alertingRule struct { type AlertingRule struct {
// State can be "pending", "firing", "inactive". // State can be "pending", "firing", "inactive".
State string `json:"state"` State string `json:"state"`
Name string `json:"name"` Name string `json:"name"`
@ -1362,7 +1362,7 @@ type alertingRule struct {
Type string `json:"type"` Type string `json:"type"`
} }
type recordingRule struct { type RecordingRule struct {
Name string `json:"name"` Name string `json:"name"`
Query string `json:"query"` Query string `json:"query"`
Labels labels.Labels `json:"labels,omitempty"` Labels labels.Labels `json:"labels,omitempty"`
@ -1391,12 +1391,12 @@ func (api *API) rules(r *http.Request) apiFuncResult {
Name: grp.Name(), Name: grp.Name(),
File: grp.File(), File: grp.File(),
Interval: grp.Interval().Seconds(), Interval: grp.Interval().Seconds(),
Rules: []rule{}, Rules: []Rule{},
EvaluationTime: grp.GetEvaluationTime().Seconds(), EvaluationTime: grp.GetEvaluationTime().Seconds(),
LastEvaluation: grp.GetLastEvaluation(), LastEvaluation: grp.GetLastEvaluation(),
} }
for _, r := range grp.Rules() { for _, r := range grp.Rules() {
var enrichedRule rule var enrichedRule Rule
lastError := "" lastError := ""
if r.LastError() != nil { if r.LastError() != nil {
@ -1407,7 +1407,7 @@ func (api *API) rules(r *http.Request) apiFuncResult {
if !returnAlerts { if !returnAlerts {
break break
} }
enrichedRule = alertingRule{ enrichedRule = AlertingRule{
State: rule.State().String(), State: rule.State().String(),
Name: rule.Name(), Name: rule.Name(),
Query: rule.Query().String(), Query: rule.Query().String(),
@ -1425,7 +1425,7 @@ func (api *API) rules(r *http.Request) apiFuncResult {
if !returnRecording { if !returnRecording {
break break
} }
enrichedRule = recordingRule{ enrichedRule = RecordingRule{
Name: rule.Name(), Name: rule.Name(),
Query: rule.Query().String(), Query: rule.Query().String(),
Labels: rule.Labels(), Labels: rule.Labels(),

View file

@ -1472,8 +1472,8 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
Name: "grp", Name: "grp",
File: "/path/to/file", File: "/path/to/file",
Interval: 1, Interval: 1,
Rules: []rule{ Rules: []Rule{
alertingRule{ AlertingRule{
State: "inactive", State: "inactive",
Name: "test_metric3", Name: "test_metric3",
Query: "absent(test_metric3) != 1", Query: "absent(test_metric3) != 1",
@ -1484,7 +1484,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
Health: "unknown", Health: "unknown",
Type: "alerting", Type: "alerting",
}, },
alertingRule{ AlertingRule{
State: "inactive", State: "inactive",
Name: "test_metric4", Name: "test_metric4",
Query: "up == 1", Query: "up == 1",
@ -1495,7 +1495,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
Health: "unknown", Health: "unknown",
Type: "alerting", Type: "alerting",
}, },
recordingRule{ RecordingRule{
Name: "recording-rule-1", Name: "recording-rule-1",
Query: "vector(1)", Query: "vector(1)",
Labels: labels.Labels{}, Labels: labels.Labels{},
@ -1518,8 +1518,8 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
Name: "grp", Name: "grp",
File: "/path/to/file", File: "/path/to/file",
Interval: 1, Interval: 1,
Rules: []rule{ Rules: []Rule{
alertingRule{ AlertingRule{
State: "inactive", State: "inactive",
Name: "test_metric3", Name: "test_metric3",
Query: "absent(test_metric3) != 1", Query: "absent(test_metric3) != 1",
@ -1530,7 +1530,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
Health: "unknown", Health: "unknown",
Type: "alerting", Type: "alerting",
}, },
alertingRule{ AlertingRule{
State: "inactive", State: "inactive",
Name: "test_metric4", Name: "test_metric4",
Query: "up == 1", Query: "up == 1",
@ -1557,8 +1557,8 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
Name: "grp", Name: "grp",
File: "/path/to/file", File: "/path/to/file",
Interval: 1, Interval: 1,
Rules: []rule{ Rules: []Rule{
recordingRule{ RecordingRule{
Name: "recording-rule-1", Name: "recording-rule-1",
Query: "vector(1)", Query: "vector(1)",
Labels: labels.Labels{}, Labels: labels.Labels{},

View file

@ -34,45 +34,45 @@
"lru-cache": "^6.0.0" "lru-cache": "^6.0.0"
}, },
"devDependencies": { "devDependencies": {
"@codemirror/autocomplete": "^0.19.3", "@codemirror/autocomplete": "^0.19.8",
"@codemirror/basic-setup": "^0.19.0", "@codemirror/basic-setup": "^0.19.0",
"@codemirror/highlight": "^0.19.6", "@codemirror/highlight": "^0.19.6",
"@codemirror/language": "^0.19.4", "@codemirror/language": "^0.19.5",
"@codemirror/lint": "^0.19.1", "@codemirror/lint": "^0.19.3",
"@codemirror/state": "^0.19.5", "@codemirror/state": "^0.19.6",
"@codemirror/view": "^0.19.7", "@codemirror/view": "^0.19.20",
"@lezer/common": "^0.15.8", "@lezer/common": "^0.15.8",
"@lezer/generator": "^0.15.2", "@lezer/generator": "^0.15.2",
"@types/chai": "^4.2.22", "@types/chai": "^4.2.22",
"@types/lru-cache": "^5.1.0", "@types/lru-cache": "^5.1.0",
"@types/mocha": "^9.0.0", "@types/mocha": "^9.0.0",
"@types/node": "^16.11.7", "@types/node": "^16.11.9",
"@typescript-eslint/eslint-plugin": "^5.3.1", "@typescript-eslint/eslint-plugin": "^5.3.1",
"@typescript-eslint/parser": "^5.3.1", "@typescript-eslint/parser": "^5.3.1",
"chai": "^4.2.0", "chai": "^4.2.0",
"codecov": "^3.8.1", "codecov": "^3.8.1",
"eslint": "^8.2.0", "eslint": "^8.3.0",
"eslint-config-prettier": "^8.3.0", "eslint-config-prettier": "^8.3.0",
"eslint-plugin-flowtype": "^8.0.3", "eslint-plugin-flowtype": "^8.0.3",
"eslint-plugin-import": "^2.25.3", "eslint-plugin-import": "^2.25.3",
"eslint-plugin-prettier": "^4.0.0", "eslint-plugin-prettier": "^4.0.0",
"isomorphic-fetch": "^3.0.0", "isomorphic-fetch": "^3.0.0",
"mocha": "^9.1.3", "mocha": "^8.4.0",
"nock": "^13.0.11", "nock": "^13.2.1",
"nyc": "^15.1.0", "nyc": "^15.1.0",
"prettier": "^2.4.1", "prettier": "^2.4.1",
"ts-loader": "^7.0.4", "ts-loader": "^7.0.4",
"ts-mocha": "^8.0.0", "ts-mocha": "^8.0.0",
"ts-node": "^9.0.0", "ts-node": "^10.4.0",
"typescript": "^4.2.3" "typescript": "^4.5.2"
}, },
"peerDependencies": { "peerDependencies": {
"@codemirror/autocomplete": "^0.19.3", "@codemirror/autocomplete": "^0.19.8",
"@codemirror/highlight": "^0.19.6", "@codemirror/highlight": "^0.19.6",
"@codemirror/language": "^0.19.4", "@codemirror/language": "^0.19.5",
"@codemirror/lint": "^0.19.1", "@codemirror/lint": "^0.19.3",
"@codemirror/state": "^0.19.5", "@codemirror/state": "^0.19.6",
"@codemirror/view": "^0.19.7", "@codemirror/view": "^0.19.20",
"@lezer/common": "^0.15.8" "@lezer/common": "^0.15.8"
}, },
"prettier": { "prettier": {

7291
web/ui/package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -3,32 +3,32 @@
"version": "0.1.0", "version": "0.1.0",
"private": true, "private": true,
"dependencies": { "dependencies": {
"@codemirror/autocomplete": "^0.19.3", "@codemirror/autocomplete": "^0.19.8",
"@codemirror/closebrackets": "^0.19.0", "@codemirror/closebrackets": "^0.19.0",
"@codemirror/commands": "^0.19.4", "@codemirror/commands": "^0.19.5",
"@codemirror/comment": "^0.19.0", "@codemirror/comment": "^0.19.0",
"@codemirror/highlight": "^0.19.6", "@codemirror/highlight": "^0.19.6",
"@codemirror/history": "^0.19.0", "@codemirror/history": "^0.19.0",
"@codemirror/language": "^0.19.4", "@codemirror/language": "^0.19.5",
"@codemirror/lint": "^0.19.1", "@codemirror/lint": "^0.19.3",
"@codemirror/matchbrackets": "^0.19.1", "@codemirror/matchbrackets": "^0.19.3",
"@codemirror/search": "^0.19.2", "@codemirror/search": "^0.19.3",
"@codemirror/state": "^0.19.5", "@codemirror/state": "^0.19.6",
"@codemirror/view": "^0.19.7", "@codemirror/view": "^0.19.20",
"@forevolve/bootstrap-dark": "^1.0.0", "@forevolve/bootstrap-dark": "^1.0.0",
"@fortawesome/fontawesome-svg-core": "^1.2.14", "@fortawesome/fontawesome-svg-core": "^1.2.14",
"@fortawesome/free-solid-svg-icons": "^5.7.1", "@fortawesome/free-solid-svg-icons": "^5.7.1",
"@fortawesome/react-fontawesome": "^0.1.4", "@fortawesome/react-fontawesome": "^0.1.16",
"@nexucis/fuzzy": "^0.3.0", "@nexucis/fuzzy": "^0.3.0",
"bootstrap": "^5.1.3", "bootstrap": "^4.6.1",
"codemirror-promql": "0.18.0", "codemirror-promql": "0.18.0",
"css.escape": "^1.5.1", "css.escape": "^1.5.1",
"downshift": "^3.4.8", "downshift": "^6.1.7",
"i": "^0.3.7", "i": "^0.3.7",
"jquery": "^3.5.1", "jquery": "^3.5.1",
"jquery.flot.tooltip": "^0.9.0", "jquery.flot.tooltip": "^0.9.0",
"moment": "^2.24.0", "moment": "^2.24.0",
"moment-timezone": "^0.5.23", "moment-timezone": "^0.5.34",
"popper.js": "^1.14.3", "popper.js": "^1.14.3",
"react": "^17.0.2", "react": "^17.0.2",
"react-copy-to-clipboard": "^5.0.4", "react-copy-to-clipboard": "^5.0.4",
@ -37,8 +37,8 @@
"react-router-dom": "^5.2.1", "react-router-dom": "^5.2.1",
"react-test-renderer": "^17.0.2", "react-test-renderer": "^17.0.2",
"reactstrap": "^8.9.0", "reactstrap": "^8.9.0",
"sanitize-html": "^2.3.3", "sanitize-html": "^2.5.3",
"sass": "1.39.0", "sass": "1.43.4",
"tempusdominus-bootstrap-4": "^5.1.2", "tempusdominus-bootstrap-4": "^5.1.2",
"tempusdominus-core": "^5.0.3" "tempusdominus-core": "^5.0.3"
}, },
@ -65,19 +65,19 @@
], ],
"devDependencies": { "devDependencies": {
"@testing-library/react-hooks": "^7.0.1", "@testing-library/react-hooks": "^7.0.1",
"@types/enzyme": "^3.10.9", "@types/enzyme": "^3.10.10",
"@types/flot": "0.0.32", "@types/flot": "0.0.32",
"@types/jest": "^27.0.1", "@types/jest": "^27.0.3",
"@types/jquery": "^3.5.8", "@types/jquery": "^3.5.9",
"@types/node": "^16.11.7", "@types/node": "^16.11.9",
"@types/react": "^17.0.19", "@types/react": "^17.0.36",
"@types/react-copy-to-clipboard": "^5.0.2", "@types/react-copy-to-clipboard": "^5.0.2",
"@types/react-dom": "^17.0.9", "@types/react-dom": "^17.0.11",
"@types/react-resize-detector": "^6.1.0", "@types/react-resize-detector": "^6.1.0",
"@types/react-router-dom": "^5.3.2", "@types/react-router-dom": "^5.3.2",
"@types/sanitize-html": "^2.5.0", "@types/sanitize-html": "^2.5.0",
"@types/sinon": "^10.0.6", "@types/sinon": "^10.0.6",
"@wojtekmaj/enzyme-adapter-react-17": "^0.6.3", "@wojtekmaj/enzyme-adapter-react-17": "^0.6.5",
"enzyme": "^3.11.0", "enzyme": "^3.11.0",
"enzyme-to-json": "^3.6.2", "enzyme-to-json": "^3.6.2",
"eslint-config-prettier": "^8.3.0", "eslint-config-prettier": "^8.3.0",
@ -88,8 +88,8 @@
"mutationobserver-shim": "^0.3.7", "mutationobserver-shim": "^0.3.7",
"prettier": "^2.4.1", "prettier": "^2.4.1",
"react-scripts": "4.0.3", "react-scripts": "4.0.3",
"sinon": "^11.1.2", "sinon": "^12.0.1",
"typescript": "^4.4.2" "typescript": "^4.5.2"
}, },
"proxy": "http://localhost:9090", "proxy": "http://localhost:9090",
"jest": { "jest": {

View file

@ -2,7 +2,7 @@ import { HighlightStyle, tags } from '@codemirror/highlight';
import { EditorView } from '@codemirror/view'; import { EditorView } from '@codemirror/view';
export const baseTheme = EditorView.theme({ export const baseTheme = EditorView.theme({
'&': { '&.cm-editor': {
'&.cm-focused': { '&.cm-focused': {
outline: 'none', outline: 'none',
outline_fallback: 'none', outline_fallback: 'none',

View file

@ -594,7 +594,7 @@ func (h *Handler) Run(ctx context.Context, listener net.Listener, webConfig stri
ReadTimeout: h.options.ReadTimeout, ReadTimeout: h.options.ReadTimeout,
} }
errCh := make(chan error) errCh := make(chan error, 1)
go func() { go func() {
errCh <- toolkit_web.Serve(listener, httpSrv, webConfig, h.logger) errCh <- toolkit_web.Serve(listener, httpSrv, webConfig, h.logger)
}() }()