mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
stash WIP for latest content negotiation changes
Signed-off-by: Alex Greenbank <alex.greenbank@grafana.com>
This commit is contained in:
parent
7b88101cf5
commit
1651f8fab2
|
@ -56,7 +56,7 @@ var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)
|
||||||
// not understand Remote Write 2.0) will result in an HTTP 400 status code from the receiver.
|
// not understand Remote Write 2.0) will result in an HTTP 400 status code from the receiver.
|
||||||
|
|
||||||
// A Remote Write 2.0 request sent to a remote write receiver may (depending on receiver version)
|
// A Remote Write 2.0 request sent to a remote write receiver may (depending on receiver version)
|
||||||
// result in an HTTP 406 status code to indicate that it does not accept the protocol or
|
// result in an HTTP 415 status code to indicate that it does not accept the protocol or
|
||||||
// encoding of that request and that the sender should retry with a more suitable protocol
|
// encoding of that request and that the sender should retry with a more suitable protocol
|
||||||
// version or encoding.
|
// version or encoding.
|
||||||
|
|
||||||
|
@ -226,55 +226,6 @@ type RecoverableError struct {
|
||||||
retryAfter model.Duration
|
retryAfter model.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt a HEAD request against a remote write endpoint to see what it supports.
|
|
||||||
func (c *Client) probeRemoteVersions(ctx context.Context) error {
|
|
||||||
// We assume we are in Version2 mode otherwise we shouldn't be calling this.
|
|
||||||
|
|
||||||
httpReq, err := http.NewRequest(http.MethodHead, c.urlString, nil)
|
|
||||||
if err != nil {
|
|
||||||
// Errors from NewRequest are from unparsable URLs, so are not
|
|
||||||
// recoverable.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set the version header to be nice.
|
|
||||||
httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
|
|
||||||
httpReq.Header.Set("User-Agent", UserAgent)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, c.timeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
httpResp, err := c.Client.Do(httpReq.WithContext(ctx))
|
|
||||||
if err != nil {
|
|
||||||
// We don't attempt a retry here.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// See if we got a header anyway.
|
|
||||||
promHeader := httpResp.Header.Get(RemoteWriteVersionHeader)
|
|
||||||
|
|
||||||
// Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank.
|
|
||||||
if promHeader != "" {
|
|
||||||
c.lastRWHeader = promHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for an error.
|
|
||||||
if httpResp.StatusCode != http.StatusOK {
|
|
||||||
if httpResp.StatusCode == http.StatusMethodNotAllowed {
|
|
||||||
// If we get a 405 (MethodNotAllowed) error then it means the endpoint doesn't
|
|
||||||
// understand Remote Write 2.0, so we allow the lastRWHeader to be overwritten
|
|
||||||
// even if it is blank.
|
|
||||||
// This will make subsequent sends use RemoteWrite 1.0 until the endpoint gives
|
|
||||||
// a response that confirms it can speak 2.0.
|
|
||||||
c.lastRWHeader = promHeader
|
|
||||||
}
|
|
||||||
return fmt.Errorf(httpResp.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
// All ok, return no error.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
|
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
|
||||||
// and encoded bytes from codec.go.
|
// and encoded bytes from codec.go.
|
||||||
func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat config.RemoteWriteFormat, compression string) error {
|
func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat config.RemoteWriteFormat, compression string) error {
|
||||||
|
@ -335,8 +286,8 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co
|
||||||
// Return an unrecoverable error to indicate the 400.
|
// Return an unrecoverable error to indicate the 400.
|
||||||
// This then gets passed up the chain so we can react to it properly.
|
// This then gets passed up the chain so we can react to it properly.
|
||||||
return &ErrRenegotiate{line, httpResp.StatusCode}
|
return &ErrRenegotiate{line, httpResp.StatusCode}
|
||||||
case http.StatusNotAcceptable:
|
case http.StatusUnsupportedMediaType:
|
||||||
// Return an unrecoverable error to indicate the 406.
|
// Return an unrecoverable error to indicate the 415.
|
||||||
// This then gets passed up the chain so we can react to it properly.
|
// This then gets passed up the chain so we can react to it properly.
|
||||||
return &ErrRenegotiate{line, httpResp.StatusCode}
|
return &ErrRenegotiate{line, httpResp.StatusCode}
|
||||||
default:
|
default:
|
||||||
|
@ -377,10 +328,6 @@ func (c Client) Endpoint() string {
|
||||||
return c.urlString
|
return c.urlString
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) GetLastRWHeader() string {
|
|
||||||
return c.lastRWHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read reads from a remote endpoint.
|
// Read reads from a remote endpoint.
|
||||||
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
||||||
c.readQueries.Inc()
|
c.readQueries.Inc()
|
||||||
|
@ -463,10 +410,6 @@ func NewTestClient(name, url string) WriteClient {
|
||||||
return &TestClient{name: name, url: url}
|
return &TestClient{name: name, url: url}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestClient) probeRemoteVersions(_ context.Context) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TestClient) Store(_ context.Context, req []byte, _ int, _ config.RemoteWriteFormat, _ string) error {
|
func (c *TestClient) Store(_ context.Context, req []byte, _ int, _ config.RemoteWriteFormat, _ string) error {
|
||||||
r := rand.Intn(200-100) + 100
|
r := rand.Intn(200-100) + 100
|
||||||
time.Sleep(time.Duration(r) * time.Millisecond)
|
time.Sleep(time.Duration(r) * time.Millisecond)
|
||||||
|
@ -480,7 +423,3 @@ func (c *TestClient) Name() string {
|
||||||
func (c *TestClient) Endpoint() string {
|
func (c *TestClient) Endpoint() string {
|
||||||
return c.url
|
return c.url
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestClient) GetLastRWHeader() string {
|
|
||||||
return "2.0;snappy,0.1.0"
|
|
||||||
}
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -397,10 +396,6 @@ type WriteClient interface {
|
||||||
Name() string
|
Name() string
|
||||||
// Endpoint is the remote read or write endpoint for the storage client.
|
// Endpoint is the remote read or write endpoint for the storage client.
|
||||||
Endpoint() string
|
Endpoint() string
|
||||||
// Get the protocol versions supported by the endpoint.
|
|
||||||
probeRemoteVersions(ctx context.Context) error
|
|
||||||
// Get the last RW header received from the endpoint.
|
|
||||||
GetLastRWHeader() string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -582,7 +577,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
|
||||||
// Build the WriteRequest with no samples.
|
// Build the WriteRequest with no samples.
|
||||||
|
|
||||||
// Get compression to use from content negotiation based on last header seen (defaults to snappy).
|
// Get compression to use from content negotiation based on last header seen (defaults to snappy).
|
||||||
compression, _ := negotiateRWProto(t.rwFormat, t.storeClient.GetLastRWHeader())
|
compression := "snappy"
|
||||||
|
|
||||||
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, compression)
|
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, compression)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1510,40 +1505,6 @@ func (q *queue) newBatch(capacity int) []timeSeries {
|
||||||
return make([]timeSeries, 0, capacity)
|
return make([]timeSeries, 0, capacity)
|
||||||
}
|
}
|
||||||
|
|
||||||
func negotiateRWProto(rwFormat config.RemoteWriteFormat, lastHeaderSeen string) (string, config.RemoteWriteFormat) {
|
|
||||||
if rwFormat == Version1 {
|
|
||||||
// If we're only handling Version1 then all we can do is that with snappy compression.
|
|
||||||
return "snappy", Version1
|
|
||||||
}
|
|
||||||
if rwFormat != Version2 {
|
|
||||||
// If we get here then someone has added a new RemoteWriteFormat value but hasn't
|
|
||||||
// fixed this function to handle it. Panic!
|
|
||||||
panic(fmt.Sprintf("Unhandled RemoteWriteFormat %q", rwFormat))
|
|
||||||
}
|
|
||||||
if lastHeaderSeen == "" {
|
|
||||||
// We haven't had a valid header, so we just default to "0.1.0/snappy".
|
|
||||||
return "snappy", Version1
|
|
||||||
}
|
|
||||||
// We can currently handle:
|
|
||||||
// "2.0;snappy"
|
|
||||||
// "0.1.0" - implicit compression of snappy
|
|
||||||
// lastHeaderSeen should contain a list of tuples.
|
|
||||||
// If we find a match to something we can handle then we can return that.
|
|
||||||
for _, tuple := range strings.Split(lastHeaderSeen, ",") {
|
|
||||||
// Remove spaces from the tuple.
|
|
||||||
curr := strings.ReplaceAll(tuple, " ", "")
|
|
||||||
switch curr {
|
|
||||||
case "2.0;snappy":
|
|
||||||
return "snappy", Version2
|
|
||||||
case "0.1.0":
|
|
||||||
return "snappy", Version1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise we have to default to "0.1.0".
|
|
||||||
return "snappy", Version1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if s.running.Dec() == 0 {
|
if s.running.Dec() == 0 {
|
||||||
|
@ -1637,17 +1598,13 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Work out what version to send based on the last header seen and the QM's rwFormat setting.
|
compression := "snappy"
|
||||||
for attemptNos := 1; attemptNos <= 3; attemptNos++ {
|
rwFormat := s.qm.rwFormat
|
||||||
lastHeaderSeen := s.qm.storeClient.GetLastRWHeader()
|
// TODO(alexg): Need to get rwFormat from somewhere
|
||||||
compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen)
|
sendErr := attemptBatchSend(batch, rwFormat, compression, false)
|
||||||
sendErr := attemptBatchSend(batch, rwFormat, compression, false)
|
pErr := &ErrRenegotiate{}
|
||||||
pErr := &ErrRenegotiate{}
|
if sendErr != nil && errors.As(sendErr, &pErr) {
|
||||||
if sendErr == nil || !errors.As(sendErr, &pErr) {
|
// If we get either of the two errors (415, 400) bundled in ErrRenegotiate we want to log and metric
|
||||||
// No error, or error wasn't a 406 or 400, so we can stop trying.
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// If we get either of the two errors (406, 400) bundled in ErrRenegotiate we loop and re-negotiate.
|
|
||||||
// TODO(alexg) - add retry/renegotiate metrics here
|
// TODO(alexg) - add retry/renegotiate metrics here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1659,19 +1616,15 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
batch := queue.Batch()
|
batch := queue.Batch()
|
||||||
if len(batch) > 0 {
|
if len(batch) > 0 {
|
||||||
for attemptNos := 1; attemptNos <= 3; attemptNos++ {
|
compression := "snappy"
|
||||||
// Work out what version to send based on the last header seen and the QM's rwFormat setting.
|
// TODO(alexg): Need to get rwFormat from somewhere
|
||||||
lastHeaderSeen := s.qm.storeClient.GetLastRWHeader()
|
rwFormat := s.qm.rwFormat
|
||||||
compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen)
|
sendErr := attemptBatchSend(batch, rwFormat, compression, false)
|
||||||
sendErr := attemptBatchSend(batch, rwFormat, compression, true)
|
pErr := &ErrRenegotiate{}
|
||||||
pErr := &ErrRenegotiate{}
|
if sendErr != nil && errors.As(sendErr, &pErr) {
|
||||||
if sendErr == nil || !errors.As(sendErr, &pErr) {
|
// If we get either of the two errors (415, 400) bundled in ErrRenegotiate we want to log and metric
|
||||||
// No error, or error wasn't a 406 or 400, so we can stop trying.
|
// TODO(alexg) - add retry/renegotiate metrics here
|
||||||
break
|
|
||||||
}
|
|
||||||
// If we get either of the two errors (406, 400) bundled in ErrRenegotiate we loop and re-negotiate.
|
|
||||||
}
|
}
|
||||||
// TODO(alexg) - add retry/renegotiate metrics here
|
|
||||||
}
|
}
|
||||||
queue.ReturnForReuse(batch)
|
queue.ReturnForReuse(batch)
|
||||||
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
|
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
|
||||||
|
@ -1725,7 +1678,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
|
||||||
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, compression)
|
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, compression)
|
||||||
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin))
|
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin))
|
||||||
|
|
||||||
// Return the error in case it is a 406 and we need to reformat the data.
|
// Return the error in case it is a 415 and we need to reformat the data.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1734,7 +1687,7 @@ func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries
|
||||||
err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, compression)
|
err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, compression)
|
||||||
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin))
|
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin))
|
||||||
|
|
||||||
// Return the error in case it is a 406 and we need to reformat the data.
|
// Return the error in case it is a 415 and we need to reformat the data.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,146 +63,6 @@ func newHighestTimestampMetric() *maxTimestamp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type contentNegotiationStep struct {
|
|
||||||
lastRWHeader string
|
|
||||||
compression string
|
|
||||||
behaviour error // or nil
|
|
||||||
attemptString string
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestContentNegotiation(t *testing.T) {
|
|
||||||
testcases := []struct {
|
|
||||||
name string
|
|
||||||
success bool
|
|
||||||
qmRwFormat config.RemoteWriteFormat
|
|
||||||
rwFormat config.RemoteWriteFormat
|
|
||||||
steps []contentNegotiationStep
|
|
||||||
}{
|
|
||||||
// Test a simple case where the v2 request we send is processed first time.
|
|
||||||
{
|
|
||||||
success: true, name: "v2 happy path", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Test a simple case where the v1 request we send is processed first time.
|
|
||||||
{
|
|
||||||
success: true, name: "v1 happy path", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Test a case where the v1 request has a temporary delay but goes through on retry.
|
|
||||||
// There is no content re-negotiation between first and retry attempts.
|
|
||||||
{
|
|
||||||
success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"},
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Repeat the above test but with v2. The request has a temporary delay but goes through on retry.
|
|
||||||
// There is no content re-negotiation between first and retry attempts.
|
|
||||||
{
|
|
||||||
success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"},
|
|
||||||
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Now test where the server suddenly stops speaking 2.0 and we need to downgrade.
|
|
||||||
{
|
|
||||||
success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "},
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400.
|
|
||||||
{
|
|
||||||
success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 400}, attemptString: "0,1,snappy,HTTP 400: msg: "},
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Now test where the server flip flops between "2.0;snappy" and "0.1.0" only.
|
|
||||||
{
|
|
||||||
success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "},
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,0,snappy,HTTP 406: msg: "},
|
|
||||||
{lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: &ErrRenegotiate{"", 406}, attemptString: "0,1,snappy,HTTP 406: msg: "},
|
|
||||||
// There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries).
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
queueConfig := config.DefaultQueueConfig
|
|
||||||
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
||||||
queueConfig.MaxShards = 1
|
|
||||||
|
|
||||||
// We need to set URL's so that metric creation doesn't panic.
|
|
||||||
writeConfig := baseRemoteWriteConfig("http://test-storage.com")
|
|
||||||
writeConfig.QueueConfig = queueConfig
|
|
||||||
|
|
||||||
conf := &config.Config{
|
|
||||||
GlobalConfig: config.DefaultGlobalConfig,
|
|
||||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
|
||||||
writeConfig,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testcases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
dir := t.TempDir()
|
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
var (
|
|
||||||
series []record.RefSeries
|
|
||||||
metadata []record.RefMetadata
|
|
||||||
samples []record.RefSample
|
|
||||||
)
|
|
||||||
|
|
||||||
// Generates same series in both cases.
|
|
||||||
samples, series = createTimeseries(1, 1)
|
|
||||||
metadata = createSeriesMetadata(series)
|
|
||||||
|
|
||||||
// Apply new config.
|
|
||||||
queueConfig.Capacity = len(samples)
|
|
||||||
queueConfig.MaxSamplesPerSend = len(samples)
|
|
||||||
// For now we only ever have a single rw config in this test.
|
|
||||||
conf.RemoteWriteConfigs[0].ProtocolVersion = tc.qmRwFormat
|
|
||||||
require.NoError(t, s.ApplyConfig(conf))
|
|
||||||
hash, err := toHash(writeConfig)
|
|
||||||
require.NoError(t, err)
|
|
||||||
qm := s.rws.queues[hash]
|
|
||||||
|
|
||||||
c := NewTestWriteClient(tc.rwFormat)
|
|
||||||
c.setSteps(tc.steps) // set expected behaviour.
|
|
||||||
qm.SetClient(c)
|
|
||||||
|
|
||||||
qm.StoreSeries(series, 0)
|
|
||||||
qm.StoreMetadata(metadata)
|
|
||||||
|
|
||||||
// Did we expect some data back?
|
|
||||||
if tc.success {
|
|
||||||
c.expectSamples(samples, series)
|
|
||||||
}
|
|
||||||
qm.Append(samples)
|
|
||||||
|
|
||||||
if !tc.success {
|
|
||||||
// We just need to sleep for a bit to give it time to run.
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
// But we still need to check for data with no delay to avoid race.
|
|
||||||
c.waitForExpectedData(t, 0*time.Second)
|
|
||||||
} else {
|
|
||||||
// We expected data so wait for it.
|
|
||||||
c.waitForExpectedData(t, 5*time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, len(c.sendAttempts), len(tc.steps))
|
|
||||||
for i, s := range c.sendAttempts {
|
|
||||||
require.Equal(t, s, tc.steps[i].attemptString)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSampleDelivery(t *testing.T) {
|
func TestSampleDelivery(t *testing.T) {
|
||||||
testcases := []struct {
|
testcases := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -968,9 +828,6 @@ type TestWriteClient struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
buf []byte
|
buf []byte
|
||||||
rwFormat config.RemoteWriteFormat
|
rwFormat config.RemoteWriteFormat
|
||||||
sendAttempts []string
|
|
||||||
steps []contentNegotiationStep
|
|
||||||
currstep int
|
|
||||||
retry bool
|
retry bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -983,12 +840,6 @@ func NewTestWriteClient(rwFormat config.RemoteWriteFormat) *TestWriteClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) setSteps(steps []contentNegotiationStep) {
|
|
||||||
c.steps = steps
|
|
||||||
c.currstep = -1 // incremented by GetLastRWHeader()
|
|
||||||
c.retry = false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
|
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
@ -1108,21 +959,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
attemptString := fmt.Sprintf("%d,%d,%s", attemptNos, rwFormat, compression)
|
|
||||||
|
|
||||||
if attemptNos > 0 {
|
|
||||||
// If this is a second attempt then we need to bump to the next step otherwise we loop.
|
|
||||||
c.currstep++
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we've been told to return something for this config.
|
|
||||||
if len(c.steps) > 0 {
|
|
||||||
if err = c.steps[c.currstep].behaviour; err != nil {
|
|
||||||
c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var reqProto *prompb.WriteRequest
|
var reqProto *prompb.WriteRequest
|
||||||
switch rwFormat {
|
switch rwFormat {
|
||||||
case Version1:
|
case Version1:
|
||||||
|
@ -1136,10 +972,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
builder := labels.NewScratchBuilder(0)
|
builder := labels.NewScratchBuilder(0)
|
||||||
count := 0
|
count := 0
|
||||||
for _, ts := range reqProto.Timeseries {
|
for _, ts := range reqProto.Timeseries {
|
||||||
|
@ -1165,7 +997,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r
|
||||||
}
|
}
|
||||||
|
|
||||||
c.writesReceived++
|
c.writesReceived++
|
||||||
c.sendAttempts = append(c.sendAttempts, attemptString+",ok")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1177,20 +1008,6 @@ func (c *TestWriteClient) Endpoint() string {
|
||||||
return "http://test-remote.com/1234"
|
return "http://test-remote.com/1234"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) probeRemoteVersions(_ context.Context) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TestWriteClient) GetLastRWHeader() string {
|
|
||||||
c.mtx.Lock()
|
|
||||||
defer c.mtx.Unlock()
|
|
||||||
c.currstep++
|
|
||||||
if len(c.steps) > 0 {
|
|
||||||
return c.steps[c.currstep].lastRWHeader
|
|
||||||
}
|
|
||||||
return "2.0;snappy,0.1.0"
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestBlockingWriteClient is a queue_manager WriteClient which will block
|
// TestBlockingWriteClient is a queue_manager WriteClient which will block
|
||||||
// on any calls to Store(), until the request's Context is cancelled, at which
|
// on any calls to Store(), until the request's Context is cancelled, at which
|
||||||
// point the `numCalls` property will contain a count of how many times Store()
|
// point the `numCalls` property will contain a count of how many times Store()
|
||||||
|
@ -1221,14 +1038,6 @@ func (c *TestBlockingWriteClient) Endpoint() string {
|
||||||
return "http://test-remote-blocking.com/1234"
|
return "http://test-remote-blocking.com/1234"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestBlockingWriteClient) probeRemoteVersions(_ context.Context) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TestBlockingWriteClient) GetLastRWHeader() string {
|
|
||||||
return "2.0;snappy,0.1.0"
|
|
||||||
}
|
|
||||||
|
|
||||||
// For benchmarking the send and not the receive side.
|
// For benchmarking the send and not the receive side.
|
||||||
type NopWriteClient struct{}
|
type NopWriteClient struct{}
|
||||||
|
|
||||||
|
@ -1238,10 +1047,6 @@ func (c *NopWriteClient) Store(context.Context, []byte, int, config.RemoteWriteF
|
||||||
}
|
}
|
||||||
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
|
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
|
||||||
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
|
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
|
||||||
func (c *NopWriteClient) probeRemoteVersions(_ context.Context) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (c *NopWriteClient) GetLastRWHeader() string { return "2.0;snappy,0.1.0" }
|
|
||||||
|
|
||||||
type MockWriteClient struct {
|
type MockWriteClient struct {
|
||||||
StoreFunc func(context.Context, []byte, int) error
|
StoreFunc func(context.Context, []byte, int) error
|
||||||
|
@ -1255,14 +1060,6 @@ func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int, _ config.
|
||||||
func (c *MockWriteClient) Name() string { return c.NameFunc() }
|
func (c *MockWriteClient) Name() string { return c.NameFunc() }
|
||||||
func (c *MockWriteClient) Endpoint() string { return c.EndpointFunc() }
|
func (c *MockWriteClient) Endpoint() string { return c.EndpointFunc() }
|
||||||
|
|
||||||
// TODO(bwplotka): Mock it if needed.
|
|
||||||
func (c *MockWriteClient) GetLastRWHeader() string { return "2.0;snappy,0.1.0" }
|
|
||||||
|
|
||||||
// TODO(bwplotka): Mock it if needed.
|
|
||||||
func (c *MockWriteClient) probeRemoteVersions(_ context.Context) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
|
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
|
||||||
var extraLabels []labels.Label = []labels.Label{
|
var extraLabels []labels.Label = []labels.Label{
|
||||||
{Name: "kubernetes_io_arch", Value: "amd64"},
|
{Name: "kubernetes_io_arch", Value: "amd64"},
|
||||||
|
|
|
@ -191,21 +191,8 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||||
delete(rws.queues, hash)
|
delete(rws.queues, hash)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// TODO(alexg): Remote Write version for this endpoint to come from config
|
||||||
// Work out what protocol and compression to use for this endpoint.
|
|
||||||
// Default to Remote Write Version1.
|
|
||||||
rwFormat := Version1
|
rwFormat := Version1
|
||||||
switch rwConf.ProtocolVersion {
|
|
||||||
case Version1:
|
|
||||||
// We use the standard value as there's no negotiation to be had.
|
|
||||||
case Version2:
|
|
||||||
rwFormat = Version2
|
|
||||||
// If this newer remote write format is enabled then we need to probe the remote server
|
|
||||||
// to work out the desired protocol version and compressions.
|
|
||||||
// The value of the header is kept in the client so no need to see it here.
|
|
||||||
_ = c.probeRemoteVersions(context.Background())
|
|
||||||
// We ignore any error here, at some point we may choose to log it.
|
|
||||||
}
|
|
||||||
|
|
||||||
// Redacted to remove any passwords in the URL (that are
|
// Redacted to remove any passwords in the URL (that are
|
||||||
// technically accepted but not recommended) since this is
|
// technically accepted but not recommended) since this is
|
||||||
|
|
|
@ -63,46 +63,6 @@ func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string {
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
type writeHeadHandler struct {
|
|
||||||
logger log.Logger
|
|
||||||
|
|
||||||
remoteWriteHeadRequests prometheus.Counter
|
|
||||||
|
|
||||||
// Experimental feature, new remote write proto format.
|
|
||||||
// The handler will accept the new format, but it can still accept the old one.
|
|
||||||
rwFormat config.RemoteWriteFormat
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat config.RemoteWriteFormat) http.Handler {
|
|
||||||
h := &writeHeadHandler{
|
|
||||||
logger: logger,
|
|
||||||
rwFormat: rwFormat,
|
|
||||||
remoteWriteHeadRequests: prometheus.NewCounter(prometheus.CounterOpts{
|
|
||||||
Namespace: "prometheus",
|
|
||||||
Subsystem: "api",
|
|
||||||
Name: "remote_write_head_requests",
|
|
||||||
Help: "The number of remote write HEAD requests.",
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
if reg != nil {
|
|
||||||
reg.MustRegister(h.remoteWriteHeadRequests)
|
|
||||||
}
|
|
||||||
return h
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send a response to the HEAD request based on the format supported.
|
|
||||||
func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
// Add appropriate header values for the specific rwFormat.
|
|
||||||
for hName, hValue := range rwHeaderNameValues(h.rwFormat) {
|
|
||||||
w.Header().Set(hName, hValue)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Increment counter
|
|
||||||
h.remoteWriteHeadRequests.Inc()
|
|
||||||
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
}
|
|
||||||
|
|
||||||
type writeHandler struct {
|
type writeHandler struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
appendable storage.Appendable
|
appendable storage.Appendable
|
||||||
|
@ -155,8 +115,8 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
default:
|
default:
|
||||||
// We have a version in the header but it is not one we recognise.
|
// We have a version in the header but it is not one we recognise.
|
||||||
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer)
|
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer)
|
||||||
// Return a 406 so that the client can choose a more appropriate protocol to use.
|
// Return a 415 so that the client can choose a more appropriate protocol to use.
|
||||||
http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable)
|
http.Error(w, "Unknown remote write version in headers", http.StatusUnsupportedMediaType)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,8 +146,8 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unsupported Content-Encoding", "contentEncoding", contentEncoding)
|
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unsupported Content-Encoding", "contentEncoding", contentEncoding)
|
||||||
// Return a 406 so that the client can choose a more appropriate protocol to use.
|
// Return a 415 so that the client can choose a more appropriate protocol to use.
|
||||||
http.Error(w, "Unsupported Content-Encoding", http.StatusNotAcceptable)
|
http.Error(w, "Unsupported Content-Encoding", http.StatusUnsupportedMediaType)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,25 +39,8 @@ import (
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRemoteWriteHeadHandler(t *testing.T) {
|
|
||||||
handler := NewWriteHeadHandler(log.NewNopLogger(), nil, Version2)
|
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodHead, "", nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
|
||||||
handler.ServeHTTP(recorder, req)
|
|
||||||
|
|
||||||
resp := recorder.Result()
|
|
||||||
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
||||||
|
|
||||||
// Check header is expected value.
|
|
||||||
protHeader := resp.Header.Get(RemoteWriteVersionHeader)
|
|
||||||
require.Equal(t, "2.0;snappy,0.1.0", protHeader)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) {
|
func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) {
|
||||||
// Send a v2 request without a "Content-Encoding:" header -> 406.
|
// Send a v2 request without a "Content-Encoding:" header -> 415.
|
||||||
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -74,12 +57,12 @@ func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) {
|
||||||
handler.ServeHTTP(recorder, req)
|
handler.ServeHTTP(recorder, req)
|
||||||
|
|
||||||
resp := recorder.Result()
|
resp := recorder.Result()
|
||||||
// Should give us a 406.
|
// Should give us a 415.
|
||||||
require.Equal(t, http.StatusNotAcceptable, resp.StatusCode)
|
require.Equal(t, http.StatusUnsupportedMediaType, resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRemoteWriteHandlerInvalidCompression(t *testing.T) {
|
func TestRemoteWriteHandlerInvalidCompression(t *testing.T) {
|
||||||
// Send a v2 request without an unhandled compression scheme -> 406.
|
// Send a v2 request without an unhandled compression scheme -> 415.
|
||||||
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -95,12 +78,12 @@ func TestRemoteWriteHandlerInvalidCompression(t *testing.T) {
|
||||||
handler.ServeHTTP(recorder, req)
|
handler.ServeHTTP(recorder, req)
|
||||||
|
|
||||||
resp := recorder.Result()
|
resp := recorder.Result()
|
||||||
// Expect a 406.
|
// Expect a 415.
|
||||||
require.Equal(t, http.StatusNotAcceptable, resp.StatusCode)
|
require.Equal(t, http.StatusUnsupportedMediaType, resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRemoteWriteHandlerInvalidVersion(t *testing.T) {
|
func TestRemoteWriteHandlerInvalidVersion(t *testing.T) {
|
||||||
// Send a protocol version number that isn't recognised/supported -> 406.
|
// Send a protocol version number that isn't recognised/supported -> 415.
|
||||||
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -115,8 +98,8 @@ func TestRemoteWriteHandlerInvalidVersion(t *testing.T) {
|
||||||
handler.ServeHTTP(recorder, req)
|
handler.ServeHTTP(recorder, req)
|
||||||
|
|
||||||
resp := recorder.Result()
|
resp := recorder.Result()
|
||||||
// Expect a 406.
|
// Expect a 415.
|
||||||
require.Equal(t, http.StatusNotAcceptable, resp.StatusCode)
|
require.Equal(t, http.StatusUnsupportedMediaType, resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRemoteWriteHandler(t *testing.T) {
|
func TestRemoteWriteHandler(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue