2017-10-23 08:56:47 -07:00
// Copyright 2017 The Prometheus Authors
2017-10-23 06:44:57 -07:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"fmt"
2018-06-08 00:19:20 -07:00
"io"
2017-10-23 06:44:57 -07:00
"io/ioutil"
"net/http"
2017-10-23 08:56:47 -07:00
"sort"
2020-11-19 07:23:03 -08:00
"strings"
2017-10-23 06:44:57 -07:00
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
2019-03-25 16:01:12 -07:00
"github.com/pkg/errors"
2017-10-23 06:44:57 -07:00
"github.com/prometheus/common/model"
2020-10-22 02:00:08 -07:00
2021-11-08 06:23:17 -08:00
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
2017-10-23 06:44:57 -07:00
"github.com/prometheus/prometheus/prompb"
2017-10-23 13:28:17 -07:00
"github.com/prometheus/prometheus/storage"
2019-08-19 13:16:10 -07:00
"github.com/prometheus/prometheus/tsdb/chunkenc"
2017-10-23 06:44:57 -07:00
)
2018-06-08 00:19:20 -07:00
// decodeReadLimit is the maximum size of a read request body in bytes.
const decodeReadLimit = 32 * 1024 * 1024
2018-09-05 06:50:50 -07:00
type HTTPError struct {
msg string
status int
}
func ( e HTTPError ) Error ( ) string {
return e . msg
}
func ( e HTTPError ) Status ( ) int {
return e . status
}
2017-10-23 06:44:57 -07:00
// DecodeReadRequest reads a remote.Request from a http.Request.
func DecodeReadRequest ( r * http . Request ) ( * prompb . ReadRequest , error ) {
2018-06-08 00:19:20 -07:00
compressed , err := ioutil . ReadAll ( io . LimitReader ( r . Body , decodeReadLimit ) )
2017-10-23 06:44:57 -07:00
if err != nil {
return nil , err
}
reqBuf , err := snappy . Decode ( nil , compressed )
if err != nil {
return nil , err
}
var req prompb . ReadRequest
if err := proto . Unmarshal ( reqBuf , & req ) ; err != nil {
return nil , err
}
return & req , nil
}
// EncodeReadResponse writes a remote.Response to a http.ResponseWriter.
func EncodeReadResponse ( resp * prompb . ReadResponse , w http . ResponseWriter ) error {
data , err := proto . Marshal ( resp )
if err != nil {
return err
}
compressed := snappy . Encode ( nil , data )
_ , err = w . Write ( compressed )
return err
}
// ToQuery builds a Query proto.
2020-03-12 02:36:09 -07:00
func ToQuery ( from , to int64 , matchers [ ] * labels . Matcher , hints * storage . SelectHints ) ( * prompb . Query , error ) {
2017-10-23 06:44:57 -07:00
ms , err := toLabelMatchers ( matchers )
if err != nil {
return nil , err
}
2018-06-18 08:33:04 -07:00
var rp * prompb . ReadHints
2020-03-12 02:36:09 -07:00
if hints != nil {
2018-06-13 00:19:17 -07:00
rp = & prompb . ReadHints {
2020-03-12 02:36:09 -07:00
StartMs : hints . Start ,
EndMs : hints . End ,
StepMs : hints . Step ,
Func : hints . Func ,
Grouping : hints . Grouping ,
By : hints . By ,
RangeMs : hints . Range ,
2018-06-13 00:19:17 -07:00
}
2018-05-08 01:48:13 -07:00
}
2017-10-23 06:44:57 -07:00
return & prompb . Query {
StartTimestampMs : from ,
EndTimestampMs : to ,
Matchers : ms ,
2018-05-08 01:48:13 -07:00
Hints : rp ,
2017-10-23 06:44:57 -07:00
} , nil
}
// ToQueryResult builds a QueryResult proto.
2020-06-09 09:57:31 -07:00
func ToQueryResult ( ss storage . SeriesSet , sampleLimit int ) ( * prompb . QueryResult , storage . Warnings , error ) {
2018-09-05 06:50:50 -07:00
numSamples := 0
2017-10-23 06:44:57 -07:00
resp := & prompb . QueryResult { }
2017-10-23 13:28:17 -07:00
for ss . Next ( ) {
series := ss . At ( )
iter := series . Iterator ( )
2018-09-25 11:14:00 -07:00
samples := [ ] prompb . Sample { }
2017-10-23 13:28:17 -07:00
for iter . Next ( ) {
2018-09-05 06:50:50 -07:00
numSamples ++
if sampleLimit > 0 && numSamples > sampleLimit {
2020-06-09 09:57:31 -07:00
return nil , ss . Warnings ( ) , HTTPError {
2018-09-05 06:50:50 -07:00
msg : fmt . Sprintf ( "exceeded sample limit (%d)" , sampleLimit ) ,
status : http . StatusBadRequest ,
}
}
2017-10-23 13:28:17 -07:00
ts , val := iter . At ( )
2018-09-25 11:14:00 -07:00
samples = append ( samples , prompb . Sample {
2017-10-23 13:28:17 -07:00
Timestamp : ts ,
Value : val ,
2017-10-23 06:44:57 -07:00
} )
}
2017-10-23 13:28:17 -07:00
if err := iter . Err ( ) ; err != nil {
2020-06-09 09:57:31 -07:00
return nil , ss . Warnings ( ) , err
2017-10-23 13:28:17 -07:00
}
resp . Timeseries = append ( resp . Timeseries , & prompb . TimeSeries {
2019-08-12 09:22:02 -07:00
Labels : labelsToLabelsProto ( series . Labels ( ) , nil ) ,
2017-10-23 13:28:17 -07:00
Samples : samples ,
} )
}
2020-06-24 06:41:52 -07:00
return resp , ss . Warnings ( ) , ss . Err ( )
2017-10-23 06:44:57 -07:00
}
2020-01-17 03:21:44 -08:00
// FromQueryResult unpacks and sorts a QueryResult proto.
2020-03-12 02:36:09 -07:00
func FromQueryResult ( sortSeries bool , res * prompb . QueryResult ) storage . SeriesSet {
2017-10-23 13:28:17 -07:00
series := make ( [ ] storage . Series , 0 , len ( res . Timeseries ) )
for _ , ts := range res . Timeseries {
2020-06-24 06:41:52 -07:00
lbls := labelProtosToLabels ( ts . Labels )
if err := validateLabelsAndMetricName ( lbls ) ; err != nil {
2017-10-23 13:28:17 -07:00
return errSeriesSet { err : err }
2017-10-23 06:44:57 -07:00
}
2020-06-24 06:41:52 -07:00
series = append ( series , & concreteSeries { labels : lbls , samples : ts . Samples } )
2017-10-23 06:44:57 -07:00
}
2020-03-12 02:36:09 -07:00
if sortSeries {
sort . Sort ( byLabel ( series ) )
}
2017-10-23 13:28:17 -07:00
return & concreteSeriesSet {
series : series ,
}
}
2019-08-19 13:16:10 -07:00
// NegotiateResponseType returns first accepted response type that this server supports.
// On the empty accepted list we assume that the SAMPLES response type was requested. This is to maintain backward compatibility.
func NegotiateResponseType ( accepted [ ] prompb . ReadRequest_ResponseType ) ( prompb . ReadRequest_ResponseType , error ) {
if len ( accepted ) == 0 {
accepted = [ ] prompb . ReadRequest_ResponseType { prompb . ReadRequest_SAMPLES }
}
supported := map [ prompb . ReadRequest_ResponseType ] struct { } {
prompb . ReadRequest_SAMPLES : { } ,
prompb . ReadRequest_STREAMED_XOR_CHUNKS : { } ,
}
for _ , resType := range accepted {
if _ , ok := supported [ resType ] ; ok {
return resType , nil
}
}
return 0 , errors . Errorf ( "server does not support any of the requested response types: %v; supported: %v" , accepted , supported )
}
2020-06-24 06:41:52 -07:00
// StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller.
// It expects Series set with populated chunks.
func StreamChunkedReadResponses (
stream io . Writer ,
queryIndex int64 ,
ss storage . ChunkSeriesSet ,
sortedExternalLabels [ ] prompb . Label ,
maxBytesInFrame int ,
) ( storage . Warnings , error ) {
var (
chks [ ] prompb . Chunk
lbls [ ] prompb . Label
)
for ss . Next ( ) {
series := ss . At ( )
iter := series . Iterator ( )
lbls = MergeLabels ( labelsToLabelsProto ( series . Labels ( ) , lbls ) , sortedExternalLabels )
frameBytesLeft := maxBytesInFrame
for _ , lbl := range lbls {
frameBytesLeft -= lbl . Size ( )
}
isNext := iter . Next ( )
// Send at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for isNext {
chk := iter . At ( )
if chk . Chunk == nil {
return ss . Warnings ( ) , errors . Errorf ( "StreamChunkedReadResponses: found not populated chunk returned by SeriesSet at ref: %v" , chk . Ref )
}
// Cut the chunk.
chks = append ( chks , prompb . Chunk {
MinTimeMs : chk . MinTime ,
MaxTimeMs : chk . MaxTime ,
Type : prompb . Chunk_Encoding ( chk . Chunk . Encoding ( ) ) ,
Data : chk . Chunk . Bytes ( ) ,
} )
frameBytesLeft -= chks [ len ( chks ) - 1 ] . Size ( )
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
isNext = iter . Next ( )
if frameBytesLeft > 0 && isNext {
continue
}
b , err := proto . Marshal ( & prompb . ChunkedReadResponse {
ChunkedSeries : [ ] * prompb . ChunkedSeries {
{ Labels : lbls , Chunks : chks } ,
} ,
QueryIndex : queryIndex ,
} )
if err != nil {
return ss . Warnings ( ) , errors . Wrap ( err , "marshal ChunkedReadResponse" )
}
if _ , err := stream . Write ( b ) ; err != nil {
return ss . Warnings ( ) , errors . Wrap ( err , "write to stream" )
}
chks = chks [ : 0 ]
}
if err := iter . Err ( ) ; err != nil {
return ss . Warnings ( ) , err
}
}
return ss . Warnings ( ) , ss . Err ( )
}
2019-08-19 13:16:10 -07:00
// MergeLabels merges two sets of sorted proto labels, preferring those in
// primary to those in secondary when there is an overlap.
func MergeLabels ( primary , secondary [ ] prompb . Label ) [ ] prompb . Label {
result := make ( [ ] prompb . Label , 0 , len ( primary ) + len ( secondary ) )
i , j := 0 , 0
for i < len ( primary ) && j < len ( secondary ) {
if primary [ i ] . Name < secondary [ j ] . Name {
result = append ( result , primary [ i ] )
i ++
} else if primary [ i ] . Name > secondary [ j ] . Name {
result = append ( result , secondary [ j ] )
j ++
} else {
result = append ( result , primary [ i ] )
i ++
j ++
}
}
for ; i < len ( primary ) ; i ++ {
result = append ( result , primary [ i ] )
}
for ; j < len ( secondary ) ; j ++ {
result = append ( result , secondary [ j ] )
}
return result
}
2017-11-11 17:15:27 -08:00
type byLabel [ ] storage . Series
func ( a byLabel ) Len ( ) int { return len ( a ) }
func ( a byLabel ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
func ( a byLabel ) Less ( i , j int ) bool { return labels . Compare ( a [ i ] . Labels ( ) , a [ j ] . Labels ( ) ) < 0 }
2017-10-23 13:28:17 -07:00
// errSeriesSet implements storage.SeriesSet, just returning an error.
type errSeriesSet struct {
err error
}
2017-10-23 06:44:57 -07:00
2017-10-23 13:28:17 -07:00
func ( errSeriesSet ) Next ( ) bool {
return false
}
func ( errSeriesSet ) At ( ) storage . Series {
return nil
}
func ( e errSeriesSet ) Err ( ) error {
return e . err
}
2020-06-09 09:57:31 -07:00
func ( e errSeriesSet ) Warnings ( ) storage . Warnings { return nil }
2017-10-23 13:28:17 -07:00
// concreteSeriesSet implements storage.SeriesSet.
type concreteSeriesSet struct {
cur int
series [ ] storage . Series
}
func ( c * concreteSeriesSet ) Next ( ) bool {
c . cur ++
return c . cur - 1 < len ( c . series )
}
func ( c * concreteSeriesSet ) At ( ) storage . Series {
return c . series [ c . cur - 1 ]
}
func ( c * concreteSeriesSet ) Err ( ) error {
return nil
}
2020-06-09 09:57:31 -07:00
func ( c * concreteSeriesSet ) Warnings ( ) storage . Warnings { return nil }
2018-04-08 02:51:54 -07:00
// concreteSeries implements storage.Series.
2017-10-23 13:28:17 -07:00
type concreteSeries struct {
labels labels . Labels
2018-09-25 11:14:00 -07:00
samples [ ] prompb . Sample
2017-10-23 13:28:17 -07:00
}
func ( c * concreteSeries ) Labels ( ) labels . Labels {
2017-11-11 15:47:47 -08:00
return labels . New ( c . labels ... )
2017-10-23 13:28:17 -07:00
}
2020-02-06 07:58:38 -08:00
func ( c * concreteSeries ) Iterator ( ) chunkenc . Iterator {
2017-10-23 13:28:17 -07:00
return newConcreteSeriersIterator ( c )
}
// concreteSeriesIterator implements storage.SeriesIterator.
type concreteSeriesIterator struct {
cur int
series * concreteSeries
}
2020-02-06 07:58:38 -08:00
func newConcreteSeriersIterator ( series * concreteSeries ) chunkenc . Iterator {
2017-10-23 13:28:17 -07:00
return & concreteSeriesIterator {
cur : - 1 ,
series : series ,
}
}
// Seek implements storage.SeriesIterator.
func ( c * concreteSeriesIterator ) Seek ( t int64 ) bool {
2021-11-29 01:47:56 -08:00
if c . cur == - 1 {
c . cur = 0
}
// No-op check.
if s := c . series . samples [ c . cur ] ; s . Timestamp >= t {
return true
}
// Do binary search between current position and end.
c . cur += sort . Search ( len ( c . series . samples ) - c . cur , func ( n int ) bool {
return c . series . samples [ n + c . cur ] . Timestamp >= t
2017-10-23 13:28:17 -07:00
} )
return c . cur < len ( c . series . samples )
}
// At implements storage.SeriesIterator.
func ( c * concreteSeriesIterator ) At ( ) ( t int64 , v float64 ) {
s := c . series . samples [ c . cur ]
return s . Timestamp , s . Value
}
// Next implements storage.SeriesIterator.
func ( c * concreteSeriesIterator ) Next ( ) bool {
c . cur ++
return c . cur < len ( c . series . samples )
}
// Err implements storage.SeriesIterator.
func ( c * concreteSeriesIterator ) Err ( ) error {
return nil
}
2019-08-07 08:13:10 -07:00
// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read,
// also making sure that there are no labels with duplicate names
2017-10-23 13:28:17 -07:00
func validateLabelsAndMetricName ( ls labels . Labels ) error {
2019-08-07 08:13:10 -07:00
for i , l := range ls {
2017-10-23 13:28:17 -07:00
if l . Name == labels . MetricName && ! model . IsValidMetricName ( model . LabelValue ( l . Value ) ) {
2019-03-25 16:01:12 -07:00
return errors . Errorf ( "invalid metric name: %v" , l . Value )
2017-10-23 13:28:17 -07:00
}
if ! model . LabelName ( l . Name ) . IsValid ( ) {
2019-03-25 16:01:12 -07:00
return errors . Errorf ( "invalid label name: %v" , l . Name )
2017-10-23 13:28:17 -07:00
}
if ! model . LabelValue ( l . Value ) . IsValid ( ) {
2019-03-25 16:01:12 -07:00
return errors . Errorf ( "invalid label value: %v" , l . Value )
2017-10-23 13:28:17 -07:00
}
2019-08-07 08:13:10 -07:00
if i > 0 && l . Name == ls [ i - 1 ] . Name {
return errors . Errorf ( "duplicate label with name: %v" , l . Name )
}
2017-10-23 13:28:17 -07:00
}
return nil
2017-10-23 06:44:57 -07:00
}
func toLabelMatchers ( matchers [ ] * labels . Matcher ) ( [ ] * prompb . LabelMatcher , error ) {
pbMatchers := make ( [ ] * prompb . LabelMatcher , 0 , len ( matchers ) )
for _ , m := range matchers {
var mType prompb . LabelMatcher_Type
switch m . Type {
case labels . MatchEqual :
mType = prompb . LabelMatcher_EQ
case labels . MatchNotEqual :
mType = prompb . LabelMatcher_NEQ
case labels . MatchRegexp :
mType = prompb . LabelMatcher_RE
case labels . MatchNotRegexp :
mType = prompb . LabelMatcher_NRE
default :
2019-03-25 16:01:12 -07:00
return nil , errors . New ( "invalid matcher type" )
2017-10-23 06:44:57 -07:00
}
pbMatchers = append ( pbMatchers , & prompb . LabelMatcher {
Type : mType ,
Name : m . Name ,
Value : m . Value ,
} )
}
return pbMatchers , nil
}
2019-08-19 13:16:10 -07:00
// FromLabelMatchers parses protobuf label matchers to Prometheus label matchers.
func FromLabelMatchers ( matchers [ ] * prompb . LabelMatcher ) ( [ ] * labels . Matcher , error ) {
2017-10-23 06:44:57 -07:00
result := make ( [ ] * labels . Matcher , 0 , len ( matchers ) )
for _ , matcher := range matchers {
var mtype labels . MatchType
switch matcher . Type {
case prompb . LabelMatcher_EQ :
mtype = labels . MatchEqual
case prompb . LabelMatcher_NEQ :
mtype = labels . MatchNotEqual
case prompb . LabelMatcher_RE :
mtype = labels . MatchRegexp
case prompb . LabelMatcher_NRE :
mtype = labels . MatchNotRegexp
default :
2019-03-25 16:01:12 -07:00
return nil , errors . New ( "invalid matcher type" )
2017-10-23 06:44:57 -07:00
}
matcher , err := labels . NewMatcher ( mtype , matcher . Name , matcher . Value )
if err != nil {
return nil , err
}
result = append ( result , matcher )
}
return result , nil
}
2021-09-21 13:53:27 -07:00
func exemplarProtoToExemplar ( ep prompb . Exemplar ) exemplar . Exemplar {
timestamp := ep . Timestamp
return exemplar . Exemplar {
Labels : labelProtosToLabels ( ep . Labels ) ,
Value : ep . Value ,
Ts : timestamp ,
HasTs : timestamp != 0 ,
}
}
2017-10-23 08:56:47 -07:00
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric
func LabelProtosToMetric ( labelPairs [ ] * prompb . Label ) model . Metric {
2017-10-23 06:44:57 -07:00
metric := make ( model . Metric , len ( labelPairs ) )
for _ , l := range labelPairs {
metric [ model . LabelName ( l . Name ) ] = model . LabelValue ( l . Value )
}
return metric
}
2017-10-23 08:56:47 -07:00
2019-01-15 11:13:39 -08:00
func labelProtosToLabels ( labelPairs [ ] prompb . Label ) labels . Labels {
2017-10-23 08:56:47 -07:00
result := make ( labels . Labels , 0 , len ( labelPairs ) )
for _ , l := range labelPairs {
result = append ( result , labels . Label {
Name : l . Name ,
Value : l . Value ,
} )
}
sort . Sort ( result )
return result
}
2019-08-12 09:22:02 -07:00
// labelsToLabelsProto transforms labels into prompb labels. The buffer slice
// will be used to avoid allocations if it is big enough to store the labels.
func labelsToLabelsProto ( labels labels . Labels , buf [ ] prompb . Label ) [ ] prompb . Label {
result := buf [ : 0 ]
if cap ( buf ) < len ( labels ) {
result = make ( [ ] prompb . Label , 0 , len ( labels ) )
}
2017-10-23 13:28:17 -07:00
for _ , l := range labels {
2019-01-15 11:13:39 -08:00
result = append ( result , prompb . Label {
2019-08-07 12:39:07 -07:00
Name : l . Name ,
Value : l . Value ,
2017-10-23 13:28:17 -07:00
} )
}
return result
}
2020-11-19 07:23:03 -08:00
// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.
func metricTypeToMetricTypeProto ( t textparse . MetricType ) prompb . MetricMetadata_MetricType {
mt := strings . ToUpper ( string ( t ) )
v , ok := prompb . MetricMetadata_MetricType_value [ mt ]
if ! ok {
return prompb . MetricMetadata_UNKNOWN
}
return prompb . MetricMetadata_MetricType ( v )
}
2021-01-30 03:04:48 -08:00
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
// snappy decompression.
func DecodeWriteRequest ( r io . Reader ) ( * prompb . WriteRequest , error ) {
compressed , err := ioutil . ReadAll ( r )
if err != nil {
return nil , err
}
reqBuf , err := snappy . Decode ( nil , compressed )
if err != nil {
return nil , err
}
var req prompb . WriteRequest
if err := proto . Unmarshal ( reqBuf , & req ) ; err != nil {
return nil , err
}
return & req , nil
}