Merge remote-tracking branch 'prometheus/main' into arve/query-logger-munmap

This commit is contained in:
Arve Knudsen 2024-05-27 17:08:33 +02:00
commit 7b56353090
30 changed files with 1076 additions and 1349 deletions

32
.gitpod.Dockerfile vendored
View file

@ -1,15 +1,33 @@
FROM gitpod/workspace-full FROM gitpod/workspace-full
# Set Node.js version as an environment variable.
ENV CUSTOM_NODE_VERSION=16 ENV CUSTOM_NODE_VERSION=16
ENV CUSTOM_GO_VERSION=1.19
ENV GOPATH=$HOME/go-packages
ENV GOROOT=$HOME/go
ENV PATH=$GOROOT/bin:$GOPATH/bin:$PATH
# Install and use the specified Node.js version via nvm.
RUN bash -c ". .nvm/nvm.sh && nvm install ${CUSTOM_NODE_VERSION} && nvm use ${CUSTOM_NODE_VERSION} && nvm alias default ${CUSTOM_NODE_VERSION}" RUN bash -c ". .nvm/nvm.sh && nvm install ${CUSTOM_NODE_VERSION} && nvm use ${CUSTOM_NODE_VERSION} && nvm alias default ${CUSTOM_NODE_VERSION}"
# Ensure nvm uses the default Node.js version in all new shells.
RUN echo "nvm use default &>/dev/null" >> ~/.bashrc.d/51-nvm-fix RUN echo "nvm use default &>/dev/null" >> ~/.bashrc.d/51-nvm-fix
RUN curl -fsSL https://dl.google.com/go/go${GO_VERSION}.linux-amd64.tar.gz | tar xzs \
&& printf '%s\n' 'export GOPATH=/workspace/go' \
'export PATH=$GOPATH/bin:$PATH' > $HOME/.bashrc.d/300-go
# Remove any existing Go installation in $HOME path.
RUN rm -rf $HOME/go $HOME/go-packages
# Export go environment variables.
RUN echo "export GOPATH=/workspace/go" >> ~/.bashrc.d/300-go && \
echo "export GOBIN=\$GOPATH/bin" >> ~/.bashrc.d/300-go && \
echo "export GOROOT=${HOME}/go" >> ~/.bashrc.d/300-go && \
echo "export PATH=\$GOROOT/bin:\$GOBIN:\$PATH" >> ~/.bashrc
# Reload the environment variables to ensure go environment variables are
# available in subsequent commands.
RUN bash -c "source ~/.bashrc && source ~/.bashrc.d/300-go"
# Fetch the Go version dynamically from the Prometheus go.mod file and Install Go in $HOME path.
RUN export CUSTOM_GO_VERSION=$(curl -sSL "https://raw.githubusercontent.com/prometheus/prometheus/main/go.mod" | awk '/^go/{print $2".0"}') && \
curl -fsSL "https://dl.google.com/go/go${CUSTOM_GO_VERSION}.linux-amd64.tar.gz" | \
tar -xz -C $HOME
# Fetch the goyacc parser version dynamically from the Prometheus Makefile
# and install it globally in $GOBIN path.
RUN GOYACC_VERSION=$(curl -fsSL "https://raw.githubusercontent.com/prometheus/prometheus/main/Makefile" | awk -F'=' '/GOYACC_VERSION \?=/{gsub(/ /, "", $2); print $2}') && \
go install "golang.org/x/tools/cmd/goyacc@${GOYACC_VERSION}"

View file

@ -42,7 +42,12 @@ go build ./cmd/prometheus/
make test # Make sure all the tests pass before you commit and push :) make test # Make sure all the tests pass before you commit and push :)
``` ```
We use [`golangci-lint`](https://github.com/golangci/golangci-lint) for linting the code. If it reports an issue and you think that the warning needs to be disregarded or is a false-positive, you can add a special comment `//nolint:linter1[,linter2,...]` before the offending line. Use this sparingly though, fixing the code to comply with the linter's recommendation is in general the preferred course of action. To run a collection of Go linters through [`golangci-lint`](https://github.com/golangci/golangci-lint), do:
```bash
make lint
```
If it reports an issue and you think that the warning needs to be disregarded or is a false-positive, you can add a special comment `//nolint:linter1[,linter2,...]` before the offending line. Use this sparingly though, fixing the code to comply with the linter's recommendation is in general the preferred course of action. See [this section of the golangci-lint documentation](https://golangci-lint.run/usage/false-positives/#nolint-directive) for more information.
All our issues are regularly tagged so that you can also filter down the issues involving the components you want to work on. For our labeling policy refer [the wiki page](https://github.com/prometheus/prometheus/wiki/Label-Names-and-Descriptions). All our issues are regularly tagged so that you can also filter down the issues involving the components you want to work on. For our labeling policy refer [the wiki page](https://github.com/prometheus/prometheus/wiki/Label-Names-and-Descriptions).

View file

@ -418,7 +418,7 @@ func main() {
serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
Default("1m").SetValue(&cfg.resendDelay) Default("1m").SetValue(&cfg.resendDelay)
serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules that can run concurrently."). serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules that can run concurrently. When set, \"query.max-concurrency\" may need to be adjusted accordingly.").
Default("4").Int64Var(&cfg.maxConcurrentEvals) Default("4").Int64Var(&cfg.maxConcurrentEvals)
a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release."). a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release.").

View file

@ -88,7 +88,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
blockDuration := getCompatibleBlockDuration(maxBlockDuration) blockDuration := getCompatibleBlockDuration(maxBlockDuration)
mint = blockDuration * (mint / blockDuration) mint = blockDuration * (mint / blockDuration)
db, err := tsdb.OpenDBReadOnly(outputDir, nil) db, err := tsdb.OpenDBReadOnly(outputDir, "", nil)
if err != nil { if err != nil {
return err return err
} }

View file

@ -235,12 +235,14 @@ func main() {
tsdbDumpCmd := tsdbCmd.Command("dump", "Dump samples from a TSDB.") tsdbDumpCmd := tsdbCmd.Command("dump", "Dump samples from a TSDB.")
dumpPath := tsdbDumpCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String() dumpPath := tsdbDumpCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
dumpSandboxDirRoot := tsdbDumpCmd.Flag("sandbox-dir-root", "Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end.").Default(defaultDBPath).String()
dumpMinTime := tsdbDumpCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() dumpMinTime := tsdbDumpCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
dumpMatch := tsdbDumpCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings() dumpMatch := tsdbDumpCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
tsdbDumpOpenMetricsCmd := tsdbCmd.Command("dump-openmetrics", "[Experimental] Dump samples from a TSDB into OpenMetrics format. Native histograms are not dumped.") tsdbDumpOpenMetricsCmd := tsdbCmd.Command("dump-openmetrics", "[Experimental] Dump samples from a TSDB into OpenMetrics text format, excluding native histograms and staleness markers, which are not representable in OpenMetrics.")
dumpOpenMetricsPath := tsdbDumpOpenMetricsCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String() dumpOpenMetricsPath := tsdbDumpOpenMetricsCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
dumpOpenMetricsSandboxDirRoot := tsdbDumpOpenMetricsCmd.Flag("sandbox-dir-root", "Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end.").Default(defaultDBPath).String()
dumpOpenMetricsMinTime := tsdbDumpOpenMetricsCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() dumpOpenMetricsMinTime := tsdbDumpOpenMetricsCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpOpenMetricsMaxTime := tsdbDumpOpenMetricsCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() dumpOpenMetricsMaxTime := tsdbDumpOpenMetricsCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
dumpOpenMetricsMatch := tsdbDumpOpenMetricsCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings() dumpOpenMetricsMatch := tsdbDumpOpenMetricsCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
@ -396,9 +398,9 @@ func main() {
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable))) os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
case tsdbDumpCmd.FullCommand(): case tsdbDumpCmd.FullCommand():
os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch, formatSeriesSet))) os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpSandboxDirRoot, *dumpMinTime, *dumpMaxTime, *dumpMatch, formatSeriesSet)))
case tsdbDumpOpenMetricsCmd.FullCommand(): case tsdbDumpOpenMetricsCmd.FullCommand():
os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics))) os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsSandboxDirRoot, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics)))
// TODO(aSquare14): Work on adding support for custom block size. // TODO(aSquare14): Work on adding support for custom block size.
case openMetricsImportCmd.FullCommand(): case openMetricsImportCmd.FullCommand():
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration)) os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration))

View file

@ -338,7 +338,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
} }
func listBlocks(path string, humanReadable bool) error { func listBlocks(path string, humanReadable bool) error {
db, err := tsdb.OpenDBReadOnly(path, nil) db, err := tsdb.OpenDBReadOnly(path, "", nil)
if err != nil { if err != nil {
return err return err
} }
@ -393,7 +393,7 @@ func getFormatedBytes(bytes int64, humanReadable bool) string {
} }
func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) { func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) {
db, err := tsdb.OpenDBReadOnly(path, nil) db, err := tsdb.OpenDBReadOnly(path, "", nil)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -708,8 +708,8 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
type SeriesSetFormatter func(series storage.SeriesSet) error type SeriesSetFormatter func(series storage.SeriesSet) error
func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) { func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) {
db, err := tsdb.OpenDBReadOnly(path, nil) db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil)
if err != nil { if err != nil {
return err return err
} }

View file

