prometheus/storage/remote/codec.go

544 lines
14 KiB
Go
Raw Normal View History

// Copyright 2017 The Prometheus Authors
2017-10-23 06:44:57 -07:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"fmt"
"io"
2017-10-23 06:44:57 -07:00
"io/ioutil"
"net/http"
"sort"
2017-10-23 06:44:57 -07:00
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/pkg/errors"
2017-10-23 06:44:57 -07:00
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/prompb"
2017-10-23 13:28:17 -07:00
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
2017-10-23 06:44:57 -07:00
)
// decodeReadLimit is the maximum size of a read request body in bytes.
const decodeReadLimit = 32 * 1024 * 1024
type HTTPError struct {
msg string
status int
}
func (e HTTPError) Error() string {
return e.msg
}
func (e HTTPError) Status() int {
return e.status
}
2017-10-23 06:44:57 -07:00
// DecodeReadRequest reads a remote.Request from a http.Request.
func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error) {
compressed, err := ioutil.ReadAll(io.LimitReader(r.Body, decodeReadLimit))
2017-10-23 06:44:57 -07:00
if err != nil {
return nil, err
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, err
}
var req prompb.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}
return &req, nil
}
// EncodeReadResponse writes a remote.Response to a http.ResponseWriter.
func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error {
data, err := proto.Marshal(resp)
if err != nil {
return err
}
compressed := snappy.Encode(nil, data)
_, err = w.Write(compressed)
return err
}
// ToQuery builds a Query proto.
func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams) (*prompb.Query, error) {
2017-10-23 06:44:57 -07:00
ms, err := toLabelMatchers(matchers)
if err != nil {
return nil, err
}
var rp *prompb.ReadHints
if p != nil {
rp = &prompb.ReadHints{
StepMs: p.Step,
Func: p.Func,
StartMs: p.Start,
EndMs: p.End,
Grouping: p.Grouping,
By: p.By,
RangeMs: p.Range,
}
}
2017-10-23 06:44:57 -07:00
return &prompb.Query{
StartTimestampMs: from,
EndTimestampMs: to,
Matchers: ms,
Hints: rp,
2017-10-23 06:44:57 -07:00
}, nil
}
// ToQueryResult builds a QueryResult proto.
func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, error) {
numSamples := 0
2017-10-23 06:44:57 -07:00
resp := &prompb.QueryResult{}
2017-10-23 13:28:17 -07:00
for ss.Next() {
series := ss.At()
iter := series.Iterator()
samples := []prompb.Sample{}
2017-10-23 13:28:17 -07:00
for iter.Next() {
numSamples++
if sampleLimit > 0 && numSamples > sampleLimit {
return nil, HTTPError{
msg: fmt.Sprintf("exceeded sample limit (%d)", sampleLimit),
status: http.StatusBadRequest,
}
}
2017-10-23 13:28:17 -07:00
ts, val := iter.At()
samples = append(samples, prompb.Sample{
2017-10-23 13:28:17 -07:00
Timestamp: ts,
Value: val,
2017-10-23 06:44:57 -07:00
})
}
2017-10-23 13:28:17 -07:00
if err := iter.Err(); err != nil {
return nil, err
}
resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{
Labels: labelsToLabelsProto(series.Labels(), nil),
2017-10-23 13:28:17 -07:00
Samples: samples,
})
}
if err := ss.Err(); err != nil {
return nil, err
2017-10-23 06:44:57 -07:00
}
2017-10-23 13:28:17 -07:00
return resp, nil
2017-10-23 06:44:57 -07:00
}
Don't sort postings if we only have one block. Sorting the heads postings can be quite slow. We only need sorted series when merging with another querier, so only sort then. This will make big queries that only touch the head faster, though queries that touch both the head and a block will still be the same speed. This probably won't help much with graphing unless the range is under an hour, however it should make most recording rules faster. Add gaurantee that remote read streaming produces sorted series. PromQL benchmarks for histograms show only 2-3% improvement, but they're only over 1k series. benchmark old ns/op new ns/op delta BenchmarkQuerierSelect/Head/1of1000000-4 1375486282 507657736 -63.09% BenchmarkQuerierSelect/Head/10of1000000-4 1387859004 507769850 -63.41% BenchmarkQuerierSelect/Head/100of1000000-4 1387087935 506029110 -63.52% BenchmarkQuerierSelect/Head/1000of1000000-4 1386869064 504521986 -63.62% BenchmarkQuerierSelect/Head/10000of1000000-4 1386213685 505210422 -63.55% BenchmarkQuerierSelect/Head/100000of1000000-4 1392754988 529842406 -61.96% BenchmarkQuerierSelect/Head/1000000of1000000-4 1569414722 725059506 -53.80% BenchmarkQuerierSelect/SortedHead/1of1000000-4 1381019902 1370495863 -0.76% BenchmarkQuerierSelect/SortedHead/10of1000000-4 1375696209 1366789468 -0.65% BenchmarkQuerierSelect/SortedHead/100of1000000-4 1386009422 1364519297 -1.55% BenchmarkQuerierSelect/SortedHead/1000of1000000-4 1377700532 1364486191 -0.96% BenchmarkQuerierSelect/SortedHead/10000of1000000-4 1383539536 1369545314 -1.01% BenchmarkQuerierSelect/SortedHead/100000of1000000-4 1410089163 1394731339 -1.09% BenchmarkQuerierSelect/SortedHead/1000000of1000000-4 1634744148 1581554956 -3.25% BenchmarkQuerierSelect/Block/1of1000000-4 881741242 879839470 -0.22% BenchmarkQuerierSelect/Block/10of1000000-4 880381562 882846038 +0.28% BenchmarkQuerierSelect/Block/100of1000000-4 887519357 881016916 -0.73% BenchmarkQuerierSelect/Block/1000of1000000-4 902194205 883433524 -2.08% BenchmarkQuerierSelect/Block/10000of1000000-4 892321964 885130170 -0.81% BenchmarkQuerierSelect/Block/100000of1000000-4 938604466 933527150 -0.54% BenchmarkQuerierSelect/Block/1000000of1000000-4 1313510845 1295881124 -1.34% benchmark old allocs new allocs delta BenchmarkQuerierSelect/Head/1of1000000-4 4000056 4000018 -0.00% BenchmarkQuerierSelect/Head/10of1000000-4 4000074 4000036 -0.00% BenchmarkQuerierSelect/Head/100of1000000-4 4000254 4000216 -0.00% BenchmarkQuerierSelect/Head/1000of1000000-4 4002054 4002016 -0.00% BenchmarkQuerierSelect/Head/10000of1000000-4 4020054 4020016 -0.00% BenchmarkQuerierSelect/Head/100000of1000000-4 4200054 4200016 -0.00% BenchmarkQuerierSelect/Head/1000000of1000000-4 6000054 6000016 -0.00% BenchmarkQuerierSelect/SortedHead/1of1000000-4 4000071 4000071 +0.00% BenchmarkQuerierSelect/SortedHead/10of1000000-4 4000089 4000089 +0.00% BenchmarkQuerierSelect/SortedHead/100of1000000-4 4000269 4000269 +0.00% BenchmarkQuerierSelect/SortedHead/1000of1000000-4 4002069 4002069 +0.00% BenchmarkQuerierSelect/SortedHead/10000of1000000-4 4020069 4020069 +0.00% BenchmarkQuerierSelect/SortedHead/100000of1000000-4 4200069 4200069 +0.00% BenchmarkQuerierSelect/SortedHead/1000000of1000000-4 6000069 6000069 +0.00% BenchmarkQuerierSelect/Block/1of1000000-4 6000023 6000022 -0.00% BenchmarkQuerierSelect/Block/10of1000000-4 6000059 6000058 -0.00% BenchmarkQuerierSelect/Block/100of1000000-4 6000419 6000418 -0.00% BenchmarkQuerierSelect/Block/1000of1000000-4 6004019 6004018 -0.00% BenchmarkQuerierSelect/Block/10000of1000000-4 6040019 6040018 -0.00% BenchmarkQuerierSelect/Block/100000of1000000-4 6400019 6400018 -0.00% BenchmarkQuerierSelect/Block/1000000of1000000-4 10000020 10000019 -0.00% benchmark old bytes new bytes delta BenchmarkQuerierSelect/Head/1of1000000-4 229192200 176001176 -23.21% BenchmarkQuerierSelect/Head/10of1000000-4 229193352 176002328 -23.21% BenchmarkQuerierSelect/Head/100of1000000-4 229204872 176013848 -23.21% BenchmarkQuerierSelect/Head/1000of1000000-4 229320072 176129048 -23.20% BenchmarkQuerierSelect/Head/10000of1000000-4 230472072 177281048 -23.08% BenchmarkQuerierSelect/Head/100000of1000000-4 241992072 188801048 -21.98% BenchmarkQuerierSelect/Head/1000000of1000000-4 357192072 304001048 -14.89% BenchmarkQuerierSelect/SortedHead/1of1000000-4 229193928 229193928 +0.00% BenchmarkQuerierSelect/SortedHead/10of1000000-4 229195080 229195080 +0.00% BenchmarkQuerierSelect/SortedHead/100of1000000-4 229206600 229206600 +0.00% BenchmarkQuerierSelect/SortedHead/1000of1000000-4 229321800 229321800 +0.00% BenchmarkQuerierSelect/SortedHead/10000of1000000-4 230473800 230473800 +0.00% BenchmarkQuerierSelect/SortedHead/100000of1000000-4 241993800 241993800 +0.00% BenchmarkQuerierSelect/SortedHead/1000000of1000000-4 357193800 357193800 +0.00% BenchmarkQuerierSelect/Block/1of1000000-4 227201516 227201500 -0.00% BenchmarkQuerierSelect/Block/10of1000000-4 227202924 227202908 -0.00% BenchmarkQuerierSelect/Block/100of1000000-4 227217036 227217020 -0.00% BenchmarkQuerierSelect/Block/1000of1000000-4 227358156 227358140 -0.00% BenchmarkQuerierSelect/Block/10000of1000000-4 228769356 228769340 -0.00% BenchmarkQuerierSelect/Block/100000of1000000-4 242881356 242881340 -0.00% BenchmarkQuerierSelect/Block/1000000of1000000-4 384001616 384001600 -0.00% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
2020-01-17 03:21:44 -08:00
// FromQueryResult unpacks and sorts a QueryResult proto.
2017-10-23 13:28:17 -07:00
func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
series := make([]storage.Series, 0, len(res.Timeseries))
for _, ts := range res.Timeseries {
labels := labelProtosToLabels(ts.Labels)
if err := validateLabelsAndMetricName(labels); err != nil {
return errSeriesSet{err: err}
2017-10-23 06:44:57 -07:00
}
2017-10-23 13:28:17 -07:00
series = append(series, &concreteSeries{
labels: labels,
samples: ts.Samples,
})
2017-10-23 06:44:57 -07:00
}
2017-10-23 13:28:17 -07:00
sort.Sort(byLabel(series))
return &concreteSeriesSet{
series: series,
}
}
// NegotiateResponseType returns first accepted response type that this server supports.
// On the empty accepted list we assume that the SAMPLES response type was requested. This is to maintain backward compatibility.
func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.ReadRequest_ResponseType, error) {
if len(accepted) == 0 {
accepted = []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES}
}
supported := map[prompb.ReadRequest_ResponseType]struct{}{
prompb.ReadRequest_SAMPLES: {},
prompb.ReadRequest_STREAMED_XOR_CHUNKS: {},
}
for _, resType := range accepted {
if _, ok := supported[resType]; ok {
return resType, nil
}
}
return 0, errors.Errorf("server does not support any of the requested response types: %v; supported: %v", accepted, supported)
}
// StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller.
// TODO(bwplotka): Encode only what's needed. Fetch the encoded series from blocks instead of re-encoding everything.
func StreamChunkedReadResponses(
stream io.Writer,
queryIndex int64,
ss storage.SeriesSet,
sortedExternalLabels []prompb.Label,
maxBytesInFrame int,
) error {
var (
chks []prompb.Chunk
lbls []prompb.Label
err error
lblsSize int
)
for ss.Next() {
series := ss.At()
iter := series.Iterator()
lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels)
lblsSize = 0
for _, lbl := range lbls {
lblsSize += lbl.Size()
}
// Send at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for {
// TODO(bwplotka): Use ChunkIterator once available in TSDB instead of re-encoding: https://github.com/prometheus/prometheus/pull/5882
chks, err = encodeChunks(iter, chks, maxBytesInFrame-lblsSize)
if err != nil {
return err
}
if len(chks) == 0 {
break
}
b, err := proto.Marshal(&prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{
{
Labels: lbls,
Chunks: chks,
},
},
QueryIndex: queryIndex,
})
if err != nil {
return errors.Wrap(err, "marshal ChunkedReadResponse")
}
if _, err := stream.Write(b); err != nil {
return errors.Wrap(err, "write to stream")
}
chks = chks[:0]
}
if err := iter.Err(); err != nil {
return err
}
}
if err := ss.Err(); err != nil {
return err
}
return nil
}
// encodeChunks expects iterator to be ready to use (aka iter.Next() called before invoking).
func encodeChunks(iter storage.SeriesIterator, chks []prompb.Chunk, frameBytesLeft int) ([]prompb.Chunk, error) {
const maxSamplesInChunk = 120
var (
chkMint int64
chkMaxt int64
chk *chunkenc.XORChunk
app chunkenc.Appender
err error
)
for iter.Next() {
if chk == nil {
chk = chunkenc.NewXORChunk()
app, err = chk.Appender()
if err != nil {
return nil, err
}
chkMint, _ = iter.At()
}
app.Append(iter.At())
chkMaxt, _ = iter.At()
if chk.NumSamples() < maxSamplesInChunk {
continue
}
// Cut the chunk.
chks = append(chks, prompb.Chunk{
MinTimeMs: chkMint,
MaxTimeMs: chkMaxt,
Type: prompb.Chunk_Encoding(chk.Encoding()),
Data: chk.Bytes(),
})
chk = nil
frameBytesLeft -= chks[len(chks)-1].Size()
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
if frameBytesLeft <= 0 {
break
}
}
if iter.Err() != nil {
return nil, errors.Wrap(iter.Err(), "iter TSDB series")
}
if chk != nil {
// Cut the chunk if exists.
chks = append(chks, prompb.Chunk{
MinTimeMs: chkMint,
MaxTimeMs: chkMaxt,
Type: prompb.Chunk_Encoding(chk.Encoding()),
Data: chk.Bytes(),
})
}
return chks, nil
}
// MergeLabels merges two sets of sorted proto labels, preferring those in
// primary to those in secondary when there is an overlap.
func MergeLabels(primary, secondary []prompb.Label) []prompb.Label {
result := make([]prompb.Label, 0, len(primary)+len(secondary))
i, j := 0, 0
for i < len(primary) && j < len(secondary) {
if primary[i].Name < secondary[j].Name {
result = append(result, primary[i])
i++
} else if primary[i].Name > secondary[j].Name {
result = append(result, secondary[j])
j++
} else {
result = append(result, primary[i])
i++
j++
}
}
for ; i < len(primary); i++ {
result = append(result, primary[i])
}
for ; j < len(secondary); j++ {
result = append(result, secondary[j])
}
return result
}
type byLabel []storage.Series
func (a byLabel) Len() int { return len(a) }
func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
2017-10-23 13:28:17 -07:00
// errSeriesSet implements storage.SeriesSet, just returning an error.
type errSeriesSet struct {
err error
}
2017-10-23 06:44:57 -07:00
2017-10-23 13:28:17 -07:00
func (errSeriesSet) Next() bool {
return false
}
func (errSeriesSet) At() storage.Series {
return nil
}
func (e errSeriesSet) Err() error {
return e.err
}
// concreteSeriesSet implements storage.SeriesSet.
type concreteSeriesSet struct {
cur int
series []storage.Series
}
func (c *concreteSeriesSet) Next() bool {
c.cur++
return c.cur-1 < len(c.series)
}
func (c *concreteSeriesSet) At() storage.Series {
return c.series[c.cur-1]
}
func (c *concreteSeriesSet) Err() error {
return nil
}
2018-04-08 02:51:54 -07:00
// concreteSeries implements storage.Series.
2017-10-23 13:28:17 -07:00
type concreteSeries struct {
labels labels.Labels
samples []prompb.Sample
2017-10-23 13:28:17 -07:00
}
func (c *concreteSeries) Labels() labels.Labels {
return labels.New(c.labels...)
2017-10-23 13:28:17 -07:00
}
func (c *concreteSeries) Iterator() storage.SeriesIterator {
return newConcreteSeriersIterator(c)
}
// concreteSeriesIterator implements storage.SeriesIterator.
type concreteSeriesIterator struct {
cur int
series *concreteSeries
}
func newConcreteSeriersIterator(series *concreteSeries) storage.SeriesIterator {
return &concreteSeriesIterator{
cur: -1,
series: series,
}
}
// Seek implements storage.SeriesIterator.
func (c *concreteSeriesIterator) Seek(t int64) bool {
c.cur = sort.Search(len(c.series.samples), func(n int) bool {
return c.series.samples[n].Timestamp >= t
})
return c.cur < len(c.series.samples)
}
// At implements storage.SeriesIterator.
func (c *concreteSeriesIterator) At() (t int64, v float64) {
s := c.series.samples[c.cur]
return s.Timestamp, s.Value
}
// Next implements storage.SeriesIterator.
func (c *concreteSeriesIterator) Next() bool {
c.cur++
return c.cur < len(c.series.samples)
}
// Err implements storage.SeriesIterator.
func (c *concreteSeriesIterator) Err() error {
return nil
}
// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read,
// also making sure that there are no labels with duplicate names
2017-10-23 13:28:17 -07:00
func validateLabelsAndMetricName(ls labels.Labels) error {
for i, l := range ls {
2017-10-23 13:28:17 -07:00
if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) {
return errors.Errorf("invalid metric name: %v", l.Value)
2017-10-23 13:28:17 -07:00
}
if !model.LabelName(l.Name).IsValid() {
return errors.Errorf("invalid label name: %v", l.Name)
2017-10-23 13:28:17 -07:00
}
if !model.LabelValue(l.Value).IsValid() {
return errors.Errorf("invalid label value: %v", l.Value)
2017-10-23 13:28:17 -07:00
}
if i > 0 && l.Name == ls[i-1].Name {
return errors.Errorf("duplicate label with name: %v", l.Name)
}
2017-10-23 13:28:17 -07:00
}
return nil
2017-10-23 06:44:57 -07:00
}
func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) {
pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers))
for _, m := range matchers {
var mType prompb.LabelMatcher_Type
switch m.Type {
case labels.MatchEqual:
mType = prompb.LabelMatcher_EQ
case labels.MatchNotEqual:
mType = prompb.LabelMatcher_NEQ
case labels.MatchRegexp:
mType = prompb.LabelMatcher_RE
case labels.MatchNotRegexp:
mType = prompb.LabelMatcher_NRE
default:
return nil, errors.New("invalid matcher type")
2017-10-23 06:44:57 -07:00
}
pbMatchers = append(pbMatchers, &prompb.LabelMatcher{
Type: mType,
Name: m.Name,
Value: m.Value,
})
}
return pbMatchers, nil
}
// FromLabelMatchers parses protobuf label matchers to Prometheus label matchers.
func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) {
2017-10-23 06:44:57 -07:00
result := make([]*labels.Matcher, 0, len(matchers))
for _, matcher := range matchers {
var mtype labels.MatchType
switch matcher.Type {
case prompb.LabelMatcher_EQ:
mtype = labels.MatchEqual
case prompb.LabelMatcher_NEQ:
mtype = labels.MatchNotEqual
case prompb.LabelMatcher_RE:
mtype = labels.MatchRegexp
case prompb.LabelMatcher_NRE:
mtype = labels.MatchNotRegexp
default:
return nil, errors.New("invalid matcher type")
2017-10-23 06:44:57 -07:00
}
matcher, err := labels.NewMatcher(mtype, matcher.Name, matcher.Value)
if err != nil {
return nil, err
}
result = append(result, matcher)
}
return result, nil
}
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric
func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
2017-10-23 06:44:57 -07:00
metric := make(model.Metric, len(labelPairs))
for _, l := range labelPairs {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
return metric
}
func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels {
result := make(labels.Labels, 0, len(labelPairs))
for _, l := range labelPairs {
result = append(result, labels.Label{
Name: l.Name,
Value: l.Value,
})
}
sort.Sort(result)
return result
}
// labelsToLabelsProto transforms labels into prompb labels. The buffer slice
// will be used to avoid allocations if it is big enough to store the labels.
func labelsToLabelsProto(labels labels.Labels, buf []prompb.Label) []prompb.Label {
result := buf[:0]
if cap(buf) < len(labels) {
result = make([]prompb.Label, 0, len(labels))
}
2017-10-23 13:28:17 -07:00
for _, l := range labels {
result = append(result, prompb.Label{
Name: l.Name,
Value: l.Value,
2017-10-23 13:28:17 -07:00
})
}
return result
}