From 72475b8a0c896158d2a35ba606dc101dc4f4a73c Mon Sep 17 00:00:00 2001 From: fuling Date: Fri, 20 Mar 2020 22:40:08 +0800 Subject: [PATCH 1/2] [ENHANCEMENT] remote storage:Add default api implementation of remote write Signed-off-by: fuling --- cmd/prometheus/main.go | 2 +- web/api/v1/api.go | 92 +++++++++++++++++++++++++++++++++++++++++- web/api/v1/api_test.go | 54 +++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 3 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 7e153bc329..821f971a70 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -284,7 +284,7 @@ func main() { Default("2m").SetValue(&cfg.queryTimeout) a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently."). - Default("20").IntVar(&cfg.queryConcurrency) + Default("2000").IntVar(&cfg.queryConcurrency) a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). Default("50000000").IntVar(&cfg.queryMaxSamples) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 3225845d3a..dc46e0d2af 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -16,6 +16,7 @@ package v1 import ( "context" "fmt" + "io/ioutil" "math" "math/rand" "net" @@ -27,11 +28,14 @@ import ( "sort" "strconv" "strings" + "sync" "time" "unsafe" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -71,7 +75,6 @@ const ( type errorType string const ( - errorNone errorType = "" errorTimeout errorType = "timeout" errorCanceled errorType = "canceled" errorExec errorType = "execution" @@ -172,6 +175,9 @@ type TSDBAdminStats interface { // them using the provided storage and query engine. type API struct { Queryable storage.SampleAndChunkQueryable + Appendable storage.Appendable + refs map[string]uint64 + refsLock *sync.RWMutex QueryEngine *promql.Engine targetRetriever func(context.Context) TargetRetriever @@ -226,7 +232,10 @@ func NewAPI( ) *API { return &API{ QueryEngine: qe, + refs: make(map[string]uint64), + refsLock: &sync.RWMutex{}, Queryable: q, + Appendable: q, targetRetriever: tr, alertmanagerRetriever: ar, @@ -309,6 +318,7 @@ func (api *API) Register(r *route.Router) { r.Get("/status/flags", wrap(api.serveFlags)) r.Get("/status/tsdb", wrap(api.serveTSDBStatus)) r.Post("/read", api.ready(http.HandlerFunc(api.remoteRead))) + r.Post("/write", api.ready(http.HandlerFunc(api.remoteWrite))) r.Get("/alerts", wrap(api.alerts)) r.Get("/rules", wrap(api.rules)) @@ -667,7 +677,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) { return invalidParamError(err, "match[]") } - q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(end.Add(time.Minute*-5)), timestamp.FromTime(end)) if err != nil { return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } @@ -1496,6 +1506,84 @@ func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.Response } } +func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + level.Error(api.logger).Log("msg", "Read error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + level.Error(api.logger).Log("msg", "Decode error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + var req prompb.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + level.Error(api.logger).Log("msg", "Unmarshal error", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err = api.write(&req) + if err != nil { + level.Error(api.logger).Log("msg", "Api write", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (api *API) write(req *prompb.WriteRequest) error { + var err error = nil + app := api.Appendable.Appender() + defer func() { //TODO:clear api.refs cache + if err != nil { + app.Rollback() + return + } + if err = app.Commit(); err != nil { + return + } + }() + for _, ts := range req.Timeseries { + tsLabels := make(labels.Labels, 0, len(ts.Labels)) + for _, l := range ts.Labels { + tsLabels = append(tsLabels, labels.Label{Name: l.Name, Value: l.Value}) + } + sort.Sort(tsLabels) + tsLabelsKey := tsLabels.String() + for _, s := range ts.Samples { + api.refsLock.RLock() + ref, ok := api.refs[tsLabelsKey] + api.refsLock.RUnlock() + if ok { + err = app.AddFast(ref, s.Timestamp, s.Value) + if err != nil && strings.Contains(err.Error(), "unknown series") { + // + } else { + switch err { + case nil: + case storage.ErrOutOfOrderSample: + //level.Error(api.logger).Log("msg", "AddFast fail .Out of order sample", "err", err, "series", tsLabelsKey, "Timestamp", s.Timestamp, "Value", s.Value) + default: + level.Error(api.logger).Log("msg", "AddFast fail .unexpected error", "err", err, "series", tsLabelsKey, "Timestamp", s.Timestamp, "Value", s.Value) + return err + } + continue + } + } + ref, err = app.Add(tsLabels, s.Timestamp, s.Value) + if err != nil { + return err + } + api.refsLock.Lock() + api.refs[tsLabelsKey] = ref + api.refsLock.Unlock() + } + } + return nil +} + // filterExtLabelsFromMatchers change equality matchers which match external labels // to a matcher that looks for an empty label, // as that label should not be present in the storage. diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 1702fa5860..7e400dba18 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -29,6 +29,7 @@ import ( "runtime" "sort" "strings" + "sync" "testing" "time" @@ -60,6 +61,10 @@ import ( "github.com/prometheus/prometheus/util/teststorage" ) +const ( + errorNone errorType = "" +) + // testMetaStore satisfies the scrape.MetricMetadataStore interface. // It is used to inject specific metadata as part of a test case. type testMetaStore struct { @@ -2249,6 +2254,55 @@ func TestStreamReadEndpoint(t *testing.T) { }, results) } +func TestSampledWriteEndpoint(t *testing.T) { + + samples := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, + }, + } + + req := &prompb.WriteRequest{ + Timeseries: samples, + } + + suite, err := promql.NewTest(t, ` + load 1m + test_metric1{foo="bar",baz="qux"} 1 + `) + testutil.Ok(t, err) + + defer suite.Close() + + err = suite.Run() + testutil.Ok(t, err) + + api := &API{ + Appendable: suite.Storage(), + refs: make(map[string]uint64), + refsLock: &sync.RWMutex{}, + } + err = api.write(req) + testutil.Ok(t, err) +} + type fakeDB struct { err error } From d479151f1f96ece3f4b1330991ef1b54fed7b0a5 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 30 Jan 2021 11:04:48 +0000 Subject: [PATCH 2/2] Various enhancements and refactorings for remote write receiver: - Remove unrelated changes - Refactor code out of the API module - that is already getting pretty crowded. - Don't track reference for AddFast in remote write. This has the potential to consume unlimited server-side memory if a malicious client pushes a different label set for every series. For now, its easier and safer to always use the 'slow' path. - Return 400 on out of order samples. - Use remote.DecodeWriteRequest in the remote write adapters. - Put this behing the 'remote-write-server' feature flag - Add some (very) basic docs. - Used named return & add test for commit error propagation Signed-off-by: Tom Wilkie --- cmd/prometheus/main.go | 8 +- docs/disabled_features.md | 6 + docs/storage.md | 11 +- .../example_write_adapter/server.go | 19 +-- .../remote_storage_adapter/main.go | 19 +-- storage/remote/codec.go | 21 +++ storage/remote/codec_test.go | 35 +++++ storage/remote/write_hander.go | 86 +++++++++++ storage/remote/write_handler_test.go | 138 ++++++++++++++++++ web/api/v1/api.go | 116 +++------------ web/api/v1/api_test.go | 54 ------- web/web.go | 2 + 12 files changed, 329 insertions(+), 186 deletions(-) create mode 100644 storage/remote/write_hander.go create mode 100644 storage/remote/write_handler_test.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 821f971a70..4c59aa7b57 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -133,6 +133,10 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { switch o { case "promql-at-modifier": c.enablePromQLAtModifier = true + level.Info(logger).Log("msg", "Experimental promql-at-modifier enabled") + case "remote-write-receiver": + c.web.RemoteWriteReceiver = true + level.Info(logger).Log("msg", "Experimental remote-write-receiver enabled") case "": continue default: @@ -284,12 +288,12 @@ func main() { Default("2m").SetValue(&cfg.queryTimeout) a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently."). - Default("2000").IntVar(&cfg.queryConcurrency) + Default("20").IntVar(&cfg.queryConcurrency) a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). Default("50000000").IntVar(&cfg.queryMaxSamples) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: 'promql-at-modifier' to enable the @ modifier. See https://prometheus.io/docs/prometheus/latest/disabled_features/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: 'promql-at-modifier' to enable the @ modifier, 'remote-write-receiver' to enable remote write receiver. See https://prometheus.io/docs/prometheus/latest/disabled_features/ for more details."). Default("").StringsVar(&cfg.featureList) promlogflag.AddFlags(a, &cfg.promlogConfig) diff --git a/docs/disabled_features.md b/docs/disabled_features.md index 2af789311d..b579b38342 100644 --- a/docs/disabled_features.md +++ b/docs/disabled_features.md @@ -17,3 +17,9 @@ They may be enabled by default in future versions. This feature lets you specify the evaluation time for instant vector selectors, range vector selectors, and subqueries. More details can be found [here](querying/basics.md#@-modifier). + +## Remote Write Receiver + +`--enable-feature=remote-write-receiver` + +The remote write receiver allows Prometheus to accept remote write requests from other Prometheus servers. More details can be found [here](storage.md#overview). diff --git a/docs/storage.md b/docs/storage.md index f10b51a7e9..6e395724e4 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -88,7 +88,7 @@ needed_disk_space = retention_time_seconds * ingested_samples_per_second * bytes To lower the rate of ingested samples, you can either reduce the number of time series you scrape (fewer targets or fewer series per target), or you can increase the scrape interval. However, reducing the number of series is likely more effective, due to compression of samples within a series. -If your local storage becomes corrupted for whatever reason, the best +If your local storage becomes corrupted for whatever reason, the best strategy to address the problem is to shut down Prometheus then remove the entire storage directory. You can also try removing individual block directories, or the WAL directory to resolve the problem. Note that this means losing @@ -111,9 +111,10 @@ a set of interfaces that allow integrating with remote storage systems. ### Overview -Prometheus integrates with remote storage systems in two ways: +Prometheus integrates with remote storage systems in three ways: * Prometheus can write samples that it ingests to a remote URL in a standardized format. +* Prometheus can receive samples from other Prometheus servers in a standardized format. * Prometheus can read (back) sample data from a remote URL in a standardized format. ![Remote read and write architecture](images/remote_integrations.png) @@ -122,6 +123,8 @@ The read and write protocols both use a snappy-compressed protocol buffer encodi For details on configuring remote storage integrations in Prometheus, see the [remote write](configuration/configuration.md#remote_write) and [remote read](configuration/configuration.md#remote_read) sections of the Prometheus configuration documentation. +The built-in remote write receiver can be enabled by setting the `--enable-feature=remote-write-receiver` command line flag. When enabled, the remote write receiver endpoint is `/api/v1/write`. + For details on the request and response messages, see the [remote storage protocol buffer definitions](https://github.com/prometheus/prometheus/blob/master/prompb/remote.proto). Note that on the read path, Prometheus only fetches raw series data for a set of label selectors and time ranges from the remote end. All PromQL evaluation on the raw data still happens in Prometheus itself. This means that remote read queries have some scalability limit, since all necessary data needs to be loaded into the querying Prometheus server first and then processed there. However, supporting fully distributed evaluation of PromQL was deemed infeasible for the time being. @@ -138,7 +141,7 @@ If a user wants to create blocks into the TSDB from data that is in [OpenMetrics A typical use case is to migrate metrics data from a different monitoring system or time-series database to Prometheus. To do so, the user must first convert the source data into [OpenMetrics](https://openmetrics.io/) format, which is the input format for the backfilling as described below. -### Usage +### Usage Backfilling can be used via the Promtool command line. Promtool will write the blocks to a directory. By default this output directory is ./data/, you can change it by using the name of the desired output directory as an optional argument in the sub-command. @@ -146,4 +149,4 @@ Backfilling can be used via the Promtool command line. Promtool will write the b promtool tsdb create-blocks-from openmetrics [] ``` -After the creation of the blocks, move it to the data directory of Prometheus. If there is an overlap with the existing blocks in Prometheus, the flag `--storage.tsdb.allow-overlapping-blocks` needs to be set. Note that any backfilled data is subject to the retention configured for your Prometheus server (by time or size). \ No newline at end of file +After the creation of the blocks, move it to the data directory of Prometheus. If there is an overlap with the existing blocks in Prometheus, the flag `--storage.tsdb.allow-overlapping-blocks` needs to be set. Note that any backfilled data is subject to the retention configured for your Prometheus server (by time or size). diff --git a/documentation/examples/remote_storage/example_write_adapter/server.go b/documentation/examples/remote_storage/example_write_adapter/server.go index b94ef8fc25..c61ed7b410 100644 --- a/documentation/examples/remote_storage/example_write_adapter/server.go +++ b/documentation/examples/remote_storage/example_write_adapter/server.go @@ -15,33 +15,18 @@ package main import ( "fmt" - "io/ioutil" "log" "net/http" - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" ) func main() { http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) { - compressed, err := ioutil.ReadAll(r.Body) + req, err := remote.DecodeWriteRequest(r.Body) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - reqBuf, err := snappy.Decode(nil, compressed) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - var req prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } diff --git a/documentation/examples/remote_storage/remote_storage_adapter/main.go b/documentation/examples/remote_storage/remote_storage_adapter/main.go index f445fb0240..de35ecf693 100644 --- a/documentation/examples/remote_storage/remote_storage_adapter/main.go +++ b/documentation/examples/remote_storage/remote_storage_adapter/main.go @@ -42,6 +42,7 @@ import ( "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/influxdb" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/opentsdb" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" ) type config struct { @@ -211,28 +212,14 @@ func buildClients(logger log.Logger, cfg *config) ([]writer, []reader) { func serve(logger log.Logger, addr string, writers []writer, readers []reader) error { http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) { - compressed, err := ioutil.ReadAll(r.Body) + req, err := remote.DecodeWriteRequest(r.Body) if err != nil { level.Error(logger).Log("msg", "Read error", "err", err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } - reqBuf, err := snappy.Decode(nil, compressed) - if err != nil { - level.Error(logger).Log("msg", "Decode error", "err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - var req prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { - level.Error(logger).Log("msg", "Unmarshal error", "err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - samples := protoToSamples(&req) + samples := protoToSamples(req) receivedSamples.Add(float64(len(samples))) var wg sync.WaitGroup diff --git a/storage/remote/codec.go b/storage/remote/codec.go index f8033111cb..0bd1b97622 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -497,3 +497,24 @@ func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_M return prompb.MetricMetadata_MetricType(v) } + +// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling +// snappy decompression. +func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { + compressed, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var req prompb.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + return nil, err + } + + return &req, nil +} diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index bd507b7c43..0f6a56bdad 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -14,6 +14,7 @@ package remote import ( + "bytes" "fmt" "testing" @@ -25,6 +26,31 @@ import ( "github.com/prometheus/prometheus/storage" ) +var writeRequestFixture = &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, + }, + }, +} + func TestValidateLabelsAndMetricName(t *testing.T) { tests := []struct { input labels.Labels @@ -262,3 +288,12 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) { }) } } + +func TestDecodeWriteRequest(t *testing.T) { + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) + require.NoError(t, err) + + actual, err := DecodeWriteRequest(bytes.NewReader(buf)) + require.NoError(t, err) + require.Equal(t, writeRequestFixture, actual) +} diff --git a/storage/remote/write_hander.go b/storage/remote/write_hander.go new file mode 100644 index 0000000000..c0538e2b2f --- /dev/null +++ b/storage/remote/write_hander.go @@ -0,0 +1,86 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "net/http" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" +) + +type handler struct { + logger log.Logger + appendable storage.Appendable +} + +// NewWriteHandler creates a http.Handler that accepts remote write requests and +// writes them to the provided appendable. +func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler { + return &handler{ + logger: logger, + appendable: appendable, + } +} + +func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + req, err := DecodeWriteRequest(r.Body) + if err != nil { + level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = h.write(r.Context(), req) + switch err { + case nil: + case storage.ErrOutOfOrderSample, storage.ErrOutOfBounds, storage.ErrDuplicateSampleForTimestamp: + // Indicated an out of order sample is a bad request to prevent retries. + level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + default: + level.Error(h.logger).Log("msg", "Error appending remote write", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func (h *handler) write(ctx context.Context, req *prompb.WriteRequest) (err error) { + app := h.appendable.Appender(ctx) + defer func() { + if err != nil { + app.Rollback() + return + } + err = app.Commit() + }() + + for _, ts := range req.Timeseries { + labels := labelProtosToLabels(ts.Labels) + for _, s := range ts.Samples { + _, err = app.Add(labels, s.Timestamp, s.Value) + if err != nil { + return err + } + } + } + + return nil +} diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go new file mode 100644 index 0000000000..d6d66d0d3d --- /dev/null +++ b/storage/remote/write_handler_test.go @@ -0,0 +1,138 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" +) + +func TestRemoteWriteHandler(t *testing.T) { + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{} + handler := NewWriteHandler(nil, appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusNoContent, resp.StatusCode) + + i := 0 + for _, ts := range writeRequestFixture.Timeseries { + labels := labelProtosToLabels(ts.Labels) + for _, s := range ts.Samples { + require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) + i++ + } + } +} + +func TestOutOfOrder(t *testing.T) { + buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + }}, nil, nil) + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{ + latest: 100, + } + handler := NewWriteHandler(log.NewNopLogger(), appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +func TestCommitErr(t *testing.T) { + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{ + commitErr: fmt.Errorf("commit error"), + } + handler := NewWriteHandler(log.NewNopLogger(), appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusInternalServerError, resp.StatusCode) + require.Equal(t, "commit error\n", string(body)) +} + +type mockAppendable struct { + latest int64 + samples []mockSample + commitErr error +} + +type mockSample struct { + l labels.Labels + t int64 + v float64 +} + +func (m *mockAppendable) Appender(_ context.Context) storage.Appender { + return m +} + +func (m *mockAppendable) Add(l labels.Labels, t int64, v float64) (uint64, error) { + if t < m.latest { + return 0, storage.ErrOutOfOrderSample + } + + m.latest = t + m.samples = append(m.samples, mockSample{l, t, v}) + return 0, nil +} + +func (m *mockAppendable) Commit() error { + return m.commitErr +} + +func (*mockAppendable) AddFast(uint64, int64, float64) error { + return fmt.Errorf("not implemented") +} + +func (*mockAppendable) Rollback() error { + return fmt.Errorf("not implemented") +} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index dc46e0d2af..d18dc3be2e 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -16,7 +16,6 @@ package v1 import ( "context" "fmt" - "io/ioutil" "math" "math/rand" "net" @@ -28,14 +27,11 @@ import ( "sort" "strconv" "strings" - "sync" "time" "unsafe" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -75,6 +71,7 @@ const ( type errorType string const ( + errorNone errorType = "" errorTimeout errorType = "timeout" errorCanceled errorType = "canceled" errorExec errorType = "execution" @@ -175,9 +172,6 @@ type TSDBAdminStats interface { // them using the provided storage and query engine. type API struct { Queryable storage.SampleAndChunkQueryable - Appendable storage.Appendable - refs map[string]uint64 - refsLock *sync.RWMutex QueryEngine *promql.Engine targetRetriever func(context.Context) TargetRetriever @@ -200,6 +194,7 @@ type API struct { buildInfo *PrometheusVersion runtimeInfo func() (RuntimeInfo, error) gatherer prometheus.Gatherer + remoteWriteHandler http.Handler } func init() { @@ -210,7 +205,7 @@ func init() { // NewAPI returns an initialized API type. func NewAPI( qe *promql.Engine, - q storage.SampleAndChunkQueryable, + s storage.Storage, tr func(context.Context) TargetRetriever, ar func(context.Context) AlertmanagerRetriever, configFunc func() config.Config, @@ -229,13 +224,12 @@ func NewAPI( runtimeInfo func() (RuntimeInfo, error), buildInfo *PrometheusVersion, gatherer prometheus.Gatherer, + remoteWriteReceiver bool, ) *API { - return &API{ - QueryEngine: qe, - refs: make(map[string]uint64), - refsLock: &sync.RWMutex{}, - Queryable: q, - Appendable: q, + a := &API{ + QueryEngine: qe, + Queryable: s, + targetRetriever: tr, alertmanagerRetriever: ar, @@ -257,6 +251,12 @@ func NewAPI( buildInfo: buildInfo, gatherer: gatherer, } + + if remoteWriteReceiver { + a.remoteWriteHandler = remote.NewWriteHandler(logger, s) + } + + return a } func setUnavailStatusOnTSDBNotReady(r apiFuncResult) apiFuncResult { @@ -677,7 +677,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) { return invalidParamError(err, "match[]") } - q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(end.Add(time.Minute*-5)), timestamp.FromTime(end)) + q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } @@ -1506,84 +1506,6 @@ func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.Response } } -func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) { - compressed, err := ioutil.ReadAll(r.Body) - if err != nil { - level.Error(api.logger).Log("msg", "Read error", "err", err.Error()) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - reqBuf, err := snappy.Decode(nil, compressed) - if err != nil { - level.Error(api.logger).Log("msg", "Decode error", "err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - var req prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { - level.Error(api.logger).Log("msg", "Unmarshal error", "err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - err = api.write(&req) - if err != nil { - level.Error(api.logger).Log("msg", "Api write", "err", err.Error()) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} - -func (api *API) write(req *prompb.WriteRequest) error { - var err error = nil - app := api.Appendable.Appender() - defer func() { //TODO:clear api.refs cache - if err != nil { - app.Rollback() - return - } - if err = app.Commit(); err != nil { - return - } - }() - for _, ts := range req.Timeseries { - tsLabels := make(labels.Labels, 0, len(ts.Labels)) - for _, l := range ts.Labels { - tsLabels = append(tsLabels, labels.Label{Name: l.Name, Value: l.Value}) - } - sort.Sort(tsLabels) - tsLabelsKey := tsLabels.String() - for _, s := range ts.Samples { - api.refsLock.RLock() - ref, ok := api.refs[tsLabelsKey] - api.refsLock.RUnlock() - if ok { - err = app.AddFast(ref, s.Timestamp, s.Value) - if err != nil && strings.Contains(err.Error(), "unknown series") { - // - } else { - switch err { - case nil: - case storage.ErrOutOfOrderSample: - //level.Error(api.logger).Log("msg", "AddFast fail .Out of order sample", "err", err, "series", tsLabelsKey, "Timestamp", s.Timestamp, "Value", s.Value) - default: - level.Error(api.logger).Log("msg", "AddFast fail .unexpected error", "err", err, "series", tsLabelsKey, "Timestamp", s.Timestamp, "Value", s.Value) - return err - } - continue - } - } - ref, err = app.Add(tsLabels, s.Timestamp, s.Value) - if err != nil { - return err - } - api.refsLock.Lock() - api.refs[tsLabelsKey] = ref - api.refsLock.Unlock() - } - } - return nil -} - // filterExtLabelsFromMatchers change equality matchers which match external labels // to a matcher that looks for an empty label, // as that label should not be present in the storage. @@ -1610,6 +1532,14 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe return filteredMatchers, nil } +func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) { + if api.remoteWriteHandler != nil { + api.remoteWriteHandler.ServeHTTP(w, r) + } else { + http.Error(w, "remote write receiver needs to be enabled with --enable-feature=remote-write-receiver", http.StatusNotFound) + } +} + func (api *API) deleteSeries(r *http.Request) apiFuncResult { if !api.enableAdmin { return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 7e400dba18..1702fa5860 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -29,7 +29,6 @@ import ( "runtime" "sort" "strings" - "sync" "testing" "time" @@ -61,10 +60,6 @@ import ( "github.com/prometheus/prometheus/util/teststorage" ) -const ( - errorNone errorType = "" -) - // testMetaStore satisfies the scrape.MetricMetadataStore interface. // It is used to inject specific metadata as part of a test case. type testMetaStore struct { @@ -2254,55 +2249,6 @@ func TestStreamReadEndpoint(t *testing.T) { }, results) } -func TestSampledWriteEndpoint(t *testing.T) { - - samples := []prompb.TimeSeries{ - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_metric1"}, - {Name: "b", Value: "c"}, - {Name: "baz", Value: "qux"}, - {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar"}, - }, - Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - }, - { - Labels: []prompb.Label{ - {Name: "__name__", Value: "test_metric1"}, - {Name: "b", Value: "c"}, - {Name: "baz", Value: "qux"}, - {Name: "d", Value: "e"}, - {Name: "foo", Value: "bar"}, - }, - Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, - }, - } - - req := &prompb.WriteRequest{ - Timeseries: samples, - } - - suite, err := promql.NewTest(t, ` - load 1m - test_metric1{foo="bar",baz="qux"} 1 - `) - testutil.Ok(t, err) - - defer suite.Close() - - err = suite.Run() - testutil.Ok(t, err) - - api := &API{ - Appendable: suite.Storage(), - refs: make(map[string]uint64), - refsLock: &sync.RWMutex{}, - } - err = api.write(req) - testutil.Ok(t, err) -} - type fakeDB struct { err error } diff --git a/web/web.go b/web/web.go index c780612f09..3be9df83e6 100644 --- a/web/web.go +++ b/web/web.go @@ -244,6 +244,7 @@ type Options struct { RemoteReadSampleLimit int RemoteReadConcurrencyLimit int RemoteReadBytesInFrame int + RemoteWriteReceiver bool Gatherer prometheus.Gatherer Registerer prometheus.Registerer @@ -322,6 +323,7 @@ func New(logger log.Logger, o *Options) *Handler { h.runtimeInfo, h.versionInfo, o.Gatherer, + o.RemoteWriteReceiver, ) if o.RoutePrefix != "/" {