Merge remote-tracking branch 'mimir/main' into owilliams/utf8-02-mimir

This commit is contained in:
Owen Williams 2024-01-18 11:39:10 -05:00
commit fd1733df2a
45 changed files with 3564 additions and 19469 deletions

View file

@ -5,6 +5,45 @@
* [ENHANCEMENT] TSDB: Make the wlog watcher read segments synchronously when not tailing. #13224
* [BUGFIX] Agent: Participate in notify calls. #13223
## 2.49.0 / 2024-01-15
* [FEATURE] Promtool: Add `--run` flag promtool test rules command. #12206
* [FEATURE] SD: Add support for `NS` records to DNS SD. #13219
* [FEATURE] UI: Add heatmap visualization setting in the Graph tab, useful histograms. #13096 #13371
* [FEATURE] Scraping: Add `scrape_config.enable_compression` (default true) to disable gzip compression when scraping the target. #13166
* [FEATURE] PromQL: Add a `promql-experimental-functions` feature flag containing some new experimental PromQL functions. #13103 NOTE: More experimental functions might be added behind the same feature flag in the future. Added functions:
* Experimental `mad_over_time` (median absolute deviation around the median) function. #13059
* Experimental `sort_by_label` and `sort_by_label_desc` functions allowing sorting returned series by labels. #11299
* [FEATURE] SD: Add `__meta_linode_gpus` label to Linode SD. #13097
* [FEATURE] API: Add `exclude_alerts` query parameter to `/api/v1/rules` to only return recording rules. #12999
* [FEATURE] TSDB: --storage.tsdb.retention.time flag value is now exposed as a `prometheus_tsdb_retention_limit_seconds` metric. #12986
* [FEATURE] Scraping: Add ability to specify priority of scrape protocols to accept during scrape (e.g. to scrape Prometheus proto format for certain jobs). This can be changed by setting `global.scrape_protocols` and `scrape_config.scrape_protocols`. #12738
* [ENHANCEMENT] Scraping: Automated handling of scraping histograms that violate `scrape_config.native_histogram_bucket_limit` setting. #13129
* [ENHANCEMENT] Scraping: Optimized memory allocations when scraping. #12992
* [ENHANCEMENT] SD: Added cache for Azure SD to avoid rate-limits. #12622
* [ENHANCEMENT] TSDB: Various improvements to OOO exemplar scraping. E.g. allowing ingestion of exemplars with the same timestamp, but with different labels. #13021
* [ENHANCEMENT] API: Optimize `/api/v1/labels` and `/api/v1/label/<label_name>/values` when 1 set of matchers are used. #12888
* [ENHANCEMENT] TSDB: Various optimizations for TSDB block index, head mmap chunks and WAL, reducing latency and memory allocations (improving API calls, compaction queries etc). #12997 #13058 #13056 #13040
* [ENHANCEMENT] PromQL: Optimize memory allocations and latency when querying float histograms. #12954
* [ENHANCEMENT] Rules: Instrument TraceID in log lines for rule evaluations. #13034
* [ENHANCEMENT] PromQL: Optimize memory allocations in query_range calls. #13043
* [ENHANCEMENT] Promtool: unittest interval now defaults to evaluation_intervals when not set. #12729
* [BUGFIX] SD: Fixed Azure SD public IP reporting #13241
* [BUGFIX] API: Fix inaccuracies in posting cardinality statistics. #12653
* [BUGFIX] PromQL: Fix inaccuracies of `histogram_quantile` with classic histograms. #13153
* [BUGFIX] TSDB: Fix rare fails or inaccurate queries with OOO samples. #13115
* [BUGFIX] TSDB: Fix rare panics on append commit when exemplars are used. #13092
* [BUGFIX] TSDB: Fix exemplar WAL storage, so remote write can send/receive samples before exemplars. #13113
* [BUGFIX] Mixins: Fix `url` filter on remote write dashboards. #10721
* [BUGFIX] PromQL/TSDB: Various fixes to float histogram operations. #12891 #12977 #12609 #13190 #13189 #13191 #13201 #13212 #13208
* [BUGFIX] Promtool: Fix int32 overflow issues for 32-bit architectures. #12978
* [BUGFIX] SD: Fix Azure VM Scale Set NIC issue. #13283
## 2.48.1 / 2023-12-07
* [BUGFIX] TSDB: Make the wlog watcher read segments synchronously when not tailing. #13224
* [BUGFIX] Agent: Participate in notify calls (fixes slow down in remote write handling introduced in 2.45). #13223
## 2.48.0 / 2023-11-16
* [CHANGE] Remote-write: respect Retry-After header on 5xx errors. #12677

View file

@ -54,7 +54,8 @@ Release cadence of first pre-releases being cut is 6 weeks.
| v2.47 | 2023-08-23 | Bryan Boreham (GitHub: @bboreham) |
| v2.48 | 2023-10-04 | Levi Harrison (GitHub: @LeviHarrison) |
| v2.49 | 2023-12-05 | Bartek Plotka (GitHub: @bwplotka) |
| v2.50 | 2024-01-16 | **searching for volunteer** |
| v2.50 | 2024-01-16 | Augustin Husson (GitHub: @nexucis) |
| v2.51 | 2024-02-13 | **searching for volunteer** |
If you are interested in volunteering please create a pull request against the [prometheus/prometheus](https://github.com/prometheus/prometheus) repository and propose yourself for the release series of your choice.

View file

@ -1 +1 @@
2.48.0
2.49.0

View file

@ -71,7 +71,7 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true)
c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil)
if err != nil {
log.Panicln("creating compactor", err)
}

View file

@ -1667,6 +1667,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
EnableNativeHistograms: opts.EnableNativeHistograms,
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
EnableOverlappingCompaction: true,
}
}

370
cmd/promtool/analyze.go Normal file
View file

