feat: enhance stdin check and add tests parsing error

Signed-off-by: François Gouteroux <francois.gouteroux@gmail.com>
This commit is contained in:
François Gouteroux 2023-06-01 10:28:55 +02:00
parent ca6580828a
commit 6ae4a46845
4 changed files with 71 additions and 50 deletions

View file

@ -185,7 +185,7 @@ func main() {
pushMetricsCmd.Arg("remote-write-url", "Prometheus remote write url to push metrics.").Required().URLVar(&remoteWriteURL) pushMetricsCmd.Arg("remote-write-url", "Prometheus remote write url to push metrics.").Required().URLVar(&remoteWriteURL)
metricFiles := pushMetricsCmd.Arg( metricFiles := pushMetricsCmd.Arg(
"metric-files", "metric-files",
"The metric files to push, default is read from standard input (STDIN).", "The metric files to push, default is read from standard input.",
).ExistingFiles() ).ExistingFiles()
pushMetricsLabels := pushMetricsCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times.").Default("job=promtool").StringMap() pushMetricsLabels := pushMetricsCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times.").Default("job=promtool").StringMap()
pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration() pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration()

View file

@ -33,8 +33,6 @@ import (
// Push metrics to a prometheus remote write (for testing purpose only). // Push metrics to a prometheus remote write (for testing purpose only).
func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, labels map[string]string, files ...string) int { func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, labels map[string]string, files ...string) int {
failed := false
addressURL, err := url.Parse(url.String()) addressURL, err := url.Parse(url.String())
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
@ -63,63 +61,37 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
headers: headers, headers: headers,
} }
// add empty string to avoid matching filename var data []byte
var failed bool
if len(files) == 0 { if len(files) == 0 {
files = append(files, "") data, err = io.ReadAll(os.Stdin)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return failureExitCode
}
fmt.Printf("Parsing standard input\n")
if parseAndPushMetrics(client, data, labels) {
fmt.Printf(" SUCCESS: metrics pushed to remote write.\n")
return successExitCode
}
return failureExitCode
} }
for _, file := range files { for _, file := range files {
var data []byte data, err = os.ReadFile(file)
var err error
// if file is an empty string it is a stdin
if file == "" {
data, err = io.ReadAll(os.Stdin)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
failed = true
break
}
fmt.Printf("Parsing input from stdin\n")
} else {
data, err = os.ReadFile(file)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
failed = true
continue
}
fmt.Printf("Parsing input from metric file %s\n", file)
}
metricsData, err := fmtutil.ParseMetricsTextAndFormat(bytes.NewReader(data), labels)
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err) fmt.Fprintln(os.Stderr, " FAILED:", err)
failed = true failed = true
continue continue
} }
raw, err := metricsData.Marshal() fmt.Printf("Parsing metrics file %s\n", file)
if err != nil { if parseAndPushMetrics(client, data, labels) {
fmt.Fprintln(os.Stderr, " FAILED:", err) fmt.Printf(" SUCCESS: metrics file %s pushed to remote write.\n", file)
failed = true
continue continue
} }
failed = true
// Encode the request body into snappy encoding.
compressed := snappy.Encode(nil, raw)
err = client.Store(context.Background(), compressed)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
failed = true
continue
}
if file == "" {
fmt.Printf(" SUCCESS: metric pushed to remote write.\n")
} else {
fmt.Printf(" SUCCESS: metric file %s pushed to remote write.\n", file)
}
} }
if failed { if failed {
@ -129,6 +101,30 @@ func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[strin
return successExitCode return successExitCode
} }
func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]string) bool {
metricsData, err := fmtutil.ParseMetricsTextAndFormat(bytes.NewReader(data), labels)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false
}
raw, err := metricsData.Marshal()
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false
}
// Encode the request body into snappy encoding.
compressed := snappy.Encode(nil, raw)
err = client.Store(context.Background(), compressed)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false
}
return true
}
type setHeadersTransport struct { type setHeadersTransport struct {
http.RoundTripper http.RoundTripper
headers map[string]string headers map[string]string

View file

@ -410,7 +410,7 @@ Push metrics to a prometheus remote write (for testing purpose only).
| Argument | Description | Required | | Argument | Description | Required |
| --- | --- | --- | | --- | --- | --- |
| remote-write-url | Prometheus remote write url to push metrics. | Yes | | remote-write-url | Prometheus remote write url to push metrics. | Yes |
| metric-files | The metric files to push, default is read from standard input (STDIN). | | | metric-files | The metric files to push, default is read from standard input. | |

View file

@ -173,7 +173,7 @@ var writeRequestFixture = &prompb.WriteRequest{
}, },
} }
func TestParseMetricsTextAndFormat(t *testing.T) { func TestParseAndPushMetricsTextAndFormat(t *testing.T) {
input := bytes.NewReader([]byte(` input := bytes.NewReader([]byte(`
# HELP http_request_duration_seconds A histogram of the request duration. # HELP http_request_duration_seconds A histogram of the request duration.
# TYPE http_request_duration_seconds histogram # TYPE http_request_duration_seconds histogram
@ -206,3 +206,28 @@ func TestParseMetricsTextAndFormat(t *testing.T) {
require.Equal(t, writeRequestFixture, expected) require.Equal(t, writeRequestFixture, expected)
} }
func TestParseMetricsTextAndFormatErrorParsingFloatValue(t *testing.T) {
input := bytes.NewReader([]byte(`
# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027Error 1395066363000
http_requests_total{method="post",code="400"} 3 1395066363000
`))
labels := map[string]string{"job": "promtool"}
_, err := ParseMetricsTextAndFormat(input, labels)
require.Equal(t, err.Error(), "text format parsing error in line 4: expected float as value, got \"1027Error\"")
}
func TestParseMetricsTextAndFormatErrorParsingMetricType(t *testing.T) {
input := bytes.NewReader([]byte(`
# HELP node_info node info summary.
# TYPE node_info info
node_info{test="summary"} 1 1395066363000
`))
labels := map[string]string{"job": "promtool"}
_, err := ParseMetricsTextAndFormat(input, labels)
require.Equal(t, err.Error(), "text format parsing error in line 3: unknown metric type \"info\"")
}