From 72475b8a0c896158d2a35ba606dc101dc4f4a73c Mon Sep 17 00:00:00 2001 From: fuling Date: Fri, 20 Mar 2020 22:40:08 +0800 Subject: [PATCH] [ENHANCEMENT] remote storage:Add default api implementation of remote write Signed-off-by: fuling --- cmd/prometheus/main.go | 2 +- web/api/v1/api.go | 92 +++++++++++++++++++++++++++++++++++++++++- web/api/v1/api_test.go | 54 +++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 3 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 7e153bc329..821f971a70 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -284,7 +284,7 @@ func main() { Default("2m").SetValue(&cfg.queryTimeout) a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently."). - Default("20").IntVar(&cfg.queryConcurrency) + Default("2000").IntVar(&cfg.queryConcurrency) a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). Default("50000000").IntVar(&cfg.queryMaxSamples) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 3225845d3a..dc46e0d2af 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -16,6 +16,7 @@ package v1 import ( "context" "fmt" + "io/ioutil" "math" "math/rand" "net" @@ -27,11 +28,14 @@ import ( "sort" "strconv" "strings" + "sync" "time" "unsafe" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -71,7 +75,6 @@ const ( type errorType string const ( - errorNone errorType = "" errorTimeout errorType = "timeout" errorCanceled errorType = "canceled" errorExec errorType = "execution" @@ -172,6 +175,9 @@ type TSDBAdminStats interface { // them using the provided storage and query engine. type API struct { Queryable storage.SampleAndChunkQueryable + Appendable storage.Appendable + refs map[string]uint64 + refsLock *sync.RWMutex QueryEngine *promql.Engine targetRetriever func(context.Context) TargetRetriever @@ -226,7 +232,10 @@ func NewAPI( ) *API { return &API{ QueryEngine: qe, + refs: make(map[string]uint64), + refsLock: &sync.RWMutex{}, Queryable: q, + Appendable: q, targetRetriever: tr, alertmanagerRetriever: ar, @@ -309,6 +318,7 @@ func (api *API) Register(r *route.Router) { r.Get("/status/flags", wrap(api.serveFlags)) r.Get("/status/tsdb", wrap(api.serveTSDBStatus)) r.Post("/read", api.ready(http.HandlerFunc(api.remoteRead))) + r.Post("/write", api.ready(http.HandlerFunc(api.remoteWrite))) r.Get("/alerts", wrap(api.alerts)) r.Get("/rules", wrap(api.rules)) @@ -667,7 +677,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) { return invalidParamError(err, "match[]") } - q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(end.Add(time.Minute*-5)), timestamp.FromTime(end)) if err != nil { return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } @@ -1496,6 +1506,84 @@ func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.Response } } +func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + level.Error(api.logger).Log("msg", "Read error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + level.Error(api.logger).Log("msg", "Decode error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + var req prompb.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + level.Error(api.logger).Log("msg", "Unmarshal error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err = api.write(&req) + if err != nil { + level.Error(api.logger).Log("msg", "Api write", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (api *API) write(req *prompb.WriteRequest) error { + var err error = nil + app := api.Appendable.Appender() + defer func() { //TODO:clear api.refs cache + if err != nil { + app.Rollback() + return + } + if err = app.Commit(); err != nil { + return + } + }() + for _, ts := range req.Timeseries { + tsLabels := make(labels.Labels, 0, len(ts.Labels)) + for _, l := range ts.Labels { + tsLabels = append(tsLabels, labels.Label{Name: l.Name, Value: l.Value}) + } + sort.Sort(tsLabels) + tsLabelsKey := tsLabels.String() + for _, s := range ts.Samples { + api.refsLock.RLock() + ref, ok := api.refs[tsLabelsKey] + api.refsLock.RUnlock() + if ok { + err = app.AddFast(ref, s.Timestamp, s.Value) + if err != nil && strings.Contains(err.Error(), "unknown series") { + // + } else { + switch err { + case nil: + case storage.ErrOutOfOrderSample: + //level.Error(api.logger).Log("msg", "AddFast fail .Out of order sample", "err", err, "series", tsLabelsKey, "Timestamp", s.Timestamp, "Value", s.Value) + default: + level.Error(api.logger).Log("msg", "AddFast fail .unexpected error", "err", err, "series", tsLabelsKey, "Timestamp", s.Timestamp, "Value", s.Value) + return err + } + continue + } + } + ref, err = app.Add(tsLabels, s.Timestamp, s.Value) + if err != nil { + return err + } + api.refsLock.Lock() + api.refs[tsLabelsKey] = ref + api.refsLock.Unlock() + } + } + return nil +} + // filterExtLabelsFromMatchers change equality matchers which match external labels // to a matcher that looks for an empty label, // as that label should not be present in the storage. diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 1702fa5860..7e400dba18 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -29,6 +29,7 @@ import ( "runtime" "sort" "strings" + "sync" "testing" "time" @@ -60,6 +61,10 @@ import ( "github.com/prometheus/prometheus/util/teststorage" ) +const ( + errorNone errorType = "" +) + // testMetaStore satisfies the scrape.MetricMetadataStore interface. // It is used to inject specific metadata as part of a test case. type testMetaStore struct { @@ -2249,6 +2254,55 @@ func TestStreamReadEndpoint(t *testing.T) { }, results) } +func TestSampledWriteEndpoint(t *testing.T) { + + samples := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, + }, + } + + req := &prompb.WriteRequest{ + Timeseries: samples, + } + + suite, err := promql.NewTest(t, ` + load 1m + test_metric1{foo="bar",baz="qux"} 1 + `) + testutil.Ok(t, err) + + defer suite.Close() + + err = suite.Run() + testutil.Ok(t, err) + + api := &API{ + Appendable: suite.Storage(), + refs: make(map[string]uint64), + refsLock: &sync.RWMutex{}, + } + err = api.write(req) + testutil.Ok(t, err) +} + type fakeDB struct { err error }