diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 7e153bc329..4c59aa7b57 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -133,6 +133,10 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { switch o { case "promql-at-modifier": c.enablePromQLAtModifier = true + level.Info(logger).Log("msg", "Experimental promql-at-modifier enabled") + case "remote-write-receiver": + c.web.RemoteWriteReceiver = true + level.Info(logger).Log("msg", "Experimental remote-write-receiver enabled") case "": continue default: @@ -289,7 +293,7 @@ func main() { a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). Default("50000000").IntVar(&cfg.queryMaxSamples) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: 'promql-at-modifier' to enable the @ modifier. See https://prometheus.io/docs/prometheus/latest/disabled_features/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: 'promql-at-modifier' to enable the @ modifier, 'remote-write-receiver' to enable remote write receiver. See https://prometheus.io/docs/prometheus/latest/disabled_features/ for more details."). Default("").StringsVar(&cfg.featureList) promlogflag.AddFlags(a, &cfg.promlogConfig) diff --git a/docs/disabled_features.md b/docs/disabled_features.md index 2af789311d..b579b38342 100644 --- a/docs/disabled_features.md +++ b/docs/disabled_features.md @@ -17,3 +17,9 @@ They may be enabled by default in future versions. This feature lets you specify the evaluation time for instant vector selectors, range vector selectors, and subqueries. More details can be found [here](querying/basics.md#@-modifier). + +## Remote Write Receiver + +`--enable-feature=remote-write-receiver` + +The remote write receiver allows Prometheus to accept remote write requests from other Prometheus servers. More details can be found [here](storage.md#overview). diff --git a/docs/storage.md b/docs/storage.md index f10b51a7e9..6e395724e4 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -88,7 +88,7 @@ needed_disk_space = retention_time_seconds * ingested_samples_per_second * bytes To lower the rate of ingested samples, you can either reduce the number of time series you scrape (fewer targets or fewer series per target), or you can increase the scrape interval. However, reducing the number of series is likely more effective, due to compression of samples within a series. -If your local storage becomes corrupted for whatever reason, the best +If your local storage becomes corrupted for whatever reason, the best strategy to address the problem is to shut down Prometheus then remove the entire storage directory. You can also try removing individual block directories, or the WAL directory to resolve the problem. Note that this means losing @@ -111,9 +111,10 @@ a set of interfaces that allow integrating with remote storage systems. ### Overview -Prometheus integrates with remote storage systems in two ways: +Prometheus integrates with remote storage systems in three ways: * Prometheus can write samples that it ingests to a remote URL in a standardized format. +* Prometheus can receive samples from other Prometheus servers in a standardized format. * Prometheus can read (back) sample data from a remote URL in a standardized format. ![Remote read and write architecture](images/remote_integrations.png) @@ -122,6 +123,8 @@ The read and write protocols both use a snappy-compressed protocol buffer encodi For details on configuring remote storage integrations in Prometheus, see the [remote write](configuration/configuration.md#remote_write) and [remote read](configuration/configuration.md#remote_read) sections of the Prometheus configuration documentation. +The built-in remote write receiver can be enabled by setting the `--enable-feature=remote-write-receiver` command line flag. When enabled, the remote write receiver endpoint is `/api/v1/write`. + For details on the request and response messages, see the [remote storage protocol buffer definitions](https://github.com/prometheus/prometheus/blob/master/prompb/remote.proto). Note that on the read path, Prometheus only fetches raw series data for a set of label selectors and time ranges from the remote end. All PromQL evaluation on the raw data still happens in Prometheus itself. This means that remote read queries have some scalability limit, since all necessary data needs to be loaded into the querying Prometheus server first and then processed there. However, supporting fully distributed evaluation of PromQL was deemed infeasible for the time being. @@ -138,7 +141,7 @@ If a user wants to create blocks into the TSDB from data that is in [OpenMetrics A typical use case is to migrate metrics data from a different monitoring system or time-series database to Prometheus. To do so, the user must first convert the source data into [OpenMetrics](https://openmetrics.io/) format, which is the input format for the backfilling as described below. -### Usage +### Usage Backfilling can be used via the Promtool command line. Promtool will write the blocks to a directory. By default this output directory is ./data/, you can change it by using the name of the desired output directory as an optional argument in the sub-command. @@ -146,4 +149,4 @@ Backfilling can be used via the Promtool command line. Promtool will write the b promtool tsdb create-blocks-from openmetrics [] ``` -After the creation of the blocks, move it to the data directory of Prometheus. If there is an overlap with the existing blocks in Prometheus, the flag `--storage.tsdb.allow-overlapping-blocks` needs to be set. Note that any backfilled data is subject to the retention configured for your Prometheus server (by time or size). \ No newline at end of file +After the creation of the blocks, move it to the data directory of Prometheus. If there is an overlap with the existing blocks in Prometheus, the flag `--storage.tsdb.allow-overlapping-blocks` needs to be set. Note that any backfilled data is subject to the retention configured for your Prometheus server (by time or size). diff --git a/documentation/examples/remote_storage/example_write_adapter/server.go b/documentation/examples/remote_storage/example_write_adapter/server.go index b94ef8fc25..c61ed7b410 100644 --- a/documentation/examples/remote_storage/example_write_adapter/server.go +++ b/documentation/examples/remote_storage/example_write_adapter/server.go @@ -15,33 +15,18 @@ package main import ( "fmt" - "io/ioutil" "log" "net/http" - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" ) func main() { http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) { - compressed, err := ioutil.ReadAll(r.Body) + req, err := remote.DecodeWriteRequest(r.Body) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - reqBuf, err := snappy.Decode(nil, compressed) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - var req prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } diff --git a/documentation/examples/remote_storage/remote_storage_adapter/main.go b/documentation/examples/remote_storage/remote_storage_adapter/main.go index f445fb0240..de35ecf693 100644 --- a/documentation/examples/remote_storage/remote_storage_adapter/main.go +++ b/documentation/examples/remote_storage/remote_storage_adapter/main.go @@ -42,6 +42,7 @@ import ( "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/influxdb" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/opentsdb" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" ) type config struct { @@ -211,28 +212,14 @@ func buildClients(logger log.Logger, cfg *config) ([]writer, []reader) { func serve(logger log.Logger, addr string, writers []writer, readers []reader) error { http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) { - compressed, err := ioutil.ReadAll(r.Body) + req, err := remote.DecodeWriteRequest(r.Body) if err != nil { level.Error(logger).Log("msg", "Read error", "err", err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } - reqBuf, err := snappy.Decode(nil, compressed) - if err != nil { - level.Error(logger).Log("msg", "Decode error", "err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - var req prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { - level.Error(logger).Log("msg", "Unmarshal error", "err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - samples := protoToSamples(&req) + samples := protoToSamples(req) receivedSamples.Add(float64(len(samples))) var wg sync.WaitGroup diff --git a/storage/remote/codec.go b/storage/remote/codec.go index f8033111cb..0bd1b97622 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -497,3 +497,24 @@ func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_M return prompb.MetricMetadata_MetricType(v) } + +// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling +// snappy decompression. +func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { + compressed, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var req prompb.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + return nil, err + } + + return &req, nil +} diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index bd507b7c43..0f6a56bdad 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -14,6 +14,7 @@ package remote import ( + "bytes" "fmt" "testing" @@ -25,6 +26,31 @@ import ( "github.com/prometheus/prometheus/storage" ) +var writeRequestFixture = &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + }, + Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, + }, + }, +} + func TestValidateLabelsAndMetricName(t *testing.T) { tests := []struct { input labels.Labels @@ -262,3 +288,12 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) { }) } } + +func TestDecodeWriteRequest(t *testing.T) { + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) + require.NoError(t, err) + + actual, err := DecodeWriteRequest(bytes.NewReader(buf)) + require.NoError(t, err) + require.Equal(t, writeRequestFixture, actual) +} diff --git a/storage/remote/write_hander.go b/storage/remote/write_hander.go new file mode 100644 index 0000000000..c0538e2b2f --- /dev/null +++ b/storage/remote/write_hander.go @@ -0,0 +1,86 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "net/http" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" +) + +type handler struct { + logger log.Logger + appendable storage.Appendable +} + +// NewWriteHandler creates a http.Handler that accepts remote write requests and +// writes them to the provided appendable. +func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler { + return &handler{ + logger: logger, + appendable: appendable, + } +} + +func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + req, err := DecodeWriteRequest(r.Body) + if err != nil { + level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = h.write(r.Context(), req) + switch err { + case nil: + case storage.ErrOutOfOrderSample, storage.ErrOutOfBounds, storage.ErrDuplicateSampleForTimestamp: + // Indicated an out of order sample is a bad request to prevent retries. + level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + default: + level.Error(h.logger).Log("msg", "Error appending remote write", "err", err.Error()) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func (h *handler) write(ctx context.Context, req *prompb.WriteRequest) (err error) { + app := h.appendable.Appender(ctx) + defer func() { + if err != nil { + app.Rollback() + return + } + err = app.Commit() + }() + + for _, ts := range req.Timeseries { + labels := labelProtosToLabels(ts.Labels) + for _, s := range ts.Samples { + _, err = app.Add(labels, s.Timestamp, s.Value) + if err != nil { + return err + } + } + } + + return nil +} diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go new file mode 100644 index 0000000000..d6d66d0d3d --- /dev/null +++ b/storage/remote/write_handler_test.go @@ -0,0 +1,138 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" +) + +func TestRemoteWriteHandler(t *testing.T) { + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{} + handler := NewWriteHandler(nil, appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusNoContent, resp.StatusCode) + + i := 0 + for _, ts := range writeRequestFixture.Timeseries { + labels := labelProtosToLabels(ts.Labels) + for _, s := range ts.Samples { + require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) + i++ + } + } +} + +func TestOutOfOrder(t *testing.T) { + buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, + }}, nil, nil) + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{ + latest: 100, + } + handler := NewWriteHandler(log.NewNopLogger(), appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +func TestCommitErr(t *testing.T) { + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{ + commitErr: fmt.Errorf("commit error"), + } + handler := NewWriteHandler(log.NewNopLogger(), appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusInternalServerError, resp.StatusCode) + require.Equal(t, "commit error\n", string(body)) +} + +type mockAppendable struct { + latest int64 + samples []mockSample + commitErr error +} + +type mockSample struct { + l labels.Labels + t int64 + v float64 +} + +func (m *mockAppendable) Appender(_ context.Context) storage.Appender { + return m +} + +func (m *mockAppendable) Add(l labels.Labels, t int64, v float64) (uint64, error) { + if t < m.latest { + return 0, storage.ErrOutOfOrderSample + } + + m.latest = t + m.samples = append(m.samples, mockSample{l, t, v}) + return 0, nil +} + +func (m *mockAppendable) Commit() error { + return m.commitErr +} + +func (*mockAppendable) AddFast(uint64, int64, float64) error { + return fmt.Errorf("not implemented") +} + +func (*mockAppendable) Rollback() error { + return fmt.Errorf("not implemented") +} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 3225845d3a..d18dc3be2e 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -194,6 +194,7 @@ type API struct { buildInfo *PrometheusVersion runtimeInfo func() (RuntimeInfo, error) gatherer prometheus.Gatherer + remoteWriteHandler http.Handler } func init() { @@ -204,7 +205,7 @@ func init() { // NewAPI returns an initialized API type. func NewAPI( qe *promql.Engine, - q storage.SampleAndChunkQueryable, + s storage.Storage, tr func(context.Context) TargetRetriever, ar func(context.Context) AlertmanagerRetriever, configFunc func() config.Config, @@ -223,10 +224,12 @@ func NewAPI( runtimeInfo func() (RuntimeInfo, error), buildInfo *PrometheusVersion, gatherer prometheus.Gatherer, + remoteWriteReceiver bool, ) *API { - return &API{ - QueryEngine: qe, - Queryable: q, + a := &API{ + QueryEngine: qe, + Queryable: s, + targetRetriever: tr, alertmanagerRetriever: ar, @@ -248,6 +251,12 @@ func NewAPI( buildInfo: buildInfo, gatherer: gatherer, } + + if remoteWriteReceiver { + a.remoteWriteHandler = remote.NewWriteHandler(logger, s) + } + + return a } func setUnavailStatusOnTSDBNotReady(r apiFuncResult) apiFuncResult { @@ -309,6 +318,7 @@ func (api *API) Register(r *route.Router) { r.Get("/status/flags", wrap(api.serveFlags)) r.Get("/status/tsdb", wrap(api.serveTSDBStatus)) r.Post("/read", api.ready(http.HandlerFunc(api.remoteRead))) + r.Post("/write", api.ready(http.HandlerFunc(api.remoteWrite))) r.Get("/alerts", wrap(api.alerts)) r.Get("/rules", wrap(api.rules)) @@ -1522,6 +1532,14 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe return filteredMatchers, nil } +func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) { + if api.remoteWriteHandler != nil { + api.remoteWriteHandler.ServeHTTP(w, r) + } else { + http.Error(w, "remote write receiver needs to be enabled with --enable-feature=remote-write-receiver", http.StatusNotFound) + } +} + func (api *API) deleteSeries(r *http.Request) apiFuncResult { if !api.enableAdmin { return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} diff --git a/web/web.go b/web/web.go index c780612f09..3be9df83e6 100644 --- a/web/web.go +++ b/web/web.go @@ -244,6 +244,7 @@ type Options struct { RemoteReadSampleLimit int RemoteReadConcurrencyLimit int RemoteReadBytesInFrame int + RemoteWriteReceiver bool Gatherer prometheus.Gatherer Registerer prometheus.Registerer @@ -322,6 +323,7 @@ func New(logger log.Logger, o *Options) *Handler { h.runtimeInfo, h.versionInfo, o.Gatherer, + o.RemoteWriteReceiver, ) if o.RoutePrefix != "/" {