@ -64,6 +64,7 @@ func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []strin
err := dumpSamples( err := dumpSamples(
context.Background(), context.Background(),
path, path,
t.TempDir(),
mint, mint,
maxt, maxt,
match, match,

View file

@ -48,7 +48,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--rules.alert.for-outage-tolerance</code> | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` | | <code class="text-nowrap">--rules.alert.for-outage-tolerance</code> | Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` |
| <code class="text-nowrap">--rules.alert.for-grace-period</code> | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` | | <code class="text-nowrap">--rules.alert.for-grace-period</code> | Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` |
| <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | | <code class="text-nowrap">--rules.alert.resend-delay</code> | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
| <code class="text-nowrap">--rules.max-concurrent-evals</code> | Global concurrency limit for independent rules that can run concurrently. Use with server mode only. | `4` | | <code class="text-nowrap">--rules.max-concurrent-evals</code> | Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` |
| <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | | <code class="text-nowrap">--alertmanager.notification-queue-capacity</code> | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
| <code class="text-nowrap">--query.lookback-delta</code> | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | <code class="text-nowrap">--query.lookback-delta</code> | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` |
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |

View file

@ -566,6 +566,7 @@ Dump samples from a TSDB.
| Flag | Description | Default | | Flag | Description | Default |
| --- | --- | --- | | --- | --- | --- |
| <code class="text-nowrap">--sandbox-dir-root</code> | Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` |
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` | | <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` | | <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` | | <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
@ -584,7 +585,7 @@ Dump samples from a TSDB.
##### `promtool tsdb dump-openmetrics` ##### `promtool tsdb dump-openmetrics`
[Experimental] Dump samples from a TSDB into OpenMetrics format. Native histograms are not dumped. [Experimental] Dump samples from a TSDB into OpenMetrics text format, excluding native histograms and staleness markers, which are not representable in OpenMetrics.
@ -592,6 +593,7 @@ Dump samples from a TSDB.
| Flag | Description | Default | | Flag | Description | Default |
| --- | --- | --- | | --- | --- | --- |
| <code class="text-nowrap">--sandbox-dir-root</code> | Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` |
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` | | <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` | | <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` | | <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |

View file

@ -1467,6 +1467,7 @@ For OVHcloud's [public cloud instances](https://www.ovhcloud.com/en/public-cloud
* `__meta_ovhcloud_dedicated_server_ipv6`: the IPv6 of the server * `__meta_ovhcloud_dedicated_server_ipv6`: the IPv6 of the server
* `__meta_ovhcloud_dedicated_server_link_speed`: the link speed of the server * `__meta_ovhcloud_dedicated_server_link_speed`: the link speed of the server
* `__meta_ovhcloud_dedicated_server_name`: the name of the server * `__meta_ovhcloud_dedicated_server_name`: the name of the server
* `__meta_ovhcloud_dedicated_server_no_intervention`: whether datacenter intervention is disabled for the server
* `__meta_ovhcloud_dedicated_server_os`: the operating system of the server * `__meta_ovhcloud_dedicated_server_os`: the operating system of the server
* `__meta_ovhcloud_dedicated_server_rack`: the rack of the server * `__meta_ovhcloud_dedicated_server_rack`: the rack of the server
* `__meta_ovhcloud_dedicated_server_reverse`: the reverse DNS name of the server * `__meta_ovhcloud_dedicated_server_reverse`: the reverse DNS name of the server

View file

@ -197,6 +197,9 @@ or time-series database to Prometheus. To do so, the user must first convert the
source data into [OpenMetrics](https://openmetrics.io/) format, which is the source data into [OpenMetrics](https://openmetrics.io/) format, which is the
input format for the backfilling as described below. input format for the backfilling as described below.
Note that native histograms and staleness markers are not supported by this
procedure, as they cannot be represented in the OpenMetrics format.
### Usage ### Usage
Backfilling can be used via the Promtool command line. Promtool will write the blocks Backfilling can be used via the Promtool command line. Promtool will write the blocks

View file

@ -44,5 +44,10 @@
// The default refresh time for all dashboards, default to 60s // The default refresh time for all dashboards, default to 60s
refresh: '60s', refresh: '60s',
}, },
// Opt-out of multi-cluster dashboards by overriding this.
showMultiCluster: true,
// The cluster label to infer the cluster name from.
clusterLabel: 'cluster',
}, },
} }

View file

@ -10,21 +10,32 @@ local template = grafana.template;
{ {
grafanaDashboards+:: { grafanaDashboards+:: {
'prometheus.json': 'prometheus.json':
g.dashboard( local showMultiCluster = $._config.showMultiCluster;
local dashboard = g.dashboard(
'%(prefix)sOverview' % $._config.grafanaPrometheus '%(prefix)sOverview' % $._config.grafanaPrometheus
) );
.addMultiTemplate('cluster', 'prometheus_build_info{%(prometheusSelector)s}' % $._config, 'cluster') local templatedDashboard = if showMultiCluster then
.addMultiTemplate('job', 'prometheus_build_info{cluster=~"$cluster"}', 'job') dashboard
.addMultiTemplate('instance', 'prometheus_build_info{cluster=~"$cluster", job=~"$job"}', 'instance') .addMultiTemplate('cluster', 'prometheus_build_info{%(prometheusSelector)s}' % $._config, $._config.clusterLabel)
.addMultiTemplate('job', 'prometheus_build_info{cluster=~"$cluster"}', 'job')
.addMultiTemplate('instance', 'prometheus_build_info{cluster=~"$cluster", job=~"$job"}', 'instance')
else
dashboard
.addMultiTemplate('job', 'prometheus_build_info{%(prometheusSelector)s}' % $._config, 'job')
.addMultiTemplate('instance', 'prometheus_build_info{job=~"$job"}', 'instance');
templatedDashboard
.addRow( .addRow(
g.row('Prometheus Stats') g.row('Prometheus Stats')
.addPanel( .addPanel(
g.panel('Prometheus Stats') + g.panel('Prometheus Stats') +
g.tablePanel([ g.tablePanel(if showMultiCluster then [
'count by (cluster, job, instance, version) (prometheus_build_info{cluster=~"$cluster", job=~"$job", instance=~"$instance"})', 'count by (cluster, job, instance, version) (prometheus_build_info{cluster=~"$cluster", job=~"$job", instance=~"$instance"})',
'max by (cluster, job, instance) (time() - process_start_time_seconds{cluster=~"$cluster", job=~"$job", instance=~"$instance"})', 'max by (cluster, job, instance) (time() - process_start_time_seconds{cluster=~"$cluster", job=~"$job", instance=~"$instance"})',
] else [
'count by (job, instance, version) (prometheus_build_info{job=~"$job", instance=~"$instance"})',
'max by (job, instance) (time() - process_start_time_seconds{job=~"$job", instance=~"$instance"})',
], { ], {
cluster: { alias: 'Cluster' }, cluster: { alias: if showMultiCluster then 'Cluster' else '' },
job: { alias: 'Job' }, job: { alias: 'Job' },
instance: { alias: 'Instance' }, instance: { alias: 'Instance' },
version: { alias: 'Version' }, version: { alias: 'Version' },
@ -37,12 +48,18 @@ local template = grafana.template;
g.row('Discovery') g.row('Discovery')
.addPanel( .addPanel(
g.panel('Target Sync') + g.panel('Target Sync') +
g.queryPanel('sum(rate(prometheus_target_sync_length_seconds_sum{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[5m])) by (cluster, job, scrape_job, instance) * 1e3', '{{cluster}}:{{job}}:{{instance}}:{{scrape_job}}') + g.queryPanel(if showMultiCluster then 'sum(rate(prometheus_target_sync_length_seconds_sum{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[5m])) by (cluster, job, scrape_job, instance) * 1e3'
else 'sum(rate(prometheus_target_sync_length_seconds_sum{job=~"$job",instance=~"$instance"}[5m])) by (scrape_job) * 1e3',
if showMultiCluster then '{{cluster}}:{{job}}:{{instance}}:{{scrape_job}}'
else '{{scrape_job}}') +
{ yaxes: g.yaxes('ms') } { yaxes: g.yaxes('ms') }
) )
.addPanel( .addPanel(
g.panel('Targets') + g.panel('Targets') +
g.queryPanel('sum by (cluster, job, instance) (prometheus_sd_discovered_targets{cluster=~"$cluster", job=~"$job",instance=~"$instance"})', '{{cluster}}:{{job}}:{{instance}}') + g.queryPanel(if showMultiCluster then 'sum by (cluster, job, instance) (prometheus_sd_discovered_targets{cluster=~"$cluster", job=~"$job",instance=~"$instance"})'
else 'sum(prometheus_sd_discovered_targets{job=~"$job",instance=~"$instance"})',
if showMultiCluster then '{{cluster}}:{{job}}:{{instance}}'
else 'Targets') +
g.stack g.stack
) )
) )
@ -50,29 +67,47 @@ local template = grafana.template;
g.row('Retrieval') g.row('Retrieval')
.addPanel( .addPanel(
g.panel('Average Scrape Interval Duration') + g.panel('Average Scrape Interval Duration') +
g.queryPanel('rate(prometheus_target_interval_length_seconds_sum{cluster=~"$cluster", job=~"$job",instance=~"$instance"}[5m]) / rate(prometheus_target_interval_length_seconds_count{cluster=~"$cluster", job=~"$job",instance=~"$instance"}[5m]) * 1e3', '{{cluster}}:{{job}}:{{instance}} {{interval}} configured') + g.queryPanel(if showMultiCluster then 'rate(prometheus_target_interval_length_seconds_sum{cluster=~"$cluster", job=~"$job",instance=~"$instance"}[5m]) / rate(prometheus_target_interval_length_seconds_count{cluster=~"$cluster", job=~"$job",instance=~"$instance"}[5m]) * 1e3'
else 'rate(prometheus_target_interval_length_seconds_sum{job=~"$job",instance=~"$instance"}[5m]) / rate(prometheus_target_interval_length_seconds_count{job=~"$job",instance=~"$instance"}[5m]) * 1e3',
if showMultiCluster then '{{cluster}}:{{job}}:{{instance}} {{interval}} configured'
else '{{interval}} configured') +
{ yaxes: g.yaxes('ms') } { yaxes: g.yaxes('ms') }
) )
.addPanel( .addPanel(
g.panel('Scrape failures') + g.panel('Scrape failures') +
g.queryPanel([ g.queryPanel(if showMultiCluster then [
'sum by (cluster, job, instance) (rate(prometheus_target_scrapes_exceeded_body_size_limit_total{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[1m]))', 'sum by (cluster, job, instance) (rate(prometheus_target_scrapes_exceeded_body_size_limit_total{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[1m]))',
'sum by (cluster, job, instance) (rate(prometheus_target_scrapes_exceeded_sample_limit_total{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[1m]))', 'sum by (cluster, job, instance) (rate(prometheus_target_scrapes_exceeded_sample_limit_total{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[1m]))',
'sum by (cluster, job, instance) (rate(prometheus_target_scrapes_sample_duplicate_timestamp_total{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[1m]))', 'sum by (cluster, job, instance) (rate(prometheus_target_scrapes_sample_duplicate_timestamp_total{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[1m]))',
'sum by (cluster, job, instance) (rate(prometheus_target_scrapes_sample_out_of_bounds_total{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[1m]))', 'sum by (cluster, job, instance) (rate(prometheus_target_scrapes_sample_out_of_bounds_total{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[1m]))',
'sum by (cluster, job, instance) (rate(prometheus_target_scrapes_sample_out_of_order_total{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[1m]))', 'sum by (cluster, job, instance) (rate(prometheus_target_scrapes_sample_out_of_order_total{cluster=~"$cluster",job=~"$job",instance=~"$instance"}[1m]))',
], [ ] else [
'sum by (job) (rate(prometheus_target_scrapes_exceeded_body_size_limit_total[1m]))',
'sum by (job) (rate(prometheus_target_scrapes_exceeded_sample_limit_total[1m]))',
'sum by (job) (rate(prometheus_target_scrapes_sample_duplicate_timestamp_total[1m]))',
'sum by (job) (rate(prometheus_target_scrapes_sample_out_of_bounds_total[1m]))',
'sum by (job) (rate(prometheus_target_scrapes_sample_out_of_order_total[1m]))',
], if showMultiCluster then [
'exceeded body size limit: {{cluster}} {{job}} {{instance}}', 'exceeded body size limit: {{cluster}} {{job}} {{instance}}',
'exceeded sample limit: {{cluster}} {{job}} {{instance}}', 'exceeded sample limit: {{cluster}} {{job}} {{instance}}',
'duplicate timestamp: {{cluster}} {{job}} {{instance}}', 'duplicate timestamp: {{cluster}} {{job}} {{instance}}',
'out of bounds: {{cluster}} {{job}} {{instance}}', 'out of bounds: {{cluster}} {{job}} {{instance}}',
'out of order: {{cluster}} {{job}} {{instance}}', 'out of order: {{cluster}} {{job}} {{instance}}',
] else [
'exceeded body size limit: {{job}}',
'exceeded sample limit: {{job}}',
'duplicate timestamp: {{job}}',
'out of bounds: {{job}}',
'out of order: {{job}}',
]) + ]) +
g.stack g.stack
) )
.addPanel( .addPanel(
g.panel('Appended Samples') + g.panel('Appended Samples') +
g.queryPanel('rate(prometheus_tsdb_head_samples_appended_total{cluster=~"$cluster", job=~"$job",instance=~"$instance"}[5m])', '{{cluster}} {{job}} {{instance}}') + g.queryPanel(if showMultiCluster then 'rate(prometheus_tsdb_head_samples_appended_total{cluster=~"$cluster", job=~"$job",instance=~"$instance"}[5m])'
else 'rate(prometheus_tsdb_head_samples_appended_total{job=~"$job",instance=~"$instance"}[5m])',
if showMultiCluster then '{{cluster}} {{job}} {{instance}}'
else '{{job}} {{instance}}') +
g.stack g.stack
) )
) )
@ -80,12 +115,18 @@ local template = grafana.template;
g.row('Storage') g.row('Storage')
.addPanel( .addPanel(
g.panel('Head Series') + g.panel('Head Series') +
g.queryPanel('prometheus_tsdb_head_series{cluster=~"$cluster",job=~"$job",instance=~"$instance"}', '{{cluster}} {{job}} {{instance}} head series') + g.queryPanel(if showMultiCluster then 'prometheus_tsdb_head_series{cluster=~"$cluster",job=~"$job",instance=~"$instance"}'
else 'prometheus_tsdb_head_series{job=~"$job",instance=~"$instance"}',
if showMultiCluster then '{{cluster}} {{job}} {{instance}} head series'
else '{{job}} {{instance}} head series') +
g.stack g.stack
) )
.addPanel( .addPanel(
g.panel('Head Chunks') + g.panel('Head Chunks') +
g.queryPanel('prometheus_tsdb_head_chunks{cluster=~"$cluster",job=~"$job",instance=~"$instance"}', '{{cluster}} {{job}} {{instance}} head chunks') + g.queryPanel(if showMultiCluster then 'prometheus_tsdb_head_chunks{cluster=~"$cluster",job=~"$job",instance=~"$instance"}'
else 'prometheus_tsdb_head_chunks{job=~"$job",instance=~"$instance"}',
if showMultiCluster then '{{cluster}} {{job}} {{instance}} head chunks'
else '{{job}} {{instance}} head chunks') +
g.stack g.stack
) )
) )
@ -93,12 +134,18 @@ local template = grafana.template;
g.row('Query') g.row('Query')
.addPanel( .addPanel(
g.panel('Query Rate') + g.panel('Query Rate') +
g.queryPanel('rate(prometheus_engine_query_duration_seconds_count{cluster=~"$cluster",job=~"$job",instance=~"$instance",slice="inner_eval"}[5m])', '{{cluster}} {{job}} {{instance}}') + g.queryPanel(if showMultiCluster then 'rate(prometheus_engine_query_duration_seconds_count{cluster=~"$cluster",job=~"$job",instance=~"$instance",slice="inner_eval"}[5m])'
else 'rate(prometheus_engine_query_duration_seconds_count{job=~"$job",instance=~"$instance",slice="inner_eval"}[5m])',
if showMultiCluster then '{{cluster}} {{job}} {{instance}}'
else '{{job}} {{instance}}') +
g.stack, g.stack,
) )
.addPanel( .addPanel(
g.panel('Stage Duration') + g.panel('Stage Duration') +
g.queryPanel('max by (slice) (prometheus_engine_query_duration_seconds{quantile="0.9",cluster=~"$cluster", job=~"$job",instance=~"$instance"}) * 1e3', '{{slice}}') + g.queryPanel(if showMultiCluster then 'max by (slice) (prometheus_engine_query_duration_seconds{quantile="0.9",cluster=~"$cluster", job=~"$job",instance=~"$instance"}) * 1e3'
else 'max by (slice) (prometheus_engine_query_duration_seconds{quantile="0.9",job=~"$job",instance=~"$instance"}) * 1e3',
if showMultiCluster then '{{slice}}'
else '{{slice}}') +
{ yaxes: g.yaxes('ms') } + { yaxes: g.yaxes('ms') } +
g.stack, g.stack,
) )

View file

@ -828,7 +828,12 @@ type zeroOrOneCharacterStringMatcher struct {
} }
func (m *zeroOrOneCharacterStringMatcher) Matches(s string) bool { func (m *zeroOrOneCharacterStringMatcher) Matches(s string) bool {
if moreThanOneRune(s) { // If there's more than one rune in the string, then it can't match.
if r, size := utf8.DecodeRuneInString(s); r == utf8.RuneError {
// Size is 0 for empty strings, 1 for invalid rune.
// Empty string matches, invalid rune matches if there isn't anything else.
return size == len(s)
} else if size < len(s) {
return false return false
} }
@ -840,27 +845,6 @@ func (m *zeroOrOneCharacterStringMatcher) Matches(s string) bool {
return s[0] != '\n' return s[0] != '\n'
} }
// moreThanOneRune returns true if there are more than one runes in the string.
// It doesn't check whether the string is valid UTF-8.
// The return value should be always equal to utf8.RuneCountInString(s) > 1,
// but the function is optimized for the common case where the string prefix is ASCII.
func moreThanOneRune(s string) bool {
// If len(s) is exactly one or zero, there can't be more than one rune.
// Exit through this path quickly.
if len(s) <= 1 {
return false
}
// There's one or more bytes:
// If first byte is ASCII then there are multiple runes if there are more bytes after that.
if s[0] < utf8.RuneSelf {
return len(s) > 1
}
// Less common case: first is a multibyte rune.
return utf8.RuneCountInString(s) > 1
}
// trueMatcher is a stringMatcher which matches any string (always returns true). // trueMatcher is a stringMatcher which matches any string (always returns true).
type trueMatcher struct{} type trueMatcher struct{}

View file

@ -19,6 +19,7 @@ import (
"strings" "strings"
"testing" "testing"
"time" "time"
"unicode/utf8"
"github.com/grafana/regexp" "github.com/grafana/regexp"
"github.com/grafana/regexp/syntax" "github.com/grafana/regexp/syntax"
@ -36,6 +37,7 @@ var (
".*foo", ".*foo",
"^.*foo$", "^.*foo$",
"^.+foo$", "^.+foo$",
".?",
".*", ".*",
".+", ".+",
"foo.+", "foo.+",
@ -88,6 +90,12 @@ var (
// Values matching / not matching the test regexps on long alternations. // Values matching / not matching the test regexps on long alternations.
"zQPbMkNO", "zQPbMkNo", "jyyfj00j0061", "jyyfj00j006", "jyyfj00j00612", "NNSPdvMi", "NNSPdvMiXXX", "NNSPdvMixxx", "nnSPdvMi", "nnSPdvMiXXX", "zQPbMkNO", "zQPbMkNo", "jyyfj00j0061", "jyyfj00j006", "jyyfj00j00612", "NNSPdvMi", "NNSPdvMiXXX", "NNSPdvMixxx", "nnSPdvMi", "nnSPdvMiXXX",
// Invalid utf8
"\xfefoo",
"foo\xfe",
"\xfd",
"\xff\xff",
} }
) )
@ -926,19 +934,91 @@ func BenchmarkOptimizeEqualStringMatchers(b *testing.B) {
} }
func TestZeroOrOneCharacterStringMatcher(t *testing.T) { func TestZeroOrOneCharacterStringMatcher(t *testing.T) {
matcher := &zeroOrOneCharacterStringMatcher{matchNL: true} t.Run("match newline", func(t *testing.T) {
require.True(t, matcher.Matches("")) matcher := &zeroOrOneCharacterStringMatcher{matchNL: true}
require.True(t, matcher.Matches("x")) require.True(t, matcher.Matches(""))
require.True(t, matcher.Matches("\n")) require.True(t, matcher.Matches("x"))
require.False(t, matcher.Matches("xx")) require.True(t, matcher.Matches("\n"))
require.False(t, matcher.Matches("\n\n")) require.False(t, matcher.Matches("xx"))
require.False(t, matcher.Matches("\n\n"))
})
matcher = &zeroOrOneCharacterStringMatcher{matchNL: false} t.Run("do not match newline", func(t *testing.T) {
require.True(t, matcher.Matches("")) matcher := &zeroOrOneCharacterStringMatcher{matchNL: false}
require.True(t, matcher.Matches("x")) require.True(t, matcher.Matches(""))
require.False(t, matcher.Matches("\n")) require.True(t, matcher.Matches("x"))
require.False(t, matcher.Matches("xx")) require.False(t, matcher.Matches("\n"))
require.False(t, matcher.Matches("\n\n")) require.False(t, matcher.Matches("xx"))
require.False(t, matcher.Matches("\n\n"))
})
t.Run("unicode", func(t *testing.T) {
// Just for documentation purposes, emoji1 is 1 rune, emoji2 is 2 runes.
// Having this in mind, will make future readers fixing tests easier.
emoji1 := "😀"
emoji2 := "❤️"
require.Equal(t, 1, utf8.RuneCountInString(emoji1))
require.Equal(t, 2, utf8.RuneCountInString(emoji2))
matcher := &zeroOrOneCharacterStringMatcher{matchNL: true}
require.True(t, matcher.Matches(emoji1))
require.False(t, matcher.Matches(emoji2))
require.False(t, matcher.Matches(emoji1+emoji1))
require.False(t, matcher.Matches("x"+emoji1))
require.False(t, matcher.Matches(emoji1+"x"))
require.False(t, matcher.Matches(emoji1+emoji2))
})
t.Run("invalid unicode", func(t *testing.T) {
// Just for reference, we also compare to what `^.?$` regular expression matches.
re := regexp.MustCompile("^.?$")
matcher := &zeroOrOneCharacterStringMatcher{matchNL: true}
requireMatches := func(s string, expected bool) {
t.Helper()
require.Equal(t, expected, matcher.Matches(s))
require.Equal(t, re.MatchString(s), matcher.Matches(s))
}
requireMatches("\xff", true)
requireMatches("x\xff", false)
requireMatches("\xffx", false)
requireMatches("\xff\xfe", false)
})
}
func BenchmarkZeroOrOneCharacterStringMatcher(b *testing.B) {
type benchCase struct {
str string
matches bool
}
emoji1 := "😀"
emoji2 := "❤️"
cases := []benchCase{
{"", true},
{"x", true},
{"\n", true},
{"xx", false},
{"\n\n", false},
{emoji1, true},
{emoji2, false},
{emoji1 + emoji1, false},
{strings.Repeat("x", 100), false},
{strings.Repeat(emoji1, 100), false},
{strings.Repeat(emoji2, 100), false},
}
matcher := &zeroOrOneCharacterStringMatcher{matchNL: true}
b.ResetTimer()
for n := 0; n < b.N; n++ {
c := cases[n%len(cases)]
got := matcher.Matches(c.str)
if got != c.matches {
b.Fatalf("unexpected result for %q: got %t, want %t", c.str, got, c.matches)
}
}
} }
func TestLiteralPrefixStringMatcher(t *testing.T) { func TestLiteralPrefixStringMatcher(t *testing.T) {

File diff suppressed because it is too large Load diff

View file

@ -269,3 +269,448 @@ eval instant at 50m histogram_sum(sum(incr_sum_histogram))
eval instant at 50m histogram_sum(sum(last_over_time(incr_sum_histogram[5m]))) eval instant at 50m histogram_sum(sum(last_over_time(incr_sum_histogram[5m])))
{} 30 {} 30
# Apply rate function to histogram.
load 15s
histogram_rate {{schema:1 count:12 sum:18.4 z_bucket:2 z_bucket_w:0.001 buckets:[1 2 0 1 1] n_buckets:[1 2 0 1 1]}}+{{schema:1 count:9 sum:18.4 z_bucket:1 z_bucket_w:0.001 buckets:[1 1 0 1 1] n_buckets:[1 1 0 1 1]}}x100
eval instant at 5m rate(histogram_rate[45s])
{} {{schema:1 count:0.6 sum:1.2266666666666652 z_bucket:0.06666666666666667 z_bucket_w:0.001 buckets:[0.06666666666666667 0.06666666666666667 0 0.06666666666666667 0.06666666666666667] n_buckets:[0.06666666666666667 0.06666666666666667 0 0.06666666666666667 0.06666666666666667]}}
eval range from 5m to 5m30s step 30s rate(histogram_rate[45s])
{} {{schema:1 count:0.6 sum:1.2266666666666652 z_bucket:0.06666666666666667 z_bucket_w:0.001 buckets:[0.06666666666666667 0.06666666666666667 0 0.06666666666666667 0.06666666666666667] n_buckets:[0.06666666666666667 0.06666666666666667 0 0.06666666666666667 0.06666666666666667]}}x1
# Apply count and sum function to histogram.
load 10m
histogram_count_sum_2 {{schema:0 count:24 sum:100 z_bucket:4 z_bucket_w:0.001 buckets:[2 3 0 1 4] n_buckets:[2 3 0 1 4]}}x1
eval instant at 10m histogram_count(histogram_count_sum_2)
{} 24
eval instant at 10m histogram_sum(histogram_count_sum_2)
{} 100
# Apply stddev and stdvar function to histogram with {1, 2, 3, 4} (low res).
load 10m
histogram_stddev_stdvar_1 {{schema:2 count:4 sum:10 buckets:[1 0 0 0 1 0 0 1 1]}}x1
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_1)
{} 1.0787993180043811
eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_1)
{} 1.163807968526718
# Apply stddev and stdvar function to histogram with {1, 1, 1, 1} (high res).
load 10m
histogram_stddev_stdvar_2 {{schema:8 count:10 sum:10 buckets:[1 2 3 4]}}x1
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_2)
{} 0.0048960313898237465
eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_2)
{} 2.3971123370139447e-05
# Apply stddev and stdvar function to histogram with {-50, -8, 0, 3, 8, 9}.
load 10m
histogram_stddev_stdvar_3 {{schema:3 count:7 sum:62 z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_3)
{} 42.947236400258
eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_3)
{} 1844.4651144196398
# Apply stddev and stdvar function to histogram with {-100000, -10000, -1000, -888, -888, -100, -50, -9, -8, -3}.
load 10m
histogram_stddev_stdvar_4 {{schema:0 count:10 sum:-112946 z_bucket:0 n_buckets:[0 0 1 1 1 0 1 1 0 0 3 0 0 0 1 0 0 1]}}x1
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_4)
{} 27556.344499842
eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_4)
{} 759352122.1939945
# Apply stddev and stdvar function to histogram with {-10x10}.
load 10m
histogram_stddev_stdvar_5 {{schema:0 count:10 sum:-100 z_bucket:0 n_buckets:[0 0 0 0 10]}}x1
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_5)
{} 1.3137084989848
eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_5)
{} 1.725830020304794
# Apply stddev and stdvar function to histogram with {-50, -8, 0, 3, 8, 9, NaN}.
load 10m
histogram_stddev_stdvar_6 {{schema:3 count:7 sum:NaN z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_6)
{} NaN
eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_6)
{} NaN
# Apply stddev and stdvar function to histogram with {-50, -8, 0, 3, 8, 9, Inf}.
load 10m
histogram_stddev_stdvar_7 {{schema:3 count:7 sum:Inf z_bucket:1 buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ] n_buckets:[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 ]}}x1
eval instant at 10m histogram_stddev(histogram_stddev_stdvar_7)
{} NaN
eval instant at 10m histogram_stdvar(histogram_stddev_stdvar_7)
{} NaN
# Apply quantile function to histogram with all positive buckets with zero bucket.
load 10m
histogram_quantile_1 {{schema:0 count:12 sum:100 z_bucket:2 z_bucket_w:0.001 buckets:[2 3 0 1 4]}}x1
eval instant at 10m histogram_quantile(1.001, histogram_quantile_1)
{} Inf
eval instant at 10m histogram_quantile(1, histogram_quantile_1)
{} 16
eval instant at 10m histogram_quantile(0.99, histogram_quantile_1)
{} 15.759999999999998
eval instant at 10m histogram_quantile(0.9, histogram_quantile_1)
{} 13.600000000000001
eval instant at 10m histogram_quantile(0.6, histogram_quantile_1)
{} 4.799999999999997
eval instant at 10m histogram_quantile(0.5, histogram_quantile_1)
{} 1.6666666666666665
eval instant at 10m histogram_quantile(0.1, histogram_quantile_1)
{} 0.0006000000000000001
eval instant at 10m histogram_quantile(0, histogram_quantile_1)
{} 0
eval instant at 10m histogram_quantile(-1, histogram_quantile_1)
{} -Inf
# Apply quantile function to histogram with all negative buckets with zero bucket.
load 10m
histogram_quantile_2 {{schema:0 count:12 sum:100 z_bucket:2 z_bucket_w:0.001 n_buckets:[2 3 0 1 4]}}x1
eval instant at 10m histogram_quantile(1.001, histogram_quantile_2)
{} Inf
eval instant at 10m histogram_quantile(1, histogram_quantile_2)
{} 0
eval instant at 10m histogram_quantile(0.99, histogram_quantile_2)
{} -6.000000000000048e-05
eval instant at 10m histogram_quantile(0.9, histogram_quantile_2)
{} -0.0005999999999999996
eval instant at 10m histogram_quantile(0.5, histogram_quantile_2)
{} -1.6666666666666667
eval instant at 10m histogram_quantile(0.1, histogram_quantile_2)
{} -13.6
eval instant at 10m histogram_quantile(0, histogram_quantile_2)
{} -16
eval instant at 10m histogram_quantile(-1, histogram_quantile_2)
{} -Inf
# Apply quantile function to histogram with both positive and negative buckets with zero bucket.
load 10m
histogram_quantile_3 {{schema:0 count:24 sum:100 z_bucket:4 z_bucket_w:0.001 buckets:[2 3 0 1 4] n_buckets:[2 3 0 1 4]}}x1
eval instant at 10m histogram_quantile(1.001, histogram_quantile_3)
{} Inf
eval instant at 10m histogram_quantile(1, histogram_quantile_3)
{} 16
eval instant at 10m histogram_quantile(0.99, histogram_quantile_3)
{} 15.519999999999996
eval instant at 10m histogram_quantile(0.9, histogram_quantile_3)
{} 11.200000000000003
eval instant at 10m histogram_quantile(0.7, histogram_quantile_3)
{} 1.2666666666666657
eval instant at 10m histogram_quantile(0.55, histogram_quantile_3)
{} 0.0006000000000000005
eval instant at 10m histogram_quantile(0.5, histogram_quantile_3)
{} 0
eval instant at 10m histogram_quantile(0.45, histogram_quantile_3)
{} -0.0005999999999999996
eval instant at 10m histogram_quantile(0.3, histogram_quantile_3)
{} -1.266666666666667
eval instant at 10m histogram_quantile(0.1, histogram_quantile_3)
{} -11.2
eval instant at 10m histogram_quantile(0.01, histogram_quantile_3)
{} -15.52
eval instant at 10m histogram_quantile(0, histogram_quantile_3)
{} -16
eval instant at 10m histogram_quantile(-1, histogram_quantile_3)
{} -Inf
# Apply fraction function to empty histogram.
load 10m
histogram_fraction_1 {{}}x1
eval instant at 10m histogram_fraction(3.1415, 42, histogram_fraction_1)
{} NaN
# Apply fraction function to histogram with positive and zero buckets.
load 10m
histogram_fraction_2 {{schema:0 count:12 sum:100 z_bucket:2 z_bucket_w:0.001 buckets:[2 3 0 1 4]}}x1
eval instant at 10m histogram_fraction(0, +Inf, histogram_fraction_2)
{} 1
eval instant at 10m histogram_fraction(-Inf, 0, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(-0.001, 0, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(0, 0.001, histogram_fraction_2)
{} 0.16666666666666666
eval instant at 10m histogram_fraction(0, 0.0005, histogram_fraction_2)
{} 0.08333333333333333
eval instant at 10m histogram_fraction(0.001, inf, histogram_fraction_2)
{} 0.8333333333333334
eval instant at 10m histogram_fraction(-inf, -0.001, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(1, 2, histogram_fraction_2)
{} 0.25
eval instant at 10m histogram_fraction(1.5, 2, histogram_fraction_2)
{} 0.125
eval instant at 10m histogram_fraction(1, 8, histogram_fraction_2)
{} 0.3333333333333333
eval instant at 10m histogram_fraction(1, 6, histogram_fraction_2)
{} 0.2916666666666667
eval instant at 10m histogram_fraction(1.5, 6, histogram_fraction_2)
{} 0.16666666666666666
eval instant at 10m histogram_fraction(-2, -1, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(-2, -1.5, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(-8, -1, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(-6, -1, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(-6, -1.5, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(42, 3.1415, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(0, 0, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(0.000001, 0.000001, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(42, 42, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(-3.1, -3.1, histogram_fraction_2)
{} 0
eval instant at 10m histogram_fraction(3.1415, NaN, histogram_fraction_2)
{} NaN
eval instant at 10m histogram_fraction(NaN, 42, histogram_fraction_2)
{} NaN
eval instant at 10m histogram_fraction(NaN, NaN, histogram_fraction_2)
{} NaN
eval instant at 10m histogram_fraction(-Inf, +Inf, histogram_fraction_2)
{} 1
# Apply fraction function to histogram with negative and zero buckets.
load 10m
histogram_fraction_3 {{schema:0 count:12 sum:100 z_bucket:2 z_bucket_w:0.001 n_buckets:[2 3 0 1 4]}}x1
eval instant at 10m histogram_fraction(0, +Inf, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(-Inf, 0, histogram_fraction_3)
{} 1
eval instant at 10m histogram_fraction(-0.001, 0, histogram_fraction_3)
{} 0.16666666666666666
eval instant at 10m histogram_fraction(0, 0.001, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(-0.0005, 0, histogram_fraction_3)
{} 0.08333333333333333
eval instant at 10m histogram_fraction(0.001, inf, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(-inf, -0.001, histogram_fraction_3)
{} 0.8333333333333334
eval instant at 10m histogram_fraction(1, 2, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(1.5, 2, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(1, 8, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(1, 6, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(1.5, 6, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(-2, -1, histogram_fraction_3)
{} 0.25
eval instant at 10m histogram_fraction(-2, -1.5, histogram_fraction_3)
{} 0.125
eval instant at 10m histogram_fraction(-8, -1, histogram_fraction_3)
{} 0.3333333333333333
eval instant at 10m histogram_fraction(-6, -1, histogram_fraction_3)
{} 0.2916666666666667
eval instant at 10m histogram_fraction(-6, -1.5, histogram_fraction_3)
{} 0.16666666666666666
eval instant at 10m histogram_fraction(42, 3.1415, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(0, 0, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(0.000001, 0.000001, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(42, 42, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(-3.1, -3.1, histogram_fraction_3)
{} 0
eval instant at 10m histogram_fraction(3.1415, NaN, histogram_fraction_3)
{} NaN
eval instant at 10m histogram_fraction(NaN, 42, histogram_fraction_3)
{} NaN
eval instant at 10m histogram_fraction(NaN, NaN, histogram_fraction_3)
{} NaN
eval instant at 10m histogram_fraction(-Inf, +Inf, histogram_fraction_3)
{} 1
# Apply fraction function to histogram with both positive, negative and zero buckets.
load 10m
histogram_fraction_4 {{schema:0 count:24 sum:100 z_bucket:4 z_bucket_w:0.001 buckets:[2 3 0 1 4] n_buckets:[2 3 0 1 4]}}x1
eval instant at 10m histogram_fraction(0, +Inf, histogram_fraction_4)
{} 0.5
eval instant at 10m histogram_fraction(-Inf, 0, histogram_fraction_4)
{} 0.5
eval instant at 10m histogram_fraction(-0.001, 0, histogram_fraction_4)
{} 0.08333333333333333
eval instant at 10m histogram_fraction(0, 0.001, histogram_fraction_4)
{} 0.08333333333333333
eval instant at 10m histogram_fraction(-0.0005, 0.0005, histogram_fraction_4)
{} 0.08333333333333333
eval instant at 10m histogram_fraction(0.001, inf, histogram_fraction_4)
{} 0.4166666666666667
eval instant at 10m histogram_fraction(-inf, -0.001, histogram_fraction_4)
{} 0.4166666666666667
eval instant at 10m histogram_fraction(1, 2, histogram_fraction_4)
{} 0.125
eval instant at 10m histogram_fraction(1.5, 2, histogram_fraction_4)
{} 0.0625
eval instant at 10m histogram_fraction(1, 8, histogram_fraction_4)
{} 0.16666666666666666
eval instant at 10m histogram_fraction(1, 6, histogram_fraction_4)
{} 0.14583333333333334
eval instant at 10m histogram_fraction(1.5, 6, histogram_fraction_4)
{} 0.08333333333333333
eval instant at 10m histogram_fraction(-2, -1, histogram_fraction_4)
{} 0.125
eval instant at 10m histogram_fraction(-2, -1.5, histogram_fraction_4)
{} 0.0625
eval instant at 10m histogram_fraction(-8, -1, histogram_fraction_4)
{} 0.16666666666666666
eval instant at 10m histogram_fraction(-6, -1, histogram_fraction_4)
{} 0.14583333333333334
eval instant at 10m histogram_fraction(-6, -1.5, histogram_fraction_4)
{} 0.08333333333333333
eval instant at 10m histogram_fraction(42, 3.1415, histogram_fraction_4)
{} 0
eval instant at 10m histogram_fraction(0, 0, histogram_fraction_4)
{} 0
eval instant at 10m histogram_fraction(0.000001, 0.000001, histogram_fraction_4)
{} 0
eval instant at 10m histogram_fraction(42, 42, histogram_fraction_4)
{} 0
eval instant at 10m histogram_fraction(-3.1, -3.1, histogram_fraction_4)
{} 0
eval instant at 10m histogram_fraction(3.1415, NaN, histogram_fraction_4)
{} NaN
eval instant at 10m histogram_fraction(NaN, 42, histogram_fraction_4)
{} NaN
eval instant at 10m histogram_fraction(NaN, NaN, histogram_fraction_4)
{} NaN
eval instant at 10m histogram_fraction(-Inf, +Inf, histogram_fraction_4)
{} 1

View file

@ -2044,7 +2044,7 @@ func TestBoundedRuleEvalConcurrency(t *testing.T) {
require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount)) require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount))
} }
const artificialDelay = 10 * time.Millisecond const artificialDelay = 15 * time.Millisecond
func optsFactory(storage storage.Storage, maxInflight, inflightQueries *atomic.Int32, maxConcurrent int64) *ManagerOptions { func optsFactory(storage storage.Storage, maxInflight, inflightQueries *atomic.Int32, maxConcurrent int64) *ManagerOptions {
var inflightMu sync.Mutex var inflightMu sync.Mutex

View file

@ -75,7 +75,7 @@ type AzureADConfig struct { //nolint:revive // exported.
// OAuth is the oauth config that is being used to authenticate. // OAuth is the oauth config that is being used to authenticate.
OAuth *OAuthConfig `yaml:"oauth,omitempty"` OAuth *OAuthConfig `yaml:"oauth,omitempty"`
// OAuth is the oauth config that is being used to authenticate. // SDK is the SDK config that is being used to authenticate.
SDK *SDKConfig `yaml:"sdk,omitempty"` SDK *SDKConfig `yaml:"sdk,omitempty"`
// Cloud is the Azure cloud in which the service is running. Example: AzurePublic/AzureGovernment/AzureChina. // Cloud is the Azure cloud in which the service is running. Example: AzurePublic/AzureGovernment/AzureChina.

View file

@ -381,6 +381,33 @@ func listChunkFiles(dir string) (map[int]string, error) {
return res, nil return res, nil
} }
// HardLinkChunkFiles creates hardlinks for chunk files from src to dst.
// It does nothing if src doesn't exist and ensures dst is created if not.
func HardLinkChunkFiles(src, dst string) error {
_, err := os.Stat(src)
if os.IsNotExist(err) {
return nil
}
if err != nil {
return fmt.Errorf("check source chunks dir: %w", err)
}
if err := os.MkdirAll(dst, 0o777); err != nil {
return fmt.Errorf("set up destination chunks dir: %w", err)
}
files, err := listChunkFiles(src)
if err != nil {
return fmt.Errorf("list chunks: %w", err)
}
for _, filePath := range files {
_, fileName := filepath.Split(filePath)
err := os.Link(filepath.Join(src, fileName), filepath.Join(dst, fileName))
if err != nil {
return fmt.Errorf("hardlink a chunk: %w", err)
}
}
return nil
}
// repairLastChunkFile deletes the last file if it's empty. // repairLastChunkFile deletes the last file if it's empty.
// Because we don't fsync when creating these files, we could end // Because we don't fsync when creating these files, we could end
// up with an empty file at the end during an abrupt shutdown. // up with an empty file at the end during an abrupt shutdown.

View file

@ -1298,7 +1298,7 @@ func TestCancelCompactions(t *testing.T) {
// This checks that the `context.Canceled` error is properly checked at all levels: // This checks that the `context.Canceled` error is properly checked at all levels:
// - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks. // - tsdb_errors.NewMulti() should have the Is() method implemented for correct checks.
// - callers should check with errors.Is() instead of ==. // - callers should check with errors.Is() instead of ==.
readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, log.NewNopLogger()) readOnlyDB, err := OpenDBReadOnly(tmpdirCopy, "", log.NewNopLogger())
require.NoError(t, err) require.NoError(t, err)
blocks, err := readOnlyDB.Blocks() blocks, err := readOnlyDB.Blocks()
require.NoError(t, err) require.NoError(t, err)

View file

@ -383,26 +383,36 @@ var ErrClosed = errors.New("db already closed")
// Current implementation doesn't support concurrency so // Current implementation doesn't support concurrency so
// all API calls should happen in the same go routine. // all API calls should happen in the same go routine.
type DBReadOnly struct { type DBReadOnly struct {
logger log.Logger logger log.Logger
dir string dir string
closers []io.Closer sandboxDir string
closed chan struct{} closers []io.Closer
closed chan struct{}
} }
// OpenDBReadOnly opens DB in the given directory for read only operations. // OpenDBReadOnly opens DB in the given directory for read only operations.
func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { func OpenDBReadOnly(dir, sandboxDirRoot string, l log.Logger) (*DBReadOnly, error) {
if _, err := os.Stat(dir); err != nil { if _, err := os.Stat(dir); err != nil {
return nil, fmt.Errorf("opening the db dir: %w", err) return nil, fmt.Errorf("opening the db dir: %w", err)
} }
if sandboxDirRoot == "" {
sandboxDirRoot = dir
}
sandboxDir, err := os.MkdirTemp(sandboxDirRoot, "tmp_dbro_sandbox")
if err != nil {
return nil, fmt.Errorf("setting up sandbox dir: %w", err)
}
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
return &DBReadOnly{ return &DBReadOnly{
logger: l, logger: l,
dir: dir, dir: dir,
closed: make(chan struct{}), sandboxDir: sandboxDir,
closed: make(chan struct{}),
}, nil }, nil
} }
@ -491,7 +501,14 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
} }
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkDirRoot = db.dir // Hard link the chunk files to a dir in db.sandboxDir in case the Head needs to truncate some of them
// or cut new ones while replaying the WAL.
// See https://github.com/prometheus/prometheus/issues/11618.
err = chunks.HardLinkChunkFiles(mmappedChunksDir(db.dir), mmappedChunksDir(db.sandboxDir))
if err != nil {
return nil, err
}
opts.ChunkDirRoot = db.sandboxDir
head, err := NewHead(nil, db.logger, nil, nil, opts, NewHeadStats()) head, err := NewHead(nil, db.logger, nil, nil, opts, NewHeadStats())
if err != nil { if err != nil {
return nil, err return nil, err
@ -519,7 +536,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
} }
} }
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkDirRoot = db.dir opts.ChunkDirRoot = db.sandboxDir
head, err = NewHead(nil, db.logger, w, wbl, opts, NewHeadStats()) head, err = NewHead(nil, db.logger, w, wbl, opts, NewHeadStats())
if err != nil { if err != nil {
return nil, err return nil, err
@ -690,8 +707,14 @@ func (db *DBReadOnly) Block(blockID string) (BlockReader, error) {
return block, nil return block, nil
} }
// Close all block readers. // Close all block readers and delete the sandbox dir.
func (db *DBReadOnly) Close() error { func (db *DBReadOnly) Close() error {
defer func() {
// Delete the temporary sandbox directory that was created when opening the DB.
if err := os.RemoveAll(db.sandboxDir); err != nil {
level.Error(db.logger).Log("msg", "delete sandbox dir", "err", err)
}
}()
select { select {
case <-db.closed: case <-db.closed:
return ErrClosed return ErrClosed

View file

@ -25,6 +25,7 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
@ -2494,7 +2495,7 @@ func TestDBReadOnly(t *testing.T) {
} }
// Open a read only db and ensure that the API returns the same result as the normal DB. // Open a read only db and ensure that the API returns the same result as the normal DB.
dbReadOnly, err := OpenDBReadOnly(dbDir, logger) dbReadOnly, err := OpenDBReadOnly(dbDir, "", logger)
require.NoError(t, err) require.NoError(t, err)
defer func() { require.NoError(t, dbReadOnly.Close()) }() defer func() { require.NoError(t, dbReadOnly.Close()) }()
@ -2548,10 +2549,14 @@ func TestDBReadOnly(t *testing.T) {
// TestDBReadOnlyClosing ensures that after closing the db // TestDBReadOnlyClosing ensures that after closing the db
// all api methods return an ErrClosed. // all api methods return an ErrClosed.
func TestDBReadOnlyClosing(t *testing.T) { func TestDBReadOnlyClosing(t *testing.T) {
dbDir := t.TempDir() sandboxDir := t.TempDir()
db, err := OpenDBReadOnly(dbDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))) db, err := OpenDBReadOnly(t.TempDir(), sandboxDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)))
require.NoError(t, err) require.NoError(t, err)
// The sandboxDir was there.
require.DirExists(t, db.sandboxDir)
require.NoError(t, db.Close()) require.NoError(t, db.Close())
// The sandboxDir was deleted when closing.
require.NoDirExists(t, db.sandboxDir)
require.Equal(t, db.Close(), ErrClosed) require.Equal(t, db.Close(), ErrClosed)
_, err = db.Blocks() _, err = db.Blocks()
require.Equal(t, err, ErrClosed) require.Equal(t, err, ErrClosed)
@ -2587,7 +2592,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
} }
// Flush WAL. // Flush WAL.
db, err := OpenDBReadOnly(dbDir, logger) db, err := OpenDBReadOnly(dbDir, "", logger)
require.NoError(t, err) require.NoError(t, err)
flush := t.TempDir() flush := t.TempDir()
@ -2595,7 +2600,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
require.NoError(t, db.Close()) require.NoError(t, db.Close())
// Reopen the DB from the flushed WAL block. // Reopen the DB from the flushed WAL block.
db, err = OpenDBReadOnly(flush, logger) db, err = OpenDBReadOnly(flush, "", logger)
require.NoError(t, err) require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }() defer func() { require.NoError(t, db.Close()) }()
blocks, err := db.Blocks() blocks, err := db.Blocks()
@ -2624,6 +2629,80 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
require.Equal(t, 1000.0, sum) require.Equal(t, 1000.0, sum)
} }
func TestDBReadOnly_Querier_NoAlteration(t *testing.T) {
countChunks := func(dir string) int {
files, err := os.ReadDir(mmappedChunksDir(dir))
require.NoError(t, err)
return len(files)
}
dirHash := func(dir string) (hash []byte) {
// Windows requires the DB to be closed: "xxx\lock: The process cannot access the file because it is being used by another process."
// But closing the DB alters the directory in this case (it'll cut a new chunk).
if runtime.GOOS != "windows" {
hash = testutil.DirHash(t, dir)
}
return
}
spinUpQuerierAndCheck := func(dir, sandboxDir string, chunksCount int) {
dBDirHash := dirHash(dir)
// Bootsrap a RO db from the same dir and set up a querier.
dbReadOnly, err := OpenDBReadOnly(dir, sandboxDir, nil)
require.NoError(t, err)
require.Equal(t, chunksCount, countChunks(dir))
q, err := dbReadOnly.Querier(math.MinInt, math.MaxInt)
require.NoError(t, err)
require.NoError(t, q.Close())
require.NoError(t, dbReadOnly.Close())
// The RO Head doesn't alter RW db chunks_head/.
require.Equal(t, chunksCount, countChunks(dir))
require.Equal(t, dirHash(dir), dBDirHash)
}
t.Run("doesn't cut chunks while replaying WAL", func(t *testing.T) {
db := openTestDB(t, nil, nil)
defer func() {
require.NoError(t, db.Close())
}()
// Append until the first mmaped head chunk.
for i := 0; i < 121; i++ {
app := db.Appender(context.Background())
_, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), 0)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
spinUpQuerierAndCheck(db.dir, t.TempDir(), 0)
// The RW Head should have no problem cutting its own chunk,
// this also proves that a chunk needed to be cut.
require.NotPanics(t, func() { db.ForceHeadMMap() })
require.Equal(t, 1, countChunks(db.dir))
})
t.Run("doesn't truncate corrupted chunks", func(t *testing.T) {
db := openTestDB(t, nil, nil)
require.NoError(t, db.Close())
// Simulate a corrupted chunk: without a header.
_, err := os.Create(path.Join(mmappedChunksDir(db.dir), "000001"))
require.NoError(t, err)
spinUpQuerierAndCheck(db.dir, t.TempDir(), 1)
// The RW Head should have no problem truncating its corrupted file:
// this proves that the chunk needed to be truncated.
db, err = Open(db.dir, nil, nil, nil, nil)
defer func() {
require.NoError(t, db.Close())
}()
require.NoError(t, err)
require.Equal(t, 0, countChunks(db.dir))
})
}
func TestDBCannotSeePartialCommits(t *testing.T) { func TestDBCannotSeePartialCommits(t *testing.T) {
if defaultIsolationDisabled { if defaultIsolationDisabled {
t.Skip("skipping test since tsdb isolation is disabled") t.Skip("skipping test since tsdb isolation is disabled")

View file

@ -310,12 +310,22 @@ func (h *Head) resetInMemoryState() error {
return err return err
} }
if h.series != nil {
// reset the existing series to make sure we call the appropriated hooks
// and increment the series removed metrics
fs := h.series.iterForDeletion(func(_ int, _ uint64, s *memSeries, flushedForCallback map[chunks.HeadSeriesRef]labels.Labels) {
// All series should be flushed
flushedForCallback[s.ref] = s.lset
})
h.metrics.seriesRemoved.Add(float64(fs))
}
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
h.iso = newIsolation(h.opts.IsolationDisabled) h.iso = newIsolation(h.opts.IsolationDisabled)
h.oooIso = newOOOIsolation() h.oooIso = newOOOIsolation()
h.numSeries.Store(0)
h.exemplarMetrics = em h.exemplarMetrics = em
h.exemplars = es h.exemplars = es
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
h.postings = index.NewUnorderedMemPostings() h.postings = index.NewUnorderedMemPostings()
h.tombstones = tombstones.NewMemTombstones() h.tombstones = tombstones.NewMemTombstones()
h.deleted = map[chunks.HeadSeriesRef]int{} h.deleted = map[chunks.HeadSeriesRef]int{}
@ -1861,11 +1871,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series. // minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) { func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) {
var ( var (
deleted = map[storage.SeriesRef]struct{}{} deleted = map[storage.SeriesRef]struct{}{}
rmChunks = 0 rmChunks = 0
actualMint int64 = math.MaxInt64 actualMint int64 = math.MaxInt64
minOOOTime int64 = math.MaxInt64 minOOOTime int64 = math.MaxInt64
deletedFromPrevStripe = 0
) )
minMmapFile = math.MaxInt32 minMmapFile = math.MaxInt32
@ -1923,27 +1932,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
deletedForCallback[series.ref] = series.lset deletedForCallback[series.ref] = series.lset
} }
// Run through all series shard by shard, checking which should be deleted. s.iterForDeletion(check)
for i := 0; i < s.size; i++ {
deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe)
s.locks[i].Lock()
// Delete conflicts first so seriesHashmap.del doesn't move them to the `unique` field,
// after deleting `unique`.
for hash, all := range s.hashes[i].conflicts {
for _, series := range all {
check(i, hash, series, deletedForCallback)
}
}
for hash, series := range s.hashes[i].unique {
check(i, hash, series, deletedForCallback)
}
s.locks[i].Unlock()
s.seriesLifecycleCallback.PostDeletion(deletedForCallback)
deletedFromPrevStripe = len(deletedForCallback)
}
if actualMint == math.MaxInt64 { if actualMint == math.MaxInt64 {
actualMint = mint actualMint = mint
@ -1952,6 +1941,35 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
return deleted, rmChunks, actualMint, minOOOTime, minMmapFile return deleted, rmChunks, actualMint, minOOOTime, minMmapFile
} }
// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.
// The checkDeletedFunc takes a map as input and should add to it all series that were deleted and should be included
// when invoking the PostDeletion hook.
func (s *stripeSeries) iterForDeletion(checkDeletedFunc func(int, uint64, *memSeries, map[chunks.HeadSeriesRef]labels.Labels)) int {
seriesSetFromPrevStripe := 0
totalDeletedSeries := 0
// Run through all series shard by shard
for i := 0; i < s.size; i++ {
seriesSet := make(map[chunks.HeadSeriesRef]labels.Labels, seriesSetFromPrevStripe)
s.locks[i].Lock()
// Iterate conflicts first so f doesn't move them to the `unique` field,
// after deleting `unique`.
for hash, all := range s.hashes[i].conflicts {
for _, series := range all {
checkDeletedFunc(i, hash, series, seriesSet)
}
}
for hash, series := range s.hashes[i].unique {
checkDeletedFunc(i, hash, series, seriesSet)
}
s.locks[i].Unlock()
s.seriesLifecycleCallback.PostDeletion(seriesSet)
totalDeletedSeries += len(seriesSet)
seriesSetFromPrevStripe = len(seriesSet)
}
return totalDeletedSeries
}
func (s *stripeSeries) getByID(id chunks.HeadSeriesRef) *memSeries { func (s *stripeSeries) getByID(id chunks.HeadSeriesRef) *memSeries {
i := uint64(id) & uint64(s.size-1) i := uint64(id) & uint64(s.size-1)

View file

@ -4007,6 +4007,9 @@ func TestSnapshotError(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
f, err := os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0) f, err := os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0)
require.NoError(t, err) require.NoError(t, err)
// Create snapshot backup to be restored on future test cases.
snapshotBackup, err := io.ReadAll(f)
require.NoError(t, err)
_, err = f.WriteAt([]byte{0b11111111}, 18) _, err = f.WriteAt([]byte{0b11111111}, 18)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, f.Close()) require.NoError(t, f.Close())
@ -4021,10 +4024,44 @@ func TestSnapshotError(t *testing.T) {
// There should be no series in the memory after snapshot error since WAL was removed. // There should be no series in the memory after snapshot error since WAL was removed.
require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal))
require.Equal(t, uint64(0), head.NumSeries())
require.Nil(t, head.series.getByHash(lbls.Hash(), lbls)) require.Nil(t, head.series.getByHash(lbls.Hash(), lbls))
tm, err = head.tombstones.Get(1) tm, err = head.tombstones.Get(1)
require.NoError(t, err) require.NoError(t, err)
require.Empty(t, tm) require.Empty(t, tm)
require.NoError(t, head.Close())
// Test corruption in the middle of the snapshot.
f, err = os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0)
require.NoError(t, err)
_, err = f.WriteAt(snapshotBackup, 0)
require.NoError(t, err)
_, err = f.WriteAt([]byte{0b11111111}, 300)
require.NoError(t, err)
require.NoError(t, f.Close())
c := &countSeriesLifecycleCallback{}
opts := head.opts
opts.SeriesCallback = c
w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
require.NoError(t, err)
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
// There should be no series in the memory after snapshot error since WAL was removed.
require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal))
require.Nil(t, head.series.getByHash(lbls.Hash(), lbls))
require.Equal(t, uint64(0), head.NumSeries())
// Since the snapshot could replay certain series, we continue invoking the create hooks.
// In such instances, we need to ensure that we also trigger the delete hooks when resetting the memory.
require.Equal(t, int64(2), c.created.Load())
require.Equal(t, int64(2), c.deleted.Load())
require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.seriesRemoved))
require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.seriesCreated))
} }
func TestHistogramMetrics(t *testing.T) { func TestHistogramMetrics(t *testing.T) {
@ -5829,3 +5866,14 @@ func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) {
require.False(t, head.compactable()) require.False(t, head.compactable())
} }
type countSeriesLifecycleCallback struct {
created atomic.Int64
deleted atomic.Int64
}
func (c *countSeriesLifecycleCallback) PreCreation(labels.Labels) error { return nil }
func (c *countSeriesLifecycleCallback) PostCreation(labels.Labels) { c.created.Inc() }
func (c *countSeriesLifecycleCallback) PostDeletion(s map[chunks.HeadSeriesRef]labels.Labels) {
c.deleted.Add(int64(len(s)))
}

View file

@ -53,7 +53,7 @@ const (
seriesByteAlign = 16 seriesByteAlign = 16
// checkContextEveryNIterations is used in some tight loops to check if the context is done. // checkContextEveryNIterations is used in some tight loops to check if the context is done.
checkContextEveryNIterations = 100 checkContextEveryNIterations = 128
) )
type indexWriterSeries struct { type indexWriterSeries struct {

View file

@ -20,7 +20,9 @@ import (
"hash/crc32" "hash/crc32"
"os" "os"
"path/filepath" "path/filepath"
"slices"
"sort" "sort"
"strconv"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -160,39 +162,14 @@ func TestIndexRW_Create_Open(t *testing.T) {
} }
func TestIndexRW_Postings(t *testing.T) { func TestIndexRW_Postings(t *testing.T) {
dir := t.TempDir()
ctx := context.Background() ctx := context.Background()
var input indexWriterSeriesSlice
fn := filepath.Join(dir, indexFilename) for i := 1; i < 5; i++ {
input = append(input, &indexWriterSeries{
iw, err := NewWriter(context.Background(), fn) labels: labels.FromStrings("a", "1", "b", strconv.Itoa(i)),
require.NoError(t, err) })
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
labels.FromStrings("a", "1", "b", "3"),
labels.FromStrings("a", "1", "b", "4"),
} }
ir, fn, _ := createFileReader(ctx, t, input)
require.NoError(t, iw.AddSymbol("1"))
require.NoError(t, iw.AddSymbol("2"))
require.NoError(t, iw.AddSymbol("3"))
require.NoError(t, iw.AddSymbol("4"))
require.NoError(t, iw.AddSymbol("a"))
require.NoError(t, iw.AddSymbol("b"))
// Postings lists are only written if a series with the respective
// reference was added before.
require.NoError(t, iw.AddSeries(1, series[0]))
require.NoError(t, iw.AddSeries(2, series[1]))
require.NoError(t, iw.AddSeries(3, series[2]))
require.NoError(t, iw.AddSeries(4, series[3]))
require.NoError(t, iw.Close())
ir, err := NewFileReader(fn)
require.NoError(t, err)
p, err := ir.Postings(ctx, "a", "1") p, err := ir.Postings(ctx, "a", "1")
require.NoError(t, err) require.NoError(t, err)
@ -205,7 +182,7 @@ func TestIndexRW_Postings(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Empty(t, c) require.Empty(t, c)
testutil.RequireEqual(t, series[i], builder.Labels()) testutil.RequireEqual(t, input[i].labels, builder.Labels())
} }
require.NoError(t, p.Err()) require.NoError(t, p.Err())
@ -240,8 +217,6 @@ func TestIndexRW_Postings(t *testing.T) {
"b": {"1", "2", "3", "4"}, "b": {"1", "2", "3", "4"},
}, labelIndices) }, labelIndices)
require.NoError(t, ir.Close())
t.Run("ShardedPostings()", func(t *testing.T) { t.Run("ShardedPostings()", func(t *testing.T) {
ir, err := NewFileReader(fn) ir, err := NewFileReader(fn)
require.NoError(t, err) require.NoError(t, err)
@ -296,42 +271,16 @@ func TestIndexRW_Postings(t *testing.T) {
} }
func TestPostingsMany(t *testing.T) { func TestPostingsMany(t *testing.T) {
dir := t.TempDir()
ctx := context.Background() ctx := context.Background()
fn := filepath.Join(dir, indexFilename)
iw, err := NewWriter(context.Background(), fn)
require.NoError(t, err)
// Create a label in the index which has 999 values. // Create a label in the index which has 999 values.
symbols := map[string]struct{}{} var input indexWriterSeriesSlice
series := []labels.Labels{}
for i := 1; i < 1000; i++ { for i := 1; i < 1000; i++ {
v := fmt.Sprintf("%03d", i) v := fmt.Sprintf("%03d", i)
series = append(series, labels.FromStrings("i", v, "foo", "bar")) input = append(input, &indexWriterSeries{
symbols[v] = struct{}{} labels: labels.FromStrings("i", v, "foo", "bar"),
})
} }
symbols["i"] = struct{}{} ir, _, symbols := createFileReader(ctx, t, input)
symbols["foo"] = struct{}{}
symbols["bar"] = struct{}{}
syms := []string{}
for s := range symbols {
syms = append(syms, s)
}
sort.Strings(syms)
for _, s := range syms {
require.NoError(t, iw.AddSymbol(s))
}
for i, s := range series {
require.NoError(t, iw.AddSeries(storage.SeriesRef(i), s))
}
require.NoError(t, iw.Close())
ir, err := NewFileReader(fn)
require.NoError(t, err)
defer func() { require.NoError(t, ir.Close()) }()
cases := []struct { cases := []struct {
in []string in []string
@ -387,25 +336,13 @@ func TestPostingsMany(t *testing.T) {
} }
func TestPersistence_index_e2e(t *testing.T) { func TestPersistence_index_e2e(t *testing.T) {
dir := t.TempDir()
ctx := context.Background() ctx := context.Background()
lbls, err := labels.ReadLabels(filepath.Join("..", "testdata", "20kseries.json"), 20000) lbls, err := labels.ReadLabels(filepath.Join("..", "testdata", "20kseries.json"), 20000)
require.NoError(t, err) require.NoError(t, err)
// Sort labels as the index writer expects series in sorted order. // Sort labels as the index writer expects series in sorted order.
sort.Sort(labels.Slice(lbls)) sort.Sort(labels.Slice(lbls))
symbols := map[string]struct{}{}
for _, lset := range lbls {
lset.Range(func(l labels.Label) {
symbols[l.Name] = struct{}{}
symbols[l.Value] = struct{}{}
})
}
var input indexWriterSeriesSlice var input indexWriterSeriesSlice
ref := uint64(0) ref := uint64(0)
// Generate ChunkMetas for every label set. // Generate ChunkMetas for every label set.
for i, lset := range lbls { for i, lset := range lbls {
@ -426,17 +363,7 @@ func TestPersistence_index_e2e(t *testing.T) {
}) })
} }
iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename)) ir, _, _ := createFileReader(ctx, t, input)
require.NoError(t, err)
syms := []string{}
for s := range symbols {
syms = append(syms, s)
}
sort.Strings(syms)
for _, s := range syms {
require.NoError(t, iw.AddSymbol(s))
}
// Population procedure as done by compaction. // Population procedure as done by compaction.
var ( var (
@ -447,8 +374,6 @@ func TestPersistence_index_e2e(t *testing.T) {
mi := newMockIndex() mi := newMockIndex()
for i, s := range input { for i, s := range input {
err = iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...)
require.NoError(t, err)
require.NoError(t, mi.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...)) require.NoError(t, mi.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...))
s.labels.Range(func(l labels.Label) { s.labels.Range(func(l labels.Label) {
@ -462,12 +387,6 @@ func TestPersistence_index_e2e(t *testing.T) {
postings.Add(storage.SeriesRef(i), s.labels) postings.Add(storage.SeriesRef(i), s.labels)
} }
err = iw.Close()
require.NoError(t, err)
ir, err := NewFileReader(filepath.Join(dir, indexFilename))
require.NoError(t, err)
for p := range mi.postings { for p := range mi.postings {
gotp, err := ir.Postings(ctx, p.Name, p.Value) gotp, err := ir.Postings(ctx, p.Name, p.Value)
require.NoError(t, err) require.NoError(t, err)
@ -523,8 +442,6 @@ func TestPersistence_index_e2e(t *testing.T) {
} }
sort.Strings(expSymbols) sort.Strings(expSymbols)
require.Equal(t, expSymbols, gotSymbols) require.Equal(t, expSymbols, gotSymbols)
require.NoError(t, ir.Close())
} }
func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) {
@ -624,39 +541,14 @@ func BenchmarkReader_ShardedPostings(b *testing.B) {
numShards = 16 numShards = 16
) )
dir, err := os.MkdirTemp("", "benchmark_reader_sharded_postings")
require.NoError(b, err)
defer func() {
require.NoError(b, os.RemoveAll(dir))
}()
ctx := context.Background() ctx := context.Background()
var input indexWriterSeriesSlice
// Generate an index.
fn := filepath.Join(dir, indexFilename)
iw, err := NewWriter(ctx, fn)
require.NoError(b, err)
for i := 1; i <= numSeries; i++ { for i := 1; i <= numSeries; i++ {
require.NoError(b, iw.AddSymbol(fmt.Sprintf("%10d", i))) input = append(input, &indexWriterSeries{
labels: labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i)),
})
} }
require.NoError(b, iw.AddSymbol("const")) ir, _, _ := createFileReader(ctx, b, input)
require.NoError(b, iw.AddSymbol("unique"))
for i := 1; i <= numSeries; i++ {
require.NoError(b, iw.AddSeries(storage.SeriesRef(i),
labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i))))
}
require.NoError(b, iw.Close())
b.ResetTimer()
// Create a reader to read back all postings from the index.
ir, err := NewFileReader(fn)
require.NoError(b, err)
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
@ -721,28 +613,17 @@ func TestChunksTimeOrdering(t *testing.T) {
} }
func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
dir := t.TempDir() const seriesCount = 1000
var input indexWriterSeriesSlice
idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) for i := 1; i < seriesCount; i++ {
require.NoError(t, err) input = append(input, &indexWriterSeries{
labels: labels.FromStrings("__name__", fmt.Sprintf("%4d", i)),
seriesCount := 1000 chunks: []chunks.Meta{
for i := 1; i <= seriesCount; i++ { {Ref: 1, MinTime: 0, MaxTime: 10},
require.NoError(t, idx.AddSymbol(fmt.Sprintf("%4d", i))) },
})
} }
require.NoError(t, idx.AddSymbol("__name__")) ir, _, _ := createFileReader(context.Background(), t, input)
for i := 1; i <= seriesCount; i++ {
require.NoError(t, idx.AddSeries(storage.SeriesRef(i), labels.FromStrings("__name__", fmt.Sprintf("%4d", i)),
chunks.Meta{Ref: 1, MinTime: 0, MaxTime: 10},
))
}
require.NoError(t, idx.Close())
ir, err := NewFileReader(filepath.Join(dir, "index"))
require.NoError(t, err)
defer ir.Close()
failAfter := uint64(seriesCount / 2) // Fail after processing half of the series. failAfter := uint64(seriesCount / 2) // Fail after processing half of the series.
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter} ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
@ -752,3 +633,42 @@ func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
require.Error(t, p.Err()) require.Error(t, p.Err())
require.Equal(t, failAfter, ctx.Count()) require.Equal(t, failAfter, ctx.Count())
} }
// createFileReader creates a temporary index file. It writes the provided input to this file.
// It returns a Reader for this file, the file's name, and the symbol map.
func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSeriesSlice) (*Reader, string, map[string]struct{}) {
tb.Helper()
fn := filepath.Join(tb.TempDir(), indexFilename)
iw, err := NewWriter(ctx, fn)
require.NoError(tb, err)
symbols := map[string]struct{}{}
for _, s := range input {
s.labels.Range(func(l labels.Label) {
symbols[l.Name] = struct{}{}
symbols[l.Value] = struct{}{}
})
}
syms := []string{}
for s := range symbols {
syms = append(syms, s)
}
slices.Sort(syms)
for _, s := range syms {
require.NoError(tb, iw.AddSymbol(s))
}
for i, s := range input {
require.NoError(tb, iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...))
}
require.NoError(tb, iw.Close())
ir, err := NewFileReader(fn)
require.NoError(tb, err)
tb.Cleanup(func() {
require.NoError(tb, ir.Close())
})
return ir, fn, symbols
}