@ -0,0 +1,370 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"errors"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
"sort"
"strconv"
"strings"
"time"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
var (
errNotNativeHistogram = fmt.Errorf("not a native histogram")
errNotEnoughData = fmt.Errorf("not enough data")
outputHeader = `Bucket stats for each histogram series over time
------------------------------------------------
First the min, avg, and max number of populated buckets, followed by the total
number of buckets (only if different from the max number of populated buckets
which is typical for classic but not native histograms).`
outputFooter = `Aggregated bucket stats
-----------------------
Each line shows min/avg/max over the series above.`
)
type QueryAnalyzeConfig struct {
metricType string
duration time.Duration
time string
matchers []string
}
// run retrieves metrics that look like conventional histograms (i.e. have _bucket
// suffixes) or native histograms, depending on metricType flag.
func (c *QueryAnalyzeConfig) run(url *url.URL, roundtripper http.RoundTripper) error {
if c.metricType != "histogram" {
return fmt.Errorf("analyze type is %s, must be 'histogram'", c.metricType)
}
ctx := context.Background()
api, err := newAPI(url, roundtripper, nil)
if err != nil {
return err
}
var endTime time.Time
if c.time != "" {
endTime, err = parseTime(c.time)
if err != nil {
return fmt.Errorf("error parsing time '%s': %w", c.time, err)
}
} else {
endTime = time.Now()
}
return c.getStatsFromMetrics(ctx, api, endTime, os.Stdout, c.matchers)
}
func (c *QueryAnalyzeConfig) getStatsFromMetrics(ctx context.Context, api v1.API, endTime time.Time, out io.Writer, matchers []string) error {
fmt.Fprintf(out, "%s\n\n", outputHeader)
metastatsNative := newMetaStatistics()
metastatsClassic := newMetaStatistics()
for _, matcher := range matchers {
seriesSel := seriesSelector(matcher, c.duration)
matrix, err := querySamples(ctx, api, seriesSel, endTime)
if err != nil {
return err
}
matrices := make(map[string]model.Matrix)
for _, series := range matrix {
// We do not handle mixed types. If there are float values, we assume it is a
// classic histogram, otherwise we assume it is a native histogram, and we
// ignore series with errors if they do not match the expected type.
if len(series.Values) == 0 {
stats, err := calcNativeBucketStatistics(series)
if err != nil {
if errors.Is(err, errNotNativeHistogram) || errors.Is(err, errNotEnoughData) {
continue
}
return err
}
fmt.Fprintf(out, "- %s (native): %v\n", series.Metric, *stats)
metastatsNative.update(stats)
} else {
lbs := model.LabelSet(series.Metric).Clone()
if _, ok := lbs["le"]; !ok {
continue
}
metricName := string(lbs[labels.MetricName])
if !strings.HasSuffix(metricName, "_bucket") {
continue
}
delete(lbs, labels.MetricName)
delete(lbs, "le")
key := formatSeriesName(metricName, lbs)
matrices[key] = append(matrices[key], series)
}
}
for key, matrix := range matrices {
stats, err := calcClassicBucketStatistics(matrix)
if err != nil {
if errors.Is(err, errNotEnoughData) {
continue
}
return err
}
fmt.Fprintf(out, "- %s (classic): %v\n", key, *stats)
metastatsClassic.update(stats)
}
}
fmt.Fprintf(out, "\n%s\n", outputFooter)
if metastatsNative.Count() > 0 {
fmt.Fprintf(out, "\nNative %s\n", metastatsNative)
}
if metastatsClassic.Count() > 0 {
fmt.Fprintf(out, "\nClassic %s\n", metastatsClassic)
}
return nil
}
func seriesSelector(metricName string, duration time.Duration) string {
builder := strings.Builder{}
builder.WriteString(metricName)
builder.WriteRune('[')
builder.WriteString(duration.String())
builder.WriteRune(']')
return builder.String()
}
func formatSeriesName(metricName string, lbs model.LabelSet) string {
builder := strings.Builder{}
builder.WriteString(metricName)
builder.WriteString(lbs.String())
return builder.String()
}
func querySamples(ctx context.Context, api v1.API, query string, end time.Time) (model.Matrix, error) {
values, _, err := api.Query(ctx, query, end)
if err != nil {
return nil, err
}
matrix, ok := values.(model.Matrix)
if !ok {
return nil, fmt.Errorf("query of buckets resulted in non-Matrix")
}
return matrix, nil
}
// minPop/avgPop/maxPop is for the number of populated (non-zero) buckets.
// total is the total number of buckets across all samples in the series,
// populated or not.
type statistics struct {
minPop, maxPop, total int
avgPop float64
}
func (s statistics) String() string {
if s.maxPop == s.total {
return fmt.Sprintf("%d/%.3f/%d", s.minPop, s.avgPop, s.maxPop)
}
return fmt.Sprintf("%d/%.3f/%d/%d", s.minPop, s.avgPop, s.maxPop, s.total)
}
func calcClassicBucketStatistics(matrix model.Matrix) (*statistics, error) {
numBuckets := len(matrix)
stats := &statistics{
minPop: math.MaxInt,
total: numBuckets,
}
if numBuckets == 0 || len(matrix[0].Values) < 2 {
return stats, errNotEnoughData
}
numSamples := len(matrix[0].Values)
sortMatrix(matrix)
totalPop := 0
for timeIdx := 0; timeIdx < numSamples; timeIdx++ {
curr, err := getBucketCountsAtTime(matrix, numBuckets, timeIdx)
if err != nil {
return stats, err
}
countPop := 0
for _, b := range curr {
if b != 0 {
countPop++
}
}
totalPop += countPop
if stats.minPop > countPop {
stats.minPop = countPop
}
if stats.maxPop < countPop {
stats.maxPop = countPop
}
}
stats.avgPop = float64(totalPop) / float64(numSamples)
return stats, nil
}
func sortMatrix(matrix model.Matrix) {
sort.SliceStable(matrix, func(i, j int) bool {
return getLe(matrix[i]) < getLe(matrix[j])
})
}
func getLe(series *model.SampleStream) float64 {
lbs := model.LabelSet(series.Metric)
le, _ := strconv.ParseFloat(string(lbs["le"]), 64)
return le
}
func getBucketCountsAtTime(matrix model.Matrix, numBuckets, timeIdx int) ([]int, error) {
counts := make([]int, numBuckets)
if timeIdx >= len(matrix[0].Values) {
// Just return zeroes instead of erroring out so we can get partial results.
return counts, nil
}
counts[0] = int(matrix[0].Values[timeIdx].Value)
for i, bucket := range matrix[1:] {
if timeIdx >= len(bucket.Values) {
// Just return zeroes instead of erroring out so we can get partial results.
return counts, nil
}
curr := bucket.Values[timeIdx]
prev := matrix[i].Values[timeIdx]
// Assume the results are nicely aligned.
if curr.Timestamp != prev.Timestamp {
return counts, fmt.Errorf("matrix result is not time aligned")
}
counts[i+1] = int(curr.Value - prev.Value)
}
return counts, nil
}
type bucketBounds struct {
boundaries int32
upper, lower float64
}
func makeBucketBounds(b *model.HistogramBucket) bucketBounds {
return bucketBounds{
boundaries: b.Boundaries,
upper: float64(b.Upper),
lower: float64(b.Lower),
}
}
func calcNativeBucketStatistics(series *model.SampleStream) (*statistics, error) {
stats := &statistics{
minPop: math.MaxInt,
}
overall := make(map[bucketBounds]struct{})
totalPop := 0
if len(series.Histograms) == 0 {
return nil, errNotNativeHistogram
}
if len(series.Histograms) == 1 {
return nil, errNotEnoughData
}
for _, histogram := range series.Histograms {
for _, bucket := range histogram.Histogram.Buckets {
bb := makeBucketBounds(bucket)
overall[bb] = struct{}{}
}
countPop := len(histogram.Histogram.Buckets)
totalPop += countPop
if stats.minPop > countPop {
stats.minPop = countPop
}
if stats.maxPop < countPop {
stats.maxPop = countPop
}
}
stats.avgPop = float64(totalPop) / float64(len(series.Histograms))
stats.total = len(overall)
return stats, nil
}
type distribution struct {
min, max, count int
avg float64
}
func newDistribution() distribution {
return distribution{
min: math.MaxInt,
}
}
func (d *distribution) update(num int) {
if d.min > num {
d.min = num
}
if d.max < num {
d.max = num
}
d.count++
d.avg += float64(num)/float64(d.count) - d.avg/float64(d.count)
}
func (d distribution) String() string {
return fmt.Sprintf("%d/%.3f/%d", d.min, d.avg, d.max)
}
type metaStatistics struct {
minPop, avgPop, maxPop, total distribution
}
func newMetaStatistics() *metaStatistics {
return &metaStatistics{
minPop: newDistribution(),
avgPop: newDistribution(),
maxPop: newDistribution(),
total: newDistribution(),
}
}
func (ms metaStatistics) Count() int {
return ms.minPop.count
}
func (ms metaStatistics) String() string {
if ms.maxPop == ms.total {
return fmt.Sprintf("histogram series (%d in total):\n- min populated: %v\n- avg populated: %v\n- max populated: %v", ms.Count(), ms.minPop, ms.avgPop, ms.maxPop)
}
return fmt.Sprintf("histogram series (%d in total):\n- min populated: %v\n- avg populated: %v\n- max populated: %v\n- total: %v", ms.Count(), ms.minPop, ms.avgPop, ms.maxPop, ms.total)
}
func (ms *metaStatistics) update(s *statistics) {
ms.minPop.update(s.minPop)
ms.avgPop.update(int(s.avgPop))
ms.maxPop.update(s.maxPop)
ms.total.update(s.total)
}

View file

@ -0,0 +1,170 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/common/model"
)
var (
exampleMatrix = model.Matrix{
&model.SampleStream{
Metric: model.Metric{
"le": "+Inf",
},
Values: []model.SamplePair{
{
Value: 31,
Timestamp: 100,
},
{
Value: 32,
Timestamp: 200,
},
{
Value: 40,
Timestamp: 300,
},
},
},
&model.SampleStream{
Metric: model.Metric{
"le": "0.5",
},
Values: []model.SamplePair{
{
Value: 10,
Timestamp: 100,
},
{
Value: 11,
Timestamp: 200,
},
{
Value: 11,
Timestamp: 300,
},
},
},
&model.SampleStream{
Metric: model.Metric{
"le": "10",
},
Values: []model.SamplePair{
{
Value: 30,
Timestamp: 100,
},
{
Value: 31,
Timestamp: 200,
},
{
Value: 37,
Timestamp: 300,
},
},
},
&model.SampleStream{
Metric: model.Metric{
"le": "2",
},
Values: []model.SamplePair{
{
Value: 25,
Timestamp: 100,
},
{
Value: 26,
Timestamp: 200,
},
{
Value: 27,
Timestamp: 300,
},
},
},
}
exampleMatrixLength = len(exampleMatrix)
)
func init() {
sortMatrix(exampleMatrix)
}
func TestGetBucketCountsAtTime(t *testing.T) {
cases := []struct {
matrix model.Matrix
length int
timeIdx int
expected []int
}{
{
exampleMatrix,
exampleMatrixLength,
0,
[]int{10, 15, 5, 1},
},
{
exampleMatrix,
exampleMatrixLength,
1,
[]int{11, 15, 5, 1},
},
{
exampleMatrix,
exampleMatrixLength,
2,
[]int{11, 16, 10, 3},
},
}
for _, c := range cases {
t.Run(fmt.Sprintf("exampleMatrix@%d", c.timeIdx), func(t *testing.T) {
res, err := getBucketCountsAtTime(c.matrix, c.length, c.timeIdx)
require.NoError(t, err)
require.Equal(t, c.expected, res)
})
}
}
func TestCalcClassicBucketStatistics(t *testing.T) {
cases := []struct {
matrix model.Matrix
expected *statistics
}{
{
exampleMatrix,
&statistics{
minPop: 4,
avgPop: 4,
maxPop: 4,
total: 4,
},
},
}
for i, c := range cases {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
res, err := calcClassicBucketStatistics(c.matrix)
require.NoError(t, err)
require.Equal(t, c.expected, res)
})
}
}

View file

@ -35,9 +35,7 @@ import (
"github.com/go-kit/log"
"github.com/google/pprof/profile"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/client_golang/prometheus/testutil/promlint"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
@ -185,6 +183,14 @@ func main() {
queryLabelsEnd := queryLabelsCmd.Flag("end", "End time (RFC3339 or Unix timestamp).").String()
queryLabelsMatch := queryLabelsCmd.Flag("match", "Series selector. Can be specified multiple times.").Strings()
queryAnalyzeCfg := &QueryAnalyzeConfig{}
queryAnalyzeCmd := queryCmd.Command("analyze", "Run queries against your Prometheus to analyze the usage pattern of certain metrics.")
queryAnalyzeCmd.Flag("server", "Prometheus server to query.").Required().URLVar(&serverURL)
queryAnalyzeCmd.Flag("type", "Type of metric: histogram.").Required().StringVar(&queryAnalyzeCfg.metricType)
queryAnalyzeCmd.Flag("duration", "Time frame to analyze.").Default("1h").DurationVar(&queryAnalyzeCfg.duration)
queryAnalyzeCmd.Flag("time", "Query time (RFC3339 or Unix timestamp), defaults to now.").StringVar(&queryAnalyzeCfg.time)
queryAnalyzeCmd.Flag("match", "Series selector. Can be specified multiple times.").Required().StringsVar(&queryAnalyzeCfg.matchers)
pushCmd := app.Command("push", "Push to a Prometheus server.")
pushCmd.Flag("http.config.file", "HTTP client configuration file for promtool to connect to Prometheus.").PlaceHolder("<filename>").ExistingFileVar(&httpConfigFilePath)
pushMetricsCmd := pushCmd.Command("metrics", "Push metrics to a prometheus remote write (for testing purpose only).")
@ -230,7 +236,7 @@ func main() {
dumpPath := tsdbDumpCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
dumpMinTime := tsdbDumpCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
dumpMatch := tsdbDumpCmd.Flag("match", "Series selector.").Default("{__name__=~'(?s:.*)'}").String()
dumpMatch := tsdbDumpCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
importCmd := tsdbCmd.Command("create-blocks-from", "[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.")
importHumanReadable := importCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool()
@ -390,6 +396,9 @@ func main() {
case importRulesCmd.FullCommand():
os.Exit(checkErr(importRules(serverURL, httpRoundTripper, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *maxBlockDuration, *importRulesFiles...)))
case queryAnalyzeCmd.FullCommand():
os.Exit(checkErr(queryAnalyzeCfg.run(serverURL, httpRoundTripper)))
case documentationCmd.FullCommand():
os.Exit(checkErr(documentcli.GenerateMarkdown(app.Model(), os.Stdout)))
@ -997,246 +1006,6 @@ func checkMetricsExtended(r io.Reader) ([]metricStat, int, error) {
return stats, total, nil
}
// QueryInstant performs an instant query against a Prometheus server.
func QueryInstant(url *url.URL, roundTripper http.RoundTripper, query, evalTime string, p printer) int {
if url.Scheme == "" {
url.Scheme = "http"
}
config := api.Config{
Address: url.String(),
RoundTripper: roundTripper,
}
// Create new client.
c, err := api.NewClient(config)
if err != nil {
fmt.Fprintln(os.Stderr, "error creating API client:", err)
return failureExitCode
}
eTime := time.Now()
if evalTime != "" {
eTime, err = parseTime(evalTime)
if err != nil {
fmt.Fprintln(os.Stderr, "error parsing evaluation time:", err)
return failureExitCode
}
}
// Run query against client.
api := v1.NewAPI(c)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
val, _, err := api.Query(ctx, query, eTime) // Ignoring warnings for now.
cancel()
if err != nil {
return handleAPIError(err)
}
p.printValue(val)
return successExitCode
}
// QueryRange performs a range query against a Prometheus server.
func QueryRange(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, query, start, end string, step time.Duration, p printer) int {
if url.Scheme == "" {
url.Scheme = "http"
}
config := api.Config{
Address: url.String(),
RoundTripper: roundTripper,
}
if len(headers) > 0 {
config.RoundTripper = promhttp.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
for key, value := range headers {
req.Header.Add(key, value)
}
return roundTripper.RoundTrip(req)
})
}
// Create new client.
c, err := api.NewClient(config)
if err != nil {
fmt.Fprintln(os.Stderr, "error creating API client:", err)
return failureExitCode
}
var stime, etime time.Time
if end == "" {
etime = time.Now()
} else {
etime, err = parseTime(end)
if err != nil {
fmt.Fprintln(os.Stderr, "error parsing end time:", err)
return failureExitCode
}
}
if start == "" {
stime = etime.Add(-5 * time.Minute)
} else {
stime, err = parseTime(start)
if err != nil {
fmt.Fprintln(os.Stderr, "error parsing start time:", err)
return failureExitCode
}
}
if !stime.Before(etime) {
fmt.Fprintln(os.Stderr, "start time is not before end time")
return failureExitCode
}
if step == 0 {
resolution := math.Max(math.Floor(etime.Sub(stime).Seconds()/250), 1)
// Convert seconds to nanoseconds such that time.Duration parses correctly.
step = time.Duration(resolution) * time.Second
}
// Run query against client.
api := v1.NewAPI(c)
r := v1.Range{Start: stime, End: etime, Step: step}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
val, _, err := api.QueryRange(ctx, query, r) // Ignoring warnings for now.
cancel()
if err != nil {
return handleAPIError(err)
}
p.printValue(val)
return successExitCode
}
// QuerySeries queries for a series against a Prometheus server.
func QuerySeries(url *url.URL, roundTripper http.RoundTripper, matchers []string, start, end string, p printer) int {
if url.Scheme == "" {
url.Scheme = "http"
}
config := api.Config{
Address: url.String(),
RoundTripper: roundTripper,
}
// Create new client.
c, err := api.NewClient(config)
if err != nil {
fmt.Fprintln(os.Stderr, "error creating API client:", err)
return failureExitCode
}
stime, etime, err := parseStartTimeAndEndTime(start, end)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return failureExitCode
}
// Run query against client.
api := v1.NewAPI(c)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
val, _, err := api.Series(ctx, matchers, stime, etime) // Ignoring warnings for now.
cancel()
if err != nil {
return handleAPIError(err)
}
p.printSeries(val)
return successExitCode
}
// QueryLabels queries for label values against a Prometheus server.
func QueryLabels(url *url.URL, roundTripper http.RoundTripper, matchers []string, name, start, end string, p printer) int {
if url.Scheme == "" {
url.Scheme = "http"
}
config := api.Config{
Address: url.String(),
RoundTripper: roundTripper,
}
// Create new client.
c, err := api.NewClient(config)
if err != nil {
fmt.Fprintln(os.Stderr, "error creating API client:", err)
return failureExitCode
}
stime, etime, err := parseStartTimeAndEndTime(start, end)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return failureExitCode
}
// Run query against client.
api := v1.NewAPI(c)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
val, warn, err := api.LabelValues(ctx, name, matchers, stime, etime)
cancel()
for _, v := range warn {
fmt.Fprintln(os.Stderr, "query warning:", v)
}
if err != nil {
return handleAPIError(err)
}
p.printLabelValues(val)
return successExitCode
}
func handleAPIError(err error) int {
var apiErr *v1.Error
if errors.As(err, &apiErr) && apiErr.Detail != "" {
fmt.Fprintf(os.Stderr, "query error: %v (detail: %s)\n", apiErr, strings.TrimSpace(apiErr.Detail))
} else {
fmt.Fprintln(os.Stderr, "query error:", err)
}
return failureExitCode
}
func parseStartTimeAndEndTime(start, end string) (time.Time, time.Time, error) {
var (
minTime = time.Now().Add(-9999 * time.Hour)
maxTime = time.Now().Add(9999 * time.Hour)
err error
)
stime := minTime
etime := maxTime
if start != "" {
stime, err = parseTime(start)
if err != nil {
return stime, etime, fmt.Errorf("error parsing start time: %w", err)
}
}
if end != "" {
etime, err = parseTime(end)
if err != nil {
return stime, etime, fmt.Errorf("error parsing end time: %w", err)
}
}
return stime, etime, nil
}
func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
return time.Unix(int64(s), int64(ns*float64(time.Second))).UTC(), nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return t, nil
}
return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s)
}
type endpointsGroup struct {
urlToFilename map[string]string
postProcess func(b []byte) ([]byte, error)
@ -1390,15 +1159,12 @@ func importRules(url *url.URL, roundTripper http.RoundTripper, start, end, outpu
evalInterval: evalInterval,
maxBlockDuration: maxBlockDuration,
}
client, err := api.NewClient(api.Config{
Address: url.String(),
RoundTripper: roundTripper,
})
api, err := newAPI(url, roundTripper, nil)
if err != nil {
return fmt.Errorf("new api client error: %w", err)
}
ruleImporter := newRuleImporter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), cfg, v1.NewAPI(client))
ruleImporter := newRuleImporter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), cfg, api)
errs := ruleImporter.loadGroups(ctx, files)
for _, err := range errs {
if err != nil {

251
cmd/promtool/query.go Normal file
View file

@ -0,0 +1,251 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/client_golang/prometheus/promhttp"
_ "github.com/prometheus/prometheus/plugins" // Register plugins.
)
func newAPI(url *url.URL, roundTripper http.RoundTripper, headers map[string]string) (v1.API, error) {
if url.Scheme == "" {
url.Scheme = "http"
}
config := api.Config{
Address: url.String(),
RoundTripper: roundTripper,
}
if len(headers) > 0 {
config.RoundTripper = promhttp.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
for key, value := range headers {
req.Header.Add(key, value)
}
return roundTripper.RoundTrip(req)
})
}
// Create new client.
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
api := v1.NewAPI(client)
return api, nil
}
// QueryInstant performs an instant query against a Prometheus server.
func QueryInstant(url *url.URL, roundTripper http.RoundTripper, query, evalTime string, p printer) int {
api, err := newAPI(url, roundTripper, nil)
if err != nil {
fmt.Fprintln(os.Stderr, "error creating API client:", err)
return failureExitCode
}
eTime := time.Now()
if evalTime != "" {
eTime, err = parseTime(evalTime)
if err != nil {
fmt.Fprintln(os.Stderr, "error parsing evaluation time:", err)
return failureExitCode
}
}
// Run query against client.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
val, _, err := api.Query(ctx, query, eTime) // Ignoring warnings for now.
cancel()
if err != nil {
return handleAPIError(err)
}
p.printValue(val)
return successExitCode
}
// QueryRange performs a range query against a Prometheus server.
func QueryRange(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, query, start, end string, step time.Duration, p printer) int {
api, err := newAPI(url, roundTripper, headers)
if err != nil {
fmt.Fprintln(os.Stderr, "error creating API client:", err)
return failureExitCode
}
var stime, etime time.Time
if end == "" {
etime = time.Now()
} else {
etime, err = parseTime(end)
if err != nil {
fmt.Fprintln(os.Stderr, "error parsing end time:", err)
return failureExitCode
}
}
if start == "" {
stime = etime.Add(-5 * time.Minute)
} else {
stime, err = parseTime(start)
if err != nil {
fmt.Fprintln(os.Stderr, "error parsing start time:", err)
return failureExitCode
}
}
if !stime.Before(etime) {
fmt.Fprintln(os.Stderr, "start time is not before end time")
return failureExitCode
}
if step == 0 {
resolution := math.Max(math.Floor(etime.Sub(stime).Seconds()/250), 1)
// Convert seconds to nanoseconds such that time.Duration parses correctly.
step = time.Duration(resolution) * time.Second
}
// Run query against client.
r := v1.Range{Start: stime, End: etime, Step: step}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
val, _, err := api.QueryRange(ctx, query, r) // Ignoring warnings for now.
cancel()
if err != nil {
return handleAPIError(err)
}
p.printValue(val)
return successExitCode
}
// QuerySeries queries for a series against a Prometheus server.
func QuerySeries(url *url.URL, roundTripper http.RoundTripper, matchers []string, start, end string, p printer) int {
api, err := newAPI(url, roundTripper, nil)
if err != nil {
fmt.Fprintln(os.Stderr, "error creating API client:", err)
return failureExitCode
}
stime, etime, err := parseStartTimeAndEndTime(start, end)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return failureExitCode
}
// Run query against client.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
val, _, err := api.Series(ctx, matchers, stime, etime) // Ignoring warnings for now.
cancel()
if err != nil {
return handleAPIError(err)
}
p.printSeries(val)
return successExitCode
}
// QueryLabels queries for label values against a Prometheus server.
func QueryLabels(url *url.URL, roundTripper http.RoundTripper, matchers []string, name, start, end string, p printer) int {
api, err := newAPI(url, roundTripper, nil)
if err != nil {
fmt.Fprintln(os.Stderr, "error creating API client:", err)
return failureExitCode
}
stime, etime, err := parseStartTimeAndEndTime(start, end)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return failureExitCode
}
// Run query against client.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
val, warn, err := api.LabelValues(ctx, name, matchers, stime, etime)
cancel()
for _, v := range warn {
fmt.Fprintln(os.Stderr, "query warning:", v)
}
if err != nil {
return handleAPIError(err)
}
p.printLabelValues(val)
return successExitCode
}
func handleAPIError(err error) int {
var apiErr *v1.Error
if errors.As(err, &apiErr) && apiErr.Detail != "" {
fmt.Fprintf(os.Stderr, "query error: %v (detail: %s)\n", apiErr, strings.TrimSpace(apiErr.Detail))
} else {
fmt.Fprintln(os.Stderr, "query error:", err)
}
return failureExitCode
}
func parseStartTimeAndEndTime(start, end string) (time.Time, time.Time, error) {
var (
minTime = time.Now().Add(-9999 * time.Hour)
maxTime = time.Now().Add(9999 * time.Hour)
err error
)
stime := minTime
etime := maxTime
if start != "" {
stime, err = parseTime(start)
if err != nil {
return stime, etime, fmt.Errorf("error parsing start time: %w", err)
}
}
if end != "" {
etime, err = parseTime(end)
if err != nil {
return stime, etime, fmt.Errorf("error parsing end time: %w", err)
}
}
return stime, etime, nil
}
func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
return time.Unix(int64(s), int64(ns*float64(time.Second))).UTC(), nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return t, nil
}
return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s)
}

