Add streaming remote read to ReadClient (#11379)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

* Add streaming remote read to ReadClient

Signed-off-by: Justin Lei <justin.lei@grafana.com>

* Apply suggestions from code review

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: Justin Lei <justin.lei@grafana.com>

* Remote read instrumentation tweaks

Signed-off-by: Justin Lei <lei.justin@gmail.com>

* Minor cleanups

Signed-off-by: Justin Lei <lei.justin@gmail.com>

* In-line handleChunkedResponse

Signed-off-by: Justin Lei <lei.justin@gmail.com>

* Fix lints

Signed-off-by: Justin Lei <lei.justin@gmail.com>

* Explicitly call cancel() when needed

Signed-off-by: Justin Lei <lei.justin@gmail.com>

* Update chunkedSeries, chunkedSeriesIterator for new interfaces

Signed-off-by: Justin Lei <lei.justin@gmail.com>

* Adapt remote.chunkedSeries to use prompb.ChunkedSeries

Signed-off-by: Justin Lei <lei.justin@gmail.com>

* Fix lint

Signed-off-by: Justin Lei <lei.justin@gmail.com>

---------

Signed-off-by: Justin Lei <justin.lei@grafana.com>
Signed-off-by: Justin Lei <lei.justin@gmail.com>
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Justin Lei 2024-08-27 23:23:54 -07:00 committed by GitHub
parent 7757794bb3
commit 3a82cd5a7e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 819 additions and 61 deletions

View file

@ -221,6 +221,7 @@ var (
// DefaultRemoteReadConfig is the default remote read configuration. // DefaultRemoteReadConfig is the default remote read configuration.
DefaultRemoteReadConfig = RemoteReadConfig{ DefaultRemoteReadConfig = RemoteReadConfig{
RemoteTimeout: model.Duration(1 * time.Minute), RemoteTimeout: model.Duration(1 * time.Minute),
ChunkedReadLimit: DefaultChunkedReadLimit,
HTTPClientConfig: config.DefaultHTTPClientConfig, HTTPClientConfig: config.DefaultHTTPClientConfig,
FilterExternalLabels: true, FilterExternalLabels: true,
} }
@ -1279,10 +1280,17 @@ type MetadataConfig struct {
MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"` MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"`
} }
const (
// DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows.
// 50MB is the default. This is equivalent to ~100k full XOR chunks and average labelset.
DefaultChunkedReadLimit = 5e+7
)
// RemoteReadConfig is the configuration for reading from remote storage. // RemoteReadConfig is the configuration for reading from remote storage.
type RemoteReadConfig struct { type RemoteReadConfig struct {
URL *config.URL `yaml:"url"` URL *config.URL `yaml:"url"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
ChunkedReadLimit uint64 `yaml:"chunked_read_limit,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"` Headers map[string]string `yaml:"headers,omitempty"`
ReadRecent bool `yaml:"read_recent,omitempty"` ReadRecent bool `yaml:"read_recent,omitempty"`
Name string `yaml:"name,omitempty"` Name string `yaml:"name,omitempty"`

View file

@ -167,6 +167,7 @@ var expectedConf = &Config{
{ {
URL: mustParseURL("http://remote1/read"), URL: mustParseURL("http://remote1/read"),
RemoteTimeout: model.Duration(1 * time.Minute), RemoteTimeout: model.Duration(1 * time.Minute),
ChunkedReadLimit: DefaultChunkedReadLimit,
ReadRecent: true, ReadRecent: true,
Name: "default", Name: "default",
HTTPClientConfig: config.HTTPClientConfig{ HTTPClientConfig: config.HTTPClientConfig{
@ -178,6 +179,7 @@ var expectedConf = &Config{
{ {
URL: mustParseURL("http://remote3/read"), URL: mustParseURL("http://remote3/read"),
RemoteTimeout: model.Duration(1 * time.Minute), RemoteTimeout: model.Duration(1 * time.Minute),
ChunkedReadLimit: DefaultChunkedReadLimit,
ReadRecent: false, ReadRecent: false,
Name: "read_special", Name: "read_special",
RequiredMatchers: model.LabelSet{"job": "special"}, RequiredMatchers: model.LabelSet{"job": "special"},

View file

@ -26,10 +26,6 @@ import (
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
) )
// DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows.
// 50MB is the default. This is equivalent to ~100k full XOR chunks and average labelset.
const DefaultChunkedReadLimit = 5e+7
// The table gets initialized with sync.Once but may still cause a race // The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it // with any other use of the crc32 package anywhere. Thus we initialize it
// before. // before.

View file

@ -16,6 +16,7 @@ package remote
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -36,13 +37,14 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote/azuread" "github.com/prometheus/prometheus/storage/remote/azuread"
"github.com/prometheus/prometheus/storage/remote/googleiam" "github.com/prometheus/prometheus/storage/remote/googleiam"
) )
const maxErrMsgLen = 1024
const ( const (
maxErrMsgLen = 1024
RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version" RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version"
RemoteWriteVersion1HeaderValue = "0.1.0" RemoteWriteVersion1HeaderValue = "0.1.0"
RemoteWriteVersion20HeaderValue = "2.0.0" RemoteWriteVersion20HeaderValue = "2.0.0"
@ -68,9 +70,12 @@ var (
config.RemoteWriteProtoMsgV1: appProtoContentType, // Also application/x-protobuf;proto=prometheus.WriteRequest but simplified for compatibility with 1.x spec. config.RemoteWriteProtoMsgV1: appProtoContentType, // Also application/x-protobuf;proto=prometheus.WriteRequest but simplified for compatibility with 1.x spec.
config.RemoteWriteProtoMsgV2: appProtoContentType + ";proto=io.prometheus.write.v2.Request", config.RemoteWriteProtoMsgV2: appProtoContentType + ";proto=io.prometheus.write.v2.Request",
} }
)
var ( AcceptedResponseTypes = []prompb.ReadRequest_ResponseType{
prompb.ReadRequest_STREAMED_XOR_CHUNKS,
prompb.ReadRequest_SAMPLES,
}
remoteReadQueriesTotal = prometheus.NewCounterVec( remoteReadQueriesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
@ -78,7 +83,7 @@ var (
Name: "read_queries_total", Name: "read_queries_total",
Help: "The total number of remote read queries.", Help: "The total number of remote read queries.",
}, },
[]string{remoteName, endpoint, "code"}, []string{remoteName, endpoint, "response_type", "code"},
) )
remoteReadQueries = prometheus.NewGaugeVec( remoteReadQueries = prometheus.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
@ -94,13 +99,13 @@ var (
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "read_request_duration_seconds", Name: "read_request_duration_seconds",
Help: "Histogram of the latency for remote read requests.", Help: "Histogram of the latency for remote read requests. Note that for streamed responses this is only the duration of the initial call and does not include the processing of the stream.",
Buckets: append(prometheus.DefBuckets, 25, 60), Buckets: append(prometheus.DefBuckets, 25, 60),
NativeHistogramBucketFactor: 1.1, NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100, NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour, NativeHistogramMinResetDuration: 1 * time.Hour,
}, },
[]string{remoteName, endpoint}, []string{remoteName, endpoint, "response_type"},
) )
) )
@ -116,10 +121,11 @@ type Client struct {
timeout time.Duration timeout time.Duration
retryOnRateLimit bool retryOnRateLimit bool
chunkedReadLimit uint64
readQueries prometheus.Gauge readQueries prometheus.Gauge
readQueriesTotal *prometheus.CounterVec readQueriesTotal *prometheus.CounterVec
readQueriesDuration prometheus.Observer readQueriesDuration prometheus.ObserverVec
writeProtoMsg config.RemoteWriteProtoMsg writeProtoMsg config.RemoteWriteProtoMsg
writeCompression Compression // Not exposed by ClientConfig for now. writeCompression Compression // Not exposed by ClientConfig for now.
@ -136,12 +142,13 @@ type ClientConfig struct {
Headers map[string]string Headers map[string]string
RetryOnRateLimit bool RetryOnRateLimit bool
WriteProtoMsg config.RemoteWriteProtoMsg WriteProtoMsg config.RemoteWriteProtoMsg
ChunkedReadLimit uint64
} }
// ReadClient uses the SAMPLES method of remote read to read series samples from remote server. // ReadClient will request the STREAMED_XOR_CHUNKS method of remote read but can
// TODO(bwplotka): Add streamed chunked remote read method as well (https://github.com/prometheus/prometheus/issues/5926). // also fall back to the SAMPLES method if necessary.
type ReadClient interface { type ReadClient interface {
Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error)
} }
// NewReadClient creates a new client for remote read. // NewReadClient creates a new client for remote read.
@ -162,9 +169,10 @@ func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) {
urlString: conf.URL.String(), urlString: conf.URL.String(),
Client: httpClient, Client: httpClient,
timeout: time.Duration(conf.Timeout), timeout: time.Duration(conf.Timeout),
chunkedReadLimit: conf.ChunkedReadLimit,
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()), readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}), readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
readQueriesDuration: remoteReadQueryDuration.WithLabelValues(name, conf.URL.String()), readQueriesDuration: remoteReadQueryDuration.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
}, nil }, nil
} }
@ -278,8 +286,8 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) (WriteRespo
return WriteResponseStats{}, RecoverableError{err, defaultBackoff} return WriteResponseStats{}, RecoverableError{err, defaultBackoff}
} }
defer func() { defer func() {
io.Copy(io.Discard, httpResp.Body) _, _ = io.Copy(io.Discard, httpResp.Body)
httpResp.Body.Close() _ = httpResp.Body.Close()
}() }()
// TODO(bwplotka): Pass logger and emit debug on error? // TODO(bwplotka): Pass logger and emit debug on error?
@ -329,17 +337,17 @@ func (c *Client) Endpoint() string {
return c.urlString return c.urlString
} }
// Read reads from a remote endpoint. // Read reads from a remote endpoint. The sortSeries parameter is only respected in the case of a sampled response;
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { // chunked responses arrive already sorted by the server.
func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
c.readQueries.Inc() c.readQueries.Inc()
defer c.readQueries.Dec() defer c.readQueries.Dec()
req := &prompb.ReadRequest{ req := &prompb.ReadRequest{
// TODO: Support batching multiple queries into one read request, // TODO: Support batching multiple queries into one read request,
// as the protobuf interface allows for it. // as the protobuf interface allows for it.
Queries: []*prompb.Query{ Queries: []*prompb.Query{query},
query, AcceptedResponseTypes: AcceptedResponseTypes,
},
} }
data, err := proto.Marshal(req) data, err := proto.Marshal(req)
if err != nil { if err != nil {
@ -358,7 +366,6 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
ctx, cancel := context.WithTimeout(ctx, c.timeout) ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
ctx, span := otel.Tracer("").Start(ctx, "Remote Read", trace.WithSpanKind(trace.SpanKindClient)) ctx, span := otel.Tracer("").Start(ctx, "Remote Read", trace.WithSpanKind(trace.SpanKindClient))
defer span.End() defer span.End()
@ -366,24 +373,58 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe
start := time.Now() start := time.Now()
httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) httpResp, err := c.Client.Do(httpReq.WithContext(ctx))
if err != nil { if err != nil {
cancel()
return nil, fmt.Errorf("error sending request: %w", err) return nil, fmt.Errorf("error sending request: %w", err)
} }
defer func() {
io.Copy(io.Discard, httpResp.Body)
httpResp.Body.Close()
}()
c.readQueriesDuration.Observe(time.Since(start).Seconds())
c.readQueriesTotal.WithLabelValues(strconv.Itoa(httpResp.StatusCode)).Inc()
compressed, err = io.ReadAll(httpResp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err)
}
if httpResp.StatusCode/100 != 2 { if httpResp.StatusCode/100 != 2 {
return nil, fmt.Errorf("remote server %s returned HTTP status %s: %s", c.urlString, httpResp.Status, strings.TrimSpace(string(compressed))) // Make an attempt at getting an error message.
body, _ := io.ReadAll(httpResp.Body)
_ = httpResp.Body.Close()
cancel()
return nil, fmt.Errorf("remote server %s returned http status %s: %s", c.urlString, httpResp.Status, string(body))
} }
contentType := httpResp.Header.Get("Content-Type")
switch {
case strings.HasPrefix(contentType, "application/x-protobuf"):
c.readQueriesDuration.WithLabelValues("sampled").Observe(time.Since(start).Seconds())
c.readQueriesTotal.WithLabelValues("sampled", strconv.Itoa(httpResp.StatusCode)).Inc()
ss, err := c.handleSampledResponse(req, httpResp, sortSeries)
cancel()
return ss, err
case strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"):
c.readQueriesDuration.WithLabelValues("chunked").Observe(time.Since(start).Seconds())
s := NewChunkedReader(httpResp.Body, c.chunkedReadLimit, nil)
return NewChunkedSeriesSet(s, httpResp.Body, query.StartTimestampMs, query.EndTimestampMs, func(err error) {
code := strconv.Itoa(httpResp.StatusCode)
if !errors.Is(err, io.EOF) {
code = "aborted_stream"
}
c.readQueriesTotal.WithLabelValues("chunked", code).Inc()
cancel()
}), nil
default:
c.readQueriesDuration.WithLabelValues("unsupported").Observe(time.Since(start).Seconds())
c.readQueriesTotal.WithLabelValues("unsupported", strconv.Itoa(httpResp.StatusCode)).Inc()
cancel()
return nil, fmt.Errorf("unsupported content type: %s", contentType)
}
}
func (c *Client) handleSampledResponse(req *prompb.ReadRequest, httpResp *http.Response, sortSeries bool) (storage.SeriesSet, error) {
compressed, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err)
}
defer func() {
_, _ = io.Copy(io.Discard, httpResp.Body)
_ = httpResp.Body.Close()
}()
uncompressed, err := snappy.Decode(nil, compressed) uncompressed, err := snappy.Decode(nil, compressed)
if err != nil { if err != nil {
return nil, fmt.Errorf("error reading response: %w", err) return nil, fmt.Errorf("error reading response: %w", err)
@ -399,5 +440,8 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
} }
return resp.Results[0], nil // This client does not batch queries so there's always only 1 result.
res := resp.Results[0]
return FromQueryResult(sortSeries, res), nil
} }

