mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
Revert "storage/remote: remote write 2.0 content negotiation"
This commit is contained in:
parent
ad77987bdc
commit
c250a86942
|
@ -116,7 +116,7 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s
|
||||||
|
|
||||||
// Encode the request body into snappy encoding.
|
// Encode the request body into snappy encoding.
|
||||||
compressed := snappy.Encode(nil, raw)
|
compressed := snappy.Encode(nil, raw)
|
||||||
err = client.Store(context.Background(), compressed, 0, remote.Version1, "snappy")
|
err = client.Store(context.Background(), compressed, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintln(os.Stderr, " FAILED:", err)
|
fmt.Fprintln(os.Stderr, " FAILED:", err)
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -17,7 +17,6 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
@ -47,22 +46,6 @@ const maxErrMsgLen = 1024
|
||||||
|
|
||||||
var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)
|
var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)
|
||||||
|
|
||||||
// If we send a Remote Write 2.0 request to a Remote Write endpoint that only understands
|
|
||||||
// Remote Write 1.0 it will respond with an error. We need to handle these errors
|
|
||||||
// accordingly. Any 5xx errors will just need to be retried as they are considered
|
|
||||||
// transient/recoverable errors. A 4xx error will need to be passed back to the queue
|
|
||||||
// manager in order to be re-encoded in a suitable format.
|
|
||||||
|
|
||||||
// A Remote Write 2.0 request sent to, for example, a Prometheus 2.50 receiver (which does
|
|
||||||
// not understand Remote Write 2.0) will result in an HTTP 400 status code from the receiver.
|
|
||||||
var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400
|
|
||||||
|
|
||||||
// A Remote Write 2.0 request sent to a remote write receiver may (depending on receiver version)
|
|
||||||
// result in an HTTP 406 status code to indicate that it does not accept the protocol or
|
|
||||||
// encoding of that request and that the sender should retry with a more suitable protocol
|
|
||||||
// version or encoding.
|
|
||||||
var ErrStatusNotAcceptable = errors.New("HTTP StatusNotAcceptable") // 406
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
remoteReadQueriesTotal = prometheus.NewCounterVec(
|
remoteReadQueriesTotal = prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
|
@ -100,11 +83,11 @@ func init() {
|
||||||
|
|
||||||
// Client allows reading and writing from/to a remote HTTP endpoint.
|
// Client allows reading and writing from/to a remote HTTP endpoint.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
remoteName string // Used to differentiate clients in metrics.
|
remoteName string // Used to differentiate clients in metrics.
|
||||||
urlString string // url.String()
|
urlString string // url.String()
|
||||||
lastRWHeader string
|
rwFormat config.RemoteWriteFormat // For write clients, ignored for read clients.
|
||||||
Client *http.Client
|
Client *http.Client
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
|
||||||
retryOnRateLimit bool
|
retryOnRateLimit bool
|
||||||
|
|
||||||
|
@ -184,6 +167,7 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
|
||||||
httpClient.Transport = otelhttp.NewTransport(t)
|
httpClient.Transport = otelhttp.NewTransport(t)
|
||||||
|
|
||||||
return &Client{
|
return &Client{
|
||||||
|
rwFormat: conf.RemoteWriteFormat,
|
||||||
remoteName: name,
|
remoteName: name,
|
||||||
urlString: conf.URL.String(),
|
urlString: conf.URL.String(),
|
||||||
Client: httpClient,
|
Client: httpClient,
|
||||||
|
@ -215,58 +199,9 @@ type RecoverableError struct {
|
||||||
retryAfter model.Duration
|
retryAfter model.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt a HEAD request against a remote write endpoint to see what it supports.
|
|
||||||
func (c *Client) probeRemoteVersions(ctx context.Context) error {
|
|
||||||
// We assume we are in Version2 mode otherwise we shouldn't be calling this.
|
|
||||||
|
|
||||||
httpReq, err := http.NewRequest("HEAD", c.urlString, nil)
|
|
||||||
if err != nil {
|
|
||||||
// Errors from NewRequest are from unparsable URLs, so are not
|
|
||||||
// recoverable.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set the version header to be nice.
|
|
||||||
httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
|
|
||||||
httpReq.Header.Set("User-Agent", UserAgent)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, c.timeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
httpResp, err := c.Client.Do(httpReq.WithContext(ctx))
|
|
||||||
if err != nil {
|
|
||||||
// We don't attempt a retry here.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// See if we got a header anyway.
|
|
||||||
promHeader := httpResp.Header.Get(RemoteWriteVersionHeader)
|
|
||||||
|
|
||||||
// Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank.
|
|
||||||
if promHeader != "" {
|
|
||||||
c.lastRWHeader = promHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for an error.
|
|
||||||
if httpResp.StatusCode != 200 {
|
|
||||||
if httpResp.StatusCode == 405 {
|
|
||||||
// If we get a 405 (MethodNotAllowed) error then it means the endpoint doesn't
|
|
||||||
// understand Remote Write 2.0, so we allow the lastRWHeader to be overwritten
|
|
||||||
// even if it is blank.
|
|
||||||
// This will make subsequent sends use RemoteWrite 1.0 until the endpoint gives
|
|
||||||
// a response that confirms it can speak 2.0.
|
|
||||||
c.lastRWHeader = promHeader
|
|
||||||
}
|
|
||||||
return fmt.Errorf(httpResp.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
// All ok, return no error.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
|
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
|
||||||
// and encoded bytes from codec.go.
|
// and encoded bytes from codec.go.
|
||||||
func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat config.RemoteWriteFormat, compression string) error {
|
func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
|
||||||
httpReq, err := http.NewRequest("POST", c.urlString, bytes.NewReader(req))
|
httpReq, err := http.NewRequest("POST", c.urlString, bytes.NewReader(req))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Errors from NewRequest are from unparsable URLs, so are not
|
// Errors from NewRequest are from unparsable URLs, so are not
|
||||||
|
@ -274,15 +209,15 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
httpReq.Header.Add("Content-Encoding", compression)
|
httpReq.Header.Add("Content-Encoding", "snappy")
|
||||||
httpReq.Header.Set("Content-Type", "application/x-protobuf")
|
httpReq.Header.Set("Content-Type", "application/x-protobuf")
|
||||||
httpReq.Header.Set("User-Agent", UserAgent)
|
httpReq.Header.Set("User-Agent", UserAgent)
|
||||||
|
|
||||||
if rwFormat == Version1 {
|
if c.rwFormat == Version1 {
|
||||||
httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion1HeaderValue)
|
httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion1HeaderValue)
|
||||||
} else {
|
} else {
|
||||||
// Set the right header if we're using v2.0 remote write protocol.
|
// Set the right header if we're using v1.1 remote write protocol
|
||||||
httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
|
httpReq.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion2HeaderValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
if attempt > 0 {
|
if attempt > 0 {
|
||||||
|
@ -306,12 +241,7 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co
|
||||||
httpResp.Body.Close()
|
httpResp.Body.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// See if we got a X-Prometheus-Remote-Write header in the response.
|
// TODO-RW11: Here is where we need to handle version downgrade on error
|
||||||
if promHeader := httpResp.Header.Get(RemoteWriteVersionHeader); promHeader != "" {
|
|
||||||
// Only update lastRWHeader if the X-Prometheus-Remote-Write header is not blank.
|
|
||||||
// (It's blank if it wasn't present, we don't care about that distinction.)
|
|
||||||
c.lastRWHeader = promHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
if httpResp.StatusCode/100 != 2 {
|
if httpResp.StatusCode/100 != 2 {
|
||||||
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen))
|
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen))
|
||||||
|
@ -319,22 +249,7 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co
|
||||||
if scanner.Scan() {
|
if scanner.Scan() {
|
||||||
line = scanner.Text()
|
line = scanner.Text()
|
||||||
}
|
}
|
||||||
switch httpResp.StatusCode {
|
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
|
||||||
case 400:
|
|
||||||
// Return an unrecoverable error to indicate the 400.
|
|
||||||
// This then gets passed up the chain so we can react to it properly.
|
|
||||||
// TODO(alexg) Do we want to include the first line of the message?
|
|
||||||
return ErrStatusBadRequest
|
|
||||||
case 406:
|
|
||||||
// Return an unrecoverable error to indicate the 406.
|
|
||||||
// This then gets passed up the chain so we can react to it properly.
|
|
||||||
// TODO(alexg) Do we want to include the first line of the message?
|
|
||||||
// TODO(alexg) Do we want to combine these two errors as one, with the statuscode and first line of message in the error?
|
|
||||||
return ErrStatusNotAcceptable
|
|
||||||
default:
|
|
||||||
// We want to end up returning a non-specific error.
|
|
||||||
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if httpResp.StatusCode/100 == 5 ||
|
if httpResp.StatusCode/100 == 5 ||
|
||||||
(c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) {
|
(c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) {
|
||||||
|
@ -369,10 +284,6 @@ func (c Client) Endpoint() string {
|
||||||
return c.urlString
|
return c.urlString
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) GetLastRWHeader() string {
|
|
||||||
return c.lastRWHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read reads from a remote endpoint.
|
// Read reads from a remote endpoint.
|
||||||
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
||||||
c.readQueries.Inc()
|
c.readQueries.Inc()
|
||||||
|
@ -455,11 +366,7 @@ func NewTestClient(name, url string) WriteClient {
|
||||||
return &TestClient{name: name, url: url}
|
return &TestClient{name: name, url: url}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestClient) probeRemoteVersions(_ context.Context) error {
|
func (c *TestClient) Store(_ context.Context, req []byte, _ int) error {
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TestClient) Store(_ context.Context, req []byte, _ int, _ config.RemoteWriteFormat, _ string) error {
|
|
||||||
r := rand.Intn(200-100) + 100
|
r := rand.Intn(200-100) + 100
|
||||||
time.Sleep(time.Duration(r) * time.Millisecond)
|
time.Sleep(time.Duration(r) * time.Millisecond)
|
||||||
return nil
|
return nil
|
||||||
|
@ -472,7 +379,3 @@ func (c *TestClient) Name() string {
|
||||||
func (c *TestClient) Endpoint() string {
|
func (c *TestClient) Endpoint() string {
|
||||||
return c.url
|
return c.url
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestClient) GetLastRWHeader() string {
|
|
||||||
return "2.0;snappy,0.1.0"
|
|
||||||
}
|
|
||||||
|
|
|
@ -73,7 +73,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
|
||||||
c, err := NewWriteClient(hash, conf)
|
c, err := NewWriteClient(hash, conf)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = c.Store(context.Background(), []byte{}, 0, Version1, "snappy")
|
err = c.Store(context.Background(), []byte{}, 0)
|
||||||
if test.err != nil {
|
if test.err != nil {
|
||||||
require.EqualError(t, err, test.err.Error())
|
require.EqualError(t, err, test.err.Error())
|
||||||
} else {
|
} else {
|
||||||
|
@ -133,7 +133,7 @@ func TestClientRetryAfter(t *testing.T) {
|
||||||
c := getClient(getClientConfig(serverURL, tc.retryOnRateLimit))
|
c := getClient(getClientConfig(serverURL, tc.retryOnRateLimit))
|
||||||
|
|
||||||
var recErr RecoverableError
|
var recErr RecoverableError
|
||||||
err = c.Store(context.Background(), []byte{}, 0, Version1, "snappy")
|
err = c.Store(context.Background(), []byte{}, 0)
|
||||||
require.Equal(t, tc.expectedRecoverable, errors.As(err, &recErr), "Mismatch in expected recoverable error status.")
|
require.Equal(t, tc.expectedRecoverable, errors.As(err, &recErr), "Mismatch in expected recoverable error status.")
|
||||||
if tc.expectedRecoverable {
|
if tc.expectedRecoverable {
|
||||||
require.Equal(t, tc.expectedRetryAfter, recErr.retryAfter)
|
require.Equal(t, tc.expectedRetryAfter, recErr.retryAfter)
|
||||||
|
@ -203,7 +203,7 @@ func TestClientHeaders(t *testing.T) {
|
||||||
c, err := NewWriteClient("c", conf)
|
c, err := NewWriteClient("c", conf)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = c.Store(context.Background(), []byte{}, 0, Version1, "snappy")
|
err = c.Store(context.Background(), []byte{}, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.True(t, called, "The remote server wasn't called")
|
require.True(t, called, "The remote server wasn't called")
|
||||||
|
|
|
@ -559,7 +559,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDecodeWriteRequest(t *testing.T) {
|
func TestDecodeWriteRequest(t *testing.T) {
|
||||||
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy")
|
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
actual, err := DecodeWriteRequest(bytes.NewReader(buf))
|
actual, err := DecodeWriteRequest(bytes.NewReader(buf))
|
||||||
|
@ -568,7 +568,7 @@ func TestDecodeWriteRequest(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDecodeMinWriteRequest(t *testing.T) {
|
func TestDecodeMinWriteRequest(t *testing.T) {
|
||||||
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
actual, err := DecodeMinimizedWriteRequestStr(bytes.NewReader(buf))
|
actual, err := DecodeMinimizedWriteRequestStr(bytes.NewReader(buf))
|
||||||
|
|
|
@ -16,10 +16,8 @@ package remote
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"math"
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -389,20 +387,16 @@ func (m *queueManagerMetrics) unregister() {
|
||||||
// external timeseries database.
|
// external timeseries database.
|
||||||
type WriteClient interface {
|
type WriteClient interface {
|
||||||
// Store stores the given samples in the remote storage.
|
// Store stores the given samples in the remote storage.
|
||||||
Store(ctx context.Context, req []byte, retryAttempt int, rwFormat config.RemoteWriteFormat, compression string) error
|
Store(ctx context.Context, req []byte, retryAttempt int) error
|
||||||
// Name uniquely identifies the remote storage.
|
// Name uniquely identifies the remote storage.
|
||||||
Name() string
|
Name() string
|
||||||
// Endpoint is the remote read or write endpoint for the storage client.
|
// Endpoint is the remote read or write endpoint for the storage client.
|
||||||
Endpoint() string
|
Endpoint() string
|
||||||
// Get the protocol versions supported by the endpoint.
|
|
||||||
probeRemoteVersions(ctx context.Context) error
|
|
||||||
// Get the last RW header received from the endpoint.
|
|
||||||
GetLastRWHeader() string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Version1 config.RemoteWriteFormat = iota // 1.0, 0.1, etc.
|
Version1 config.RemoteWriteFormat = iota // 1.0, 0.1, etc.
|
||||||
Version2 // symbols are indices into an array of strings.
|
Version2 // symbols are indices into an array of strings
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueueManager manages a queue of samples to be sent to the Storage
|
// QueueManager manages a queue of samples to be sent to the Storage
|
||||||
|
@ -422,7 +416,8 @@ type QueueManager struct {
|
||||||
sendNativeHistograms bool
|
sendNativeHistograms bool
|
||||||
watcher *wlog.Watcher
|
watcher *wlog.Watcher
|
||||||
metadataWatcher *MetadataWatcher
|
metadataWatcher *MetadataWatcher
|
||||||
rwFormat config.RemoteWriteFormat
|
// experimental feature, new remote write proto format
|
||||||
|
rwFormat config.RemoteWriteFormat
|
||||||
|
|
||||||
clientMtx sync.RWMutex
|
clientMtx sync.RWMutex
|
||||||
storeClient WriteClient
|
storeClient WriteClient
|
||||||
|
@ -495,7 +490,9 @@ func NewQueueManager(
|
||||||
storeClient: client,
|
storeClient: client,
|
||||||
sendExemplars: enableExemplarRemoteWrite,
|
sendExemplars: enableExemplarRemoteWrite,
|
||||||
sendNativeHistograms: enableNativeHistogramRemoteWrite,
|
sendNativeHistograms: enableNativeHistogramRemoteWrite,
|
||||||
rwFormat: rwFormat,
|
// TODO: we should eventually set the format via content negotiation,
|
||||||
|
// so this field would be the desired format, maybe with a fallback?
|
||||||
|
rwFormat: rwFormat,
|
||||||
|
|
||||||
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
|
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
|
||||||
seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata),
|
seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata),
|
||||||
|
@ -575,11 +572,7 @@ func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scr
|
||||||
|
|
||||||
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
|
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
|
||||||
// Build the WriteRequest with no samples.
|
// Build the WriteRequest with no samples.
|
||||||
|
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil)
|
||||||
// Get compression to use from content negotiation based on last header seen (defaults to snappy).
|
|
||||||
compression, _ := negotiateRWProto(t.rwFormat, t.storeClient.GetLastRWHeader())
|
|
||||||
|
|
||||||
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil, compression)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -602,7 +595,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
|
||||||
}
|
}
|
||||||
|
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
err := t.storeClient.Store(ctx, req, try, Version1, compression)
|
err := t.storeClient.Store(ctx, req, try)
|
||||||
t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1498,40 +1491,6 @@ func (q *queue) newBatch(capacity int) []timeSeries {
|
||||||
return make([]timeSeries, 0, capacity)
|
return make([]timeSeries, 0, capacity)
|
||||||
}
|
}
|
||||||
|
|
||||||
func negotiateRWProto(rwFormat config.RemoteWriteFormat, lastHeaderSeen string) (string, config.RemoteWriteFormat) {
|
|
||||||
if rwFormat == Version1 {
|
|
||||||
// If we're only handling Version1 then all we can do is that with snappy compression.
|
|
||||||
return "snappy", Version1
|
|
||||||
}
|
|
||||||
if rwFormat != Version2 {
|
|
||||||
// If we get here then someone has added a new RemoteWriteFormat value but hasn't
|
|
||||||
// fixed this function to handle it. Panic!
|
|
||||||
panic(fmt.Sprintf("Unhandled RemoteWriteFormat %q", rwFormat))
|
|
||||||
}
|
|
||||||
if lastHeaderSeen == "" {
|
|
||||||
// We haven't had a valid header, so we just default to "0.1.0/snappy".
|
|
||||||
return "snappy", Version1
|
|
||||||
}
|
|
||||||
// We can currently handle:
|
|
||||||
// "2.0;snappy"
|
|
||||||
// "0.1.0" - implicit compression of snappy
|
|
||||||
// lastHeaderSeen should contain a list of tuples.
|
|
||||||
// If we find a match to something we can handle then we can return that.
|
|
||||||
for _, tuple := range strings.Split(lastHeaderSeen, ",") {
|
|
||||||
// Remove spaces from the tuple.
|
|
||||||
curr := strings.ReplaceAll(tuple, " ", "")
|
|
||||||
switch curr {
|
|
||||||
case "2.0;snappy":
|
|
||||||
return "snappy", Version2
|
|
||||||
case "0.1.0":
|
|
||||||
return "snappy", Version1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise we have to default to "0.1.0".
|
|
||||||
return "snappy", Version1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if s.running.Dec() == 0 {
|
if s.running.Dec() == 0 {
|
||||||
|
@ -1582,26 +1541,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
}
|
}
|
||||||
defer stop()
|
defer stop()
|
||||||
|
|
||||||
attemptBatchSend := func(batch []timeSeries, rwFormat config.RemoteWriteFormat, compression string, timer bool) error {
|
|
||||||
switch rwFormat {
|
|
||||||
case Version1:
|
|
||||||
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
|
||||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
|
||||||
if timer {
|
|
||||||
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
|
|
||||||
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
|
|
||||||
}
|
|
||||||
return s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf, compression)
|
|
||||||
case Version2:
|
|
||||||
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
|
||||||
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
|
||||||
err := s.sendV2Samples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf, compression)
|
|
||||||
symbolTable.clear()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -1625,20 +1564,16 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Resend logic on 406.
|
switch s.qm.rwFormat {
|
||||||
// ErrStatusNotAcceptable is a new error defined in client.go.
|
case Version1:
|
||||||
|
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
||||||
// Work out what version to send based on the last header seen and the QM's rwFormat setting.
|
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||||
// TODO(alexg) - see comments below about retry/renegotiate design.
|
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
||||||
for attemptNos := 1; attemptNos <= 3; attemptNos++ {
|
case Version2:
|
||||||
lastHeaderSeen := s.qm.storeClient.GetLastRWHeader()
|
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
||||||
compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen)
|
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||||
sendErr := attemptBatchSend(batch, rwFormat, compression, false)
|
s.sendV2Samples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf)
|
||||||
if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) {
|
symbolTable.clear()
|
||||||
// It wasn't an error, or wasn't a 406 or 400 so we can just stop trying.
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// If we get either of the two errors (406, 400) we loop and re-negotiate.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.ReturnForReuse(batch)
|
queue.ReturnForReuse(batch)
|
||||||
|
@ -1649,23 +1584,19 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
batch := queue.Batch()
|
batch := queue.Batch()
|
||||||
if len(batch) > 0 {
|
if len(batch) > 0 {
|
||||||
for attemptNos := 1; attemptNos <= 3; attemptNos++ {
|
switch s.qm.rwFormat {
|
||||||
// Work out what version to send based on the last header seen and the QM's rwFormat setting.
|
case Version1:
|
||||||
lastHeaderSeen := s.qm.storeClient.GetLastRWHeader()
|
nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
||||||
compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen)
|
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||||
sendErr := attemptBatchSend(batch, rwFormat, compression, true)
|
s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf)
|
||||||
if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) {
|
level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples,
|
||||||
// It wasn't an error, or wasn't a 406 or 400 so we can just stop trying.
|
"exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms)
|
||||||
break
|
case Version2:
|
||||||
}
|
nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata := populateV2TimeSeries(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms)
|
||||||
// If we get either of the two errors (406, 400) we loop and re-negotiate.
|
n := nPendingSamples + nPendingExemplars + nPendingHistograms
|
||||||
|
s.sendV2Samples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata, &pBufRaw, &buf)
|
||||||
|
symbolTable.clear()
|
||||||
}
|
}
|
||||||
// TODO(alexg) - the question here is whether we use the 3rd attempt to ensure we
|
|
||||||
// Consider a server that erroneously reports it can handle "0.2.0/snappy" even in the 406/400 errors when that data is sent in that format
|
|
||||||
// Q: Do we always try downgrading to 1.0 at least once?
|
|
||||||
// Q: Do we limit our attempts to only try a particular protocol/encoding tuple once?
|
|
||||||
// Q: Is 3 a suitable limit?
|
|
||||||
// TODO(alexg) - add retry/renegotiate metrics here
|
|
||||||
}
|
}
|
||||||
queue.ReturnForReuse(batch)
|
queue.ReturnForReuse(batch)
|
||||||
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
|
timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline))
|
||||||
|
@ -1714,22 +1645,16 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen
|
||||||
return nPendingSamples, nPendingExemplars, nPendingHistograms
|
return nPendingSamples, nPendingExemplars, nPendingHistograms
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte, compression string) error {
|
func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) {
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf, compression)
|
err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, 0, pBuf, buf)
|
||||||
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin))
|
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, 0, time.Since(begin))
|
||||||
|
|
||||||
// Return the error in case it is a 406 and we need to reformat the data.
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, compression string) error {
|
func (s *shards) sendV2Samples(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte) {
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf, compression)
|
err := s.sendV2SamplesWithBackoff(ctx, samples, labels, sampleCount, exemplarCount, histogramCount, metadataCount, pBuf, buf)
|
||||||
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin))
|
s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, metadataCount, time.Since(begin))
|
||||||
|
|
||||||
// Return the error in case it is a 406 and we need to reformat the data.
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) {
|
func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) {
|
||||||
|
@ -1756,9 +1681,9 @@ func (s *shards) updateMetrics(ctx context.Context, err error, sampleCount, exem
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendSamples to the remote storage with backoff for recoverable errors.
|
// sendSamples to the remote storage with backoff for recoverable errors.
|
||||||
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte, compression string) error {
|
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf *proto.Buffer, buf *[]byte) error {
|
||||||
// Build the WriteRequest with no metadata.
|
// Build the WriteRequest with no metadata.
|
||||||
req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil, compression)
|
req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, buf, nil)
|
||||||
s.qm.buildRequestLimitTimestamp.Store(lowest)
|
s.qm.buildRequestLimitTimestamp.Store(lowest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Failing to build the write request is non-recoverable, since it will
|
// Failing to build the write request is non-recoverable, since it will
|
||||||
|
@ -1784,7 +1709,6 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||||
pBuf,
|
pBuf,
|
||||||
buf,
|
buf,
|
||||||
isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
|
isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
|
||||||
compression,
|
|
||||||
)
|
)
|
||||||
s.qm.buildRequestLimitTimestamp.Store(lowest)
|
s.qm.buildRequestLimitTimestamp.Store(lowest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1816,7 +1740,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||||
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
|
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
|
||||||
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
|
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
|
||||||
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
|
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
|
||||||
err := s.qm.client().Store(ctx, *buf, try, Version1, compression)
|
err := s.qm.client().Store(ctx, *buf, try)
|
||||||
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1847,9 +1771,9 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendV2Samples to the remote storage with backoff for recoverable errors.
|
// sendV2Samples to the remote storage with backoff for recoverable errors.
|
||||||
func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, compression string) error {
|
func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte) error {
|
||||||
// Build the WriteRequest with no metadata.
|
// Build the WriteRequest with no metadata.
|
||||||
req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, compression)
|
req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil)
|
||||||
s.qm.buildRequestLimitTimestamp.Store(lowest)
|
s.qm.buildRequestLimitTimestamp.Store(lowest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Failing to build the write request is non-recoverable, since it will
|
// Failing to build the write request is non-recoverable, since it will
|
||||||
|
@ -1875,7 +1799,6 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
|
||||||
pBuf,
|
pBuf,
|
||||||
buf,
|
buf,
|
||||||
isV2TimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
|
isV2TimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
|
||||||
compression,
|
|
||||||
)
|
)
|
||||||
s.qm.buildRequestLimitTimestamp.Store(lowest)
|
s.qm.buildRequestLimitTimestamp.Store(lowest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1907,7 +1830,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2
|
||||||
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
|
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
|
||||||
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
|
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
|
||||||
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
|
s.qm.metrics.metadataTotal.Add(float64(metadataCount))
|
||||||
err := s.qm.client().Store(ctx, *buf, try, Version2, compression)
|
err := s.qm.client().Store(ctx, *buf, try)
|
||||||
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2094,23 +2017,7 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri
|
||||||
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
|
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
|
||||||
}
|
}
|
||||||
|
|
||||||
func compressPayload(tmpbuf *[]byte, inp []byte, compression string) ([]byte, error) {
|
func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool) ([]byte, int64, int64, error) {
|
||||||
var compressed []byte
|
|
||||||
|
|
||||||
switch compression {
|
|
||||||
case "snappy":
|
|
||||||
compressed = snappy.Encode(*tmpbuf, inp)
|
|
||||||
if n := snappy.MaxEncodedLen(len(inp)); n > len(*tmpbuf) {
|
|
||||||
// grow the buffer for the next time
|
|
||||||
*tmpbuf = make([]byte, n)
|
|
||||||
}
|
|
||||||
return compressed, nil
|
|
||||||
default:
|
|
||||||
return compressed, fmt.Errorf("Unknown compression scheme [%s]", compression)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf *[]byte, filter func(prompb.TimeSeries) bool, compression string) ([]byte, int64, int64, error) {
|
|
||||||
highest, lowest, timeSeries,
|
highest, lowest, timeSeries,
|
||||||
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)
|
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)
|
||||||
|
|
||||||
|
@ -2140,14 +2047,11 @@ func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metada
|
||||||
} else {
|
} else {
|
||||||
buf = &[]byte{}
|
buf = &[]byte{}
|
||||||
}
|
}
|
||||||
|
compressed := snappy.Encode(*buf, pBuf.Bytes())
|
||||||
var compressed []byte
|
if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) {
|
||||||
|
// grow the buffer for the next time
|
||||||
compressed, err = compressPayload(buf, pBuf.Bytes(), compression)
|
*buf = make([]byte, n)
|
||||||
if err != nil {
|
|
||||||
return nil, highest, lowest, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return compressed, highest, lowest, nil
|
return compressed, highest, lowest, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2183,7 +2087,7 @@ func (r *rwSymbolTable) clear() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(writev2.TimeSeries) bool, compression string) ([]byte, int64, int64, error) {
|
func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels []string, pBuf, buf *[]byte, filter func(writev2.TimeSeries) bool) ([]byte, int64, int64, error) {
|
||||||
highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter)
|
highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms := buildV2TimeSeries(samples, filter)
|
||||||
|
|
||||||
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
|
if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
|
||||||
|
@ -2213,11 +2117,10 @@ func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels
|
||||||
buf = &[]byte{}
|
buf = &[]byte{}
|
||||||
}
|
}
|
||||||
|
|
||||||
var compressed []byte
|
compressed := snappy.Encode(*buf, data)
|
||||||
|
if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) {
|
||||||
compressed, err = compressPayload(buf, data, compression)
|
// grow the buffer for the next time
|
||||||
if err != nil {
|
*buf = make([]byte, n)
|
||||||
return nil, highest, lowest, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return compressed, highest, lowest, nil
|
return compressed, highest, lowest, nil
|
||||||
|
|
|
@ -63,146 +63,6 @@ func newHighestTimestampMetric() *maxTimestamp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type contentNegotiationStep struct {
|
|
||||||
lastRWHeader string
|
|
||||||
compression string
|
|
||||||
behaviour error // or nil
|
|
||||||
attemptString string
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestContentNegotiation(t *testing.T) {
|
|
||||||
testcases := []struct {
|
|
||||||
name string
|
|
||||||
success bool
|
|
||||||
qmRwFormat config.RemoteWriteFormat
|
|
||||||
rwFormat config.RemoteWriteFormat
|
|
||||||
steps []contentNegotiationStep
|
|
||||||
}{
|
|
||||||
// Test a simple case where the v2 request we send is processed first time.
|
|
||||||
{
|
|
||||||
success: true, name: "v2 happy path", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,1,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Test a simple case where the v1 request we send is processed first time.
|
|
||||||
{
|
|
||||||
success: true, name: "v1 happy path", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Test a case where the v1 request has a temporary delay but goes through on retry.
|
|
||||||
// There is no content re-negotiation between first and retry attempts.
|
|
||||||
{
|
|
||||||
success: true, name: "v1 happy path with one 5xx retry", qmRwFormat: Version1, rwFormat: Version1, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,0,snappy,Pretend 500"},
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,0,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Repeat the above test but with v2. The request has a temporary delay but goes through on retry.
|
|
||||||
// There is no content re-negotiation between first and retry attempts.
|
|
||||||
{
|
|
||||||
success: true, name: "v2 happy path with one 5xx retry", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: RecoverableError{fmt.Errorf("Pretend 500"), 1}, attemptString: "0,1,snappy,Pretend 500"},
|
|
||||||
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: nil, attemptString: "1,1,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Now test where the server suddenly stops speaking 2.0 and we need to downgrade.
|
|
||||||
{
|
|
||||||
success: true, name: "v2 request to v2 server that has downgraded via 406", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"},
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Now test where the server suddenly stops speaking 2.0 and we need to downgrade because it returns a 400.
|
|
||||||
{
|
|
||||||
success: true, name: "v2 request to v2 server that has downgraded via 400", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "2.0;snappy,0.1.0", compression: "snappy", behaviour: ErrStatusBadRequest, attemptString: "0,1,snappy,HTTP StatusBadRequest"},
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: nil, attemptString: "0,0,snappy,ok"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
// Now test where the server flip flops between "2.0;snappy" and "0.1.0" only.
|
|
||||||
{
|
|
||||||
success: false, name: "flip flopping", qmRwFormat: Version2, rwFormat: Version2, steps: []contentNegotiationStep{
|
|
||||||
{lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"},
|
|
||||||
{lastRWHeader: "0.1.0", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,0,snappy,HTTP StatusNotAcceptable"},
|
|
||||||
{lastRWHeader: "2.0;snappy", compression: "snappy", behaviour: ErrStatusNotAcceptable, attemptString: "0,1,snappy,HTTP StatusNotAcceptable"},
|
|
||||||
// There's no 4th attempt as we do a maximum of 3 sending attempts (not counting retries).
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
queueConfig := config.DefaultQueueConfig
|
|
||||||
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
||||||
queueConfig.MaxShards = 1
|
|
||||||
|
|
||||||
// We need to set URL's so that metric creation doesn't panic.
|
|
||||||
writeConfig := baseRemoteWriteConfig("http://test-storage.com")
|
|
||||||
writeConfig.QueueConfig = queueConfig
|
|
||||||
|
|
||||||
conf := &config.Config{
|
|
||||||
GlobalConfig: config.DefaultGlobalConfig,
|
|
||||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
|
||||||
writeConfig,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testcases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
dir := t.TempDir()
|
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
var (
|
|
||||||
series []record.RefSeries
|
|
||||||
metadata []record.RefMetadata
|
|
||||||
samples []record.RefSample
|
|
||||||
)
|
|
||||||
|
|
||||||
// Generates same series in both cases.
|
|
||||||
samples, series = createTimeseries(1, 1)
|
|
||||||
metadata = createSeriesMetadata(series)
|
|
||||||
|
|
||||||
// Apply new config.
|
|
||||||
queueConfig.Capacity = len(samples)
|
|
||||||
queueConfig.MaxSamplesPerSend = len(samples)
|
|
||||||
// For now we only ever have a single rw config in this test.
|
|
||||||
conf.RemoteWriteConfigs[0].ProtocolVersion = tc.qmRwFormat
|
|
||||||
require.NoError(t, s.ApplyConfig(conf))
|
|
||||||
hash, err := toHash(writeConfig)
|
|
||||||
require.NoError(t, err)
|
|
||||||
qm := s.rws.queues[hash]
|
|
||||||
|
|
||||||
c := NewTestWriteClient(tc.rwFormat)
|
|
||||||
c.setSteps(tc.steps) // set expected behaviour.
|
|
||||||
qm.SetClient(c)
|
|
||||||
|
|
||||||
qm.StoreSeries(series, 0)
|
|
||||||
qm.StoreMetadata(metadata)
|
|
||||||
|
|
||||||
// Did we expect some data back?
|
|
||||||
if tc.success {
|
|
||||||
c.expectSamples(samples, series)
|
|
||||||
}
|
|
||||||
qm.Append(samples)
|
|
||||||
|
|
||||||
if !tc.success {
|
|
||||||
// We just need to sleep for a bit to give it time to run.
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
// But we still need to check for data with no delay to avoid race.
|
|
||||||
c.waitForExpectedData(t, 0*time.Second)
|
|
||||||
} else {
|
|
||||||
// We expected data so wait for it.
|
|
||||||
c.waitForExpectedData(t, 5*time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, len(c.sendAttempts), len(tc.steps))
|
|
||||||
for i, s := range c.sendAttempts {
|
|
||||||
require.Equal(t, s, tc.steps[i].attemptString)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSampleDelivery(t *testing.T) {
|
func TestSampleDelivery(t *testing.T) {
|
||||||
testcases := []struct {
|
testcases := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -218,7 +78,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
|
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
|
||||||
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
|
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
|
||||||
|
|
||||||
// TODO(alexg): update some portion of this test to check for the 2.0 metadata
|
// TODO: update some portion of this test to check for the 2.0 metadata
|
||||||
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only", rwFormat: Version2},
|
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only", rwFormat: Version2},
|
||||||
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms", rwFormat: Version2},
|
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms", rwFormat: Version2},
|
||||||
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only", rwFormat: Version2},
|
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only", rwFormat: Version2},
|
||||||
|
@ -280,6 +140,8 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
// Apply new config.
|
// Apply new config.
|
||||||
queueConfig.Capacity = len(samples)
|
queueConfig.Capacity = len(samples)
|
||||||
queueConfig.MaxSamplesPerSend = len(samples) / 2
|
queueConfig.MaxSamplesPerSend = len(samples) / 2
|
||||||
|
// For now we only ever have a single rw config in this test.
|
||||||
|
conf.RemoteWriteConfigs[0].ProtocolVersion = tc.rwFormat
|
||||||
require.NoError(t, s.ApplyConfig(conf))
|
require.NoError(t, s.ApplyConfig(conf))
|
||||||
hash, err := toHash(writeConfig)
|
hash, err := toHash(writeConfig)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -371,7 +233,7 @@ func (c *perRequestWriteClient) expectedData(t testing.TB) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int, rwFormat config.RemoteWriteFormat, compression string) error {
|
func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int) error {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
defer func() { c.i++ }()
|
defer func() { c.i++ }()
|
||||||
|
@ -379,7 +241,7 @@ func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int, rw
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.TestWriteClient.Store(ctx, req, r, rwFormat, compression); err != nil {
|
if err := c.TestWriteClient.Store(ctx, req, r); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -409,7 +271,7 @@ func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int, rw
|
||||||
}
|
}
|
||||||
c.requests[c.i].expectSamples(c.expectedRequestSamples[expReqSampleToUse], c.expectedSeries)
|
c.requests[c.i].expectSamples(c.expectedRequestSamples[expReqSampleToUse], c.expectedSeries)
|
||||||
c.expectedRequestSamples = append(c.expectedRequestSamples[:expReqSampleToUse], c.expectedRequestSamples[expReqSampleToUse+1:]...)
|
c.expectedRequestSamples = append(c.expectedRequestSamples[:expReqSampleToUse], c.expectedRequestSamples[expReqSampleToUse+1:]...)
|
||||||
return c.requests[c.i].Store(ctx, req, r, rwFormat, compression)
|
return c.requests[c.i].Store(ctx, req, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testDefaultQueueConfig() config.QueueConfig {
|
func testDefaultQueueConfig() config.QueueConfig {
|
||||||
|
@ -500,7 +362,7 @@ func TestMetadataDelivery(t *testing.T) {
|
||||||
mcfg := config.DefaultMetadataConfig
|
mcfg := config.DefaultMetadataConfig
|
||||||
|
|
||||||
metrics := newQueueManagerMetrics(nil, "", "")
|
metrics := newQueueManagerMetrics(nil, "", "")
|
||||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version1)
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, 0)
|
||||||
m.Start()
|
m.Start()
|
||||||
defer m.Stop()
|
defer m.Stop()
|
||||||
|
|
||||||
|
@ -1108,10 +970,6 @@ type TestWriteClient struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
buf []byte
|
buf []byte
|
||||||
rwFormat config.RemoteWriteFormat
|
rwFormat config.RemoteWriteFormat
|
||||||
sendAttempts []string
|
|
||||||
steps []contentNegotiationStep
|
|
||||||
currstep int
|
|
||||||
retry bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestWriteClient(rwFormat config.RemoteWriteFormat) *TestWriteClient {
|
func NewTestWriteClient(rwFormat config.RemoteWriteFormat) *TestWriteClient {
|
||||||
|
@ -1123,12 +981,6 @@ func NewTestWriteClient(rwFormat config.RemoteWriteFormat) *TestWriteClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) setSteps(steps []contentNegotiationStep) {
|
|
||||||
c.steps = steps
|
|
||||||
c.currstep = -1 // incremented by GetLastRWHeader()
|
|
||||||
c.retry = false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
|
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
@ -1235,7 +1087,7 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB, timeout time.Durati
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, rwFormat config.RemoteWriteFormat, compression string) error {
|
func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
// nil buffers are ok for snappy, ignore cast error.
|
// nil buffers are ok for snappy, ignore cast error.
|
||||||
|
@ -1248,23 +1100,8 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
attemptString := fmt.Sprintf("%d,%d,%s", attemptNos, rwFormat, compression)
|
|
||||||
|
|
||||||
if attemptNos > 0 {
|
|
||||||
// If this is a second attempt then we need to bump to the next step otherwise we loop.
|
|
||||||
c.currstep++
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we've been told to return something for this config.
|
|
||||||
if len(c.steps) > 0 {
|
|
||||||
if err = c.steps[c.currstep].behaviour; err != nil {
|
|
||||||
c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var reqProto *prompb.WriteRequest
|
var reqProto *prompb.WriteRequest
|
||||||
switch rwFormat {
|
switch c.rwFormat {
|
||||||
case Version1:
|
case Version1:
|
||||||
reqProto = &prompb.WriteRequest{}
|
reqProto = &prompb.WriteRequest{}
|
||||||
err = proto.Unmarshal(reqBuf, reqProto)
|
err = proto.Unmarshal(reqBuf, reqProto)
|
||||||
|
@ -1277,7 +1114,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.sendAttempts = append(c.sendAttempts, attemptString+","+fmt.Sprintf("%s", err))
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1303,7 +1139,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r
|
||||||
}
|
}
|
||||||
|
|
||||||
c.writesReceived++
|
c.writesReceived++
|
||||||
c.sendAttempts = append(c.sendAttempts, attemptString+",ok")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1315,20 +1150,6 @@ func (c *TestWriteClient) Endpoint() string {
|
||||||
return "http://test-remote.com/1234"
|
return "http://test-remote.com/1234"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) probeRemoteVersions(_ context.Context) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TestWriteClient) GetLastRWHeader() string {
|
|
||||||
c.mtx.Lock()
|
|
||||||
defer c.mtx.Unlock()
|
|
||||||
c.currstep++
|
|
||||||
if len(c.steps) > 0 {
|
|
||||||
return c.steps[c.currstep].lastRWHeader
|
|
||||||
}
|
|
||||||
return "2.0;snappy,0.1.0"
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestBlockingWriteClient is a queue_manager WriteClient which will block
|
// TestBlockingWriteClient is a queue_manager WriteClient which will block
|
||||||
// on any calls to Store(), until the request's Context is cancelled, at which
|
// on any calls to Store(), until the request's Context is cancelled, at which
|
||||||
// point the `numCalls` property will contain a count of how many times Store()
|
// point the `numCalls` property will contain a count of how many times Store()
|
||||||
|
@ -1341,7 +1162,7 @@ func NewTestBlockedWriteClient() *TestBlockingWriteClient {
|
||||||
return &TestBlockingWriteClient{}
|
return &TestBlockingWriteClient{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int, _ config.RemoteWriteFormat, _ string) error {
|
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) error {
|
||||||
c.numCalls.Inc()
|
c.numCalls.Inc()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
return nil
|
return nil
|
||||||
|
@ -1359,27 +1180,13 @@ func (c *TestBlockingWriteClient) Endpoint() string {
|
||||||
return "http://test-remote-blocking.com/1234"
|
return "http://test-remote-blocking.com/1234"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestBlockingWriteClient) probeRemoteVersions(_ context.Context) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TestBlockingWriteClient) GetLastRWHeader() string {
|
|
||||||
return "2.0;snappy,0.1.0"
|
|
||||||
}
|
|
||||||
|
|
||||||
// For benchmarking the send and not the receive side.
|
// For benchmarking the send and not the receive side.
|
||||||
type NopWriteClient struct{}
|
type NopWriteClient struct{}
|
||||||
|
|
||||||
func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} }
|
func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} }
|
||||||
func (c *NopWriteClient) Store(context.Context, []byte, int, config.RemoteWriteFormat, string) error {
|
func (c *NopWriteClient) Store(context.Context, []byte, int) error { return nil }
|
||||||
return nil
|
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
|
||||||
}
|
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
|
||||||
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
|
|
||||||
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
|
|
||||||
func (c *NopWriteClient) probeRemoteVersions(_ context.Context) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (c *NopWriteClient) GetLastRWHeader() string { return "2.0;snappy,0.1.0" }
|
|
||||||
|
|
||||||
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
|
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
|
||||||
var extraLabels []labels.Label = []labels.Label{
|
var extraLabels []labels.Label = []labels.Label{
|
||||||
|
@ -1970,14 +1777,14 @@ func BenchmarkBuildWriteRequest(b *testing.B) {
|
||||||
// Warmup buffers
|
// Warmup buffers
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
populateTimeSeries(batch, seriesBuff, true, true)
|
populateTimeSeries(batch, seriesBuff, true, true)
|
||||||
buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy")
|
buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
totalSize := 0
|
totalSize := 0
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
populateTimeSeries(batch, seriesBuff, true, true)
|
populateTimeSeries(batch, seriesBuff, true, true)
|
||||||
req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy")
|
req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -2026,7 +1833,7 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
|
||||||
// Warmup buffers
|
// Warmup buffers
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
|
populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
|
||||||
buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil, "snappy")
|
buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
|
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
|
||||||
|
@ -2034,7 +1841,7 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
|
||||||
for j := 0; j < b.N; j++ {
|
for j := 0; j < b.N; j++ {
|
||||||
populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
|
populateV2TimeSeries(&symbolTable, tc.batch, seriesBuff, true, true)
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil, "snappy")
|
req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -2052,7 +1859,7 @@ func TestDropOldTimeSeries(t *testing.T) {
|
||||||
nSamples := config.DefaultQueueConfig.Capacity * size
|
nSamples := config.DefaultQueueConfig.Capacity * size
|
||||||
samples, newSamples, series := createTimeseriesWithOldSamples(nSamples, nSeries)
|
samples, newSamples, series := createTimeseriesWithOldSamples(nSamples, nSeries)
|
||||||
|
|
||||||
// TODO(alexg): test with new version
|
// TODO: test with new version
|
||||||
c := NewTestWriteClient(Version1)
|
c := NewTestWriteClient(Version1)
|
||||||
c.expectSamples(newSamples, series)
|
c.expectSamples(newSamples, series)
|
||||||
|
|
||||||
|
|
|
@ -192,21 +192,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Work out what protocol and compression to use for this endpoint.
|
|
||||||
// Default to Remote Write Version1.
|
|
||||||
rwFormat := Version1
|
|
||||||
switch rwConf.ProtocolVersion {
|
|
||||||
case Version1:
|
|
||||||
// We use the standard value as there's no negotiation to be had.
|
|
||||||
case Version2:
|
|
||||||
rwFormat = Version2
|
|
||||||
// If this newer remote write format is enabled then we need to probe the remote server
|
|
||||||
// to work out the desired protocol version and compressions.
|
|
||||||
// The value of the header is kept in the client so no need to see it here.
|
|
||||||
_ = c.probeRemoteVersions(context.Background())
|
|
||||||
// We ignore any error here, at some point we may choose to log it.
|
|
||||||
}
|
|
||||||
|
|
||||||
// Redacted to remove any passwords in the URL (that are
|
// Redacted to remove any passwords in the URL (that are
|
||||||
// technically accepted but not recommended) since this is
|
// technically accepted but not recommended) since this is
|
||||||
// only used for metric labels.
|
// only used for metric labels.
|
||||||
|
@ -229,7 +214,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||||
rws.scraper,
|
rws.scraper,
|
||||||
rwConf.SendExemplars,
|
rwConf.SendExemplars,
|
||||||
rwConf.SendNativeHistograms,
|
rwConf.SendNativeHistograms,
|
||||||
rwFormat,
|
rwConf.ProtocolVersion,
|
||||||
)
|
)
|
||||||
// Keep track of which queues are new so we know which to start.
|
// Keep track of which queues are new so we know which to start.
|
||||||
newHashes = append(newHashes, hash)
|
newHashes = append(newHashes, hash)
|
||||||
|
|
|
@ -17,16 +17,11 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
"github.com/go-kit/log/level"
|
"github.com/go-kit/log/level"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
|
||||||
"github.com/golang/snappy"
|
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
@ -39,76 +34,20 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version"
|
RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version"
|
||||||
RemoteWriteVersion1HeaderValue = "0.1.0"
|
RemoteWriteVersion1HeaderValue = "0.1.0"
|
||||||
RemoteWriteVersion20HeaderValue = "2.0"
|
RemoteWriteVersion2HeaderValue = "2.0"
|
||||||
)
|
)
|
||||||
|
|
||||||
func rwHeaderNameValues(rwFormat config.RemoteWriteFormat) map[string]string {
|
|
||||||
// Return the correct remote write header name/values based on provided rwFormat.
|
|
||||||
ret := make(map[string]string, 1)
|
|
||||||
|
|
||||||
switch rwFormat {
|
|
||||||
case Version1:
|
|
||||||
ret[RemoteWriteVersionHeader] = RemoteWriteVersion1HeaderValue
|
|
||||||
case Version2:
|
|
||||||
// We need to add the supported protocol definitions in order:
|
|
||||||
tuples := make([]string, 0, 2)
|
|
||||||
// Add "2.0;snappy".
|
|
||||||
tuples = append(tuples, RemoteWriteVersion20HeaderValue+";snappy")
|
|
||||||
// Add default "0.1.0".
|
|
||||||
tuples = append(tuples, RemoteWriteVersion1HeaderValue)
|
|
||||||
ret[RemoteWriteVersionHeader] = strings.Join(tuples, ",")
|
|
||||||
}
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
type writeHeadHandler struct {
|
|
||||||
logger log.Logger
|
|
||||||
|
|
||||||
remoteWrite20HeadRequests prometheus.Counter
|
|
||||||
|
|
||||||
// Experimental feature, new remote write proto format.
|
|
||||||
// The handler will accept the new format, but it can still accept the old one.
|
|
||||||
rwFormat config.RemoteWriteFormat
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWriteHeadHandler(logger log.Logger, reg prometheus.Registerer, rwFormat config.RemoteWriteFormat) http.Handler {
|
|
||||||
h := &writeHeadHandler{
|
|
||||||
logger: logger,
|
|
||||||
rwFormat: rwFormat,
|
|
||||||
remoteWrite20HeadRequests: prometheus.NewCounter(prometheus.CounterOpts{
|
|
||||||
Namespace: "prometheus",
|
|
||||||
Subsystem: "api",
|
|
||||||
Name: "remote_write_20_head_requests",
|
|
||||||
Help: "The number of remote write 2.0 head requests.",
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
if reg != nil {
|
|
||||||
reg.MustRegister(h.remoteWrite20HeadRequests)
|
|
||||||
}
|
|
||||||
return h
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
// Send a response to the HEAD request based on the format supported.
|
|
||||||
|
|
||||||
// Add appropriate header values for the specific rwFormat.
|
|
||||||
for hName, hValue := range rwHeaderNameValues(h.rwFormat) {
|
|
||||||
w.Header().Set(hName, hValue)
|
|
||||||
}
|
|
||||||
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
}
|
|
||||||
|
|
||||||
type writeHandler struct {
|
type writeHandler struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
appendable storage.Appendable
|
appendable storage.Appendable
|
||||||
|
|
||||||
samplesWithInvalidLabelsTotal prometheus.Counter
|
samplesWithInvalidLabelsTotal prometheus.Counter
|
||||||
|
|
||||||
// Experimental feature, new remote write proto format.
|
// Experimental feature, new remote write proto format
|
||||||
// The handler will accept the new format, but it can still accept the old one.
|
// The handler will accept the new format, but it can still accept the old one
|
||||||
|
// TODO: this should eventually be via content negotiation?
|
||||||
rwFormat config.RemoteWriteFormat
|
rwFormat config.RemoteWriteFormat
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,81 +73,29 @@ func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable st
|
||||||
|
|
||||||
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
var err error
|
var err error
|
||||||
|
var req *prompb.WriteRequest
|
||||||
|
var reqMinStr *writev2.WriteRequest
|
||||||
|
|
||||||
// Set the header(s) in the response based on the rwFormat the server supports.
|
// TODO: this should eventually be done via content negotiation/looking at the header
|
||||||
for hName, hValue := range rwHeaderNameValues(h.rwFormat) {
|
switch h.rwFormat {
|
||||||
w.Header().Set(hName, hValue)
|
case Version1:
|
||||||
|
req, err = DecodeWriteRequest(r.Body)
|
||||||
|
case Version2:
|
||||||
|
reqMinStr, err = DecodeMinimizedWriteRequestStr(r.Body)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse the headers to work out how to handle this.
|
|
||||||
contentEncoding := r.Header.Get("Content-Encoding")
|
|
||||||
protoVer := r.Header.Get(RemoteWriteVersionHeader)
|
|
||||||
|
|
||||||
switch protoVer {
|
|
||||||
case "":
|
|
||||||
// No header provided, assume 0.1.0 as everything that relies on later.
|
|
||||||
protoVer = RemoteWriteVersion1HeaderValue
|
|
||||||
case RemoteWriteVersion1HeaderValue, RemoteWriteVersion20HeaderValue:
|
|
||||||
// We know this header, woo.
|
|
||||||
default:
|
|
||||||
// We have a version in the header but it is not one we recognise.
|
|
||||||
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unknown remote write version in headers", "ver", protoVer)
|
|
||||||
// Return a 406 so that the client can choose a more appropriate protocol to use.
|
|
||||||
http.Error(w, "Unknown remote write version in headers", http.StatusNotAcceptable)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deal with 0.1.0 clients that forget to send Content-Encoding.
|
|
||||||
if protoVer == RemoteWriteVersion1HeaderValue && contentEncoding == "" {
|
|
||||||
contentEncoding = "snappy"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the request body.
|
|
||||||
body, err := io.ReadAll(r.Body)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
|
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deal with contentEncoding first.
|
// TODO: this should eventually be done detecting the format version above
|
||||||
var decompressed []byte
|
switch h.rwFormat {
|
||||||
|
case Version1:
|
||||||
switch contentEncoding {
|
err = h.write(r.Context(), req)
|
||||||
case "snappy":
|
case Version2:
|
||||||
decompressed, err = snappy.Decode(nil, body)
|
err = h.writeMinStr(r.Context(), reqMinStr)
|
||||||
if err != nil {
|
|
||||||
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", "Unsupported Content-Encoding", "contentEncoding", contentEncoding)
|
|
||||||
// Return a 406 so that the client can choose a more appropriate protocol to use.
|
|
||||||
http.Error(w, "Unsupported Content-Encoding", http.StatusNotAcceptable)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now we have a decompressed buffer we can unmarshal it.
|
|
||||||
// At this point we are happy with the version but need to check the encoding.
|
|
||||||
switch protoVer {
|
|
||||||
case RemoteWriteVersion1HeaderValue:
|
|
||||||
var req prompb.WriteRequest
|
|
||||||
if err := proto.Unmarshal(decompressed, &req); err != nil {
|
|
||||||
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = h.write(r.Context(), &req)
|
|
||||||
case RemoteWriteVersion20HeaderValue:
|
|
||||||
// 2.0 request.
|
|
||||||
var reqMinStr writev2.WriteRequest
|
|
||||||
if err := proto.Unmarshal(decompressed, &reqMinStr); err != nil {
|
|
||||||
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = h.writeMinStr(r.Context(), &reqMinStr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
|
|
|
@ -39,94 +39,15 @@ import (
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRemoteWriteHeadHandler(t *testing.T) {
|
|
||||||
handler := NewWriteHeadHandler(log.NewNopLogger(), nil, Version2)
|
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodHead, "", nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
|
||||||
handler.ServeHTTP(recorder, req)
|
|
||||||
|
|
||||||
resp := recorder.Result()
|
|
||||||
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
||||||
|
|
||||||
// Check header is expected value.
|
|
||||||
protHeader := resp.Header.Get(RemoteWriteVersionHeader)
|
|
||||||
require.Equal(t, "2.0;snappy,0.1.0", protHeader)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRemoteWriteHandlerMinimizedMissingContentEncoding(t *testing.T) {
|
|
||||||
// Send a v2 request without a "Content-Encoding:" header -> 406.
|
|
||||||
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
|
||||||
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
|
|
||||||
// Do not provide "Content-Encoding: snappy" header.
|
|
||||||
// req.Header.Set("Content-Encoding", "snappy")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
appendable := &mockAppendable{}
|
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2)
|
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
|
||||||
handler.ServeHTTP(recorder, req)
|
|
||||||
|
|
||||||
resp := recorder.Result()
|
|
||||||
// Should give us a 406.
|
|
||||||
require.Equal(t, http.StatusNotAcceptable, resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRemoteWriteHandlerInvalidCompression(t *testing.T) {
|
|
||||||
// Send a v2 request without an unhandled compression scheme -> 406.
|
|
||||||
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
|
||||||
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
|
|
||||||
req.Header.Set("Content-Encoding", "zstd")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
appendable := &mockAppendable{}
|
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2)
|
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
|
||||||
handler.ServeHTTP(recorder, req)
|
|
||||||
|
|
||||||
resp := recorder.Result()
|
|
||||||
// Expect a 406.
|
|
||||||
require.Equal(t, http.StatusNotAcceptable, resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRemoteWriteHandlerInvalidVersion(t *testing.T) {
|
|
||||||
// Send a protocol version number that isn't recognised/supported -> 406.
|
|
||||||
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
|
||||||
req.Header.Set(RemoteWriteVersionHeader, "3.0")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
appendable := &mockAppendable{}
|
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2)
|
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
|
||||||
handler.ServeHTTP(recorder, req)
|
|
||||||
|
|
||||||
resp := recorder.Result()
|
|
||||||
// Expect a 406.
|
|
||||||
require.Equal(t, http.StatusNotAcceptable, resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRemoteWriteHandler(t *testing.T) {
|
func TestRemoteWriteHandler(t *testing.T) {
|
||||||
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy")
|
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
appendable := &mockAppendable{}
|
appendable := &mockAppendable{}
|
||||||
|
// TODO: test with other proto format(s)
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
|
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version1)
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
recorder := httptest.NewRecorder()
|
||||||
|
@ -135,10 +56,6 @@ func TestRemoteWriteHandler(t *testing.T) {
|
||||||
resp := recorder.Result()
|
resp := recorder.Result()
|
||||||
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
||||||
|
|
||||||
// Check header is expected value.
|
|
||||||
protHeader := resp.Header.Get(RemoteWriteVersionHeader)
|
|
||||||
require.Equal(t, "0.1.0", protHeader)
|
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
j := 0
|
j := 0
|
||||||
k := 0
|
k := 0
|
||||||
|
@ -170,17 +87,16 @@ func TestRemoteWriteHandler(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
|
func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
|
||||||
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil, "snappy")
|
buf, _, _, err := buildV2WriteRequest(log.NewNopLogger(), writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)
|
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion2HeaderValue)
|
||||||
// Must provide "Content-Encoding: snappy" header.
|
|
||||||
req.Header.Set("Content-Encoding", "snappy")
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
appendable := &mockAppendable{}
|
appendable := &mockAppendable{}
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Version2)
|
// TODO: test with other proto format(s)
|
||||||
|
handler := NewWriteHandler(nil, nil, appendable, Version2)
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
recorder := httptest.NewRecorder()
|
||||||
handler.ServeHTTP(recorder, req)
|
handler.ServeHTTP(recorder, req)
|
||||||
|
@ -188,10 +104,6 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
|
||||||
resp := recorder.Result()
|
resp := recorder.Result()
|
||||||
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
||||||
|
|
||||||
// Check header is expected value.
|
|
||||||
protHeader := resp.Header.Get(RemoteWriteVersionHeader)
|
|
||||||
require.Equal(t, "2.0;snappy,0.1.0", protHeader)
|
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
j := 0
|
j := 0
|
||||||
k := 0
|
k := 0
|
||||||
|
@ -230,7 +142,7 @@ func TestOutOfOrderSample(t *testing.T) {
|
||||||
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
|
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
|
||||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||||
}}, nil, nil, nil, nil, "snappy")
|
}}, nil, nil, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
|
@ -256,7 +168,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
|
||||||
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
|
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
|
||||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||||
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
|
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
|
||||||
}}, nil, nil, nil, nil, "snappy")
|
}}, nil, nil, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
|
@ -280,7 +192,7 @@ func TestOutOfOrderHistogram(t *testing.T) {
|
||||||
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
|
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
|
||||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||||
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
|
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
|
||||||
}}, nil, nil, nil, nil, "snappy")
|
}}, nil, nil, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
|
@ -310,7 +222,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
|
||||||
{Name: "test_label_name_" + num, Value: labelValue + num},
|
{Name: "test_label_name_" + num, Value: labelValue + num},
|
||||||
},
|
},
|
||||||
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
|
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
|
||||||
}}, nil, nil, nil, nil, "snappy")
|
}}, nil, nil, nil, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
|
@ -329,7 +241,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCommitErr(t *testing.T) {
|
func TestCommitErr(t *testing.T) {
|
||||||
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy")
|
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
|
@ -368,7 +280,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
|
||||||
// TODO: test with other proto format(s)
|
// TODO: test with other proto format(s)
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Version1)
|
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Version1)
|
||||||
|
|
||||||
buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy")
|
buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
|
|
||||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
|
@ -381,7 +293,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
|
||||||
|
|
||||||
var bufRequests [][]byte
|
var bufRequests [][]byte
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
buf, _, _, err = buildWriteRequest(nil, genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy")
|
buf, _, _, err = buildWriteRequest(nil, genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
bufRequests = append(bufRequests, buf)
|
bufRequests = append(bufRequests, buf)
|
||||||
}
|
}
|
||||||
|
|
|
@ -217,10 +217,9 @@ type API struct {
|
||||||
isAgent bool
|
isAgent bool
|
||||||
statsRenderer StatsRenderer
|
statsRenderer StatsRenderer
|
||||||
|
|
||||||
remoteWriteHeadHandler http.Handler
|
remoteWriteHandler http.Handler
|
||||||
remoteWriteHandler http.Handler
|
remoteReadHandler http.Handler
|
||||||
remoteReadHandler http.Handler
|
otlpWriteHandler http.Handler
|
||||||
otlpWriteHandler http.Handler
|
|
||||||
|
|
||||||
codecs []Codec
|
codecs []Codec
|
||||||
}
|
}
|
||||||
|
@ -297,20 +296,7 @@ func NewAPI(
|
||||||
}
|
}
|
||||||
|
|
||||||
if rwEnabled {
|
if rwEnabled {
|
||||||
// TODO(alexg) - Two phase rwFormat rollout needs to create handlers with flag for advertising.
|
|
||||||
// For rollout we do two phases:
|
|
||||||
// 0. (Before) no flags set
|
|
||||||
// 1. (During) support new protocols but don't advertise
|
|
||||||
// <wait until all servers have rolled out and now support RW2.0>
|
|
||||||
// 2. (After) support new protocols and advertise
|
|
||||||
//
|
|
||||||
// For rollback the two phases are:
|
|
||||||
// 0. (Before) support new protocols and advertise
|
|
||||||
// 1. (During) support new protocols but don't advertise
|
|
||||||
// <wait a suitable period for all sending clients to be aware that receiving servers no longer support 2.0>
|
|
||||||
// 2. (After) no flags set
|
|
||||||
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat)
|
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat)
|
||||||
a.remoteWriteHeadHandler = remote.NewWriteHeadHandler(logger, registerer, rwFormat)
|
|
||||||
}
|
}
|
||||||
if otlpEnabled {
|
if otlpEnabled {
|
||||||
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap)
|
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap)
|
||||||
|
@ -407,7 +393,6 @@ func (api *API) Register(r *route.Router) {
|
||||||
r.Get("/status/walreplay", api.serveWALReplayStatus)
|
r.Get("/status/walreplay", api.serveWALReplayStatus)
|
||||||
r.Post("/read", api.ready(api.remoteRead))
|
r.Post("/read", api.ready(api.remoteRead))
|
||||||
r.Post("/write", api.ready(api.remoteWrite))
|
r.Post("/write", api.ready(api.remoteWrite))
|
||||||
r.Head("/write", api.remoteWriteHead)
|
|
||||||
r.Post("/otlp/v1/metrics", api.ready(api.otlpWrite))
|
r.Post("/otlp/v1/metrics", api.ready(api.otlpWrite))
|
||||||
|
|
||||||
r.Get("/alerts", wrapAgent(api.alerts))
|
r.Get("/alerts", wrapAgent(api.alerts))
|
||||||
|
@ -1631,14 +1616,6 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *API) remoteWriteHead(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if api.remoteWriteHeadHandler != nil {
|
|
||||||
api.remoteWriteHeadHandler.ServeHTTP(w, r)
|
|
||||||
} else {
|
|
||||||
http.Error(w, "remote write receiver needs to be enabled with --web.enable-remote-write-receiver", http.StatusNotFound)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) {
|
func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) {
|
||||||
if api.remoteWriteHandler != nil {
|
if api.remoteWriteHandler != nil {
|
||||||
api.remoteWriteHandler.ServeHTTP(w, r)
|
api.remoteWriteHandler.ServeHTTP(w, r)
|
||||||
|
|
|
@ -337,48 +337,6 @@ var sampleFlagMap = map[string]string{
|
||||||
"flag2": "value2",
|
"flag2": "value2",
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadEndpoint(t *testing.T) {
|
|
||||||
for _, tc := range []struct {
|
|
||||||
name string
|
|
||||||
rwFormat config.RemoteWriteFormat
|
|
||||||
expectedStatusCode int
|
|
||||||
expectedHeaderValue string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "HEAD Version 1",
|
|
||||||
rwFormat: remote.Version1,
|
|
||||||
expectedStatusCode: http.StatusOK,
|
|
||||||
expectedHeaderValue: "0.1.0",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "HEAD Version 2",
|
|
||||||
rwFormat: remote.Version2,
|
|
||||||
expectedStatusCode: http.StatusOK,
|
|
||||||
expectedHeaderValue: "2.0;snappy,0.1.0",
|
|
||||||
},
|
|
||||||
} {
|
|
||||||
r := route.New()
|
|
||||||
api := &API{
|
|
||||||
remoteWriteHeadHandler: remote.NewWriteHeadHandler(log.NewNopLogger(), nil, tc.rwFormat),
|
|
||||||
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
|
|
||||||
}
|
|
||||||
api.Register(r)
|
|
||||||
|
|
||||||
s := httptest.NewServer(r)
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
req, err := http.NewRequest("HEAD", s.URL+"/write", nil)
|
|
||||||
require.NoError(t, err, "Error creating HEAD request")
|
|
||||||
client := &http.Client{}
|
|
||||||
resp, err := client.Do(req)
|
|
||||||
require.NoError(t, err, "Error executing HEAD request")
|
|
||||||
require.Equal(t, tc.expectedStatusCode, resp.StatusCode)
|
|
||||||
|
|
||||||
promHeader := resp.Header.Get(remote.RemoteWriteVersionHeader)
|
|
||||||
require.Equal(t, tc.expectedHeaderValue, promHeader)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEndpoints(t *testing.T) {
|
func TestEndpoints(t *testing.T) {
|
||||||
storage := promql.LoadedStorage(t, `
|
storage := promql.LoadedStorage(t, `
|
||||||
load 1m
|
load 1m
|
||||||
|
|
Loading…
Reference in a new issue