15
cmd/promtool/testdata/dump-test-1.prom vendored Normal file
View file

@ -0,0 +1,15 @@
{__name__="heavy_metric", foo="bar"} 5 0
{__name__="heavy_metric", foo="bar"} 4 60000
{__name__="heavy_metric", foo="bar"} 3 120000
{__name__="heavy_metric", foo="bar"} 2 180000
{__name__="heavy_metric", foo="bar"} 1 240000
{__name__="heavy_metric", foo="foo"} 5 0
{__name__="heavy_metric", foo="foo"} 4 60000
{__name__="heavy_metric", foo="foo"} 3 120000
{__name__="heavy_metric", foo="foo"} 2 180000
{__name__="heavy_metric", foo="foo"} 1 240000
{__name__="metric", baz="abc", foo="bar"} 1 0
{__name__="metric", baz="abc", foo="bar"} 2 60000
{__name__="metric", baz="abc", foo="bar"} 3 120000
{__name__="metric", baz="abc", foo="bar"} 4 180000
{__name__="metric", baz="abc", foo="bar"} 5 240000

10
cmd/promtool/testdata/dump-test-2.prom vendored Normal file
View file

@ -0,0 +1,10 @@
{__name__="heavy_metric", foo="foo"} 5 0
{__name__="heavy_metric", foo="foo"} 4 60000
{__name__="heavy_metric", foo="foo"} 3 120000
{__name__="heavy_metric", foo="foo"} 2 180000
{__name__="heavy_metric", foo="foo"} 1 240000
{__name__="metric", baz="abc", foo="bar"} 1 0
{__name__="metric", baz="abc", foo="bar"} 2 60000
{__name__="metric", baz="abc", foo="bar"} 3 120000
{__name__="metric", baz="abc", foo="bar"} 4 180000
{__name__="metric", baz="abc", foo="bar"} 5 240000

View file

@ -0,0 +1,2 @@
{__name__="metric", baz="abc", foo="bar"} 2 60000
{__name__="metric", baz="abc", foo="bar"} 3 120000

View file