View file

@ -23,9 +23,15 @@ import (
"testing" "testing"
"time" "time"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
) )
var longErrMessage = strings.Repeat("error message", maxErrMsgLen) var longErrMessage = strings.Repeat("error message", maxErrMsgLen)
@ -208,3 +214,226 @@ func TestClientCustomHeaders(t *testing.T) {
require.True(t, called, "The remote server wasn't called") require.True(t, called, "The remote server wasn't called")
} }
func TestReadClient(t *testing.T) {
tests := []struct {
name string
query *prompb.Query
httpHandler http.HandlerFunc
expectedLabels []map[string]string
expectedSamples [][]model.SamplePair
expectedErrorContains string
sortSeries bool
}{
{
name: "sorted sampled response",
httpHandler: sampledResponseHTTPHandler(t),
expectedLabels: []map[string]string{
{"foo1": "bar"},
{"foo2": "bar"},
},
expectedSamples: [][]model.SamplePair{
{
{Timestamp: model.Time(0), Value: model.SampleValue(3)},
{Timestamp: model.Time(5), Value: model.SampleValue(4)},
},
{
{Timestamp: model.Time(0), Value: model.SampleValue(1)},
{Timestamp: model.Time(5), Value: model.SampleValue(2)},
},
},
expectedErrorContains: "",
sortSeries: true,
},
{
name: "unsorted sampled response",
httpHandler: sampledResponseHTTPHandler(t),
expectedLabels: []map[string]string{
{"foo2": "bar"},
{"foo1": "bar"},
},
expectedSamples: [][]model.SamplePair{
{
{Timestamp: model.Time(0), Value: model.SampleValue(1)},
{Timestamp: model.Time(5), Value: model.SampleValue(2)},
},
{
{Timestamp: model.Time(0), Value: model.SampleValue(3)},
{Timestamp: model.Time(5), Value: model.SampleValue(4)},
},
},
expectedErrorContains: "",
sortSeries: false,
},
{
name: "chunked response",
query: &prompb.Query{
StartTimestampMs: 4000,
EndTimestampMs: 12000,
},
httpHandler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
flusher, ok := w.(http.Flusher)
require.True(t, ok)
cw := NewChunkedWriter(w, flusher)
l := []prompb.Label{
{Name: "foo", Value: "bar"},
}
chunks := buildTestChunks(t)
for i, c := range chunks {
cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}}
readResp := prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
QueryIndex: int64(i),
}
b, err := proto.Marshal(&readResp)
require.NoError(t, err)
_, err = cw.Write(b)
require.NoError(t, err)
}
}),
expectedLabels: []map[string]string{
{"foo": "bar"},
{"foo": "bar"},
{"foo": "bar"},
},
// This is the output of buildTestChunks minus the samples outside the query range.
expectedSamples: [][]model.SamplePair{
{
{Timestamp: model.Time(4000), Value: model.SampleValue(4)},
},
{
{Timestamp: model.Time(5000), Value: model.SampleValue(1)},
{Timestamp: model.Time(6000), Value: model.SampleValue(2)},
{Timestamp: model.Time(7000), Value: model.SampleValue(3)},
{Timestamp: model.Time(8000), Value: model.SampleValue(4)},
{Timestamp: model.Time(9000), Value: model.SampleValue(5)},
},
{
{Timestamp: model.Time(10000), Value: model.SampleValue(2)},
{Timestamp: model.Time(11000), Value: model.SampleValue(3)},
{Timestamp: model.Time(12000), Value: model.SampleValue(4)},
},
},
expectedErrorContains: "",
},
{
name: "unsupported content type",
httpHandler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "foobar")
}),
expectedErrorContains: "unsupported content type",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := httptest.NewServer(test.httpHandler)
defer server.Close()
u, err := url.Parse(server.URL)
require.NoError(t, err)
conf := &ClientConfig{
URL: &config_util.URL{URL: u},
Timeout: model.Duration(5 * time.Second),
ChunkedReadLimit: config.DefaultChunkedReadLimit,
}
c, err := NewReadClient("test", conf)
require.NoError(t, err)
query := &prompb.Query{}
if test.query != nil {
query = test.query
}
ss, err := c.Read(context.Background(), query, test.sortSeries)
if test.expectedErrorContains != "" {
require.ErrorContains(t, err, test.expectedErrorContains)
return
}
require.NoError(t, err)
i := 0
for ss.Next() {
require.NoError(t, ss.Err())
s := ss.At()
l := s.Labels()
require.Len(t, test.expectedLabels[i], l.Len())
for k, v := range test.expectedLabels[i] {
require.True(t, l.Has(k))
require.Equal(t, v, l.Get(k))
}
it := s.Iterator(nil)
j := 0
for valType := it.Next(); valType != chunkenc.ValNone; valType = it.Next() {
require.NoError(t, it.Err())
ts, v := it.At()
expectedSample := test.expectedSamples[i][j]
require.Equal(t, int64(expectedSample.Timestamp), ts)
require.Equal(t, float64(expectedSample.Value), v)
j++
}
require.Len(t, test.expectedSamples[i], j)
i++
}
require.NoError(t, ss.Err())
})
}
}
func sampledResponseHTTPHandler(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/x-protobuf")
resp := prompb.ReadResponse{
Results: []*prompb.QueryResult{
{
Timeseries: []*prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "foo2", Value: "bar"},
},
Samples: []prompb.Sample{
{Value: float64(1), Timestamp: int64(0)},
{Value: float64(2), Timestamp: int64(5)},
},
Exemplars: []prompb.Exemplar{},
},
{
Labels: []prompb.Label{
{Name: "foo1", Value: "bar"},
},
Samples: []prompb.Sample{
{Value: float64(3), Timestamp: int64(0)},
{Value: float64(4), Timestamp: int64(5)},
},
Exemplars: []prompb.Exemplar{},
},
},
},
},
}
b, err := proto.Marshal(&resp)
require.NoError(t, err)
_, err = w.Write(snappy.Encode(nil, b))
require.NoError(t, err)
}
}

