mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Various enhancements and refactorings for remote write receiver:
- Remove unrelated changes - Refactor code out of the API module - that is already getting pretty crowded. - Don't track reference for AddFast in remote write. This has the potential to consume unlimited server-side memory if a malicious client pushes a different label set for every series. For now, its easier and safer to always use the 'slow' path. - Return 400 on out of order samples. - Use remote.DecodeWriteRequest in the remote write adapters. - Put this behing the 'remote-write-server' feature flag - Add some (very) basic docs. - Used named return & add test for commit error propagation Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
parent
72475b8a0c
commit
d479151f1f
|
@ -133,6 +133,10 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
|
|||
switch o {
|
||||
case "promql-at-modifier":
|
||||
c.enablePromQLAtModifier = true
|
||||
level.Info(logger).Log("msg", "Experimental promql-at-modifier enabled")
|
||||
case "remote-write-receiver":
|
||||
c.web.RemoteWriteReceiver = true
|
||||
level.Info(logger).Log("msg", "Experimental remote-write-receiver enabled")
|
||||
case "":
|
||||
continue
|
||||
default:
|
||||
|
@ -284,12 +288,12 @@ func main() {
|
|||
Default("2m").SetValue(&cfg.queryTimeout)
|
||||
|
||||
a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently.").
|
||||
Default("2000").IntVar(&cfg.queryConcurrency)
|
||||
Default("20").IntVar(&cfg.queryConcurrency)
|
||||
|
||||
a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return.").
|
||||
Default("50000000").IntVar(&cfg.queryMaxSamples)
|
||||
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: 'promql-at-modifier' to enable the @ modifier. See https://prometheus.io/docs/prometheus/latest/disabled_features/ for more details.").
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: 'promql-at-modifier' to enable the @ modifier, 'remote-write-receiver' to enable remote write receiver. See https://prometheus.io/docs/prometheus/latest/disabled_features/ for more details.").
|
||||
Default("").StringsVar(&cfg.featureList)
|
||||
|
||||
promlogflag.AddFlags(a, &cfg.promlogConfig)
|
||||
|
|
|
@ -17,3 +17,9 @@ They may be enabled by default in future versions.
|
|||
|
||||
This feature lets you specify the evaluation time for instant vector selectors,
|
||||
range vector selectors, and subqueries. More details can be found [here](querying/basics.md#@-modifier).
|
||||
|
||||
## Remote Write Receiver
|
||||
|
||||
`--enable-feature=remote-write-receiver`
|
||||
|
||||
The remote write receiver allows Prometheus to accept remote write requests from other Prometheus servers. More details can be found [here](storage.md#overview).
|
||||
|
|
|
@ -88,7 +88,7 @@ needed_disk_space = retention_time_seconds * ingested_samples_per_second * bytes
|
|||
|
||||
To lower the rate of ingested samples, you can either reduce the number of time series you scrape (fewer targets or fewer series per target), or you can increase the scrape interval. However, reducing the number of series is likely more effective, due to compression of samples within a series.
|
||||
|
||||
If your local storage becomes corrupted for whatever reason, the best
|
||||
If your local storage becomes corrupted for whatever reason, the best
|
||||
strategy to address the problem is to shut down Prometheus then remove the
|
||||
entire storage directory. You can also try removing individual block directories,
|
||||
or the WAL directory to resolve the problem. Note that this means losing
|
||||
|
@ -111,9 +111,10 @@ a set of interfaces that allow integrating with remote storage systems.
|
|||
|
||||
### Overview
|
||||
|
||||
Prometheus integrates with remote storage systems in two ways:
|
||||
Prometheus integrates with remote storage systems in three ways:
|
||||
|
||||
* Prometheus can write samples that it ingests to a remote URL in a standardized format.
|
||||
* Prometheus can receive samples from other Prometheus servers in a standardized format.
|
||||
* Prometheus can read (back) sample data from a remote URL in a standardized format.
|
||||
|
||||
![Remote read and write architecture](images/remote_integrations.png)
|
||||
|
@ -122,6 +123,8 @@ The read and write protocols both use a snappy-compressed protocol buffer encodi
|
|||
|
||||
For details on configuring remote storage integrations in Prometheus, see the [remote write](configuration/configuration.md#remote_write) and [remote read](configuration/configuration.md#remote_read) sections of the Prometheus configuration documentation.
|
||||
|
||||
The built-in remote write receiver can be enabled by setting the `--enable-feature=remote-write-receiver` command line flag. When enabled, the remote write receiver endpoint is `/api/v1/write`.
|
||||
|
||||
For details on the request and response messages, see the [remote storage protocol buffer definitions](https://github.com/prometheus/prometheus/blob/master/prompb/remote.proto).
|
||||
|
||||
Note that on the read path, Prometheus only fetches raw series data for a set of label selectors and time ranges from the remote end. All PromQL evaluation on the raw data still happens in Prometheus itself. This means that remote read queries have some scalability limit, since all necessary data needs to be loaded into the querying Prometheus server first and then processed there. However, supporting fully distributed evaluation of PromQL was deemed infeasible for the time being.
|
||||
|
@ -138,7 +141,7 @@ If a user wants to create blocks into the TSDB from data that is in [OpenMetrics
|
|||
|
||||
A typical use case is to migrate metrics data from a different monitoring system or time-series database to Prometheus. To do so, the user must first convert the source data into [OpenMetrics](https://openmetrics.io/) format, which is the input format for the backfilling as described below.
|
||||
|
||||
### Usage
|
||||
### Usage
|
||||
|
||||
Backfilling can be used via the Promtool command line. Promtool will write the blocks to a directory. By default this output directory is ./data/, you can change it by using the name of the desired output directory as an optional argument in the sub-command.
|
||||
|
||||
|
@ -146,4 +149,4 @@ Backfilling can be used via the Promtool command line. Promtool will write the b
|
|||
promtool tsdb create-blocks-from openmetrics <input file> [<output directory>]
|
||||
```
|
||||
|
||||
After the creation of the blocks, move it to the data directory of Prometheus. If there is an overlap with the existing blocks in Prometheus, the flag `--storage.tsdb.allow-overlapping-blocks` needs to be set. Note that any backfilled data is subject to the retention configured for your Prometheus server (by time or size).
|
||||
After the creation of the blocks, move it to the data directory of Prometheus. If there is an overlap with the existing blocks in Prometheus, the flag `--storage.tsdb.allow-overlapping-blocks` needs to be set. Note that any backfilled data is subject to the retention configured for your Prometheus server (by time or size).
|
||||
|
|
|
@ -15,33 +15,18 @@ package main
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
)
|
||||
|
||||
func main() {
|
||||
http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) {
|
||||
compressed, err := ioutil.ReadAll(r.Body)
|
||||
req, err := remote.DecodeWriteRequest(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var req prompb.WriteRequest
|
||||
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import (
|
|||
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/influxdb"
|
||||
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/opentsdb"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
|
@ -211,28 +212,14 @@ func buildClients(logger log.Logger, cfg *config) ([]writer, []reader) {
|
|||
|
||||
func serve(logger log.Logger, addr string, writers []writer, readers []reader) error {
|
||||
http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
|
||||
compressed, err := ioutil.ReadAll(r.Body)
|
||||
req, err := remote.DecodeWriteRequest(r.Body)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Read error", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Decode error", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var req prompb.WriteRequest
|
||||
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
level.Error(logger).Log("msg", "Unmarshal error", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
samples := protoToSamples(&req)
|
||||
samples := protoToSamples(req)
|
||||
receivedSamples.Add(float64(len(samples)))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
|
|
@ -497,3 +497,24 @@ func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_M
|
|||
|
||||
return prompb.MetricMetadata_MetricType(v)
|
||||
}
|
||||
|
||||
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
|
||||
// snappy decompression.
|
||||
func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
|
||||
compressed, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var req prompb.WriteRequest
|
||||
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &req, nil
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package remote
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
|
@ -25,6 +26,31 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
var writeRequestFixture = &prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar"},
|
||||
},
|
||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar"},
|
||||
},
|
||||
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestValidateLabelsAndMetricName(t *testing.T) {
|
||||
tests := []struct {
|
||||
input labels.Labels
|
||||
|
@ -262,3 +288,12 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeWriteRequest(t *testing.T) {
|
||||
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual, err := DecodeWriteRequest(bytes.NewReader(buf))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, writeRequestFixture, actual)
|
||||
}
|
||||
|
|
86
storage/remote/write_hander.go
Normal file
86
storage/remote/write_hander.go
Normal file
|
@ -0,0 +1,86 @@
|
|||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
logger log.Logger
|
||||
appendable storage.Appendable
|
||||
}
|
||||
|
||||
// NewWriteHandler creates a http.Handler that accepts remote write requests and
|
||||
// writes them to the provided appendable.
|
||||
func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
|
||||
return &handler{
|
||||
logger: logger,
|
||||
appendable: appendable,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
req, err := DecodeWriteRequest(r.Body)
|
||||
if err != nil {
|
||||
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
err = h.write(r.Context(), req)
|
||||
switch err {
|
||||
case nil:
|
||||
case storage.ErrOutOfOrderSample, storage.ErrOutOfBounds, storage.ErrDuplicateSampleForTimestamp:
|
||||
// Indicated an out of order sample is a bad request to prevent retries.
|
||||
level.Error(h.logger).Log("msg", "Out of order sample from remote write", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
default:
|
||||
level.Error(h.logger).Log("msg", "Error appending remote write", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (h *handler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
|
||||
app := h.appendable.Appender(ctx)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
app.Rollback()
|
||||
return
|
||||
}
|
||||
err = app.Commit()
|
||||
}()
|
||||
|
||||
for _, ts := range req.Timeseries {
|
||||
labels := labelProtosToLabels(ts.Labels)
|
||||
for _, s := range ts.Samples {
|
||||
_, err = app.Add(labels, s.Timestamp, s.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
138
storage/remote/write_handler_test.go
Normal file
138
storage/remote/write_handler_test.go
Normal file
|
@ -0,0 +1,138 @@
|
|||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRemoteWriteHandler(t *testing.T) {
|
||||
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||
require.NoError(t, err)
|
||||
|
||||
appendable := &mockAppendable{}
|
||||
handler := NewWriteHandler(nil, appendable)
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
handler.ServeHTTP(recorder, req)
|
||||
|
||||
resp := recorder.Result()
|
||||
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
||||
|
||||
i := 0
|
||||
for _, ts := range writeRequestFixture.Timeseries {
|
||||
labels := labelProtosToLabels(ts.Labels)
|
||||
for _, s := range ts.Samples {
|
||||
require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
|
||||
i++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestOutOfOrder(t *testing.T) {
|
||||
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
|
||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||
}}, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||
require.NoError(t, err)
|
||||
|
||||
appendable := &mockAppendable{
|
||||
latest: 100,
|
||||
}
|
||||
handler := NewWriteHandler(log.NewNopLogger(), appendable)
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
handler.ServeHTTP(recorder, req)
|
||||
|
||||
resp := recorder.Result()
|
||||
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
||||
}
|
||||
|
||||
func TestCommitErr(t *testing.T) {
|
||||
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||
require.NoError(t, err)
|
||||
|
||||
appendable := &mockAppendable{
|
||||
commitErr: fmt.Errorf("commit error"),
|
||||
}
|
||||
handler := NewWriteHandler(log.NewNopLogger(), appendable)
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
handler.ServeHTTP(recorder, req)
|
||||
|
||||
resp := recorder.Result()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
|
||||
require.Equal(t, "commit error\n", string(body))
|
||||
}
|
||||
|
||||
type mockAppendable struct {
|
||||
latest int64
|
||||
samples []mockSample
|
||||
commitErr error
|
||||
}
|
||||
|
||||
type mockSample struct {
|
||||
l labels.Labels
|
||||
t int64
|
||||
v float64
|
||||
}
|
||||
|
||||
func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *mockAppendable) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if t < m.latest {
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
}
|
||||
|
||||
m.latest = t
|
||||
m.samples = append(m.samples, mockSample{l, t, v})
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (m *mockAppendable) Commit() error {
|
||||
return m.commitErr
|
||||
}
|
||||
|
||||
func (*mockAppendable) AddFast(uint64, int64, float64) error {
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (*mockAppendable) Rollback() error {
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
|
@ -16,7 +16,6 @@ package v1
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
|
@ -28,14 +27,11 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -75,6 +71,7 @@ const (
|
|||
type errorType string
|
||||
|
||||
const (
|
||||
errorNone errorType = ""
|
||||
errorTimeout errorType = "timeout"
|
||||
errorCanceled errorType = "canceled"
|
||||
errorExec errorType = "execution"
|
||||
|
@ -175,9 +172,6 @@ type TSDBAdminStats interface {
|
|||
// them using the provided storage and query engine.
|
||||
type API struct {
|
||||
Queryable storage.SampleAndChunkQueryable
|
||||
Appendable storage.Appendable
|
||||
refs map[string]uint64
|
||||
refsLock *sync.RWMutex
|
||||
QueryEngine *promql.Engine
|
||||
|
||||
targetRetriever func(context.Context) TargetRetriever
|
||||
|
@ -200,6 +194,7 @@ type API struct {
|
|||
buildInfo *PrometheusVersion
|
||||
runtimeInfo func() (RuntimeInfo, error)
|
||||
gatherer prometheus.Gatherer
|
||||
remoteWriteHandler http.Handler
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -210,7 +205,7 @@ func init() {
|
|||
// NewAPI returns an initialized API type.
|
||||
func NewAPI(
|
||||
qe *promql.Engine,
|
||||
q storage.SampleAndChunkQueryable,
|
||||
s storage.Storage,
|
||||
tr func(context.Context) TargetRetriever,
|
||||
ar func(context.Context) AlertmanagerRetriever,
|
||||
configFunc func() config.Config,
|
||||
|
@ -229,13 +224,12 @@ func NewAPI(
|
|||
runtimeInfo func() (RuntimeInfo, error),
|
||||
buildInfo *PrometheusVersion,
|
||||
gatherer prometheus.Gatherer,
|
||||
remoteWriteReceiver bool,
|
||||
) *API {
|
||||
return &API{
|
||||
QueryEngine: qe,
|
||||
refs: make(map[string]uint64),
|
||||
refsLock: &sync.RWMutex{},
|
||||
Queryable: q,
|
||||
Appendable: q,
|
||||
a := &API{
|
||||
QueryEngine: qe,
|
||||
Queryable: s,
|
||||
|
||||
targetRetriever: tr,
|
||||
alertmanagerRetriever: ar,
|
||||
|
||||
|
@ -257,6 +251,12 @@ func NewAPI(
|
|||
buildInfo: buildInfo,
|
||||
gatherer: gatherer,
|
||||
}
|
||||
|
||||
if remoteWriteReceiver {
|
||||
a.remoteWriteHandler = remote.NewWriteHandler(logger, s)
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
func setUnavailStatusOnTSDBNotReady(r apiFuncResult) apiFuncResult {
|
||||
|
@ -677,7 +677,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
|
|||
return invalidParamError(err, "match[]")
|
||||
}
|
||||
|
||||
q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(end.Add(time.Minute*-5)), timestamp.FromTime(end))
|
||||
q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
|
||||
if err != nil {
|
||||
return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
|
||||
}
|
||||
|
@ -1506,84 +1506,6 @@ func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.Response
|
|||
}
|
||||
}
|
||||
|
||||
func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) {
|
||||
compressed, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
level.Error(api.logger).Log("msg", "Read error", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
reqBuf, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
level.Error(api.logger).Log("msg", "Decode error", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
var req prompb.WriteRequest
|
||||
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
level.Error(api.logger).Log("msg", "Unmarshal error", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
err = api.write(&req)
|
||||
if err != nil {
|
||||
level.Error(api.logger).Log("msg", "Api write", "err", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (api *API) write(req *prompb.WriteRequest) error {
|
||||
var err error = nil
|
||||
app := api.Appendable.Appender()
|
||||
defer func() { //TODO:clear api.refs cache
|
||||
if err != nil {
|
||||
app.Rollback()
|
||||
return
|
||||
}
|
||||
if err = app.Commit(); err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
for _, ts := range req.Timeseries {
|
||||
tsLabels := make(labels.Labels, 0, len(ts.Labels))
|
||||
for _, l := range ts.Labels {
|
||||
tsLabels = append(tsLabels, labels.Label{Name: l.Name, Value: l.Value})
|
||||
}
|
||||
sort.Sort(tsLabels)
|
||||
tsLabelsKey := tsLabels.String()
|
||||
for _, s := range ts.Samples {
|
||||
api.refsLock.RLock()
|
||||
ref, ok := api.refs[tsLabelsKey]
|
||||
api.refsLock.RUnlock()
|
||||
if ok {
|
||||
err = app.AddFast(ref, s.Timestamp, s.Value)
|
||||
if err != nil && strings.Contains(err.Error(), "unknown series") {
|
||||
//
|
||||
} else {
|
||||
switch err {
|
||||
case nil:
|
||||
case storage.ErrOutOfOrderSample:
|
||||
//level.Error(api.logger).Log("msg", "AddFast fail .Out of order sample", "err", err, "series", tsLabelsKey, "Timestamp", s.Timestamp, "Value", s.Value)
|
||||
default:
|
||||
level.Error(api.logger).Log("msg", "AddFast fail .unexpected error", "err", err, "series", tsLabelsKey, "Timestamp", s.Timestamp, "Value", s.Value)
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
ref, err = app.Add(tsLabels, s.Timestamp, s.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
api.refsLock.Lock()
|
||||
api.refs[tsLabelsKey] = ref
|
||||
api.refsLock.Unlock()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// filterExtLabelsFromMatchers change equality matchers which match external labels
|
||||
// to a matcher that looks for an empty label,
|
||||
// as that label should not be present in the storage.
|
||||
|
@ -1610,6 +1532,14 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe
|
|||
return filteredMatchers, nil
|
||||
}
|
||||
|
||||
func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) {
|
||||
if api.remoteWriteHandler != nil {
|
||||
api.remoteWriteHandler.ServeHTTP(w, r)
|
||||
} else {
|
||||
http.Error(w, "remote write receiver needs to be enabled with --enable-feature=remote-write-receiver", http.StatusNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
func (api *API) deleteSeries(r *http.Request) apiFuncResult {
|
||||
if !api.enableAdmin {
|
||||
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil}
|
||||
|
|
|
@ -29,7 +29,6 @@ import (
|
|||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -61,10 +60,6 @@ import (
|
|||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
)
|
||||
|
||||
const (
|
||||
errorNone errorType = ""
|
||||
)
|
||||
|
||||
// testMetaStore satisfies the scrape.MetricMetadataStore interface.
|
||||
// It is used to inject specific metadata as part of a test case.
|
||||
type testMetaStore struct {
|
||||
|
@ -2254,55 +2249,6 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
}, results)
|
||||
}
|
||||
|
||||
func TestSampledWriteEndpoint(t *testing.T) {
|
||||
|
||||
samples := []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar"},
|
||||
},
|
||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "d", Value: "e"},
|
||||
{Name: "foo", Value: "bar"},
|
||||
},
|
||||
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
|
||||
},
|
||||
}
|
||||
|
||||
req := &prompb.WriteRequest{
|
||||
Timeseries: samples,
|
||||
}
|
||||
|
||||
suite, err := promql.NewTest(t, `
|
||||
load 1m
|
||||
test_metric1{foo="bar",baz="qux"} 1
|
||||
`)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
defer suite.Close()
|
||||
|
||||
err = suite.Run()
|
||||
testutil.Ok(t, err)
|
||||
|
||||
api := &API{
|
||||
Appendable: suite.Storage(),
|
||||
refs: make(map[string]uint64),
|
||||
refsLock: &sync.RWMutex{},
|
||||
}
|
||||
err = api.write(req)
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
|
||||
type fakeDB struct {
|
||||
err error
|
||||
}
|
||||
|
|
|
@ -244,6 +244,7 @@ type Options struct {
|
|||
RemoteReadSampleLimit int
|
||||
RemoteReadConcurrencyLimit int
|
||||
RemoteReadBytesInFrame int
|
||||
RemoteWriteReceiver bool
|
||||
|
||||
Gatherer prometheus.Gatherer
|
||||
Registerer prometheus.Registerer
|
||||
|
@ -322,6 +323,7 @@ func New(logger log.Logger, o *Options) *Handler {
|
|||
h.runtimeInfo,
|
||||
h.versionInfo,
|
||||
o.Gatherer,
|
||||
o.RemoteWriteReceiver,
|
||||
)
|
||||
|
||||
if o.RoutePrefix != "/" {
|
||||
|
|
Loading…
Reference in a new issue