From b1bab7bc54ed06f9a8619e7dd09b7b2ed55d3ee4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Gouteroux?= Date: Thu, 27 Apr 2023 13:23:52 +0200 Subject: [PATCH 1/7] feat(promtool): add push metrics command MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: François Gouteroux --- cmd/promtool/main.go | 15 +++ cmd/promtool/metrics.go | 234 ++++++++++++++++++++++++++++++++++ docs/command-line/promtool.md | 43 +++++++ 3 files changed, 292 insertions(+) create mode 100644 cmd/promtool/metrics.go diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index de002a0b28..c4077954ea 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -178,6 +178,18 @@ func main() { queryLabelsEnd := queryLabelsCmd.Flag("end", "End time (RFC3339 or Unix timestamp).").String() queryLabelsMatch := queryLabelsCmd.Flag("match", "Series selector. Can be specified multiple times.").Strings() + pushCmd := app.Command("push", "Push to a Prometheus server.") + pushCmd.Flag("http.config.file", "HTTP client configuration file for promtool to connect to Prometheus.").PlaceHolder("").ExistingFileVar(&httpConfigFilePath) + pushMetricsCmd := pushCmd.Command("metrics", "Push metrics to a prometheus remote write.") + pushMetricsCmd.Arg("remote-write-url", "Prometheus remote write url to push metrics.").Required().URLVar(&serverURL) + metricFiles := pushMetricsCmd.Arg( + "metric-files", + "The metric files to push.", + ).Required().ExistingFiles() + metricJobLabel := pushMetricsCmd.Flag("job-label", "Job label to attach to metrics.").Default("promtool").String() + pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration() + pushMetricsHeaders := pushMetricsCmd.Flag("header", "Prometheus remote write header.").StringMap() + testCmd := app.Command("test", "Unit testing.") testRulesCmd := testCmd.Command("rules", "Unit tests for rules.") testRulesFiles := testRulesCmd.Arg( @@ -301,6 +313,9 @@ func main() { case checkMetricsCmd.FullCommand(): os.Exit(CheckMetrics(*checkMetricsExtended)) + case pushMetricsCmd.FullCommand(): + os.Exit(PushMetrics(serverURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *metricJobLabel, *metricFiles...)) + case queryInstantCmd.FullCommand(): os.Exit(QueryInstant(serverURL, httpRoundTripper, *queryInstantExpr, *queryInstantTime, p)) diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go new file mode 100644 index 0000000000..21fcd3e662 --- /dev/null +++ b/cmd/promtool/metrics.go @@ -0,0 +1,234 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "sort" + "time" + + "github.com/golang/snappy" + dto "github.com/prometheus/client_model/go" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" +) + +// Push metrics to a prometheus remote write. +func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, jobLabel string, files ...string) int { + // remote write should respect specification: https://prometheus.io/docs/concepts/remote_write_spec/ + failed := false + + addressURL, err := url.Parse(url.String()) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return failureExitCode + } + + // build remote write client + writeClient, err := remote.NewWriteClient("remote-write", &remote.ClientConfig{ + URL: &config_util.URL{URL: addressURL}, + Timeout: model.Duration(timeout), + }) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return failureExitCode + } + + // set custom tls config from httpConfigFilePath + // set custom headers to every request + client, ok := writeClient.(*remote.Client) + if !ok { + fmt.Fprintln(os.Stderr, fmt.Errorf("unexpected type %T", writeClient)) + return failureExitCode + } + client.Client.Transport = &setHeadersTransport{ + RoundTripper: roundTripper, + headers: headers, + } + + for _, f := range files { + var data []byte + var err error + data, err = os.ReadFile(f) + if err != nil { + fmt.Fprintln(os.Stderr, err) + failed = true + continue + } + + fmt.Printf("Parsing metric file %s\n", f) + metricsData, err := parseMetricsTextAndFormat(bytes.NewReader(data), jobLabel) + if err != nil { + fmt.Fprintln(os.Stderr, err) + failed = true + continue + } + + raw, err := metricsData.Marshal() + if err != nil { + fmt.Fprintln(os.Stderr, err) + failed = true + continue + } + + // Encode the request body into snappy encoding. + compressed := snappy.Encode(nil, raw) + err = client.Store(context.Background(), compressed) + if err != nil { + fmt.Fprintln(os.Stderr, err) + failed = true + continue + } + fmt.Printf("Successfully pushed metric file %s\n", f) + } + + if failed { + return failureExitCode + } + + return successExitCode +} + +type setHeadersTransport struct { + http.RoundTripper + headers map[string]string +} + +func (s *setHeadersTransport) RoundTrip(req *http.Request) (*http.Response, error) { + for key, value := range s.headers { + req.Header.Set(key, value) + } + return s.RoundTripper.RoundTrip(req) +} + +var MetricMetadataTypeValue = map[string]int32{ + "UNKNOWN": 0, + "COUNTER": 1, + "GAUGE": 2, + "HISTOGRAM": 3, + "GAUGEHISTOGRAM": 4, + "SUMMARY": 5, + "INFO": 6, + "STATESET": 7, +} + +// formatMetrics convert metric family to a writerequest +func formatMetrics(mf map[string]*dto.MetricFamily, jobLabel string) (*prompb.WriteRequest, error) { + wr := &prompb.WriteRequest{} + + // build metric list + sortedMetricNames := make([]string, 0, len(mf)) + for metric := range mf { + sortedMetricNames = append(sortedMetricNames, metric) + } + // sort metrics name in lexicographical order + sort.Strings(sortedMetricNames) + + for _, metricName := range sortedMetricNames { + // Set metadata writerequest + mtype := MetricMetadataTypeValue[mf[metricName].Type.String()] + metadata := prompb.MetricMetadata{ + MetricFamilyName: mf[metricName].GetName(), + Type: prompb.MetricMetadata_MetricType(mtype), + Help: mf[metricName].GetHelp(), + } + wr.Metadata = append(wr.Metadata, metadata) + + for _, metric := range mf[metricName].Metric { + var timeserie prompb.TimeSeries + + // build labels map + labels := make(map[string]string, len(metric.Label)+2) + labels[model.MetricNameLabel] = metricName + labels[model.JobLabel] = jobLabel + + for _, label := range metric.Label { + labelname := label.GetName() + if labelname == model.JobLabel { + labelname = fmt.Sprintf("%s%s", model.ExportedLabelPrefix, labelname) + } + labels[labelname] = label.GetValue() + } + + // build labels name list + sortedLabelNames := make([]string, 0, len(labels)) + for label := range labels { + sortedLabelNames = append(sortedLabelNames, label) + } + // sort labels name in lexicographical order + sort.Strings(sortedLabelNames) + + for _, label := range sortedLabelNames { + timeserie.Labels = append(timeserie.Labels, prompb.Label{ + Name: label, + Value: labels[label], + }) + } + + timeserie.Samples = []prompb.Sample{ + { + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + Value: getMetricsValue(metric), + }, + } + + wr.Timeseries = append(wr.Timeseries, timeserie) + } + } + return wr, nil +} + +// parseMetricsTextReader consumes an io.Reader and returns the MetricFamily +func parseMetricsTextReader(input io.Reader) (map[string]*dto.MetricFamily, error) { + var parser expfmt.TextParser + mf, err := parser.TextToMetricFamilies(input) + if err != nil { + return nil, err + } + return mf, nil +} + +// getMetricsValue return the value of a timeserie without the need to give value type +func getMetricsValue(m *dto.Metric) float64 { + switch { + case m.Gauge != nil: + return m.GetGauge().GetValue() + case m.Counter != nil: + return m.GetCounter().GetValue() + case m.Untyped != nil: + return m.GetUntyped().GetValue() + default: + return 0. + } +} + +// parseMetricsTextAndFormat return the data in the expected prometheus metrics write request format +func parseMetricsTextAndFormat(input io.Reader, jobLabel string) (*prompb.WriteRequest, error) { + mf, err := parseMetricsTextReader(input) + if err != nil { + return nil, err + } + + return formatMetrics(mf, jobLabel) +} diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md index e149d374a0..ac159a9214 100644 --- a/docs/command-line/promtool.md +++ b/docs/command-line/promtool.md @@ -27,6 +27,7 @@ Tooling for the Prometheus monitoring system. | check | Check the resources for validity. | | query | Run query against a Prometheus server. | | debug | Fetch debug information. | +| push | Push to a Prometheus server. | | test | Unit testing. | | tsdb | Run tsdb commands. | @@ -372,6 +373,48 @@ Fetch all debug information. +### `promtool push` + +Push to a Prometheus server. + + + +#### Flags + +| Flag | Description | +| --- | --- | +| --http.config.file | HTTP client configuration file for promtool to connect to Prometheus. | + + + + +##### `promtool push metrics` + +Push metrics to a prometheus remote write. + + + +###### Flags + +| Flag | Description | Default | +| --- | --- | --- | +| --job-label | Job label to attach to metrics. | `promtool` | +| --timeout | The time to wait for pushing metrics. | `30s` | +| --header | Prometheus remote write header. | | + + + + +###### Arguments + +| Argument | Description | Required | +| --- | --- | --- | +| remote-write-url | Prometheus remote write url to push metrics. | Yes | +| metric-files | The metric files to push. | Yes | + + + + ### `promtool test` Unit testing. From 3524a16aa03d7b91a6d0484aa4ed6f1d443837f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Gouteroux?= Date: Tue, 23 May 2023 10:29:17 +0200 Subject: [PATCH 2/7] feat: add suggested changes, tests, and stdin support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: François Gouteroux --- cmd/promtool/main.go | 11 +-- cmd/promtool/metrics.go | 157 +++++++--------------------------- docs/command-line/promtool.md | 4 +- util/fmtutil/format.go | 142 ++++++++++++++++++++++++++++++ util/fmtutil/format_test.go | 71 +++++++++++++++ 5 files changed, 251 insertions(+), 134 deletions(-) create mode 100644 util/fmtutil/format.go create mode 100644 util/fmtutil/format_test.go diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index c4077954ea..3b5ba78e4c 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -81,6 +81,7 @@ func main() { var ( httpRoundTripper = api.DefaultRoundTripper serverURL *url.URL + remoteWriteURL *url.URL httpConfigFilePath string ) @@ -180,12 +181,12 @@ func main() { pushCmd := app.Command("push", "Push to a Prometheus server.") pushCmd.Flag("http.config.file", "HTTP client configuration file for promtool to connect to Prometheus.").PlaceHolder("").ExistingFileVar(&httpConfigFilePath) - pushMetricsCmd := pushCmd.Command("metrics", "Push metrics to a prometheus remote write.") - pushMetricsCmd.Arg("remote-write-url", "Prometheus remote write url to push metrics.").Required().URLVar(&serverURL) + pushMetricsCmd := pushCmd.Command("metrics", "Push metrics to a prometheus remote write (for testing purpose only).") + pushMetricsCmd.Arg("remote-write-url", "Prometheus remote write url to push metrics.").Required().URLVar(&remoteWriteURL) metricFiles := pushMetricsCmd.Arg( "metric-files", - "The metric files to push.", - ).Required().ExistingFiles() + "The metric files to push, default is read from standard input (STDIN).", + ).ExistingFiles() metricJobLabel := pushMetricsCmd.Flag("job-label", "Job label to attach to metrics.").Default("promtool").String() pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration() pushMetricsHeaders := pushMetricsCmd.Flag("header", "Prometheus remote write header.").StringMap() @@ -314,7 +315,7 @@ func main() { os.Exit(CheckMetrics(*checkMetricsExtended)) case pushMetricsCmd.FullCommand(): - os.Exit(PushMetrics(serverURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *metricJobLabel, *metricFiles...)) + os.Exit(PushMetrics(remoteWriteURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *metricJobLabel, *metricFiles...)) case queryInstantCmd.FullCommand(): os.Exit(QueryInstant(serverURL, httpRoundTripper, *queryInstantExpr, *queryInstantTime, p)) diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index 21fcd3e662..c845b5a587 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -21,22 +21,18 @@ import ( "net/http" "net/url" "os" - "sort" "time" "github.com/golang/snappy" - dto "github.com/prometheus/client_model/go" config_util "github.com/prometheus/common/config" - "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/util/fmtutil" ) -// Push metrics to a prometheus remote write. +// Push metrics to a prometheus remote write (for testing purpose only). func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, jobLabel string, files ...string) int { - // remote write should respect specification: https://prometheus.io/docs/concepts/remote_write_spec/ failed := false addressURL, err := url.Parse(url.String()) @@ -67,18 +63,36 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin headers: headers, } - for _, f := range files { + // add empty string to avoid matching filename + if len(files) == 0 { + files = append(files, "") + } + + for _, file := range files { var data []byte var err error - data, err = os.ReadFile(f) - if err != nil { - fmt.Fprintln(os.Stderr, err) - failed = true - continue - } - fmt.Printf("Parsing metric file %s\n", f) - metricsData, err := parseMetricsTextAndFormat(bytes.NewReader(data), jobLabel) + // if file is an empty string it is a stdin + if file == "" { + data, err = io.ReadAll(os.Stdin) + if err != nil { + fmt.Fprintln(os.Stderr, err) + failed = true + break + } + + fmt.Printf("Parsing stdin\n") + } else { + data, err = os.ReadFile(file) + if err != nil { + fmt.Fprintln(os.Stderr, err) + failed = true + continue + } + + fmt.Printf("Parsing metric file %s\n", file) + } + metricsData, err := fmtutil.ParseMetricsTextAndFormat(bytes.NewReader(data), jobLabel) if err != nil { fmt.Fprintln(os.Stderr, err) failed = true @@ -100,7 +114,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin failed = true continue } - fmt.Printf("Successfully pushed metric file %s\n", f) + fmt.Printf("Successfully pushed metric file %s\n", file) } if failed { @@ -121,114 +135,3 @@ func (s *setHeadersTransport) RoundTrip(req *http.Request) (*http.Response, erro } return s.RoundTripper.RoundTrip(req) } - -var MetricMetadataTypeValue = map[string]int32{ - "UNKNOWN": 0, - "COUNTER": 1, - "GAUGE": 2, - "HISTOGRAM": 3, - "GAUGEHISTOGRAM": 4, - "SUMMARY": 5, - "INFO": 6, - "STATESET": 7, -} - -// formatMetrics convert metric family to a writerequest -func formatMetrics(mf map[string]*dto.MetricFamily, jobLabel string) (*prompb.WriteRequest, error) { - wr := &prompb.WriteRequest{} - - // build metric list - sortedMetricNames := make([]string, 0, len(mf)) - for metric := range mf { - sortedMetricNames = append(sortedMetricNames, metric) - } - // sort metrics name in lexicographical order - sort.Strings(sortedMetricNames) - - for _, metricName := range sortedMetricNames { - // Set metadata writerequest - mtype := MetricMetadataTypeValue[mf[metricName].Type.String()] - metadata := prompb.MetricMetadata{ - MetricFamilyName: mf[metricName].GetName(), - Type: prompb.MetricMetadata_MetricType(mtype), - Help: mf[metricName].GetHelp(), - } - wr.Metadata = append(wr.Metadata, metadata) - - for _, metric := range mf[metricName].Metric { - var timeserie prompb.TimeSeries - - // build labels map - labels := make(map[string]string, len(metric.Label)+2) - labels[model.MetricNameLabel] = metricName - labels[model.JobLabel] = jobLabel - - for _, label := range metric.Label { - labelname := label.GetName() - if labelname == model.JobLabel { - labelname = fmt.Sprintf("%s%s", model.ExportedLabelPrefix, labelname) - } - labels[labelname] = label.GetValue() - } - - // build labels name list - sortedLabelNames := make([]string, 0, len(labels)) - for label := range labels { - sortedLabelNames = append(sortedLabelNames, label) - } - // sort labels name in lexicographical order - sort.Strings(sortedLabelNames) - - for _, label := range sortedLabelNames { - timeserie.Labels = append(timeserie.Labels, prompb.Label{ - Name: label, - Value: labels[label], - }) - } - - timeserie.Samples = []prompb.Sample{ - { - Timestamp: time.Now().UnixNano() / int64(time.Millisecond), - Value: getMetricsValue(metric), - }, - } - - wr.Timeseries = append(wr.Timeseries, timeserie) - } - } - return wr, nil -} - -// parseMetricsTextReader consumes an io.Reader and returns the MetricFamily -func parseMetricsTextReader(input io.Reader) (map[string]*dto.MetricFamily, error) { - var parser expfmt.TextParser - mf, err := parser.TextToMetricFamilies(input) - if err != nil { - return nil, err - } - return mf, nil -} - -// getMetricsValue return the value of a timeserie without the need to give value type -func getMetricsValue(m *dto.Metric) float64 { - switch { - case m.Gauge != nil: - return m.GetGauge().GetValue() - case m.Counter != nil: - return m.GetCounter().GetValue() - case m.Untyped != nil: - return m.GetUntyped().GetValue() - default: - return 0. - } -} - -// parseMetricsTextAndFormat return the data in the expected prometheus metrics write request format -func parseMetricsTextAndFormat(input io.Reader, jobLabel string) (*prompb.WriteRequest, error) { - mf, err := parseMetricsTextReader(input) - if err != nil { - return nil, err - } - - return formatMetrics(mf, jobLabel) -} diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md index ac159a9214..024c71e51a 100644 --- a/docs/command-line/promtool.md +++ b/docs/command-line/promtool.md @@ -390,7 +390,7 @@ Push to a Prometheus server. ##### `promtool push metrics` -Push metrics to a prometheus remote write. +Push metrics to a prometheus remote write (for testing purpose only). @@ -410,7 +410,7 @@ Push metrics to a prometheus remote write. | Argument | Description | Required | | --- | --- | --- | | remote-write-url | Prometheus remote write url to push metrics. | Yes | -| metric-files | The metric files to push. | Yes | +| metric-files | The metric files to push, default is read from standard input (STDIN). | | diff --git a/util/fmtutil/format.go b/util/fmtutil/format.go new file mode 100644 index 0000000000..9a06d6bb15 --- /dev/null +++ b/util/fmtutil/format.go @@ -0,0 +1,142 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fmtutil + +import ( + "fmt" + "io" + "sort" + "time" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/prompb" +) + +var MetricMetadataTypeValue = map[string]int32{ + "UNKNOWN": 0, + "COUNTER": 1, + "GAUGE": 2, + "HISTOGRAM": 3, + "GAUGEHISTOGRAM": 4, + "SUMMARY": 5, + "INFO": 6, + "STATESET": 7, +} + +// FormatMetrics convert metric family to a writerequest. +func FormatMetrics(mf map[string]*dto.MetricFamily, jobLabel string) (*prompb.WriteRequest, error) { + wr := &prompb.WriteRequest{} + + // build metric list + sortedMetricNames := make([]string, 0, len(mf)) + for metric := range mf { + sortedMetricNames = append(sortedMetricNames, metric) + } + // sort metrics name in lexicographical order + sort.Strings(sortedMetricNames) + + for _, metricName := range sortedMetricNames { + // Set metadata writerequest + mtype := MetricMetadataTypeValue[mf[metricName].Type.String()] + metadata := prompb.MetricMetadata{ + MetricFamilyName: mf[metricName].GetName(), + Type: prompb.MetricMetadata_MetricType(mtype), + Help: mf[metricName].GetHelp(), + } + wr.Metadata = append(wr.Metadata, metadata) + + for _, metric := range mf[metricName].Metric { + var timeserie prompb.TimeSeries + + // build labels map + labels := make(map[string]string, len(metric.Label)+2) + labels[model.MetricNameLabel] = metricName + labels[model.JobLabel] = jobLabel + + for _, label := range metric.Label { + labelname := label.GetName() + if labelname == model.JobLabel { + labelname = fmt.Sprintf("%s%s", model.ExportedLabelPrefix, labelname) + } + labels[labelname] = label.GetValue() + } + + // build labels name list + sortedLabelNames := make([]string, 0, len(labels)) + for label := range labels { + sortedLabelNames = append(sortedLabelNames, label) + } + // sort labels name in lexicographical order + sort.Strings(sortedLabelNames) + + for _, label := range sortedLabelNames { + timeserie.Labels = append(timeserie.Labels, prompb.Label{ + Name: label, + Value: labels[label], + }) + } + + timestamp := metric.GetTimestampMs() + if timestamp == 0 { + timestamp = time.Now().UnixNano() / int64(time.Millisecond) + } + + timeserie.Samples = []prompb.Sample{ + { + Timestamp: timestamp, + Value: getMetricsValue(metric), + }, + } + + wr.Timeseries = append(wr.Timeseries, timeserie) + } + } + return wr, nil +} + +// getMetricsValue return the value of a timeserie without the need to give value type +func getMetricsValue(m *dto.Metric) float64 { + switch { + case m.Gauge != nil: + return m.GetGauge().GetValue() + case m.Counter != nil: + return m.GetCounter().GetValue() + case m.Untyped != nil: + return m.GetUntyped().GetValue() + default: + return 0. + } +} + +// ParseMetricsTextReader consumes an io.Reader and returns the MetricFamily. +func ParseMetricsTextReader(input io.Reader) (map[string]*dto.MetricFamily, error) { + var parser expfmt.TextParser + mf, err := parser.TextToMetricFamilies(input) + if err != nil { + return nil, err + } + return mf, nil +} + +// ParseMetricsTextAndFormat return the data in the expected prometheus metrics write request format. +func ParseMetricsTextAndFormat(input io.Reader, jobLabel string) (*prompb.WriteRequest, error) { + mf, err := ParseMetricsTextReader(input) + if err != nil { + return nil, err + } + return FormatMetrics(mf, jobLabel) +} diff --git a/util/fmtutil/format_test.go b/util/fmtutil/format_test.go new file mode 100644 index 0000000000..ef3b7fcd40 --- /dev/null +++ b/util/fmtutil/format_test.go @@ -0,0 +1,71 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fmtutil + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/prompb" +) + +var writeRequestFixture = &prompb.WriteRequest{ + Metadata: []prompb.MetricMetadata{ + { + MetricFamilyName: "test_metric1", + Type: 2, + Help: "this is a test metric", + }, + }, + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 1, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, + }, + }, +} + +func TestParseMetricsTextAndFormat(t *testing.T) { + input := bytes.NewReader([]byte(` + # HELP test_metric1 this is a test metric + # TYPE test_metric1 gauge + test_metric1{b="c",baz="qux",d="e",foo="bar"} 1 1 + test_metric1{b="c",baz="qux",d="e",foo="bar"} 2 1 + `)) + + expected, err := ParseMetricsTextAndFormat(input, "promtool") + require.NoError(t, err) + + require.Equal(t, writeRequestFixture, expected) +} From 934c5ddb8d223c6363033a48919e014c63e15ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Gouteroux?= Date: Wed, 24 May 2023 10:55:49 +0200 Subject: [PATCH 3/7] feat: make push metrics labels generic and repeatable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: François Gouteroux --- cmd/promtool/main.go | 4 ++-- cmd/promtool/metrics.go | 25 +++++++++++++++---------- docs/command-line/promtool.md | 2 +- util/fmtutil/format.go | 15 ++++++++++----- util/fmtutil/format_test.go | 3 ++- 5 files changed, 30 insertions(+), 19 deletions(-) diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 3b5ba78e4c..c76790e13b 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -187,7 +187,7 @@ func main() { "metric-files", "The metric files to push, default is read from standard input (STDIN).", ).ExistingFiles() - metricJobLabel := pushMetricsCmd.Flag("job-label", "Job label to attach to metrics.").Default("promtool").String() + pushMetricsLabels := pushMetricsCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times.").Default("job=promtool").StringMap() pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration() pushMetricsHeaders := pushMetricsCmd.Flag("header", "Prometheus remote write header.").StringMap() @@ -315,7 +315,7 @@ func main() { os.Exit(CheckMetrics(*checkMetricsExtended)) case pushMetricsCmd.FullCommand(): - os.Exit(PushMetrics(remoteWriteURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *metricJobLabel, *metricFiles...)) + os.Exit(PushMetrics(remoteWriteURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *pushMetricsLabels, *metricFiles...)) case queryInstantCmd.FullCommand(): os.Exit(QueryInstant(serverURL, httpRoundTripper, *queryInstantExpr, *queryInstantTime, p)) diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index c845b5a587..8abe32cf41 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -32,7 +32,7 @@ import ( ) // Push metrics to a prometheus remote write (for testing purpose only). -func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, jobLabel string, files ...string) int { +func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, labels map[string]string, files ...string) int { failed := false addressURL, err := url.Parse(url.String()) @@ -76,32 +76,32 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin if file == "" { data, err = io.ReadAll(os.Stdin) if err != nil { - fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, " FAILED:", err) failed = true break } - fmt.Printf("Parsing stdin\n") + fmt.Printf("Parsing input from stdin\n") } else { data, err = os.ReadFile(file) if err != nil { - fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, " FAILED:", err) failed = true continue } - fmt.Printf("Parsing metric file %s\n", file) + fmt.Printf("Parsing input from metric file %s\n", file) } - metricsData, err := fmtutil.ParseMetricsTextAndFormat(bytes.NewReader(data), jobLabel) + metricsData, err := fmtutil.ParseMetricsTextAndFormat(bytes.NewReader(data), labels) if err != nil { - fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, " FAILED:", err) failed = true continue } raw, err := metricsData.Marshal() if err != nil { - fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, " FAILED:", err) failed = true continue } @@ -110,11 +110,16 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin compressed := snappy.Encode(nil, raw) err = client.Store(context.Background(), compressed) if err != nil { - fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, " FAILED:", err) failed = true continue } - fmt.Printf("Successfully pushed metric file %s\n", file) + + if file == "" { + fmt.Printf(" SUCCESS: metric pushed to remote write.\n") + } else { + fmt.Printf(" SUCCESS: metric file %s pushed to remote write.\n", file) + } } if failed { diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md index 024c71e51a..c78900b991 100644 --- a/docs/command-line/promtool.md +++ b/docs/command-line/promtool.md @@ -398,7 +398,7 @@ Push metrics to a prometheus remote write (for testing purpose only). | Flag | Description | Default | | --- | --- | --- | -| --job-label | Job label to attach to metrics. | `promtool` | +| --label | Label to attach to metrics. Can be specified multiple times. | `job=promtool` | | --timeout | The time to wait for pushing metrics. | `30s` | | --header | Prometheus remote write header. | | diff --git a/util/fmtutil/format.go b/util/fmtutil/format.go index 9a06d6bb15..b5bb9469ce 100644 --- a/util/fmtutil/format.go +++ b/util/fmtutil/format.go @@ -38,7 +38,7 @@ var MetricMetadataTypeValue = map[string]int32{ } // FormatMetrics convert metric family to a writerequest. -func FormatMetrics(mf map[string]*dto.MetricFamily, jobLabel string) (*prompb.WriteRequest, error) { +func FormatMetrics(mf map[string]*dto.MetricFamily, extraLabels map[string]string) (*prompb.WriteRequest, error) { wr := &prompb.WriteRequest{} // build metric list @@ -63,10 +63,15 @@ func FormatMetrics(mf map[string]*dto.MetricFamily, jobLabel string) (*prompb.Wr var timeserie prompb.TimeSeries // build labels map - labels := make(map[string]string, len(metric.Label)+2) + labels := make(map[string]string, len(metric.Label)+len(extraLabels)) labels[model.MetricNameLabel] = metricName - labels[model.JobLabel] = jobLabel + // add extra labels + for key, value := range extraLabels { + labels[key] = value + } + + // add metric labels for _, label := range metric.Label { labelname := label.GetName() if labelname == model.JobLabel { @@ -133,10 +138,10 @@ func ParseMetricsTextReader(input io.Reader) (map[string]*dto.MetricFamily, erro } // ParseMetricsTextAndFormat return the data in the expected prometheus metrics write request format. -func ParseMetricsTextAndFormat(input io.Reader, jobLabel string) (*prompb.WriteRequest, error) { +func ParseMetricsTextAndFormat(input io.Reader, labels map[string]string) (*prompb.WriteRequest, error) { mf, err := ParseMetricsTextReader(input) if err != nil { return nil, err } - return FormatMetrics(mf, jobLabel) + return FormatMetrics(mf, labels) } diff --git a/util/fmtutil/format_test.go b/util/fmtutil/format_test.go index ef3b7fcd40..9deed2de90 100644 --- a/util/fmtutil/format_test.go +++ b/util/fmtutil/format_test.go @@ -63,8 +63,9 @@ func TestParseMetricsTextAndFormat(t *testing.T) { test_metric1{b="c",baz="qux",d="e",foo="bar"} 1 1 test_metric1{b="c",baz="qux",d="e",foo="bar"} 2 1 `)) + labels := map[string]string{"job": "promtool"} - expected, err := ParseMetricsTextAndFormat(input, "promtool") + expected, err := ParseMetricsTextAndFormat(input, labels) require.NoError(t, err) require.Equal(t, writeRequestFixture, expected) From ca6580828aeef078370116b86ff3fbe9759b75ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Gouteroux?= Date: Wed, 31 May 2023 15:17:44 +0200 Subject: [PATCH 4/7] feat: support histogram and summary metric types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: François Gouteroux --- util/fmtutil/format.go | 171 +++++++++++++++++++++++++----------- util/fmtutil/format_test.go | 140 ++++++++++++++++++++++++++++- 2 files changed, 256 insertions(+), 55 deletions(-) diff --git a/util/fmtutil/format.go b/util/fmtutil/format.go index b5bb9469ce..291308dc23 100644 --- a/util/fmtutil/format.go +++ b/util/fmtutil/format.go @@ -14,6 +14,7 @@ package fmtutil import ( + "errors" "fmt" "io" "sort" @@ -26,6 +27,12 @@ import ( "github.com/prometheus/prometheus/prompb" ) +const ( + sumStr = "_sum" + countStr = "_count" + bucketStr = "_bucket" +) + var MetricMetadataTypeValue = map[string]int32{ "UNKNOWN": 0, "COUNTER": 1, @@ -60,71 +67,129 @@ func FormatMetrics(mf map[string]*dto.MetricFamily, extraLabels map[string]strin wr.Metadata = append(wr.Metadata, metadata) for _, metric := range mf[metricName].Metric { - var timeserie prompb.TimeSeries - - // build labels map - labels := make(map[string]string, len(metric.Label)+len(extraLabels)) - labels[model.MetricNameLabel] = metricName - - // add extra labels - for key, value := range extraLabels { - labels[key] = value + labels := makeLabelsMap(metric, metricName, extraLabels) + if err := makeTimeseries(wr, labels, metric); err != nil { + return wr, err } - - // add metric labels - for _, label := range metric.Label { - labelname := label.GetName() - if labelname == model.JobLabel { - labelname = fmt.Sprintf("%s%s", model.ExportedLabelPrefix, labelname) - } - labels[labelname] = label.GetValue() - } - - // build labels name list - sortedLabelNames := make([]string, 0, len(labels)) - for label := range labels { - sortedLabelNames = append(sortedLabelNames, label) - } - // sort labels name in lexicographical order - sort.Strings(sortedLabelNames) - - for _, label := range sortedLabelNames { - timeserie.Labels = append(timeserie.Labels, prompb.Label{ - Name: label, - Value: labels[label], - }) - } - - timestamp := metric.GetTimestampMs() - if timestamp == 0 { - timestamp = time.Now().UnixNano() / int64(time.Millisecond) - } - - timeserie.Samples = []prompb.Sample{ - { - Timestamp: timestamp, - Value: getMetricsValue(metric), - }, - } - - wr.Timeseries = append(wr.Timeseries, timeserie) } } return wr, nil } -// getMetricsValue return the value of a timeserie without the need to give value type -func getMetricsValue(m *dto.Metric) float64 { +func makeTimeserie(wr *prompb.WriteRequest, labels map[string]string, timestamp int64, value float64) { + var timeserie prompb.TimeSeries + timeserie.Labels = makeLabels(labels) + timeserie.Samples = []prompb.Sample{ + { + Timestamp: timestamp, + Value: value, + }, + } + wr.Timeseries = append(wr.Timeseries, timeserie) +} + +func makeTimeseries(wr *prompb.WriteRequest, labels map[string]string, m *dto.Metric) error { + var err error + + timestamp := m.GetTimestampMs() + if timestamp == 0 { + timestamp = time.Now().UnixNano() / int64(time.Millisecond) + } + switch { case m.Gauge != nil: - return m.GetGauge().GetValue() + makeTimeserie(wr, labels, timestamp, m.GetGauge().GetValue()) case m.Counter != nil: - return m.GetCounter().GetValue() + makeTimeserie(wr, labels, timestamp, m.GetCounter().GetValue()) + case m.Summary != nil: + metricName := labels[model.MetricNameLabel] + // Preserve metric name order with first quantile labels timeseries then sum suffix timeserie and finally count suffix timeserie + // Add Summary quantile timeseries + quantileLabels := make(map[string]string, len(labels)+1) + for key, value := range labels { + quantileLabels[key] = value + } + + for _, q := range m.GetSummary().Quantile { + quantileLabels[model.QuantileLabel] = fmt.Sprint(q.GetQuantile()) + makeTimeserie(wr, quantileLabels, timestamp, q.GetValue()) + } + // Overwrite label model.MetricNameLabel for count and sum metrics + // Add Summary sum timeserie + labels[model.MetricNameLabel] = metricName + sumStr + makeTimeserie(wr, labels, timestamp, m.GetSummary().GetSampleSum()) + // Add Summary count timeserie + labels[model.MetricNameLabel] = metricName + countStr + makeTimeserie(wr, labels, timestamp, float64(m.GetSummary().GetSampleCount())) + + case m.Histogram != nil: + metricName := labels[model.MetricNameLabel] + // Preserve metric name order with first bucket suffix timeseries then sum suffix timeserie and finally count suffix timeserie + // Add Histogram bucket timeseries + bucketLabels := make(map[string]string, len(labels)+1) + for key, value := range labels { + bucketLabels[key] = value + } + for _, b := range m.GetHistogram().Bucket { + bucketLabels[model.MetricNameLabel] = metricName + bucketStr + bucketLabels[model.BucketLabel] = fmt.Sprint(b.GetUpperBound()) + makeTimeserie(wr, bucketLabels, timestamp, float64(b.GetCumulativeCount())) + } + // Overwrite label model.MetricNameLabel for count and sum metrics + // Add Histogram sum timeserie + labels[model.MetricNameLabel] = metricName + sumStr + makeTimeserie(wr, labels, timestamp, m.GetHistogram().GetSampleSum()) + // Add Histogram count timeserie + labels[model.MetricNameLabel] = metricName + countStr + makeTimeserie(wr, labels, timestamp, float64(m.GetHistogram().GetSampleCount())) + case m.Untyped != nil: - return m.GetUntyped().GetValue() + makeTimeserie(wr, labels, timestamp, m.GetUntyped().GetValue()) default: - return 0. + err = errors.New("unsupported metric type") } + return err +} + +func makeLabels(labelsMap map[string]string) []prompb.Label { + // build labels name list + sortedLabelNames := make([]string, 0, len(labelsMap)) + for label := range labelsMap { + sortedLabelNames = append(sortedLabelNames, label) + } + // sort labels name in lexicographical order + sort.Strings(sortedLabelNames) + + var labels []prompb.Label + for _, label := range sortedLabelNames { + labels = append(labels, prompb.Label{ + Name: label, + Value: labelsMap[label], + }) + } + return labels +} + +func makeLabelsMap(m *dto.Metric, metricName string, extraLabels map[string]string) map[string]string { + // build labels map + labels := make(map[string]string, len(m.Label)+len(extraLabels)) + labels[model.MetricNameLabel] = metricName + + // add extra labels + for key, value := range extraLabels { + labels[key] = value + } + + // add metric labels + for _, label := range m.Label { + labelname := label.GetName() + if labelname == model.JobLabel { + labelname = fmt.Sprintf("%s%s", model.ExportedLabelPrefix, labelname) + } + labels[labelname] = label.GetValue() + } + + return labels } // ParseMetricsTextReader consumes an io.Reader and returns the MetricFamily. diff --git a/util/fmtutil/format_test.go b/util/fmtutil/format_test.go index 9deed2de90..e2ac301353 100644 --- a/util/fmtutil/format_test.go +++ b/util/fmtutil/format_test.go @@ -24,13 +24,130 @@ import ( var writeRequestFixture = &prompb.WriteRequest{ Metadata: []prompb.MetricMetadata{ + { + MetricFamilyName: "http_request_duration_seconds", + Type: 3, + Help: "A histogram of the request duration.", + }, + { + MetricFamilyName: "http_requests_total", + Type: 1, + Help: "The total number of HTTP requests.", + }, + { + MetricFamilyName: "rpc_duration_seconds", + Type: 5, + Help: "A summary of the RPC duration in seconds.", + }, { MetricFamilyName: "test_metric1", Type: 2, - Help: "this is a test metric", + Help: "This is a test metric.", }, }, Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_bucket"}, + {Name: "job", Value: "promtool"}, + {Name: "le", Value: "0.1"}, + }, + Samples: []prompb.Sample{{Value: 33444, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_bucket"}, + {Name: "job", Value: "promtool"}, + {Name: "le", Value: "0.5"}, + }, + Samples: []prompb.Sample{{Value: 129389, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_bucket"}, + {Name: "job", Value: "promtool"}, + {Name: "le", Value: "1"}, + }, + Samples: []prompb.Sample{{Value: 133988, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_bucket"}, + {Name: "job", Value: "promtool"}, + {Name: "le", Value: "+Inf"}, + }, + Samples: []prompb.Sample{{Value: 144320, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_sum"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 53423, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_count"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 144320, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_requests_total"}, + {Name: "code", Value: "200"}, + {Name: "job", Value: "promtool"}, + {Name: "method", Value: "post"}, + }, + Samples: []prompb.Sample{{Value: 1027, Timestamp: 1395066363000}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_requests_total"}, + {Name: "code", Value: "400"}, + {Name: "job", Value: "promtool"}, + {Name: "method", Value: "post"}, + }, + Samples: []prompb.Sample{{Value: 3, Timestamp: 1395066363000}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "rpc_duration_seconds"}, + {Name: "job", Value: "promtool"}, + {Name: "quantile", Value: "0.01"}, + }, + Samples: []prompb.Sample{{Value: 3102, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "rpc_duration_seconds"}, + {Name: "job", Value: "promtool"}, + {Name: "quantile", Value: "0.5"}, + }, + Samples: []prompb.Sample{{Value: 4773, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "rpc_duration_seconds"}, + {Name: "job", Value: "promtool"}, + {Name: "quantile", Value: "0.99"}, + }, + Samples: []prompb.Sample{{Value: 76656, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "rpc_duration_seconds_sum"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 1.7560473e+07, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "rpc_duration_seconds_count"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 2693, Timestamp: 1}}, + }, { Labels: []prompb.Label{ {Name: "__name__", Value: "test_metric1"}, @@ -58,7 +175,26 @@ var writeRequestFixture = &prompb.WriteRequest{ func TestParseMetricsTextAndFormat(t *testing.T) { input := bytes.NewReader([]byte(` - # HELP test_metric1 this is a test metric + # HELP http_request_duration_seconds A histogram of the request duration. + # TYPE http_request_duration_seconds histogram + http_request_duration_seconds_bucket{le="0.1"} 33444 1 + http_request_duration_seconds_bucket{le="0.5"} 129389 1 + http_request_duration_seconds_bucket{le="1"} 133988 1 + http_request_duration_seconds_bucket{le="+Inf"} 144320 1 + http_request_duration_seconds_sum 53423 1 + http_request_duration_seconds_count 144320 1 + # HELP http_requests_total The total number of HTTP requests. + # TYPE http_requests_total counter + http_requests_total{method="post",code="200"} 1027 1395066363000 + http_requests_total{method="post",code="400"} 3 1395066363000 + # HELP rpc_duration_seconds A summary of the RPC duration in seconds. + # TYPE rpc_duration_seconds summary + rpc_duration_seconds{quantile="0.01"} 3102 1 + rpc_duration_seconds{quantile="0.5"} 4773 1 + rpc_duration_seconds{quantile="0.99"} 76656 1 + rpc_duration_seconds_sum 1.7560473e+07 1 + rpc_duration_seconds_count 2693 1 + # HELP test_metric1 This is a test metric. # TYPE test_metric1 gauge test_metric1{b="c",baz="qux",d="e",foo="bar"} 1 1 test_metric1{b="c",baz="qux",d="e",foo="bar"} 2 1 From 6ae4a46845295e88c353de0dd42caa2b0fe1b46f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Gouteroux?= Date: Thu, 1 Jun 2023 10:28:55 +0200 Subject: [PATCH 5/7] feat: enhance stdin check and add tests parsing error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: François Gouteroux --- cmd/promtool/main.go | 2 +- cmd/promtool/metrics.go | 90 +++++++++++++++++------------------ docs/command-line/promtool.md | 2 +- util/fmtutil/format_test.go | 27 ++++++++++- 4 files changed, 71 insertions(+), 50 deletions(-) diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index c76790e13b..bcfcc24223 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -185,7 +185,7 @@ func main() { pushMetricsCmd.Arg("remote-write-url", "Prometheus remote write url to push metrics.").Required().URLVar(&remoteWriteURL) metricFiles := pushMetricsCmd.Arg( "metric-files", - "The metric files to push, default is read from standard input (STDIN).", + "The metric files to push, default is read from standard input.", ).ExistingFiles() pushMetricsLabels := pushMetricsCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times.").Default("job=promtool").StringMap() pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration() diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index 8abe32cf41..4a6fafd407 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -33,8 +33,6 @@ import ( // Push metrics to a prometheus remote write (for testing purpose only). func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, labels map[string]string, files ...string) int { - failed := false - addressURL, err := url.Parse(url.String()) if err != nil { fmt.Fprintln(os.Stderr, err) @@ -63,63 +61,37 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin headers: headers, } - // add empty string to avoid matching filename + var data []byte + var failed bool + if len(files) == 0 { - files = append(files, "") + data, err = io.ReadAll(os.Stdin) + if err != nil { + fmt.Fprintln(os.Stderr, " FAILED:", err) + return failureExitCode + } + fmt.Printf("Parsing standard input\n") + if parseAndPushMetrics(client, data, labels) { + fmt.Printf(" SUCCESS: metrics pushed to remote write.\n") + return successExitCode + } + return failureExitCode } for _, file := range files { - var data []byte - var err error - - // if file is an empty string it is a stdin - if file == "" { - data, err = io.ReadAll(os.Stdin) - if err != nil { - fmt.Fprintln(os.Stderr, " FAILED:", err) - failed = true - break - } - - fmt.Printf("Parsing input from stdin\n") - } else { - data, err = os.ReadFile(file) - if err != nil { - fmt.Fprintln(os.Stderr, " FAILED:", err) - failed = true - continue - } - - fmt.Printf("Parsing input from metric file %s\n", file) - } - metricsData, err := fmtutil.ParseMetricsTextAndFormat(bytes.NewReader(data), labels) + data, err = os.ReadFile(file) if err != nil { fmt.Fprintln(os.Stderr, " FAILED:", err) failed = true continue } - raw, err := metricsData.Marshal() - if err != nil { - fmt.Fprintln(os.Stderr, " FAILED:", err) - failed = true + fmt.Printf("Parsing metrics file %s\n", file) + if parseAndPushMetrics(client, data, labels) { + fmt.Printf(" SUCCESS: metrics file %s pushed to remote write.\n", file) continue } - - // Encode the request body into snappy encoding. - compressed := snappy.Encode(nil, raw) - err = client.Store(context.Background(), compressed) - if err != nil { - fmt.Fprintln(os.Stderr, " FAILED:", err) - failed = true - continue - } - - if file == "" { - fmt.Printf(" SUCCESS: metric pushed to remote write.\n") - } else { - fmt.Printf(" SUCCESS: metric file %s pushed to remote write.\n", file) - } + failed = true } if failed { @@ -129,6 +101,30 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin return successExitCode } +func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]string) bool { + metricsData, err := fmtutil.ParseMetricsTextAndFormat(bytes.NewReader(data), labels) + if err != nil { + fmt.Fprintln(os.Stderr, " FAILED:", err) + return false + } + + raw, err := metricsData.Marshal() + if err != nil { + fmt.Fprintln(os.Stderr, " FAILED:", err) + return false + } + + // Encode the request body into snappy encoding. + compressed := snappy.Encode(nil, raw) + err = client.Store(context.Background(), compressed) + if err != nil { + fmt.Fprintln(os.Stderr, " FAILED:", err) + return false + } + + return true +} + type setHeadersTransport struct { http.RoundTripper headers map[string]string diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md index c78900b991..c36caaf616 100644 --- a/docs/command-line/promtool.md +++ b/docs/command-line/promtool.md @@ -410,7 +410,7 @@ Push metrics to a prometheus remote write (for testing purpose only). | Argument | Description | Required | | --- | --- | --- | | remote-write-url | Prometheus remote write url to push metrics. | Yes | -| metric-files | The metric files to push, default is read from standard input (STDIN). | | +| metric-files | The metric files to push, default is read from standard input. | | diff --git a/util/fmtutil/format_test.go b/util/fmtutil/format_test.go index e2ac301353..5c1ab5bde0 100644 --- a/util/fmtutil/format_test.go +++ b/util/fmtutil/format_test.go @@ -173,7 +173,7 @@ var writeRequestFixture = &prompb.WriteRequest{ }, } -func TestParseMetricsTextAndFormat(t *testing.T) { +func TestParseAndPushMetricsTextAndFormat(t *testing.T) { input := bytes.NewReader([]byte(` # HELP http_request_duration_seconds A histogram of the request duration. # TYPE http_request_duration_seconds histogram @@ -206,3 +206,28 @@ func TestParseMetricsTextAndFormat(t *testing.T) { require.Equal(t, writeRequestFixture, expected) } + +func TestParseMetricsTextAndFormatErrorParsingFloatValue(t *testing.T) { + input := bytes.NewReader([]byte(` + # HELP http_requests_total The total number of HTTP requests. + # TYPE http_requests_total counter + http_requests_total{method="post",code="200"} 1027Error 1395066363000 + http_requests_total{method="post",code="400"} 3 1395066363000 + `)) + labels := map[string]string{"job": "promtool"} + + _, err := ParseMetricsTextAndFormat(input, labels) + require.Equal(t, err.Error(), "text format parsing error in line 4: expected float as value, got \"1027Error\"") +} + +func TestParseMetricsTextAndFormatErrorParsingMetricType(t *testing.T) { + input := bytes.NewReader([]byte(` + # HELP node_info node info summary. + # TYPE node_info info + node_info{test="summary"} 1 1395066363000 + `)) + labels := map[string]string{"job": "promtool"} + + _, err := ParseMetricsTextAndFormat(input, labels) + require.Equal(t, err.Error(), "text format parsing error in line 3: unknown metric type \"info\"") +} From 3eaa7eb538a14bfbfbb1f6e11c4f0d5a4878b10e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Gouteroux?= Date: Mon, 19 Jun 2023 10:44:24 +0200 Subject: [PATCH 6/7] fix: apply suggested changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: François Gouteroux --- util/fmtutil/format.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/util/fmtutil/format.go b/util/fmtutil/format.go index 291308dc23..5cff0516b9 100644 --- a/util/fmtutil/format.go +++ b/util/fmtutil/format.go @@ -44,8 +44,8 @@ var MetricMetadataTypeValue = map[string]int32{ "STATESET": 7, } -// FormatMetrics convert metric family to a writerequest. -func FormatMetrics(mf map[string]*dto.MetricFamily, extraLabels map[string]string) (*prompb.WriteRequest, error) { +// CreateWriteRequest convert metric family to a writerequest. +func CreateWriteRequest(mf map[string]*dto.MetricFamily, extraLabels map[string]string) (*prompb.WriteRequest, error) { wr := &prompb.WriteRequest{} // build metric list @@ -76,16 +76,16 @@ func FormatMetrics(mf map[string]*dto.MetricFamily, extraLabels map[string]strin return wr, nil } -func makeTimeserie(wr *prompb.WriteRequest, labels map[string]string, timestamp int64, value float64) { - var timeserie prompb.TimeSeries - timeserie.Labels = makeLabels(labels) - timeserie.Samples = []prompb.Sample{ +func toTimeseries(wr *prompb.WriteRequest, labels map[string]string, timestamp int64, value float64) { + var ts prompb.TimeSeries + ts.Labels = makeLabels(labels) + ts.Samples = []prompb.Sample{ { Timestamp: timestamp, Value: value, }, } - wr.Timeseries = append(wr.Timeseries, timeserie) + wr.Timeseries = append(wr.Timeseries, ts) } func makeTimeseries(wr *prompb.WriteRequest, labels map[string]string, m *dto.Metric) error { @@ -98,9 +98,9 @@ func makeTimeseries(wr *prompb.WriteRequest, labels map[string]string, m *dto.Me switch { case m.Gauge != nil: - makeTimeserie(wr, labels, timestamp, m.GetGauge().GetValue()) + toTimeseries(wr, labels, timestamp, m.GetGauge().GetValue()) case m.Counter != nil: - makeTimeserie(wr, labels, timestamp, m.GetCounter().GetValue()) + toTimeseries(wr, labels, timestamp, m.GetCounter().GetValue()) case m.Summary != nil: metricName := labels[model.MetricNameLabel] // Preserve metric name order with first quantile labels timeseries then sum suffix timeserie and finally count suffix timeserie @@ -112,15 +112,15 @@ func makeTimeseries(wr *prompb.WriteRequest, labels map[string]string, m *dto.Me for _, q := range m.GetSummary().Quantile { quantileLabels[model.QuantileLabel] = fmt.Sprint(q.GetQuantile()) - makeTimeserie(wr, quantileLabels, timestamp, q.GetValue()) + toTimeseries(wr, quantileLabels, timestamp, q.GetValue()) } // Overwrite label model.MetricNameLabel for count and sum metrics // Add Summary sum timeserie labels[model.MetricNameLabel] = metricName + sumStr - makeTimeserie(wr, labels, timestamp, m.GetSummary().GetSampleSum()) + toTimeseries(wr, labels, timestamp, m.GetSummary().GetSampleSum()) // Add Summary count timeserie labels[model.MetricNameLabel] = metricName + countStr - makeTimeserie(wr, labels, timestamp, float64(m.GetSummary().GetSampleCount())) + toTimeseries(wr, labels, timestamp, float64(m.GetSummary().GetSampleCount())) case m.Histogram != nil: metricName := labels[model.MetricNameLabel] @@ -133,18 +133,18 @@ func makeTimeseries(wr *prompb.WriteRequest, labels map[string]string, m *dto.Me for _, b := range m.GetHistogram().Bucket { bucketLabels[model.MetricNameLabel] = metricName + bucketStr bucketLabels[model.BucketLabel] = fmt.Sprint(b.GetUpperBound()) - makeTimeserie(wr, bucketLabels, timestamp, float64(b.GetCumulativeCount())) + toTimeseries(wr, bucketLabels, timestamp, float64(b.GetCumulativeCount())) } // Overwrite label model.MetricNameLabel for count and sum metrics // Add Histogram sum timeserie labels[model.MetricNameLabel] = metricName + sumStr - makeTimeserie(wr, labels, timestamp, m.GetHistogram().GetSampleSum()) + toTimeseries(wr, labels, timestamp, m.GetHistogram().GetSampleSum()) // Add Histogram count timeserie labels[model.MetricNameLabel] = metricName + countStr - makeTimeserie(wr, labels, timestamp, float64(m.GetHistogram().GetSampleCount())) + toTimeseries(wr, labels, timestamp, float64(m.GetHistogram().GetSampleCount())) case m.Untyped != nil: - makeTimeserie(wr, labels, timestamp, m.GetUntyped().GetValue()) + toTimeseries(wr, labels, timestamp, m.GetUntyped().GetValue()) default: err = errors.New("unsupported metric type") } @@ -208,5 +208,5 @@ func ParseMetricsTextAndFormat(input io.Reader, labels map[string]string) (*prom if err != nil { return nil, err } - return FormatMetrics(mf, labels) + return CreateWriteRequest(mf, labels) } From 58d38c4c5630fb73afb5403d03ede884ad7fd91c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Gouteroux?= Date: Tue, 27 Jun 2023 09:30:39 +0200 Subject: [PATCH 7/7] fix: apply suggested changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: François Gouteroux --- cmd/promtool/metrics.go | 2 +- util/fmtutil/format.go | 33 ++++++++++++--------------------- util/fmtutil/format_test.go | 10 +++++----- 3 files changed, 18 insertions(+), 27 deletions(-) diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go index 4a6fafd407..2bc2237e2f 100644 --- a/cmd/promtool/metrics.go +++ b/cmd/promtool/metrics.go @@ -102,7 +102,7 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin } func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]string) bool { - metricsData, err := fmtutil.ParseMetricsTextAndFormat(bytes.NewReader(data), labels) + metricsData, err := fmtutil.MetricTextToWriteRequest(bytes.NewReader(data), labels) if err != nil { fmt.Fprintln(os.Stderr, " FAILED:", err) return false diff --git a/util/fmtutil/format.go b/util/fmtutil/format.go index 5cff0516b9..9034a90fa7 100644 --- a/util/fmtutil/format.go +++ b/util/fmtutil/format.go @@ -44,8 +44,18 @@ var MetricMetadataTypeValue = map[string]int32{ "STATESET": 7, } -// CreateWriteRequest convert metric family to a writerequest. -func CreateWriteRequest(mf map[string]*dto.MetricFamily, extraLabels map[string]string) (*prompb.WriteRequest, error) { +// MetricTextToWriteRequest consumes an io.Reader and return the data in write request format. +func MetricTextToWriteRequest(input io.Reader, labels map[string]string) (*prompb.WriteRequest, error) { + var parser expfmt.TextParser + mf, err := parser.TextToMetricFamilies(input) + if err != nil { + return nil, err + } + return MetricFamiliesToWriteRequest(mf, labels) +} + +// MetricFamiliesToWriteRequest convert metric family to a writerequest. +func MetricFamiliesToWriteRequest(mf map[string]*dto.MetricFamily, extraLabels map[string]string) (*prompb.WriteRequest, error) { wr := &prompb.WriteRequest{} // build metric list @@ -191,22 +201,3 @@ func makeLabelsMap(m *dto.Metric, metricName string, extraLabels map[string]stri return labels } - -// ParseMetricsTextReader consumes an io.Reader and returns the MetricFamily. -func ParseMetricsTextReader(input io.Reader) (map[string]*dto.MetricFamily, error) { - var parser expfmt.TextParser - mf, err := parser.TextToMetricFamilies(input) - if err != nil { - return nil, err - } - return mf, nil -} - -// ParseMetricsTextAndFormat return the data in the expected prometheus metrics write request format. -func ParseMetricsTextAndFormat(input io.Reader, labels map[string]string) (*prompb.WriteRequest, error) { - mf, err := ParseMetricsTextReader(input) - if err != nil { - return nil, err - } - return CreateWriteRequest(mf, labels) -} diff --git a/util/fmtutil/format_test.go b/util/fmtutil/format_test.go index 5c1ab5bde0..0f052f5e75 100644 --- a/util/fmtutil/format_test.go +++ b/util/fmtutil/format_test.go @@ -201,13 +201,13 @@ func TestParseAndPushMetricsTextAndFormat(t *testing.T) { `)) labels := map[string]string{"job": "promtool"} - expected, err := ParseMetricsTextAndFormat(input, labels) + expected, err := MetricTextToWriteRequest(input, labels) require.NoError(t, err) require.Equal(t, writeRequestFixture, expected) } -func TestParseMetricsTextAndFormatErrorParsingFloatValue(t *testing.T) { +func TestMetricTextToWriteRequestErrorParsingFloatValue(t *testing.T) { input := bytes.NewReader([]byte(` # HELP http_requests_total The total number of HTTP requests. # TYPE http_requests_total counter @@ -216,11 +216,11 @@ func TestParseMetricsTextAndFormatErrorParsingFloatValue(t *testing.T) { `)) labels := map[string]string{"job": "promtool"} - _, err := ParseMetricsTextAndFormat(input, labels) + _, err := MetricTextToWriteRequest(input, labels) require.Equal(t, err.Error(), "text format parsing error in line 4: expected float as value, got \"1027Error\"") } -func TestParseMetricsTextAndFormatErrorParsingMetricType(t *testing.T) { +func TestMetricTextToWriteRequestErrorParsingMetricType(t *testing.T) { input := bytes.NewReader([]byte(` # HELP node_info node info summary. # TYPE node_info info @@ -228,6 +228,6 @@ func TestParseMetricsTextAndFormatErrorParsingMetricType(t *testing.T) { `)) labels := map[string]string{"job": "promtool"} - _, err := ParseMetricsTextAndFormat(input, labels) + _, err := MetricTextToWriteRequest(input, labels) require.Equal(t, err.Error(), "text format parsing error in line 3: unknown metric type \"info\"") }