View file

@ -22,8 +22,10 @@ import (
"math/rand" "math/rand"
"sort" "sort"
"strconv" "strconv"
"strings"
"testing" "testing"
"github.com/grafana/regexp"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
@ -1284,6 +1286,58 @@ func BenchmarkListPostings(b *testing.B) {
} }
} }
func slowRegexpString() string {
nums := map[int]struct{}{}
for i := 10_000; i < 20_000; i++ {
if i%3 == 0 {
nums[i] = struct{}{}
}
}
var sb strings.Builder
sb.WriteString(".*(9999")
for i := range nums {
sb.WriteString("|")
sb.WriteString(strconv.Itoa(i))
}
sb.WriteString(").*")
return sb.String()
}
func BenchmarkMemPostings_PostingsForLabelMatching(b *testing.B) {
fast := regexp.MustCompile("^(100|200)$")
slowRegexp := "^" + slowRegexpString() + "$"
b.Logf("Slow regexp length = %d", len(slowRegexp))
slow := regexp.MustCompile(slowRegexp)
for _, labelValueCount := range []int{1_000, 10_000, 100_000} {
b.Run(fmt.Sprintf("labels=%d", labelValueCount), func(b *testing.B) {
mp := NewMemPostings()
for i := 0; i < labelValueCount; i++ {
mp.Add(storage.SeriesRef(i), labels.FromStrings("label", strconv.Itoa(i)))
}
fp, err := ExpandPostings(mp.PostingsForLabelMatching(context.Background(), "label", fast.MatchString))
require.NoError(b, err)
b.Logf("Fast matcher matches %d series", len(fp))
b.Run("matcher=fast", func(b *testing.B) {
for i := 0; i < b.N; i++ {
mp.PostingsForLabelMatching(context.Background(), "label", fast.MatchString).Next()
}
})
sp, err := ExpandPostings(mp.PostingsForLabelMatching(context.Background(), "label", slow.MatchString))
require.NoError(b, err)
b.Logf("Slow matcher matches %d series", len(sp))
b.Run("matcher=slow", func(b *testing.B) {
for i := 0; i < b.N; i++ {
mp.PostingsForLabelMatching(context.Background(), "label", slow.MatchString).Next()
}
})
})
}
}
func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
memP := NewMemPostings() memP := NewMemPostings()
seriesCount := 10 * checkContextEveryNIterations seriesCount := 10 * checkContextEveryNIterations