View file

@ -540,6 +540,220 @@ func (c *concreteSeriesIterator) Err() error {
return nil return nil
} }
// chunkedSeriesSet implements storage.SeriesSet.
type chunkedSeriesSet struct {
chunkedReader *ChunkedReader
respBody io.ReadCloser
mint, maxt int64
cancel func(error)
current storage.Series
err error
}
func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, mint, maxt int64, cancel func(error)) storage.SeriesSet {
return &chunkedSeriesSet{
chunkedReader: chunkedReader,
respBody: respBody,
mint: mint,
maxt: maxt,
cancel: cancel,
}
}
// Next return true if there is a next series and false otherwise. It will
// block until the next series is available.
func (s *chunkedSeriesSet) Next() bool {
res := &prompb.ChunkedReadResponse{}
err := s.chunkedReader.NextProto(res)
if err != nil {
if !errors.Is(err, io.EOF) {
s.err = err
_, _ = io.Copy(io.Discard, s.respBody)
}
_ = s.respBody.Close()
s.cancel(err)
return false
}
s.current = &chunkedSeries{
ChunkedSeries: prompb.ChunkedSeries{
Labels: res.ChunkedSeries[0].Labels,
Chunks: res.ChunkedSeries[0].Chunks,
},
mint: s.mint,
maxt: s.maxt,
}
return true
}
func (s *chunkedSeriesSet) At() storage.Series {
return s.current
}
func (s *chunkedSeriesSet) Err() error {
return s.err
}
func (s *chunkedSeriesSet) Warnings() annotations.Annotations {
return nil
}
type chunkedSeries struct {
prompb.ChunkedSeries
mint, maxt int64
}
var _ storage.Series = &chunkedSeries{}
func (s *chunkedSeries) Labels() labels.Labels {
b := labels.NewScratchBuilder(0)
return s.ToLabels(&b, nil)
}
func (s *chunkedSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
csIt, ok := it.(*chunkedSeriesIterator)
if ok {
csIt.reset(s.Chunks, s.mint, s.maxt)
return csIt
}
return newChunkedSeriesIterator(s.Chunks, s.mint, s.maxt)
}
type chunkedSeriesIterator struct {
chunks []prompb.Chunk
idx int
cur chunkenc.Iterator
valType chunkenc.ValueType
mint, maxt int64
err error
}
var _ chunkenc.Iterator = &chunkedSeriesIterator{}
func newChunkedSeriesIterator(chunks []prompb.Chunk, mint, maxt int64) *chunkedSeriesIterator {
it := &chunkedSeriesIterator{}
it.reset(chunks, mint, maxt)
return it
}
func (it *chunkedSeriesIterator) Next() chunkenc.ValueType {
if it.err != nil {
return chunkenc.ValNone
}
if len(it.chunks) == 0 {
return chunkenc.ValNone
}
for it.valType = it.cur.Next(); it.valType != chunkenc.ValNone; it.valType = it.cur.Next() {
atT := it.AtT()
if atT > it.maxt {
it.chunks = nil // Exhaust this iterator so follow-up calls to Next or Seek return fast.
return chunkenc.ValNone
}
if atT >= it.mint {
return it.valType
}
}
if it.idx >= len(it.chunks)-1 {
it.valType = chunkenc.ValNone
} else {
it.idx++
it.resetIterator()
it.valType = it.Next()
}
return it.valType
}
func (it *chunkedSeriesIterator) Seek(t int64) chunkenc.ValueType {
if it.err != nil {
return chunkenc.ValNone
}
if len(it.chunks) == 0 {
return chunkenc.ValNone
}
startIdx := it.idx
it.idx += sort.Search(len(it.chunks)-startIdx, func(i int) bool {
return it.chunks[startIdx+i].MaxTimeMs >= t
})
if it.idx > startIdx {
it.resetIterator()
} else {
ts := it.cur.AtT()
if ts >= t {
return it.valType
}
}
for it.valType = it.cur.Next(); it.valType != chunkenc.ValNone; it.valType = it.cur.Next() {
ts := it.cur.AtT()
if ts > it.maxt {
it.chunks = nil // Exhaust this iterator so follow-up calls to Next or Seek return fast.
return chunkenc.ValNone
}
if ts >= t && ts >= it.mint {
return it.valType
}
}
it.valType = chunkenc.ValNone
return it.valType
}
func (it *chunkedSeriesIterator) resetIterator() {
if it.idx < len(it.chunks) {
chunk := it.chunks[it.idx]
decodedChunk, err := chunkenc.FromData(chunkenc.Encoding(chunk.Type), chunk.Data)
if err != nil {
it.err = err
return
}
it.cur = decodedChunk.Iterator(nil)
} else {
it.cur = chunkenc.NewNopIterator()
}
}
func (it *chunkedSeriesIterator) reset(chunks []prompb.Chunk, mint, maxt int64) {
it.chunks = chunks
it.mint = mint
it.maxt = maxt
it.idx = 0
if len(chunks) > 0 {
it.resetIterator()
}
}
func (it *chunkedSeriesIterator) At() (ts int64, v float64) {
return it.cur.At()
}
func (it *chunkedSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
return it.cur.AtHistogram(h)
}
func (it *chunkedSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return it.cur.AtFloatHistogram(fh)
}
func (it *chunkedSeriesIterator) AtT() int64 {
return it.cur.AtT()
}
func (it *chunkedSeriesIterator) Err() error {
return it.err
}
// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read, // validateLabelsAndMetricName validates the label names/values and metric names returned from remote read,
// also making sure that there are no labels with duplicate names. // also making sure that there are no labels with duplicate names.
func validateLabelsAndMetricName(ls []prompb.Label) error { func validateLabelsAndMetricName(ls []prompb.Label) error {
@ -612,15 +826,6 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro
return result, nil return result, nil
} }
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric.
func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
metric := make(model.Metric, len(labelPairs))
for _, l := range labelPairs {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
return metric
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling // DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression. // snappy decompression.
// Used also by documentation/examples/remote_storage. // Used also by documentation/examples/remote_storage.

View file

@ -16,6 +16,7 @@ package remote
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"sync" "sync"
"testing" "testing"
@ -24,6 +25,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/metadata"
@ -705,3 +707,270 @@ func (c *mockChunkIterator) Next() bool {
func (c *mockChunkIterator) Err() error { func (c *mockChunkIterator) Err() error {
return nil return nil
} }
func TestChunkedSeriesIterator(t *testing.T) {
t.Run("happy path", func(t *testing.T) {
chks := buildTestChunks(t)
it := newChunkedSeriesIterator(chks, 2000, 12000)
require.NoError(t, it.err)
require.NotNil(t, it.cur)
// Initial next; advance to first valid sample of first chunk.
res := it.Next()
require.Equal(t, chunkenc.ValFloat, res)
require.NoError(t, it.Err())
ts, v := it.At()
require.Equal(t, int64(2000), ts)
require.Equal(t, float64(2), v)
// Next to the second sample of the first chunk.
res = it.Next()
require.Equal(t, chunkenc.ValFloat, res)
require.NoError(t, it.Err())
ts, v = it.At()
require.Equal(t, int64(3000), ts)
require.Equal(t, float64(3), v)
// Attempt to seek to the first sample of the first chunk (should return current sample).
res = it.Seek(0)
require.Equal(t, chunkenc.ValFloat, res)
ts, v = it.At()
require.Equal(t, int64(3000), ts)
require.Equal(t, float64(3), v)
// Seek to the end of the first chunk.
res = it.Seek(4000)
require.Equal(t, chunkenc.ValFloat, res)
ts, v = it.At()
require.Equal(t, int64(4000), ts)
require.Equal(t, float64(4), v)
// Next to the first sample of the second chunk.
res = it.Next()
require.Equal(t, chunkenc.ValFloat, res)
require.NoError(t, it.Err())
ts, v = it.At()
require.Equal(t, int64(5000), ts)
require.Equal(t, float64(1), v)
// Seek to the second sample of the third chunk.
res = it.Seek(10999)
require.Equal(t, chunkenc.ValFloat, res)
require.NoError(t, it.Err())
ts, v = it.At()
require.Equal(t, int64(11000), ts)
require.Equal(t, float64(3), v)
// Attempt to seek to something past the last sample (should return false and exhaust the iterator).
res = it.Seek(99999)
require.Equal(t, chunkenc.ValNone, res)
require.NoError(t, it.Err())
// Attempt to next past the last sample (should return false as the iterator is exhausted).
res = it.Next()
require.Equal(t, chunkenc.ValNone, res)
require.NoError(t, it.Err())
})
t.Run("invalid chunk encoding error", func(t *testing.T) {
chks := buildTestChunks(t)
// Set chunk type to an invalid value.
chks[0].Type = 8
it := newChunkedSeriesIterator(chks, 0, 14000)
res := it.Next()
require.Equal(t, chunkenc.ValNone, res)
res = it.Seek(1000)
require.Equal(t, chunkenc.ValNone, res)
require.ErrorContains(t, it.err, "invalid chunk encoding")
require.Nil(t, it.cur)
})
t.Run("empty chunks", func(t *testing.T) {
emptyChunks := make([]prompb.Chunk, 0)
it1 := newChunkedSeriesIterator(emptyChunks, 0, 1000)
require.Equal(t, chunkenc.ValNone, it1.Next())
require.Equal(t, chunkenc.ValNone, it1.Seek(1000))
require.NoError(t, it1.Err())
var nilChunks []prompb.Chunk
it2 := newChunkedSeriesIterator(nilChunks, 0, 1000)
require.Equal(t, chunkenc.ValNone, it2.Next())
require.Equal(t, chunkenc.ValNone, it2.Seek(1000))
require.NoError(t, it2.Err())
})
}
func TestChunkedSeries(t *testing.T) {
t.Run("happy path", func(t *testing.T) {
chks := buildTestChunks(t)
s := chunkedSeries{
ChunkedSeries: prompb.ChunkedSeries{
Labels: []prompb.Label{
{Name: "foo", Value: "bar"},
{Name: "asdf", Value: "zxcv"},
},
Chunks: chks,
},
}
require.Equal(t, labels.FromStrings("asdf", "zxcv", "foo", "bar"), s.Labels())
it := s.Iterator(nil)
res := it.Next() // Behavior is undefined w/o the initial call to Next.
require.Equal(t, chunkenc.ValFloat, res)
require.NoError(t, it.Err())
ts, v := it.At()
require.Equal(t, int64(0), ts)
require.Equal(t, float64(0), v)
})
}
func TestChunkedSeriesSet(t *testing.T) {
t.Run("happy path", func(t *testing.T) {
buf := &bytes.Buffer{}
flusher := &mockFlusher{}
w := NewChunkedWriter(buf, flusher)
r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil)
chks := buildTestChunks(t)
l := []prompb.Label{
{Name: "foo", Value: "bar"},
}
for i, c := range chks {
cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}}
readResp := prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
QueryIndex: int64(i),
}
b, err := proto.Marshal(&readResp)
require.NoError(t, err)
_, err = w.Write(b)
require.NoError(t, err)
}
ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func(error) {})
require.NoError(t, ss.Err())
require.Nil(t, ss.Warnings())
res := ss.Next()
require.True(t, res)
require.NoError(t, ss.Err())
s := ss.At()
require.Equal(t, 1, s.Labels().Len())
require.True(t, s.Labels().Has("foo"))
require.Equal(t, "bar", s.Labels().Get("foo"))
it := s.Iterator(nil)
it.Next()
ts, v := it.At()
require.Equal(t, int64(0), ts)
require.Equal(t, float64(0), v)
numResponses := 1
for ss.Next() {
numResponses++
}
require.Equal(t, numTestChunks, numResponses)
require.NoError(t, ss.Err())
})
t.Run("chunked reader error", func(t *testing.T) {
buf := &bytes.Buffer{}
flusher := &mockFlusher{}
w := NewChunkedWriter(buf, flusher)
r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil)
chks := buildTestChunks(t)
l := []prompb.Label{
{Name: "foo", Value: "bar"},
}
for i, c := range chks {
cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}}
readResp := prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{&cSeries},
QueryIndex: int64(i),
}
b, err := proto.Marshal(&readResp)
require.NoError(t, err)
b[0] = 0xFF // Corruption!
_, err = w.Write(b)
require.NoError(t, err)
}
ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func(error) {})
require.NoError(t, ss.Err())
require.Nil(t, ss.Warnings())
res := ss.Next()
require.False(t, res)
require.ErrorContains(t, ss.Err(), "proto: illegal wireType 7")
})
}
// mockFlusher implements http.Flusher.
type mockFlusher struct{}
func (f *mockFlusher) Flush() {}
const (
numTestChunks = 3
numSamplesPerTestChunk = 5
)
func buildTestChunks(t *testing.T) []prompb.Chunk {
startTime := int64(0)
chks := make([]prompb.Chunk, 0, numTestChunks)
time := startTime
for i := 0; i < numTestChunks; i++ {
c := chunkenc.NewXORChunk()
a, err := c.Appender()
require.NoError(t, err)
minTimeMs := time
for j := 0; j < numSamplesPerTestChunk; j++ {
a.Append(time, float64(i+j))
time += int64(1000)
}
chks = append(chks, prompb.Chunk{
MinTimeMs: minTimeMs,
MaxTimeMs: time,
Type: prompb.Chunk_XOR,
Data: c.Bytes(),
})
}
return chks
}