@ -706,7 +706,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
return nil
}
func dumpSamples(ctx context.Context, path string, mint, maxt int64, match string) (err error) {
func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []string) (err error) {
db, err := tsdb.OpenDBReadOnly(path, nil)
if err != nil {
return err
@ -720,11 +720,21 @@ func dumpSamples(ctx context.Context, path string, mint, maxt int64, match strin
}
defer q.Close()
matchers, err := parser.ParseMetricSelector(match)
matcherSets, err := parser.ParseMetricSelectors(match)
if err != nil {
return err
}
ss := q.Select(ctx, false, nil, matchers...)
var ss storage.SeriesSet
if len(matcherSets) > 1 {
var sets []storage.SeriesSet
for _, mset := range matcherSets {
sets = append(sets, q.Select(ctx, true, nil, mset...))
}
ss = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
} else {
ss = q.Select(ctx, false, nil, matcherSets[0]...)
}
for ss.Next() {
series := ss.At()

View file

@ -14,9 +14,18 @@
package main
import (
"bytes"
"context"
"io"
"math"
"os"
"runtime"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/promql"
)
func TestGenerateBucket(t *testing.T) {
@ -41,3 +50,101 @@ func TestGenerateBucket(t *testing.T) {
require.Equal(t, tc.step, step)
}
}
// getDumpedSamples dumps samples and returns them.
func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []string) string {
t.Helper()
oldStdout := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w
err := dumpSamples(
context.Background(),
path,
mint,
maxt,
match,
)
require.NoError(t, err)
w.Close()
os.Stdout = oldStdout
var buf bytes.Buffer
io.Copy(&buf, r)
return buf.String()
}
func TestTSDBDump(t *testing.T) {
storage := promql.LoadedStorage(t, `
load 1m
metric{foo="bar", baz="abc"} 1 2 3 4 5
heavy_metric{foo="bar"} 5 4 3 2 1
heavy_metric{foo="foo"} 5 4 3 2 1
`)
tests := []struct {
name string
mint int64
maxt int64
match []string
expectedDump string
}{
{
name: "default match",
mint: math.MinInt64,
maxt: math.MaxInt64,
match: []string{"{__name__=~'(?s:.*)'}"},
expectedDump: "testdata/dump-test-1.prom",
},
{
name: "same matcher twice",
mint: math.MinInt64,
maxt: math.MaxInt64,
match: []string{"{foo=~'.+'}", "{foo=~'.+'}"},
expectedDump: "testdata/dump-test-1.prom",
},
{
name: "no duplication",
mint: math.MinInt64,
maxt: math.MaxInt64,
match: []string{"{__name__=~'(?s:.*)'}", "{baz='abc'}"},
expectedDump: "testdata/dump-test-1.prom",
},
{
name: "well merged",
mint: math.MinInt64,
maxt: math.MaxInt64,
match: []string{"{__name__='heavy_metric'}", "{baz='abc'}"},
expectedDump: "testdata/dump-test-1.prom",
},
{
name: "multi matchers",
mint: math.MinInt64,
maxt: math.MaxInt64,
match: []string{"{__name__='heavy_metric',foo='foo'}", "{__name__='metric'}"},
expectedDump: "testdata/dump-test-2.prom",
},
{
name: "with reduced mint and maxt",
mint: int64(60000),
maxt: int64(120000),
match: []string{"{__name__='metric'}"},
expectedDump: "testdata/dump-test-3.prom",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dumpedMetrics := getDumpedSamples(t, storage.Dir(), tt.mint, tt.maxt, tt.match)
expectedMetrics, err := os.ReadFile(tt.expectedDump)
require.NoError(t, err)
if strings.Contains(runtime.GOOS, "windows") {
// We use "/n" while dumping on windows as well.
expectedMetrics = bytes.ReplaceAll(expectedMetrics, []byte("\r\n"), []byte("\n"))
}
// even though in case of one matcher samples are not sorted, the order in the cases above should stay the same.
require.Equal(t, string(expectedMetrics), dumpedMetrics)
})
}
}

View file

@ -1143,6 +1143,9 @@ type QueueConfig struct {
MinBackoff model.Duration `yaml:"min_backoff,omitempty"`
MaxBackoff model.Duration `yaml:"max_backoff,omitempty"`
RetryOnRateLimit bool `yaml:"retry_on_http_429,omitempty"`
// Samples older than the limit will be dropped.
SampleAgeLimit model.Duration `yaml:"sample_age_limit,omitempty"`
}
// MetadataConfig is the configuration for sending metadata to remote

View file

@ -420,10 +420,20 @@ func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
} else {
ch <- target{labelSet: nil, err: err}
}
// Get out of this routine because we cannot continue without a network interface.
return
d.addToCache(nicID, networkInterface)
} else {
networkInterface, err = client.getVMScaleSetVMNetworkInterfaceByID(ctx, nicID, vm.ScaleSet, vm.InstanceID)
if err != nil {
if errors.Is(err, errorNotFound) {
level.Warn(d.logger).Log("msg", "Network interface does not exist", "name", nicID, "err", err)
} else {
ch <- target{labelSet: nil, err: err}
}
// Get out of this routine because we cannot continue without a network interface.
return
}
d.addToCache(nicID, networkInterface)
}
d.addToCache(nicID, networkInterface)
}
if networkInterface.Properties == nil {

View file

@ -324,6 +324,25 @@ Run labels query.
##### `promtool query analyze`
Run queries against your Prometheus to analyze the usage pattern of certain metrics.
###### Flags
| Flag | Description | Default |
| --- | --- | --- |
| <code class="text-nowrap">--server</code> | Prometheus server to query. | |
| <code class="text-nowrap">--type</code> | Type of metric: histogram. | |
| <code class="text-nowrap">--duration</code> | Time frame to analyze. | `1h` |
| <code class="text-nowrap">--time</code> | Query time (RFC3339 or Unix timestamp), defaults to now. | |
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | |
### `promtool debug`
Fetch debug information.
@ -548,7 +567,7 @@ Dump samples from a TSDB.
| --- | --- | --- |
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
| <code class="text-nowrap">--match</code> | Series selector. | `{__name__=~'(?s:.*)'}` |
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |

View file

@ -3619,6 +3619,10 @@ queue_config:
# Retry upon receiving a 429 status code from the remote-write storage.
# This is experimental and might change in the future.
[ retry_on_http_429: <boolean> | default = false ]
# If set, any sample that is older than sample_age_limit
# will not be sent to the remote storage. The default value is 0s,
# which means that all samples are sent.
[ sample_age_limit: <duration> | default = 0s ]
# Configures the sending of series metadata to remote storage.
# Metadata configuration is subject to change at any point

View file

@ -136,7 +136,6 @@ func (p *OpenMetricsParser) Type() ([]byte, model.MetricType) {
// Must only be called after Next returned a unit entry.
// The returned byte slices become invalid after the next call to Next.
func (p *OpenMetricsParser) Unit() ([]byte, []byte) {
// The Prometheus format does not have units.
return p.l.b[p.offsets[0]:p.offsets[1]], p.text
}

View file

@ -269,10 +269,11 @@ func (p *ProtobufParser) Type() ([]byte, model.MetricType) {
return n, model.MetricTypeUnknown
}
// Unit always returns (nil, nil) because units aren't supported by the protobuf
// format.
// Unit returns the metric unit in the current entry.
// Must only be called after Next returned a unit entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Unit() ([]byte, []byte) {
return nil, nil
return p.metricBytes.Bytes(), []byte(p.mf.GetUnit())
}
// Comment always returns nil because comments aren't supported by the protobuf
@ -422,6 +423,16 @@ func (p *ProtobufParser) Next() (Entry, error) {
default:
return EntryInvalid, fmt.Errorf("unknown metric type for metric %q: %s", name, p.mf.GetType())
}
unit := p.mf.GetUnit()
if len(unit) > 0 {
if p.mf.GetType() == dto.MetricType_COUNTER && strings.HasSuffix(name, "_total") {
if !strings.HasSuffix(name[:len(name)-6], unit) || len(name)-6 < len(unit)+1 || name[len(name)-6-len(unit)-1] != '_' {
return EntryInvalid, fmt.Errorf("unit %q not a suffix of counter %q", unit, name)
}
} else if !strings.HasSuffix(name, unit) || len(name) < len(unit)+1 || name[len(name)-len(unit)-1] != '_' {
return EntryInvalid, fmt.Errorf("unit %q not a suffix of metric %q", unit, name)
}
}
p.metricBytes.Reset()
p.metricBytes.WriteString(name)

View file

@ -58,6 +58,7 @@ metric: <
`name: "go_memstats_alloc_bytes_total"
help: "Total number of bytes allocated, even if freed."
type: COUNTER
unit: "bytes"
metric: <
counter: <
value: 1.546544e+06
@ -665,6 +666,7 @@ func TestProtobufParse(t *testing.T) {
{
m: "go_memstats_alloc_bytes_total",
help: "Total number of bytes allocated, even if freed.",
unit: "bytes",
},
{
m: "go_memstats_alloc_bytes_total",

View file

@ -24,6 +24,7 @@ import (
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -1544,6 +1545,18 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
if e.Func.Name == "rate" || e.Func.Name == "increase" {
samples := inMatrix[0]
metricName := samples.Metric.Get(labels.MetricName)
if metricName != "" && len(samples.Floats) > 0 &&
!strings.HasSuffix(metricName, "_total") &&
!strings.HasSuffix(metricName, "_sum") &&
!strings.HasSuffix(metricName, "_count") &&
!strings.HasSuffix(metricName, "_bucket") {
warnings.Add(annotations.NewPossibleNonCounterInfo(metricName, e.Args[0].PositionRange()))
}
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)

View file

@ -208,6 +208,20 @@ func ParseMetricSelector(input string) (m []*labels.Matcher, err error) {
return m, err
}
// ParseMetricSelectors parses a list of provided textual metric selectors into lists of
// label matchers.
func ParseMetricSelectors(matchers []string) (m [][]*labels.Matcher, err error) {
var matcherSets [][]*labels.Matcher
for _, s := range matchers {
matchers, err := ParseMetricSelector(s)
if err != nil {
return nil, err
}
matcherSets = append(matcherSets, matchers)
}
return matcherSets, nil
}
// SequenceValue is an omittable value in a sequence of time series values.
type SequenceValue struct {
Value float64

View file

@ -26,7 +26,7 @@ jobs:
- name: Checkout repository
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: install Go
uses: actions/setup-go@6edd4406fa81c3da01a34fa6f6343087c207a568 # v3.5.0
uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0
with:
go-version: 1.21.x
- name: Install snmp_exporter/generator dependencies

View file

@ -517,7 +517,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
}
func TestDecodeWriteRequest(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err)
actual, err := DecodeWriteRequest(bytes.NewReader(buf))

View file

@ -36,6 +36,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -51,6 +52,10 @@ const (
// Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3
reasonTooOld = "too_old"
reasonDroppedSeries = "dropped_series"
reasonUnintentionalDroppedSeries = "unintentionally_dropped_series"
)
type queueManagerMetrics struct {
@ -68,9 +73,9 @@ type queueManagerMetrics struct {
retriedExemplarsTotal prometheus.Counter
retriedHistogramsTotal prometheus.Counter
retriedMetadataTotal prometheus.Counter
droppedSamplesTotal prometheus.Counter
droppedExemplarsTotal prometheus.Counter
droppedHistogramsTotal prometheus.Counter
droppedSamplesTotal *prometheus.CounterVec
droppedExemplarsTotal *prometheus.CounterVec
droppedHistogramsTotal *prometheus.CounterVec
enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram
highestSentTimestamp *maxTimestamp
@ -180,27 +185,27 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable.",
ConstLabels: constLabels,
})
m.droppedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
m.droppedSamplesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_dropped_total",
Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.",
Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels,
})
m.droppedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{
}, []string{"reason"})
m.droppedExemplarsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "exemplars_dropped_total",
Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.",
Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels,
})
m.droppedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{
}, []string{"reason"})
m.droppedHistogramsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "histograms_dropped_total",
Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.",
Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels,
})
}, []string{"reason"})
m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -391,7 +396,8 @@ type WriteClient interface {
// indicated by the provided WriteClient. Implements writeTo interface
// used by WAL Watcher.
type QueueManager struct {
lastSendTimestamp atomic.Int64
lastSendTimestamp atomic.Int64
buildRequestLimitTimestamp atomic.Int64
logger log.Logger
flushDeadline time.Duration
@ -529,7 +535,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples.
req, _, err := buildWriteRequest(nil, metadata, pBuf, nil)
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil)
if err != nil {
return err
}
@ -575,18 +581,65 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
return nil
}
func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
limitTs := baseTime.Add(-sampleAgeLimit)
sampleTs := timestamp.Time(ts)
return sampleTs.Before(limitTs)
}
func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool {
return func(ts prompb.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
case len(ts.Samples) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
case len(ts.Histograms) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
case len(ts.Exemplars) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
default:
return false
}
return false
}
}
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool {
currentTime := time.Now()
outer:
for _, s := range samples {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), s.T) {
t.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref]
if !ok {
t.metrics.droppedSamplesTotal.Inc()
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[s.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedSamplesTotal.WithLabelValues(reasonDroppedSeries).Inc()
}
t.seriesMtx.Unlock()
continue
@ -629,17 +682,23 @@ func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool {
if !t.sendExemplars {
return true
}
currentTime := time.Now()
outer:
for _, e := range exemplars {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), e.T) {
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[e.Ref]
if !ok {
t.metrics.droppedExemplarsTotal.Inc()
// Track dropped exemplars in the same EWMA for sharding calc.
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[e.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref)
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonDroppedSeries).Inc()
}
t.seriesMtx.Unlock()
continue
@ -678,16 +737,22 @@ func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample)
if !t.sendNativeHistograms {
return true
}
currentTime := time.Now()
outer:
for _, h := range histograms {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref]
if !ok {
t.metrics.droppedHistogramsTotal.Inc()
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonDroppedSeries).Inc()
}
t.seriesMtx.Unlock()
continue
@ -725,16 +790,22 @@ func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHi
if !t.sendNativeHistograms {
return true
}
currentTime := time.Now()
outer:
for _, h := range floatHistograms {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref]
if !ok {
t.metrics.droppedHistogramsTotal.Inc()
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonDroppedSeries).Inc()
}
t.seriesMtx.Unlock()
continue
@ -1490,7 +1561,8 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, *buf, nil)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
@ -1504,6 +1576,25 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
attemptStore := func(try int) error {
currentTime := time.Now()
lowest := s.qm.buildRequestLimitTimestamp.Load()
if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) {
// This will filter out old samples during retries.
req, _, lowest, err := buildWriteRequest(
s.qm.logger,
samples,
nil,
pBuf,
*buf,
isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
return err
}
*buf = req
}
ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
defer span.End()
@ -1608,9 +1699,27 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
}
}
func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) {
var highest int64
for _, ts := range samples {
var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int
keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
if len(ts.Samples) > 0 {
droppedSamples++
}
if len(ts.Exemplars) > 0 {
droppedExemplars++
}
if len(ts.Histograms) > 0 {
droppedHistograms++
}
continue
}
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
@ -1621,10 +1730,37 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}
// Get lowest timestamp
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
// Move the current element to the write position and increment the write pointer
timeSeries[keepIdx] = timeSeries[i]
keepIdx++
}
timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
}
func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte, filter func(prompb.TimeSeries) bool) ([]byte, int64, int64, error) {
highest, lowest, timeSeries,
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
}
req := &prompb.WriteRequest{
Timeseries: samples,
Timeseries: timeSeries,
Metadata: metadata,
}
@ -1635,7 +1771,7 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
}
err := pBuf.Marshal(req)
if err != nil {
return nil, highest, err
return nil, highest, lowest, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
@ -1644,5 +1780,5 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
buf = buf[0:cap(buf)]
}
compressed := snappy.Encode(buf, pBuf.Bytes())
return compressed, highest, nil
return compressed, highest, lowest, nil
}

View file

@ -548,7 +548,7 @@ func TestShouldReshard(t *testing.T) {
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
b := labels.ScratchBuilder{}
lb := labels.ScratchBuilder{}
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
for j := 0; j < numSamples; j++ {
@ -559,15 +559,15 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([
})
}
// Create Labels that is name of series plus any extra labels supplied.
b.Reset()
b.Add(labels.MetricName, name)
lb.Reset()
lb.Add(labels.MetricName, name)
for _, l := range extraLabels {
b.Add(l.Name, l.Value)
lb.Add(l.Name, l.Value)
}
b.Sort()
lb.Sort()
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: b.Labels(),
Labels: lb.Labels(),
})
}
return samples, series
@ -1321,3 +1321,263 @@ func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) {
t.FailNow()
}
}
func TestDropOldTimeSeries(t *testing.T) {
size := 10
nSeries := 6
nSamples := config.DefaultQueueConfig.Capacity * size
samples, newSamples, series := createTimeseriesWithOldSamples(nSamples, nSeries)
c := NewTestWriteClient()
c.expectSamples(newSamples, series)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.SampleAgeLimit = model.Duration(60 * time.Second)
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
m.Append(samples)
c.waitForExpectedData(t)
}
func TestIsSampleOld(t *testing.T) {
currentTime := time.Now()
require.True(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-61*time.Second))))
require.False(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second))))
}
func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) {
newSamples := make([]record.RefSample, 0, numSamples)
samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
lb := labels.ScratchBuilder{}
for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i)
// We create half of the samples in the past.
past := timestamp.FromTime(time.Now().Add(-5 * time.Minute))
for j := 0; j < numSamples/2; j++ {
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: past + int64(j),
V: float64(i),
})
}
for j := 0; j < numSamples/2; j++ {
sample := record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(int(time.Now().UnixMilli()) + j),
V: float64(i),
}
samples = append(samples, sample)
newSamples = append(newSamples, sample)
}
// Create Labels that is name of series plus any extra labels supplied.
lb.Reset()
lb.Add(labels.MetricName, name)
for _, l := range extraLabels {
lb.Add(l.Name, l.Value)
}
lb.Sort()
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: lb.Labels(),
})
}
return samples, newSamples, series
}
func filterTsLimit(limit int64, ts prompb.TimeSeries) bool {
return limit > ts.Samples[0].Timestamp
}
func TestBuildTimeSeries(t *testing.T) {
testCases := []struct {
name string
ts []prompb.TimeSeries
filter func(ts prompb.TimeSeries) bool
lowestTs int64
highestTs int64
droppedSamples int
responseLen int
}{
{
name: "No filter applied",
ts: []prompb.TimeSeries{
{
Samples: []prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567891,
Value: 2.34,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.34,
},
},
},
},
filter: nil,
responseLen: 3,
lowestTs: 1234567890,
highestTs: 1234567892,
},
{
name: "Filter applied, samples in order",
ts: []prompb.TimeSeries{
{
Samples: []prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567891,
Value: 2.34,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.45,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567893,
Value: 3.45,
},
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
droppedSamples: 2,
},
{
name: "Filter applied, samples out of order",
ts: []prompb.TimeSeries{
{
Samples: []prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.45,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567893,
Value: 3.45,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567891,
Value: 2.34,
},
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
droppedSamples: 2,
},
{
name: "Filter applied, samples not consecutive",
ts: []prompb.TimeSeries{
{
Samples: []prompb.Sample{
{
Timestamp: 1234567890,
Value: 1.23,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567892,
Value: 3.45,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567895,
Value: 6.78,
},
},
},
{
Samples: []prompb.Sample{
{
Timestamp: 1234567897,
Value: 6.78,
},
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
responseLen: 2,
lowestTs: 1234567895,
highestTs: 1234567897,
droppedSamples: 2,
},
}
// Run the test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter)
require.NotNil(t, result)
require.Len(t, result, tc.responseLen)
require.Equal(t, tc.highestTs, highest)
require.Equal(t, tc.lowestTs, lowest)
require.Equal(t, tc.droppedSamples, droppedSamples)
})
}
}

View file

@ -38,7 +38,7 @@ import (
)
func TestRemoteWriteHandler(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -84,10 +84,10 @@ func TestRemoteWriteHandler(t *testing.T) {
}
func TestOutOfOrderSample(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
}}, nil, nil, nil)
}}, nil, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -109,10 +109,10 @@ func TestOutOfOrderSample(t *testing.T) {
// don't fail on ingestion errors since the exemplar storage is
// still experimental.
func TestOutOfOrderExemplar(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
}}, nil, nil, nil)
}}, nil, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -132,10 +132,10 @@ func TestOutOfOrderExemplar(t *testing.T) {
}
func TestOutOfOrderHistogram(t *testing.T) {
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
}}, nil, nil, nil)
}}, nil, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -158,13 +158,13 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
reqs := []*http.Request{}
for i := 0; i < b.N; i++ {
num := strings.Repeat(strconv.Itoa(i), 16)
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric"},
{Name: "test_label_name_" + num, Value: labelValue + num},
},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
}}, nil, nil, nil)
}}, nil, nil, nil, nil)
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(b, err)
@ -182,7 +182,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
}
func TestCommitErr(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -219,7 +219,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head())
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil)
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
@ -232,7 +232,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
var bufRequests [][]byte
for i := 0; i < 100; i++ {
buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil)
buf, _, _, err = buildWriteRequest(nil, genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil)
require.NoError(b, err)
bufRequests = append(bufRequests, buf)
}

View file

@ -312,7 +312,7 @@ func TestBlockSize(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true)
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
require.NoError(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
require.NoError(t, err)
@ -490,7 +490,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string {
}
func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, true)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
require.NoError(tb, err)
require.NoError(tb, os.MkdirAll(dir, 0o777))
@ -503,7 +503,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
}
func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, true)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
require.NoError(tb, err)
require.NoError(tb, os.MkdirAll(dir, 0o777))

View file

@ -101,7 +101,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
nil,
w.logger,
[]int64{w.blockSize},
chunkenc.NewPool(), nil, true)
chunkenc.NewPool(), nil)
if err != nil {
return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err)
}

View file

@ -90,9 +90,9 @@ type LeveledCompactor struct {
ctx context.Context
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder
enableOverlappingCompaction bool
concurrencyOpts LeveledCompactorConcurrencyOptions
concurrencyOpts LeveledCompactorConcurrencyOptions
}
type CompactorMetrics struct {
@ -155,12 +155,35 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics {
return m
}
// NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction)
type LeveledCompactorOptions struct {
// PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction.
// If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more.
PE index.PostingsEncoder
// MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used.
MaxBlockChunkSegmentSize int64
// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
MergeFunc storage.VerticalChunkSeriesMergeFunc
// EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled.
// It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction.
EnableOverlappingCompaction bool
}
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
MergeFunc: mergeFunc,
EnableOverlappingCompaction: true,
})
}
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MergeFunc: mergeFunc,
EnableOverlappingCompaction: true,
})
}
func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) {
if len(ranges) == 0 {
return nil, fmt.Errorf("at least one range must be provided")
}
@ -170,9 +193,18 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
if l == nil {
l = log.NewNopLogger()
}
mergeFunc := opts.MergeFunc
if mergeFunc == nil {
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
}
maxBlockChunkSegmentSize := opts.MaxBlockChunkSegmentSize
if maxBlockChunkSegmentSize == 0 {
maxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize
}
pe := opts.PE
if pe == nil {
pe = index.EncodePostingsRaw
}
return &LeveledCompactor{
ranges: ranges,
chunkPool: pool,
@ -181,8 +213,9 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
postingsEncoder: pe,
enableOverlappingCompaction: opts.EnableOverlappingCompaction,
concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(),
enableOverlappingCompaction: enableOverlappingCompaction,
}, nil
}
@ -810,7 +843,7 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blockPop
outBlocks[ix].chunkw = chunkw
var indexw IndexWriter
indexw, err = index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
indexw, err = index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder)
if err != nil {
return fmt.Errorf("open index writer: %w", err)
}

View file

@ -167,7 +167,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
},
}
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil, true)
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil)
require.NoError(t, err)
c.plan(metas)
@ -181,7 +181,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
180,
540,
1620,
}, nil, nil, true)
}, nil, nil)
require.NoError(t, err)
cases := map[string]struct {
@ -390,7 +390,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
240,
720,
2160,
}, nil, nil, true)
}, nil, nil)
require.NoError(t, err)
cases := []struct {
@ -440,7 +440,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
240,
720,
2160,
}, nil, chunks.DefaultChunkSegmentSize, nil, true)
}, nil, chunks.DefaultChunkSegmentSize, nil)
require.NoError(t, err)
tmpdir := t.TempDir()
@ -535,7 +535,7 @@ func TestCompaction_CompactWithSplitting(t *testing.T) {
for _, shardCount := range shardCounts {
t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) {
c, err := NewLeveledCompactorWithChunkSize(ctx, nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true)
c, err := NewLeveledCompactorWithChunkSize(ctx, nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil)
require.NoError(t, err)
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount)
@ -669,7 +669,7 @@ func TestCompaction_CompactEmptyBlocks(t *testing.T) {
blockDirs = append(blockDirs, bdir)
}
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true)
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil)
require.NoError(t, err)
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5)
@ -1170,7 +1170,7 @@ func TestCompaction_populateBlock(t *testing.T) {
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
}
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, nil, []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil, true)
c, err := NewLeveledCompactorWithChunkSize(context.Background(), nil, nil, []int64{0}, nil, chunks.DefaultChunkSegmentSize, nil)
require.NoError(t, err)
meta := &BlockMeta{
@ -1305,7 +1305,7 @@ func BenchmarkCompaction(b *testing.B) {
blockDirs = append(blockDirs, block.Dir())
}
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true)
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
require.NoError(b, err)
b.ResetTimer()
@ -1800,7 +1800,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
// Compaction.
mint := head.MinTime()
maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, true)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err)
id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil)
require.NoError(t, err)
@ -1942,7 +1942,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
// Sparse head compaction.
mint := sparseHead.MinTime()
maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, true)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err)
sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil)
require.NoError(t, err)
@ -1993,7 +1993,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
// Old head compaction.
mint := oldHead.MinTime()
maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, true)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err)
oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil)
require.NoError(t, err)
@ -2167,13 +2167,15 @@ func TestCompactBlockMetas(t *testing.T) {
func TestLeveledCompactor_plan_overlapping_disabled(t *testing.T) {
// This mimics our default ExponentialBlockRanges with min block size equals to 20.
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{
compactor, err := NewLeveledCompactorWithOptions(context.Background(), nil, nil, []int64{
20,
60,
180,
540,
1620,
}, nil, nil, false)
}, nil, LeveledCompactorOptions{
EnableOverlappingCompaction: false,
})
require.NoError(t, err)
cases := map[string]struct {

View file

@ -78,7 +78,6 @@ func DefaultOptions() *Options {
MinBlockDuration: DefaultBlockDuration,
MaxBlockDuration: DefaultBlockDuration,
NoLockfile: false,
AllowOverlappingCompaction: true,
SamplesPerChunk: DefaultSamplesPerChunk,
WALCompression: wlog.CompressionNone,
StripeSize: DefaultStripeSize,
@ -87,6 +86,7 @@ func DefaultOptions() *Options {
HeadChunksEndTimeVariance: 0,
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
EnableOverlappingCompaction: true,
HeadPostingsForMatchersCacheTTL: DefaultPostingsForMatchersCacheTTL,
HeadPostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems,
HeadPostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes,
@ -126,14 +126,6 @@ type Options struct {
// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool
// Compaction of overlapping blocks are allowed if AllowOverlappingCompaction is true.
// This is an optional flag for overlapping blocks.
// The reason why this flag exists is because there are various users of the TSDB
// that do not want vertical compaction happening on ingest time. Instead,
// they'd rather keep overlapping blocks and let another component do the overlapping compaction later.
// For Prometheus, this will always be true.
AllowOverlappingCompaction bool
// WALCompression configures the compression type to use on records in the WAL.
WALCompression wlog.CompressionType
@ -206,6 +198,14 @@ type Options struct {
// If it is <=0, the default value is assumed.
OutOfOrderCapMax int64
// Compaction of overlapping blocks are allowed if EnableOverlappingCompaction is true.
// This is an optional flag for overlapping blocks.
// The reason why this flag exists is because there are various users of the TSDB
// that do not want vertical compaction happening on ingest time. Instead,
// they'd rather keep overlapping blocks and let another component do the overlapping compaction later.
// For Prometheus, this will always be true.
EnableOverlappingCompaction bool
// HeadPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache in the Head.
// If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished.
HeadPostingsForMatchersCacheTTL time.Duration
@ -505,9 +505,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
nil,
db.logger,
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
chunkenc.NewPool(),
nil,
false,
chunkenc.NewPool(), nil,
)
if err != nil {
return fmt.Errorf("create leveled compactor: %w", err)
@ -884,7 +882,10 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
}
ctx, cancel := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction)
db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
})
if err != nil {
cancel()
return nil, fmt.Errorf("create leveled compactor: %w", err)

View file

@ -111,6 +111,8 @@ type symbolCacheEntry struct {
lastValue string
}
type PostingsEncoder func(*encoding.Encbuf, []uint32) error
// Writer implements the IndexWriter interface for the standard
// serialization format.
type Writer struct {
@ -149,6 +151,8 @@ type Writer struct {
crc32 hash.Hash
Version int
postingsEncoder PostingsEncoder
}
// TOC represents index Table Of Content that states where each section of index starts.
@ -187,7 +191,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
// It uses the given encoder to encode each postings list.
func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) {
dir := filepath.Dir(fn)
df, err := fileutil.OpenDir(dir)
@ -230,9 +235,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
symbolCache: make(map[string]symbolCacheEntry, 1<<8),
labelNames: make(map[string]uint64, 1<<8),
crc32: newCRC32(),
symbolCache: make(map[string]symbolCacheEntry, 1<<8),
labelNames: make(map[string]uint64, 1<<8),
crc32: newCRC32(),
postingsEncoder: encoder,
}
if err := iw.writeMeta(); err != nil {
return nil, err
@ -240,6 +246,12 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
return iw, nil
}
// NewWriter creates a new index writer using the default encoder. See
// NewWriterWithEncoder.
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw)
}
func (w *Writer) write(bufs ...[]byte) error {
return w.f.Write(bufs...)
}
@ -942,6 +954,20 @@ func (w *Writer) writePostingsToTmpFiles() error {
return nil
}
// EncodePostingsRaw uses the "basic" postings list encoding format with no compression:
// <BE uint32 len X><BE uint32 0><BE uint32 1>...<BE uint32 X-1>.
func EncodePostingsRaw(e *encoding.Encbuf, offs []uint32) error {
e.PutBE32int(len(offs))
for _, off := range offs {
if off > (1<<32)-1 {
return fmt.Errorf("series offset %d exceeds 4 bytes", off)
}
e.PutBE32(off)
}
return nil
}
func (w *Writer) writePosting(name, value string, offs []uint32) error {
// Align beginning to 4 bytes for more efficient postings list scans.
if err := w.fP.AddPadding(4); err != nil {
@ -960,13 +986,8 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.cntPO++
w.buf1.Reset()
w.buf1.PutBE32int(len(offs))
for _, off := range offs {
if off > (1<<32)-1 {
return fmt.Errorf("series offset %d exceeds 4 bytes", off)
}
w.buf1.PutBE32(off)
if err := w.postingsEncoder(&w.buf1, offs); err != nil {
return err
}
w.buf2.Reset()

View file

@ -1848,13 +1848,9 @@ func parseDuration(s string) (time.Duration, error) {
}
func parseMatchersParam(matchers []string) ([][]*labels.Matcher, error) {
var matcherSets [][]*labels.Matcher
for _, s := range matchers {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, err
}
matcherSets = append(matcherSets, matchers)
matcherSets, err := parser.ParseMetricSelectors(matchers)
if err != nil {
return nil, err
}
OUTER:

View file

@ -65,14 +65,10 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
return
}
var matcherSets [][]*labels.Matcher
for _, s := range req.Form["match[]"] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
matcherSets = append(matcherSets, matchers)
matcherSets, err := parser.ParseMetricSelectors(req.Form["match[]"])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var (

View file

@ -1,6 +1,6 @@
{
"name": "@prometheus-io/codemirror-promql",
"version": "0.48.0",
"version": "0.49.0",
"description": "a CodeMirror mode for the PromQL language",
"types": "dist/esm/index.d.ts",
"module": "dist/esm/index.js",
@ -29,7 +29,7 @@
},
"homepage": "https://github.com/prometheus/prometheus/blob/main/web/ui/module/codemirror-promql/README.md",
"dependencies": {
"@prometheus-io/lezer-promql": "0.48.0",
"@prometheus-io/lezer-promql": "0.49.0",
"lru-cache": "^7.18.3"
},
"devDependencies": {

View file

@ -1,6 +1,6 @@
{
"name": "@prometheus-io/lezer-promql",
"version": "0.48.0",
"version": "0.49.0",
"description": "lezer-based PromQL grammar",
"main": "dist/index.cjs",
"type": "module",

20933
web/ui/package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -28,5 +28,5 @@
"ts-jest": "^29.1.1",
"typescript": "^4.9.5"
},
"version": "0.48.0"
"version": "0.49.0"
}

View file

@ -1,6 +1,6 @@
{
"name": "@prometheus-io/app",
"version": "0.48.0",
"version": "0.49.0",
"private": true,
"dependencies": {
"@codemirror/autocomplete": "^6.11.1",
@ -19,7 +19,7 @@
"@lezer/lr": "^1.3.14",
"@nexucis/fuzzy": "^0.4.1",
"@nexucis/kvsearch": "^0.8.1",
"@prometheus-io/codemirror-promql": "0.48.0",
"@prometheus-io/codemirror-promql": "0.49.0",
"bootstrap": "^4.6.2",
"css.escape": "^1.5.1",
"downshift": "^7.6.2",

View file

@ -338,7 +338,7 @@ describe('DataTable', () => {
const dataTableProps: DataTableProps = {
data: {
resultType: 'string',
result: 'string',
result: [1572098246.599, 'test'],
},
useLocalTime: false,
};
@ -346,7 +346,7 @@ describe('DataTable', () => {
it('renders a string row', () => {
const table = dataTable.find(Table);
const rows = table.find('tr');
expect(rows.text()).toEqual('stringt');
expect(rows.text()).toEqual('stringtest');
});
});
});

View file

@ -24,7 +24,7 @@ export interface DataTableProps {
}
| {
resultType: 'string';
result: string;
result: SampleValue;
};
useLocalTime: boolean;
}

View file

@ -0,0 +1,66 @@
import { DataTableProps } from './DataTable';
import { isHeatmapData } from './GraphHeatmapHelpers';
describe('GraphHeatmapHelpers', () => {
it('isHeatmapData should return false for scalar and string resultType', () => {
let data = {
resultType: 'scalar',
result: [1703091180.125, '1703091180.125'],
} as DataTableProps['data'];
expect(isHeatmapData(data)).toBe(false);
data = {
resultType: 'string',
result: [1704305680.332, '2504'],
} as DataTableProps['data'];
expect(isHeatmapData(data)).toBe(false);
});
it('isHeatmapData should return false for a vector and matrix if length < 2', () => {
let data = {
resultType: 'vector',
result: [
{
metric: {
__name__: 'my_gauge',
job: 'target',
},
value: [1703091180.683, '6'],
},
],
} as DataTableProps['data'];
expect(isHeatmapData(data)).toBe(false);
data = {
resultType: 'matrix',
result: [
{
metric: {},
values: [[1703091180.683, '6']],
},
],
} as DataTableProps['data'];
expect(isHeatmapData(data)).toBe(false);
});
it('isHeatmapData should return true for valid heatmap data', () => {
const data = {
resultType: 'matrix',
result: [
{
metric: {
le: '100',
},
values: [[1703091180.683, '6']],
},
{
metric: {
le: '1000',
},
values: [[1703091190.683, '6.1']],
},
],
} as DataTableProps['data'];
expect(isHeatmapData(data)).toBe(true);
});
});

View file

@ -1,10 +1,12 @@
import { DataTableProps } from './DataTable';
import { GraphProps, GraphSeries } from './Graph';
export function isHeatmapData(data: GraphProps['data']) {
if (!data?.result?.length || data?.result?.length < 2) {
export function isHeatmapData(data: DataTableProps['data']) {
if (data?.resultType === 'scalar' || data?.resultType === 'string' || !data?.result?.length || data?.result?.length < 2) {
return false;
}
const result = data.result;
// Type assertion to prevent TS2349 error.
const result = data.result as GraphProps['data']['result'];
const firstLabels = Object.keys(result[0].metric).filter((label) => label !== 'le');
return result.every(({ metric }) => {
const labels = Object.keys(metric).filter((label) => label !== 'le');