mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 05:47:27 -08:00
Add analyze histograms command to promtool (#12331)
Add `query analyze` command to promtool This command analyzes the buckets of classic and native histograms, based on data queried from the Prometheus query API, i.e. it doesn't require direct access to the TSDB files. Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com> --------- Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
This commit is contained in:
parent
3db4596965
commit
6150e1ca0e
370
cmd/promtool/analyze.go
Normal file
370
cmd/promtool/analyze.go
Normal file
|
@ -0,0 +1,370 @@
|
|||
// Copyright 2023 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
)
|
||||
|
||||
var (
|
||||
errNotNativeHistogram = fmt.Errorf("not a native histogram")
|
||||
errNotEnoughData = fmt.Errorf("not enough data")
|
||||
|
||||
outputHeader = `Bucket stats for each histogram series over time
|
||||
------------------------------------------------
|
||||
First the min, avg, and max number of populated buckets, followed by the total
|
||||
number of buckets (only if different from the max number of populated buckets
|
||||
which is typical for classic but not native histograms).`
|
||||
outputFooter = `Aggregated bucket stats
|
||||
-----------------------
|
||||
Each line shows min/avg/max over the series above.`
|
||||
)
|
||||
|
||||
type QueryAnalyzeConfig struct {
|
||||
metricType string
|
||||
duration time.Duration
|
||||
time string
|
||||
matchers []string
|
||||
}
|
||||
|
||||
// run retrieves metrics that look like conventional histograms (i.e. have _bucket
|
||||
// suffixes) or native histograms, depending on metricType flag.
|
||||
func (c *QueryAnalyzeConfig) run(url *url.URL, roundtripper http.RoundTripper) error {
|
||||
if c.metricType != "histogram" {
|
||||
return fmt.Errorf("analyze type is %s, must be 'histogram'", c.metricType)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
api, err := newAPI(url, roundtripper, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var endTime time.Time
|
||||
if c.time != "" {
|
||||
endTime, err = parseTime(c.time)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing time '%s': %w", c.time, err)
|
||||
}
|
||||
} else {
|
||||
endTime = time.Now()
|
||||
}
|
||||
|
||||
return c.getStatsFromMetrics(ctx, api, endTime, os.Stdout, c.matchers)
|
||||
}
|
||||
|
||||
func (c *QueryAnalyzeConfig) getStatsFromMetrics(ctx context.Context, api v1.API, endTime time.Time, out io.Writer, matchers []string) error {
|
||||
fmt.Fprintf(out, "%s\n\n", outputHeader)
|
||||
metastatsNative := newMetaStatistics()
|
||||
metastatsClassic := newMetaStatistics()
|
||||
for _, matcher := range matchers {
|
||||
seriesSel := seriesSelector(matcher, c.duration)
|
||||
matrix, err := querySamples(ctx, api, seriesSel, endTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
matrices := make(map[string]model.Matrix)
|
||||
for _, series := range matrix {
|
||||
// We do not handle mixed types. If there are float values, we assume it is a
|
||||
// classic histogram, otherwise we assume it is a native histogram, and we
|
||||
// ignore series with errors if they do not match the expected type.
|
||||
if len(series.Values) == 0 {
|
||||
stats, err := calcNativeBucketStatistics(series)
|
||||
if err != nil {
|
||||
if errors.Is(err, errNotNativeHistogram) || errors.Is(err, errNotEnoughData) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
fmt.Fprintf(out, "- %s (native): %v\n", series.Metric, *stats)
|
||||
metastatsNative.update(stats)
|
||||
} else {
|
||||
lbs := model.LabelSet(series.Metric).Clone()
|
||||
if _, ok := lbs["le"]; !ok {
|
||||
continue
|
||||
}
|
||||
metricName := string(lbs[labels.MetricName])
|
||||
if !strings.HasSuffix(metricName, "_bucket") {
|
||||
continue
|
||||
}
|
||||
delete(lbs, labels.MetricName)
|
||||
delete(lbs, "le")
|
||||
key := formatSeriesName(metricName, lbs)
|
||||
matrices[key] = append(matrices[key], series)
|
||||
}
|
||||
}
|
||||
|
||||
for key, matrix := range matrices {
|
||||
stats, err := calcClassicBucketStatistics(matrix)
|
||||
if err != nil {
|
||||
if errors.Is(err, errNotEnoughData) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
fmt.Fprintf(out, "- %s (classic): %v\n", key, *stats)
|
||||
metastatsClassic.update(stats)
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(out, "\n%s\n", outputFooter)
|
||||
if metastatsNative.Count() > 0 {
|
||||
fmt.Fprintf(out, "\nNative %s\n", metastatsNative)
|
||||
}
|
||||
if metastatsClassic.Count() > 0 {
|
||||
fmt.Fprintf(out, "\nClassic %s\n", metastatsClassic)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func seriesSelector(metricName string, duration time.Duration) string {
|
||||
builder := strings.Builder{}
|
||||
builder.WriteString(metricName)
|
||||
builder.WriteRune('[')
|
||||
builder.WriteString(duration.String())
|
||||
builder.WriteRune(']')
|
||||
return builder.String()
|
||||
}
|
||||
|
||||
func formatSeriesName(metricName string, lbs model.LabelSet) string {
|
||||
builder := strings.Builder{}
|
||||
builder.WriteString(metricName)
|
||||
builder.WriteString(lbs.String())
|
||||
return builder.String()
|
||||
}
|
||||
|
||||
func querySamples(ctx context.Context, api v1.API, query string, end time.Time) (model.Matrix, error) {
|
||||
values, _, err := api.Query(ctx, query, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
matrix, ok := values.(model.Matrix)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("query of buckets resulted in non-Matrix")
|
||||
}
|
||||
|
||||
return matrix, nil
|
||||
}
|
||||
|
||||
// minPop/avgPop/maxPop is for the number of populated (non-zero) buckets.
|
||||
// total is the total number of buckets across all samples in the series,
|
||||
// populated or not.
|
||||
type statistics struct {
|
||||
minPop, maxPop, total int
|
||||
avgPop float64
|
||||
}
|
||||
|
||||
func (s statistics) String() string {
|
||||
if s.maxPop == s.total {
|
||||
return fmt.Sprintf("%d/%.3f/%d", s.minPop, s.avgPop, s.maxPop)
|
||||
}
|
||||
return fmt.Sprintf("%d/%.3f/%d/%d", s.minPop, s.avgPop, s.maxPop, s.total)
|
||||
}
|
||||
|
||||
func calcClassicBucketStatistics(matrix model.Matrix) (*statistics, error) {
|
||||
numBuckets := len(matrix)
|
||||
|
||||
stats := &statistics{
|
||||
minPop: math.MaxInt,
|
||||
total: numBuckets,
|
||||
}
|
||||
|
||||
if numBuckets == 0 || len(matrix[0].Values) < 2 {
|
||||
return stats, errNotEnoughData
|
||||
}
|
||||
|
||||
numSamples := len(matrix[0].Values)
|
||||
|
||||
sortMatrix(matrix)
|
||||
|
||||
totalPop := 0
|
||||
for timeIdx := 0; timeIdx < numSamples; timeIdx++ {
|
||||
curr, err := getBucketCountsAtTime(matrix, numBuckets, timeIdx)
|
||||
if err != nil {
|
||||
return stats, err
|
||||
}
|
||||
countPop := 0
|
||||
for _, b := range curr {
|
||||
if b != 0 {
|
||||
countPop++
|
||||
}
|
||||
}
|
||||
|
||||
totalPop += countPop
|
||||
if stats.minPop > countPop {
|
||||
stats.minPop = countPop
|
||||
}
|
||||
if stats.maxPop < countPop {
|
||||
stats.maxPop = countPop
|
||||
}
|
||||
}
|
||||
stats.avgPop = float64(totalPop) / float64(numSamples)
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func sortMatrix(matrix model.Matrix) {
|
||||
sort.SliceStable(matrix, func(i, j int) bool {
|
||||
return getLe(matrix[i]) < getLe(matrix[j])
|
||||
})
|
||||
}
|
||||
|
||||
func getLe(series *model.SampleStream) float64 {
|
||||
lbs := model.LabelSet(series.Metric)
|
||||
le, _ := strconv.ParseFloat(string(lbs["le"]), 64)
|
||||
return le
|
||||
}
|
||||
|
||||
func getBucketCountsAtTime(matrix model.Matrix, numBuckets, timeIdx int) ([]int, error) {
|
||||
counts := make([]int, numBuckets)
|
||||
if timeIdx >= len(matrix[0].Values) {
|
||||
// Just return zeroes instead of erroring out so we can get partial results.
|
||||
return counts, nil
|
||||
}
|
||||
counts[0] = int(matrix[0].Values[timeIdx].Value)
|
||||
for i, bucket := range matrix[1:] {
|
||||
if timeIdx >= len(bucket.Values) {
|
||||
// Just return zeroes instead of erroring out so we can get partial results.
|
||||
return counts, nil
|
||||
}
|
||||
curr := bucket.Values[timeIdx]
|
||||
prev := matrix[i].Values[timeIdx]
|
||||
// Assume the results are nicely aligned.
|
||||
if curr.Timestamp != prev.Timestamp {
|
||||
return counts, fmt.Errorf("matrix result is not time aligned")
|
||||
}
|
||||
counts[i+1] = int(curr.Value - prev.Value)
|
||||
}
|
||||
return counts, nil
|
||||
}
|
||||
|
||||
type bucketBounds struct {
|
||||
boundaries int32
|
||||
upper, lower float64
|
||||
}
|
||||
|
||||
func makeBucketBounds(b *model.HistogramBucket) bucketBounds {
|
||||
return bucketBounds{
|
||||
boundaries: b.Boundaries,
|
||||
upper: float64(b.Upper),
|
||||
lower: float64(b.Lower),
|
||||
}
|
||||
}
|
||||
|
||||
func calcNativeBucketStatistics(series *model.SampleStream) (*statistics, error) {
|
||||
stats := &statistics{
|
||||
minPop: math.MaxInt,
|
||||
}
|
||||
|
||||
overall := make(map[bucketBounds]struct{})
|
||||
totalPop := 0
|
||||
if len(series.Histograms) == 0 {
|
||||
return nil, errNotNativeHistogram
|
||||
}
|
||||
if len(series.Histograms) == 1 {
|
||||
return nil, errNotEnoughData
|
||||
}
|
||||
for _, histogram := range series.Histograms {
|
||||
for _, bucket := range histogram.Histogram.Buckets {
|
||||
bb := makeBucketBounds(bucket)
|
||||
overall[bb] = struct{}{}
|
||||
}
|
||||
countPop := len(histogram.Histogram.Buckets)
|
||||
|
||||
totalPop += countPop
|
||||
if stats.minPop > countPop {
|
||||
stats.minPop = countPop
|
||||
}
|
||||
if stats.maxPop < countPop {
|
||||
stats.maxPop = countPop
|
||||
}
|
||||
}
|
||||
stats.avgPop = float64(totalPop) / float64(len(series.Histograms))
|
||||
stats.total = len(overall)
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
type distribution struct {
|
||||
min, max, count int
|
||||
avg float64
|
||||
}
|
||||
|
||||
func newDistribution() distribution {
|
||||
return distribution{
|
||||
min: math.MaxInt,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *distribution) update(num int) {
|
||||
if d.min > num {
|
||||
d.min = num
|
||||
}
|
||||
if d.max < num {
|
||||
d.max = num
|
||||
}
|
||||
d.count++
|
||||
d.avg += float64(num)/float64(d.count) - d.avg/float64(d.count)
|
||||
}
|
||||
|
||||
func (d distribution) String() string {
|
||||
return fmt.Sprintf("%d/%.3f/%d", d.min, d.avg, d.max)
|
||||
}
|
||||
|
||||
type metaStatistics struct {
|
||||
minPop, avgPop, maxPop, total distribution
|
||||
}
|
||||
|
||||
func newMetaStatistics() *metaStatistics {
|
||||
return &metaStatistics{
|
||||
minPop: newDistribution(),
|
||||
avgPop: newDistribution(),
|
||||
maxPop: newDistribution(),
|
||||
total: newDistribution(),
|
||||
}
|
||||
}
|
||||
|
||||
func (ms metaStatistics) Count() int {
|
||||
return ms.minPop.count
|
||||
}
|
||||
|
||||
func (ms metaStatistics) String() string {
|
||||
if ms.maxPop == ms.total {
|
||||
return fmt.Sprintf("histogram series (%d in total):\n- min populated: %v\n- avg populated: %v\n- max populated: %v", ms.Count(), ms.minPop, ms.avgPop, ms.maxPop)
|
||||
}
|
||||
return fmt.Sprintf("histogram series (%d in total):\n- min populated: %v\n- avg populated: %v\n- max populated: %v\n- total: %v", ms.Count(), ms.minPop, ms.avgPop, ms.maxPop, ms.total)
|
||||
}
|
||||
|
||||
func (ms *metaStatistics) update(s *statistics) {
|
||||
ms.minPop.update(s.minPop)
|
||||
ms.avgPop.update(int(s.avgPop))
|
||||
ms.maxPop.update(s.maxPop)
|
||||
ms.total.update(s.total)
|
||||
}
|
170
cmd/promtool/analyze_test.go
Normal file
170
cmd/promtool/analyze_test.go
Normal file
|
@ -0,0 +1,170 @@
|
|||
// Copyright 2023 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
var (
|
||||
exampleMatrix = model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"le": "+Inf",
|
||||
},
|
||||
Values: []model.SamplePair{
|
||||
{
|
||||
Value: 31,
|
||||
Timestamp: 100,
|
||||
},
|
||||
{
|
||||
Value: 32,
|
||||
Timestamp: 200,
|
||||
},
|
||||
{
|
||||
Value: 40,
|
||||
Timestamp: 300,
|
||||
},
|
||||
},
|
||||
},
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"le": "0.5",
|
||||
},
|
||||
Values: []model.SamplePair{
|
||||
{
|
||||
Value: 10,
|
||||
Timestamp: 100,
|
||||
},
|
||||
{
|
||||
Value: 11,
|
||||
Timestamp: 200,
|
||||
},
|
||||
{
|
||||
Value: 11,
|
||||
Timestamp: 300,
|
||||
},
|
||||
},
|
||||
},
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"le": "10",
|
||||
},
|
||||
Values: []model.SamplePair{
|
||||
{
|
||||
Value: 30,
|
||||
Timestamp: 100,
|
||||
},
|
||||
{
|
||||
Value: 31,
|
||||
Timestamp: 200,
|
||||
},
|
||||
{
|
||||
Value: 37,
|
||||
Timestamp: 300,
|
||||
},
|
||||
},
|
||||
},
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"le": "2",
|
||||
},
|
||||
Values: []model.SamplePair{
|
||||
{
|
||||
Value: 25,
|
||||
Timestamp: 100,
|
||||
},
|
||||
{
|
||||
Value: 26,
|
||||
Timestamp: 200,
|
||||
},
|
||||
{
|
||||
Value: 27,
|
||||
Timestamp: 300,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
exampleMatrixLength = len(exampleMatrix)
|
||||
)
|
||||
|
||||
func init() {
|
||||
sortMatrix(exampleMatrix)
|
||||
}
|
||||
|
||||
func TestGetBucketCountsAtTime(t *testing.T) {
|
||||
cases := []struct {
|
||||
matrix model.Matrix
|
||||
length int
|
||||
timeIdx int
|
||||
expected []int
|
||||
}{
|
||||
{
|
||||
exampleMatrix,
|
||||
exampleMatrixLength,
|
||||
0,
|
||||
[]int{10, 15, 5, 1},
|
||||
},
|
||||
{
|
||||
exampleMatrix,
|
||||
exampleMatrixLength,
|
||||
1,
|
||||
[]int{11, 15, 5, 1},
|
||||
},
|
||||
{
|
||||
exampleMatrix,
|
||||
exampleMatrixLength,
|
||||
2,
|
||||
[]int{11, 16, 10, 3},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(fmt.Sprintf("exampleMatrix@%d", c.timeIdx), func(t *testing.T) {
|
||||
res, err := getBucketCountsAtTime(c.matrix, c.length, c.timeIdx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expected, res)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCalcClassicBucketStatistics(t *testing.T) {
|
||||
cases := []struct {
|
||||
matrix model.Matrix
|
||||
expected *statistics
|
||||
}{
|
||||
{
|
||||
exampleMatrix,
|
||||
&statistics{
|
||||
minPop: 4,
|
||||
avgPop: 4,
|
||||
maxPop: 4,
|
||||
total: 4,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, c := range cases {
|
||||
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
|
||||
res, err := calcClassicBucketStatistics(c.matrix)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expected, res)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -35,9 +35,7 @@ import (
|
|||
"github.com/go-kit/log"
|
||||
"github.com/google/pprof/profile"
|
||||
"github.com/prometheus/client_golang/api"
|
||||
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil/promlint"
|
||||
config_util "github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
|
@ -185,6 +183,14 @@ func main() {
|
|||
queryLabelsEnd := queryLabelsCmd.Flag("end", "End time (RFC3339 or Unix timestamp).").String()
|
||||
queryLabelsMatch := queryLabelsCmd.Flag("match", "Series selector. Can be specified multiple times.").Strings()
|
||||
|
||||
queryAnalyzeCfg := &QueryAnalyzeConfig{}
|
||||
queryAnalyzeCmd := queryCmd.Command("analyze", "Run queries against your Prometheus to analyze the usage pattern of certain metrics.")
|
||||
queryAnalyzeCmd.Flag("server", "Prometheus server to query.").Required().URLVar(&serverURL)
|
||||
queryAnalyzeCmd.Flag("type", "Type of metric: histogram.").Required().StringVar(&queryAnalyzeCfg.metricType)
|
||||
queryAnalyzeCmd.Flag("duration", "Time frame to analyze.").Default("1h").DurationVar(&queryAnalyzeCfg.duration)
|
||||
queryAnalyzeCmd.Flag("time", "Query time (RFC3339 or Unix timestamp), defaults to now.").StringVar(&queryAnalyzeCfg.time)
|
||||
queryAnalyzeCmd.Flag("match", "Series selector. Can be specified multiple times.").Required().StringsVar(&queryAnalyzeCfg.matchers)
|
||||
|
||||
pushCmd := app.Command("push", "Push to a Prometheus server.")
|
||||
pushCmd.Flag("http.config.file", "HTTP client configuration file for promtool to connect to Prometheus.").PlaceHolder("<filename>").ExistingFileVar(&httpConfigFilePath)
|
||||
pushMetricsCmd := pushCmd.Command("metrics", "Push metrics to a prometheus remote write (for testing purpose only).")
|
||||
|
@ -390,6 +396,9 @@ func main() {
|
|||
case importRulesCmd.FullCommand():
|
||||
os.Exit(checkErr(importRules(serverURL, httpRoundTripper, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *maxBlockDuration, *importRulesFiles...)))
|
||||
|
||||
case queryAnalyzeCmd.FullCommand():
|
||||
os.Exit(checkErr(queryAnalyzeCfg.run(serverURL, httpRoundTripper)))
|
||||
|
||||
case documentationCmd.FullCommand():
|
||||
os.Exit(checkErr(documentcli.GenerateMarkdown(app.Model(), os.Stdout)))
|
||||
|
||||
|
@ -997,246 +1006,6 @@ func checkMetricsExtended(r io.Reader) ([]metricStat, int, error) {
|
|||
return stats, total, nil
|
||||
}
|
||||
|
||||
// QueryInstant performs an instant query against a Prometheus server.
|
||||
func QueryInstant(url *url.URL, roundTripper http.RoundTripper, query, evalTime string, p printer) int {
|
||||
if url.Scheme == "" {
|
||||
url.Scheme = "http"
|
||||
}
|
||||
config := api.Config{
|
||||
Address: url.String(),
|
||||
RoundTripper: roundTripper,
|
||||
}
|
||||
|
||||
// Create new client.
|
||||
c, err := api.NewClient(config)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error creating API client:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
eTime := time.Now()
|
||||
if evalTime != "" {
|
||||
eTime, err = parseTime(evalTime)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error parsing evaluation time:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
}
|
||||
|
||||
// Run query against client.
|
||||
api := v1.NewAPI(c)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
val, _, err := api.Query(ctx, query, eTime) // Ignoring warnings for now.
|
||||
cancel()
|
||||
if err != nil {
|
||||
return handleAPIError(err)
|
||||
}
|
||||
|
||||
p.printValue(val)
|
||||
|
||||
return successExitCode
|
||||
}
|
||||
|
||||
// QueryRange performs a range query against a Prometheus server.
|
||||
func QueryRange(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, query, start, end string, step time.Duration, p printer) int {
|
||||
if url.Scheme == "" {
|
||||
url.Scheme = "http"
|
||||
}
|
||||
config := api.Config{
|
||||
Address: url.String(),
|
||||
RoundTripper: roundTripper,
|
||||
}
|
||||
|
||||
if len(headers) > 0 {
|
||||
config.RoundTripper = promhttp.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
|
||||
for key, value := range headers {
|
||||
req.Header.Add(key, value)
|
||||
}
|
||||
return roundTripper.RoundTrip(req)
|
||||
})
|
||||
}
|
||||
|
||||
// Create new client.
|
||||
c, err := api.NewClient(config)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error creating API client:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
var stime, etime time.Time
|
||||
|
||||
if end == "" {
|
||||
etime = time.Now()
|
||||
} else {
|
||||
etime, err = parseTime(end)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error parsing end time:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
}
|
||||
|
||||
if start == "" {
|
||||
stime = etime.Add(-5 * time.Minute)
|
||||
} else {
|
||||
stime, err = parseTime(start)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error parsing start time:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
}
|
||||
|
||||
if !stime.Before(etime) {
|
||||
fmt.Fprintln(os.Stderr, "start time is not before end time")
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
if step == 0 {
|
||||
resolution := math.Max(math.Floor(etime.Sub(stime).Seconds()/250), 1)
|
||||
// Convert seconds to nanoseconds such that time.Duration parses correctly.
|
||||
step = time.Duration(resolution) * time.Second
|
||||
}
|
||||
|
||||
// Run query against client.
|
||||
api := v1.NewAPI(c)
|
||||
r := v1.Range{Start: stime, End: etime, Step: step}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
val, _, err := api.QueryRange(ctx, query, r) // Ignoring warnings for now.
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
return handleAPIError(err)
|
||||
}
|
||||
|
||||
p.printValue(val)
|
||||
return successExitCode
|
||||
}
|
||||
|
||||
// QuerySeries queries for a series against a Prometheus server.
|
||||
func QuerySeries(url *url.URL, roundTripper http.RoundTripper, matchers []string, start, end string, p printer) int {
|
||||
if url.Scheme == "" {
|
||||
url.Scheme = "http"
|
||||
}
|
||||
config := api.Config{
|
||||
Address: url.String(),
|
||||
RoundTripper: roundTripper,
|
||||
}
|
||||
|
||||
// Create new client.
|
||||
c, err := api.NewClient(config)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error creating API client:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
stime, etime, err := parseStartTimeAndEndTime(start, end)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
// Run query against client.
|
||||
api := v1.NewAPI(c)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
val, _, err := api.Series(ctx, matchers, stime, etime) // Ignoring warnings for now.
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
return handleAPIError(err)
|
||||
}
|
||||
|
||||
p.printSeries(val)
|
||||
return successExitCode
|
||||
}
|
||||
|
||||
// QueryLabels queries for label values against a Prometheus server.
|
||||
func QueryLabels(url *url.URL, roundTripper http.RoundTripper, matchers []string, name, start, end string, p printer) int {
|
||||
if url.Scheme == "" {
|
||||
url.Scheme = "http"
|
||||
}
|
||||
config := api.Config{
|
||||
Address: url.String(),
|
||||
RoundTripper: roundTripper,
|
||||
}
|
||||
|
||||
// Create new client.
|
||||
c, err := api.NewClient(config)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error creating API client:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
stime, etime, err := parseStartTimeAndEndTime(start, end)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
// Run query against client.
|
||||
api := v1.NewAPI(c)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
val, warn, err := api.LabelValues(ctx, name, matchers, stime, etime)
|
||||
cancel()
|
||||
|
||||
for _, v := range warn {
|
||||
fmt.Fprintln(os.Stderr, "query warning:", v)
|
||||
}
|
||||
if err != nil {
|
||||
return handleAPIError(err)
|
||||
}
|
||||
|
||||
p.printLabelValues(val)
|
||||
return successExitCode
|
||||
}
|
||||
|
||||
func handleAPIError(err error) int {
|
||||
var apiErr *v1.Error
|
||||
if errors.As(err, &apiErr) && apiErr.Detail != "" {
|
||||
fmt.Fprintf(os.Stderr, "query error: %v (detail: %s)\n", apiErr, strings.TrimSpace(apiErr.Detail))
|
||||
} else {
|
||||
fmt.Fprintln(os.Stderr, "query error:", err)
|
||||
}
|
||||
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
func parseStartTimeAndEndTime(start, end string) (time.Time, time.Time, error) {
|
||||
var (
|
||||
minTime = time.Now().Add(-9999 * time.Hour)
|
||||
maxTime = time.Now().Add(9999 * time.Hour)
|
||||
err error
|
||||
)
|
||||
|
||||
stime := minTime
|
||||
etime := maxTime
|
||||
|
||||
if start != "" {
|
||||
stime, err = parseTime(start)
|
||||
if err != nil {
|
||||
return stime, etime, fmt.Errorf("error parsing start time: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if end != "" {
|
||||
etime, err = parseTime(end)
|
||||
if err != nil {
|
||||
return stime, etime, fmt.Errorf("error parsing end time: %w", err)
|
||||
}
|
||||
}
|
||||
return stime, etime, nil
|
||||
}
|
||||
|
||||
func parseTime(s string) (time.Time, error) {
|
||||
if t, err := strconv.ParseFloat(s, 64); err == nil {
|
||||
s, ns := math.Modf(t)
|
||||
return time.Unix(int64(s), int64(ns*float64(time.Second))).UTC(), nil
|
||||
}
|
||||
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
|
||||
return t, nil
|
||||
}
|
||||
return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s)
|
||||
}
|
||||
|
||||
type endpointsGroup struct {
|
||||
urlToFilename map[string]string
|
||||
postProcess func(b []byte) ([]byte, error)
|
||||
|
@ -1390,15 +1159,12 @@ func importRules(url *url.URL, roundTripper http.RoundTripper, start, end, outpu
|
|||
evalInterval: evalInterval,
|
||||
maxBlockDuration: maxBlockDuration,
|
||||
}
|
||||
client, err := api.NewClient(api.Config{
|
||||
Address: url.String(),
|
||||
RoundTripper: roundTripper,
|
||||
})
|
||||
api, err := newAPI(url, roundTripper, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("new api client error: %w", err)
|
||||
}
|
||||
|
||||
ruleImporter := newRuleImporter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), cfg, v1.NewAPI(client))
|
||||
ruleImporter := newRuleImporter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), cfg, api)
|
||||
errs := ruleImporter.loadGroups(ctx, files)
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
|
|
251
cmd/promtool/query.go
Normal file
251
cmd/promtool/query.go
Normal file
|
@ -0,0 +1,251 @@
|
|||
// Copyright 2023 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/api"
|
||||
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
||||
_ "github.com/prometheus/prometheus/plugins" // Register plugins.
|
||||
)
|
||||
|
||||
func newAPI(url *url.URL, roundTripper http.RoundTripper, headers map[string]string) (v1.API, error) {
|
||||
if url.Scheme == "" {
|
||||
url.Scheme = "http"
|
||||
}
|
||||
config := api.Config{
|
||||
Address: url.String(),
|
||||
RoundTripper: roundTripper,
|
||||
}
|
||||
|
||||
if len(headers) > 0 {
|
||||
config.RoundTripper = promhttp.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
|
||||
for key, value := range headers {
|
||||
req.Header.Add(key, value)
|
||||
}
|
||||
return roundTripper.RoundTrip(req)
|
||||
})
|
||||
}
|
||||
|
||||
// Create new client.
|
||||
client, err := api.NewClient(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
api := v1.NewAPI(client)
|
||||
return api, nil
|
||||
}
|
||||
|
||||
// QueryInstant performs an instant query against a Prometheus server.
|
||||
func QueryInstant(url *url.URL, roundTripper http.RoundTripper, query, evalTime string, p printer) int {
|
||||
api, err := newAPI(url, roundTripper, nil)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error creating API client:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
eTime := time.Now()
|
||||
if evalTime != "" {
|
||||
eTime, err = parseTime(evalTime)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error parsing evaluation time:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
}
|
||||
|
||||
// Run query against client.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
val, _, err := api.Query(ctx, query, eTime) // Ignoring warnings for now.
|
||||
cancel()
|
||||
if err != nil {
|
||||
return handleAPIError(err)
|
||||
}
|
||||
|
||||
p.printValue(val)
|
||||
|
||||
return successExitCode
|
||||
}
|
||||
|
||||
// QueryRange performs a range query against a Prometheus server.
|
||||
func QueryRange(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, query, start, end string, step time.Duration, p printer) int {
|
||||
api, err := newAPI(url, roundTripper, headers)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error creating API client:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
var stime, etime time.Time
|
||||
|
||||
if end == "" {
|
||||
etime = time.Now()
|
||||
} else {
|
||||
etime, err = parseTime(end)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error parsing end time:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
}
|
||||
|
||||
if start == "" {
|
||||
stime = etime.Add(-5 * time.Minute)
|
||||
} else {
|
||||
stime, err = parseTime(start)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error parsing start time:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
}
|
||||
|
||||
if !stime.Before(etime) {
|
||||
fmt.Fprintln(os.Stderr, "start time is not before end time")
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
if step == 0 {
|
||||
resolution := math.Max(math.Floor(etime.Sub(stime).Seconds()/250), 1)
|
||||
// Convert seconds to nanoseconds such that time.Duration parses correctly.
|
||||
step = time.Duration(resolution) * time.Second
|
||||
}
|
||||
|
||||
// Run query against client.
|
||||
r := v1.Range{Start: stime, End: etime, Step: step}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
val, _, err := api.QueryRange(ctx, query, r) // Ignoring warnings for now.
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
return handleAPIError(err)
|
||||
}
|
||||
|
||||
p.printValue(val)
|
||||
return successExitCode
|
||||
}
|
||||
|
||||
// QuerySeries queries for a series against a Prometheus server.
|
||||
func QuerySeries(url *url.URL, roundTripper http.RoundTripper, matchers []string, start, end string, p printer) int {
|
||||
api, err := newAPI(url, roundTripper, nil)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error creating API client:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
stime, etime, err := parseStartTimeAndEndTime(start, end)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
// Run query against client.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
val, _, err := api.Series(ctx, matchers, stime, etime) // Ignoring warnings for now.
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
return handleAPIError(err)
|
||||
}
|
||||
|
||||
p.printSeries(val)
|
||||
return successExitCode
|
||||
}
|
||||
|
||||
// QueryLabels queries for label values against a Prometheus server.
|
||||
func QueryLabels(url *url.URL, roundTripper http.RoundTripper, matchers []string, name, start, end string, p printer) int {
|
||||
api, err := newAPI(url, roundTripper, nil)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error creating API client:", err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
stime, etime, err := parseStartTimeAndEndTime(start, end)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
// Run query against client.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
val, warn, err := api.LabelValues(ctx, name, matchers, stime, etime)
|
||||
cancel()
|
||||
|
||||
for _, v := range warn {
|
||||
fmt.Fprintln(os.Stderr, "query warning:", v)
|
||||
}
|
||||
if err != nil {
|
||||
return handleAPIError(err)
|
||||
}
|
||||
|
||||
p.printLabelValues(val)
|
||||
return successExitCode
|
||||
}
|
||||
|
||||
func handleAPIError(err error) int {
|
||||
var apiErr *v1.Error
|
||||
if errors.As(err, &apiErr) && apiErr.Detail != "" {
|
||||
fmt.Fprintf(os.Stderr, "query error: %v (detail: %s)\n", apiErr, strings.TrimSpace(apiErr.Detail))
|
||||
} else {
|
||||
fmt.Fprintln(os.Stderr, "query error:", err)
|
||||
}
|
||||
|
||||
return failureExitCode
|
||||
}
|
||||
|
||||
func parseStartTimeAndEndTime(start, end string) (time.Time, time.Time, error) {
|
||||
var (
|
||||
minTime = time.Now().Add(-9999 * time.Hour)
|
||||
maxTime = time.Now().Add(9999 * time.Hour)
|
||||
err error
|
||||
)
|
||||
|
||||
stime := minTime
|
||||
etime := maxTime
|
||||
|
||||
if start != "" {
|
||||
stime, err = parseTime(start)
|
||||
if err != nil {
|
||||
return stime, etime, fmt.Errorf("error parsing start time: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if end != "" {
|
||||
etime, err = parseTime(end)
|
||||
if err != nil {
|
||||
return stime, etime, fmt.Errorf("error parsing end time: %w", err)
|
||||
}
|
||||
}
|
||||
return stime, etime, nil
|
||||
}
|
||||
|
||||
func parseTime(s string) (time.Time, error) {
|
||||
if t, err := strconv.ParseFloat(s, 64); err == nil {
|
||||
s, ns := math.Modf(t)
|
||||
return time.Unix(int64(s), int64(ns*float64(time.Second))).UTC(), nil
|
||||
}
|
||||
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
|
||||
return t, nil
|
||||
}
|
||||
return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s)
|
||||
}
|
|
@ -324,6 +324,25 @@ Run labels query.
|
|||
|
||||
|
||||
|
||||
##### `promtool query analyze`
|
||||
|
||||
Run queries against your Prometheus to analyze the usage pattern of certain metrics.
|
||||
|
||||
|
||||
|
||||
###### Flags
|
||||
|
||||
| Flag | Description | Default |
|
||||
| --- | --- | --- |
|
||||
| <code class="text-nowrap">--server</code> | Prometheus server to query. | |
|
||||
| <code class="text-nowrap">--type</code> | Type of metric: histogram. | |
|
||||
| <code class="text-nowrap">--duration</code> | Time frame to analyze. | `1h` |
|
||||
| <code class="text-nowrap">--time</code> | Query time (RFC3339 or Unix timestamp), defaults to now. | |
|
||||
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | |
|
||||
|
||||
|
||||
|
||||
|
||||
### `promtool debug`
|
||||
|
||||
Fetch debug information.
|
||||
|
|
Loading…
Reference in a new issue