View file

@ -116,9 +116,11 @@ type RulesRetriever interface {
AlertingRules() []*rules.AlertingRule AlertingRules() []*rules.AlertingRule
} }
// StatsRenderer converts engine statistics into a format suitable for the API.
type StatsRenderer func(context.Context, *stats.Statistics, string) stats.QueryStats type StatsRenderer func(context.Context, *stats.Statistics, string) stats.QueryStats
func defaultStatsRenderer(_ context.Context, s *stats.Statistics, param string) stats.QueryStats { // DefaultStatsRenderer is the default stats renderer for the API.
func DefaultStatsRenderer(_ context.Context, s *stats.Statistics, param string) stats.QueryStats {
if param != "" { if param != "" {
return stats.NewQueryStats(s) return stats.NewQueryStats(s)
} }
@ -272,7 +274,7 @@ func NewAPI(
buildInfo: buildInfo, buildInfo: buildInfo,
gatherer: gatherer, gatherer: gatherer,
isAgent: isAgent, isAgent: isAgent,
statsRenderer: defaultStatsRenderer, statsRenderer: DefaultStatsRenderer,
remoteReadHandler: remote.NewReadHandler(logger, registerer, q, configFunc, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame), remoteReadHandler: remote.NewReadHandler(logger, registerer, q, configFunc, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame),
} }
@ -461,7 +463,7 @@ func (api *API) query(r *http.Request) (result apiFuncResult) {
// Optional stats field in response if parameter "stats" is not empty. // Optional stats field in response if parameter "stats" is not empty.
sr := api.statsRenderer sr := api.statsRenderer
if sr == nil { if sr == nil {
sr = defaultStatsRenderer sr = DefaultStatsRenderer
} }
qs := sr(ctx, qry.Stats(), r.FormValue("stats")) qs := sr(ctx, qry.Stats(), r.FormValue("stats"))
@ -563,7 +565,7 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) {
// Optional stats field in response if parameter "stats" is not empty. // Optional stats field in response if parameter "stats" is not empty.
sr := api.statsRenderer sr := api.statsRenderer
if sr == nil { if sr == nil {
sr = defaultStatsRenderer sr = DefaultStatsRenderer
} }
qs := sr(ctx, qry.Stats(), r.FormValue("stats")) qs := sr(ctx, qry.Stats(), r.FormValue("stats"))
@ -702,7 +704,7 @@ func (api *API) labelNames(r *http.Request) apiFuncResult {
names = []string{} names = []string{}
} }
if len(names) >= limit { if len(names) > limit {
names = names[:limit] names = names[:limit]
warnings = warnings.Add(errors.New("results truncated due to limit")) warnings = warnings.Add(errors.New("results truncated due to limit"))
} }
@ -791,7 +793,7 @@ func (api *API) labelValues(r *http.Request) (result apiFuncResult) {
slices.Sort(vals) slices.Sort(vals)
if len(vals) >= limit { if len(vals) > limit {
vals = vals[:limit] vals = vals[:limit]
warnings = warnings.Add(errors.New("results truncated due to limit")) warnings = warnings.Add(errors.New("results truncated due to limit"))
} }
@ -887,7 +889,8 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
} }
metrics = append(metrics, set.At().Labels()) metrics = append(metrics, set.At().Labels())
if len(metrics) >= limit { if len(metrics) > limit {
metrics = metrics[:limit]
warnings.Add(errors.New("results truncated due to limit")) warnings.Add(errors.New("results truncated due to limit"))
return apiFuncResult{metrics, nil, warnings, closer} return apiFuncResult{metrics, nil, warnings, closer}
} }

