prometheus/storage/remote/client_test.go
alexgreenbank 62e6d1abf5 remote-write: add ability to set User-Agent
Signed-off-by: alexgreenbank <alex.greenbank@grafana.com>
2024-10-23 13:01:30 +01:00

580 lines
15 KiB
Go

// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
var longErrMessage = strings.Repeat("error message", maxErrMsgLen)
func TestStoreHTTPErrorHandling(t *testing.T) {
tests := []struct {
code int
err error
}{
{
code: 200,
err: nil,
},
{
code: 300,
err: errors.New("server returned HTTP status 300 Multiple Choices: " + longErrMessage[:maxErrMsgLen]),
},
{
code: 404,
err: errors.New("server returned HTTP status 404 Not Found: " + longErrMessage[:maxErrMsgLen]),
},
{
code: 500,
err: RecoverableError{errors.New("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen]), defaultBackoff},
},
}
for _, test := range tests {
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, longErrMessage, test.code)
}),
)
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
conf := &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
}
hash, err := toHash(conf)
require.NoError(t, err)
c, err := NewWriteClient(hash, conf)
require.NoError(t, err)
_, err = c.Store(context.Background(), []byte{}, 0)
if test.err != nil {
require.EqualError(t, err, test.err.Error())
} else {
require.NoError(t, err)
}
server.Close()
}
}
func TestReadClientUserAgent(t *testing.T) {
tests := []struct {
name string
userAgent string
}{
{
name: "default-no-override",
userAgent: "",
},
{
name: "overridden",
userAgent: "ArgleBargle/1.2.3",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var called bool
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
receivedHeaders := r.Header
// Check the X-Prometheus-User-Agent header.
require.Equal(t, []string{internalUserAgent}, receivedHeaders.Values("X-Prometheus-User-Agent"),
"expected X-Prometheus-User-Agent header to be default value of %q", internalUserAgent)
if test.userAgent == "" {
// Expect original header value.
require.Equal(t, []string{internalUserAgent}, receivedHeaders.Values("User-Agent"),
"expected User-Agent header to be default value of %q", internalUserAgent)
} else {
// Expect over-ridden header value.
require.Equal(t, []string{test.userAgent}, receivedHeaders.Values("User-Agent"),
"expected User-Agent header to be over-ridden value of %q", test.userAgent)
}
w.Header().Set("Content-Type", "text/plain")
}),
)
defer server.Close()
u, err := url.Parse(server.URL)
require.NoError(t, err)
conf := &ClientConfig{
URL: &config_util.URL{URL: u},
Timeout: model.Duration(5 * time.Second),
ChunkedReadLimit: config.DefaultChunkedReadLimit,
}
// Set the User-Agent.
if test.userAgent == "" {
// Ensure it is set to the default.
SetUserAgent(internalUserAgent)
} else {
SetUserAgent(test.userAgent)
}
c, err := NewReadClient("test", conf)
require.NoError(t, err)
query := &prompb.Query{}
_, err = c.Read(context.Background(), query, false)
require.ErrorContains(t, err, "unsupported content type")
require.True(t, called, "The remote server wasn't called")
})
}
}
func TestWriteClientUserAgent(t *testing.T) {
tests := []struct {
name string
userAgent string
}{
{
name: "default-no-override",
userAgent: "",
},
{
name: "overridden",
userAgent: "ArgleBargle/1.2.3",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var called bool
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
receivedHeaders := r.Header
// Check the X-Prometheus-User-Agent header.
require.Equal(t, []string{internalUserAgent}, receivedHeaders.Values("X-Prometheus-User-Agent"),
"expected X-Prometheus-User-Agent header to be default value of %q", internalUserAgent)
if test.userAgent == "" {
// Expect original header value.
require.Equal(t, []string{internalUserAgent}, receivedHeaders.Values("User-Agent"),
"expected User-Agent header to be default value of %q", internalUserAgent)
} else {
// Expect over-ridden header value.
require.Equal(t, []string{test.userAgent}, receivedHeaders.Values("User-Agent"),
"expected User-Agent header to be over-ridden value of %q", test.userAgent)
}
// w.Header().Set("Content-Type", "text/plain")
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
conf := &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
}
// Set the User-Agent.
if test.userAgent == "" {
// Ensure it is set to the default.
SetUserAgent(internalUserAgent)
} else {
SetUserAgent(test.userAgent)
}
hash, err := toHash(conf)
require.NoError(t, err)
c, err := NewWriteClient(hash, conf)
require.NoError(t, err)
_, err = c.Store(context.Background(), []byte{}, 0)
require.NoError(t, err)
require.True(t, called, "The remote server wasn't called")
})
}
}
func TestClientRetryAfter(t *testing.T) {
setupServer := func(statusCode int) *httptest.Server {
return httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Retry-After", "5")
http.Error(w, longErrMessage, statusCode)
}),
)
}
getClientConfig := func(serverURL *url.URL, retryOnRateLimit bool) *ClientConfig {
return &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
RetryOnRateLimit: retryOnRateLimit,
}
}
getClient := func(conf *ClientConfig) WriteClient {
hash, err := toHash(conf)
require.NoError(t, err)
c, err := NewWriteClient(hash, conf)
require.NoError(t, err)
return c
}
testCases := []struct {
name string
statusCode int
retryOnRateLimit bool
expectedRecoverable bool
expectedRetryAfter model.Duration
}{
{"TooManyRequests - No Retry", http.StatusTooManyRequests, false, false, 0},
{"TooManyRequests - With Retry", http.StatusTooManyRequests, true, true, 5 * model.Duration(time.Second)},
{"InternalServerError", http.StatusInternalServerError, false, true, 5 * model.Duration(time.Second)}, // HTTP 5xx errors do not depend on retryOnRateLimit.
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
server := setupServer(tc.statusCode)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
c := getClient(getClientConfig(serverURL, tc.retryOnRateLimit))
var recErr RecoverableError
_, err = c.Store(context.Background(), []byte{}, 0)
require.Equal(t, tc.expectedRecoverable, errors.As(err, &recErr), "Mismatch in expected recoverable error status.")
if tc.expectedRecoverable {
require.Equal(t, tc.expectedRetryAfter, recErr.retryAfter)
}
})
}
}
func TestRetryAfterDuration(t *testing.T) {
tc := []struct {
name string
tInput string
expected model.Duration
}{
{
name: "seconds",
tInput: "120",
expected: model.Duration(time.Second * 120),
},
{
name: "date-time default",
tInput: time.RFC1123, // Expected layout is http.TimeFormat, hence an error.
expected: defaultBackoff,
},
{
name: "retry-after not provided",
tInput: "", // Expected layout is http.TimeFormat, hence an error.
expected: defaultBackoff,
},
}
for _, c := range tc {
require.Equal(t, c.expected, retryAfterDuration(c.tInput), c.name)
}
}
func TestClientCustomHeaders(t *testing.T) {
headersToSend := map[string]string{"Foo": "Bar", "Baz": "qux"}
var called bool
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
receivedHeaders := r.Header
for name, value := range headersToSend {
require.Equal(
t,
[]string{value},
receivedHeaders.Values(name),
"expected %v to be part of the received headers %v",
headersToSend,
receivedHeaders,
)
}
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
conf := &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
Headers: headersToSend,
}
c, err := NewWriteClient("c", conf)
require.NoError(t, err)
_, err = c.Store(context.Background(), []byte{}, 0)
require.NoError(t, err)
require.True(t, called, "The remote server wasn't called")
}
func TestReadClient(t *testing.T) {
tests := []struct {
name string
query *prompb.Query
httpHandler http.HandlerFunc
expectedLabels []map[string]string
expectedSamples [][]model.SamplePair
expectedErrorContains string
sortSeries bool
}{
{
name: "sorted sampled response",
httpHandler: sampledResponseHTTPHandler(t),
expectedLabels: []map[string]string{
{"foo1": "bar"},
{"foo2": "bar"},
},
expectedSamples: [][]model.SamplePair{
{
{Timestamp: model.Time(0), Value: model.SampleValue(3)},
{Timestamp: model.Time(5), Value: model.SampleValue(4)},
},
{
{Timestamp: model.Time(0), Value: model.SampleValue(1)},
{Timestamp: model.Time(5), Value: model.SampleValue(2)},
},
},
expectedErrorContains: "",
sortSeries: true,
},
{
name: "unsorted sampled response",
httpHandler: sampledResponseHTTPHandler(t),
expectedLabels: []map[string]string{
{"foo2": "bar"},
{"foo1": "bar"},
},
expectedSamples: [][]model.SamplePair{
{
{Timestamp: model.Time(0), Value: model.SampleValue(1)},
{Timestamp: model.Time(5), Value: model.SampleValue(2)},
},
{
{Timestamp: model.Time(0), Value: model.SampleValue(3)},
{Timestamp: model.Time(5), Value: model.SampleValue(4)},
},
},
expectedErrorContains: "",
sortSeries: false,
},
{
name: "chunked response",
query: &prompb.Query{
StartTimestampMs: 4000,
EndTimestampMs: 12000,
},
httpHandler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
flusher, ok := w.(http.Flusher)
require.True(t, ok)
cw := NewChunkedWriter(w, flusher)
l := []prompb.Label{
{Name: "foo", Value: "bar"},
}
chunks := buildTestChunks(t)
for i, c := range chunks {
cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}}
readResp := prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
QueryIndex: int64(i),
}
b, err := proto.Marshal(&readResp)
require.NoError(t, err)
_, err = cw.Write(b)
require.NoError(t, err)
}
}),
expectedLabels: []map[string]string{
{"foo": "bar"},
{"foo": "bar"},
{"foo": "bar"},
},
// This is the output of buildTestChunks minus the samples outside the query range.
expectedSamples: [][]model.SamplePair{
{
{Timestamp: model.Time(4000), Value: model.SampleValue(4)},
},
{
{Timestamp: model.Time(5000), Value: model.SampleValue(1)},
{Timestamp: model.Time(6000), Value: model.SampleValue(2)},
{Timestamp: model.Time(7000), Value: model.SampleValue(3)},
{Timestamp: model.Time(8000), Value: model.SampleValue(4)},
{Timestamp: model.Time(9000), Value: model.SampleValue(5)},
},
{
{Timestamp: model.Time(10000), Value: model.SampleValue(2)},
{Timestamp: model.Time(11000), Value: model.SampleValue(3)},
{Timestamp: model.Time(12000), Value: model.SampleValue(4)},
},
},
expectedErrorContains: "",
},
{
name: "unsupported content type",
httpHandler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "foobar")
}),
expectedErrorContains: "unsupported content type",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := httptest.NewServer(test.httpHandler)
defer server.Close()
u, err := url.Parse(server.URL)
require.NoError(t, err)
conf := &ClientConfig{
URL: &config_util.URL{URL: u},
Timeout: model.Duration(5 * time.Second),
ChunkedReadLimit: config.DefaultChunkedReadLimit,
}
c, err := NewReadClient("test", conf)
require.NoError(t, err)
query := &prompb.Query{}
if test.query != nil {
query = test.query
}
ss, err := c.Read(context.Background(), query, test.sortSeries)
if test.expectedErrorContains != "" {
require.ErrorContains(t, err, test.expectedErrorContains)
return
}
require.NoError(t, err)
i := 0
for ss.Next() {
require.NoError(t, ss.Err())
s := ss.At()
l := s.Labels()
require.Len(t, test.expectedLabels[i], l.Len())
for k, v := range test.expectedLabels[i] {
require.True(t, l.Has(k))
require.Equal(t, v, l.Get(k))
}
it := s.Iterator(nil)
j := 0
for valType := it.Next(); valType != chunkenc.ValNone; valType = it.Next() {
require.NoError(t, it.Err())
ts, v := it.At()
expectedSample := test.expectedSamples[i][j]
require.Equal(t, int64(expectedSample.Timestamp), ts)
require.Equal(t, float64(expectedSample.Value), v)
j++
}
require.Len(t, test.expectedSamples[i], j)
i++
}
require.NoError(t, ss.Err())
})
}
}
func sampledResponseHTTPHandler(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/x-protobuf")
resp := prompb.ReadResponse{
Results: []*prompb.QueryResult{
{
Timeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "foo2", Value: "bar"},
},
Samples: []prompb.Sample{
{Value: float64(1), Timestamp: int64(0)},
{Value: float64(2), Timestamp: int64(5)},
},
Exemplars: []prompb.Exemplar{},
},
{
Labels: []prompb.Label{
{Name: "foo1", Value: "bar"},
},
Samples: []prompb.Sample{
{Value: float64(3), Timestamp: int64(0)},
{Value: float64(4), Timestamp: int64(5)},
},
Exemplars: []prompb.Exemplar{},
},
},
},
},
}
b, err := proto.Marshal(&resp)
require.NoError(t, err)
_, err = w.Write(snappy.Encode(nil, b))
require.NoError(t, err)
}
}