View file

@ -165,11 +165,11 @@ func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se
return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err)) return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err))
} }
res, err := q.client.Read(ctx, query) res, err := q.client.Read(ctx, query, sortSeries)
if err != nil { if err != nil {
return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err)) return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err))
} }
return newSeriesSetFilter(FromQueryResult(sortSeries, res), added) return newSeriesSetFilter(res, added)
} }
// addExternalLabels adds matchers for each external label. External labels // addExternalLabels adds matchers for each external label. External labels

View file

@ -179,7 +179,7 @@ func BenchmarkStreamReadEndpoint(b *testing.B) {
require.Equal(b, 2, recorder.Code/100) require.Equal(b, 2, recorder.Code/100)
var results []*prompb.ChunkedReadResponse var results []*prompb.ChunkedReadResponse
stream := NewChunkedReader(recorder.Result().Body, DefaultChunkedReadLimit, nil) stream := NewChunkedReader(recorder.Result().Body, config.DefaultChunkedReadLimit, nil)
for { for {
res := &prompb.ChunkedReadResponse{} res := &prompb.ChunkedReadResponse{}
@ -280,7 +280,7 @@ func TestStreamReadEndpoint(t *testing.T) {
require.Equal(t, "", recorder.Result().Header.Get("Content-Encoding")) require.Equal(t, "", recorder.Result().Header.Get("Content-Encoding"))
var results []*prompb.ChunkedReadResponse var results []*prompb.ChunkedReadResponse
stream := NewChunkedReader(recorder.Result().Body, DefaultChunkedReadLimit, nil) stream := NewChunkedReader(recorder.Result().Body, config.DefaultChunkedReadLimit, nil)
for { for {
res := &prompb.ChunkedReadResponse{} res := &prompb.ChunkedReadResponse{}
err := stream.NextProto(res) err := stream.NextProto(res)

View file

@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -198,7 +199,7 @@ type mockedRemoteClient struct {
b labels.ScratchBuilder b labels.ScratchBuilder
} }
func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) { func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
if c.got != nil { if c.got != nil {
return nil, fmt.Errorf("expected only one call to remote client got: %v", query) return nil, fmt.Errorf("expected only one call to remote client got: %v", query)
} }
@ -227,7 +228,7 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prom
q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels}) q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels})
} }
} }
return q, nil return FromQueryResult(sortSeries, q), nil
} }
func (c *mockedRemoteClient) reset() { func (c *mockedRemoteClient) reset() {

View file

@ -115,6 +115,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
c, err := NewReadClient(name, &ClientConfig{ c, err := NewReadClient(name, &ClientConfig{
URL: rrConf.URL, URL: rrConf.URL,
Timeout: rrConf.RemoteTimeout, Timeout: rrConf.RemoteTimeout,
ChunkedReadLimit: rrConf.ChunkedReadLimit,
HTTPClientConfig: rrConf.HTTPClientConfig, HTTPClientConfig: rrConf.HTTPClientConfig,
Headers: rrConf.Headers, Headers: rrConf.Headers,
}) })

View file

@ -1074,6 +1074,9 @@ func setupRemote(s storage.Storage) *httptest.Server {
} }
} }
w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy")
if err := remote.EncodeReadResponse(&resp, w); err != nil { if err := remote.EncodeReadResponse(&resp, w); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return