View file

@ -1060,6 +1060,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
responseLen int // If nonzero, check only the length; `response` is ignored. responseLen int // If nonzero, check only the length; `response` is ignored.
responseMetadataTotal int responseMetadataTotal int
responseAsJSON string responseAsJSON string
warningsCount int
errType errorType errType errorType
sorter func(interface{}) sorter func(interface{})
metadata []targetMetadata metadata []targetMetadata
@ -1417,7 +1418,17 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
"match[]": []string{"test_metric1"}, "match[]": []string{"test_metric1"},
"limit": []string{"1"}, "limit": []string{"1"},
}, },
responseLen: 1, // API does not specify which particular value will come back. responseLen: 1, // API does not specify which particular value will come back.
warningsCount: 1,
},
{
endpoint: api.series,
query: url.Values{
"match[]": []string{"test_metric1"},
"limit": []string{"2"},
},
responseLen: 2, // API does not specify which particular value will come back.
warningsCount: 0, // No warnings if limit isn't exceeded.
}, },
// Missing match[] query params in series requests. // Missing match[] query params in series requests.
{ {
@ -2700,7 +2711,19 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
query: url.Values{ query: url.Values{
"limit": []string{"2"}, "limit": []string{"2"},
}, },
responseLen: 2, // API does not specify which particular values will come back. responseLen: 2, // API does not specify which particular values will come back.
warningsCount: 1,
},
{
endpoint: api.labelValues,
params: map[string]string{
"name": "__name__",
},
query: url.Values{
"limit": []string{"4"},
},
responseLen: 4, // API does not specify which particular values will come back.
warningsCount: 0, // No warnings if limit isn't exceeded.
}, },
// Label names. // Label names.
{ {
@ -2847,7 +2870,16 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
query: url.Values{ query: url.Values{
"limit": []string{"2"}, "limit": []string{"2"},
}, },
responseLen: 2, // API does not specify which particular values will come back. responseLen: 2, // API does not specify which particular values will come back.
warningsCount: 1,
},
{
endpoint: api.labelNames,
query: url.Values{
"limit": []string{"3"},
},
responseLen: 3, // API does not specify which particular values will come back.
warningsCount: 0, // No warnings if limit isn't exceeded.
}, },
}...) }...)
} }
@ -2924,6 +2956,8 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
require.NoError(t, err) require.NoError(t, err)
require.JSONEq(t, test.responseAsJSON, string(s)) require.JSONEq(t, test.responseAsJSON, string(s))
} }
require.Len(t, res.warnings, test.warningsCount)
}) })
} }
}) })