mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 13:44:05 -08:00
Add initial support for exemplar to the remote write receiver endpoint (#9319)
* Add initial support for exemplar to the remote write receiver endpoint Signed-off-by: Serge Catudal <serge.catudal@gmail.com> * Update storage remote write handler tests with exemplars Signed-off-by: Serge Catudal <serge.catudal@gmail.com> * Update remote write handler in order to have a distinct checkAppendExemplarError function from scrape Signed-off-by: Serge Catudal <serge.catudal@gmail.com>
This commit is contained in:
parent
2bcd9f2f69
commit
d77c985f8c
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/textparse"
|
"github.com/prometheus/prometheus/pkg/textparse"
|
||||||
"github.com/prometheus/prometheus/prompb"
|
"github.com/prometheus/prometheus/prompb"
|
||||||
|
@ -450,6 +451,17 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
|
||||||
|
timestamp := ep.Timestamp
|
||||||
|
|
||||||
|
return exemplar.Exemplar{
|
||||||
|
Labels: labelProtosToLabels(ep.Labels),
|
||||||
|
Value: ep.Value,
|
||||||
|
Ts: timestamp,
|
||||||
|
HasTs: timestamp != 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric
|
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric
|
||||||
func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
|
func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
|
||||||
metric := make(model.Metric, len(labelPairs))
|
metric := make(model.Metric, len(labelPairs))
|
||||||
|
|
|
@ -36,7 +36,8 @@ var writeRequestFixture = &prompb.WriteRequest{
|
||||||
{Name: "d", Value: "e"},
|
{Name: "d", Value: "e"},
|
||||||
{Name: "foo", Value: "bar"},
|
{Name: "foo", Value: "bar"},
|
||||||
},
|
},
|
||||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||||
|
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Labels: []prompb.Label{
|
Labels: []prompb.Label{
|
||||||
|
@ -46,7 +47,8 @@ var writeRequestFixture = &prompb.WriteRequest{
|
||||||
{Name: "d", Value: "e"},
|
{Name: "d", Value: "e"},
|
||||||
{Name: "foo", Value: "bar"},
|
{Name: "foo", Value: "bar"},
|
||||||
},
|
},
|
||||||
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
|
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
|
||||||
|
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,10 +15,14 @@ package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
"github.com/go-kit/log/level"
|
"github.com/go-kit/log/level"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||||
"github.com/prometheus/prometheus/prompb"
|
"github.com/prometheus/prometheus/prompb"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
@ -62,16 +66,35 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checkAppendExemplarError modifies the AppendExamplar's returned error based on the error cause.
|
||||||
|
func (h *writeHandler) checkAppendExemplarError(err error, e exemplar.Exemplar, outOfOrderErrs *int) error {
|
||||||
|
switch errors.Cause(err) {
|
||||||
|
case storage.ErrNotFound:
|
||||||
|
return storage.ErrNotFound
|
||||||
|
case storage.ErrOutOfOrderExemplar:
|
||||||
|
*outOfOrderErrs++
|
||||||
|
level.Debug(h.logger).Log("msg", "Out of order exemplar", "exemplar", fmt.Sprintf("%+v", e))
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
|
func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
|
||||||
|
var (
|
||||||
|
outOfOrderExemplarErrs = 0
|
||||||
|
)
|
||||||
|
|
||||||
app := h.appendable.Appender(ctx)
|
app := h.appendable.Appender(ctx)
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
app.Rollback()
|
_ = app.Rollback()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = app.Commit()
|
err = app.Commit()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
var exemplarErr error
|
||||||
for _, ts := range req.Timeseries {
|
for _, ts := range req.Timeseries {
|
||||||
labels := labelProtosToLabels(ts.Labels)
|
labels := labelProtosToLabels(ts.Labels)
|
||||||
for _, s := range ts.Samples {
|
for _, s := range ts.Samples {
|
||||||
|
@ -79,7 +102,23 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, ep := range ts.Exemplars {
|
||||||
|
e := exemplarProtoToExemplar(ep)
|
||||||
|
|
||||||
|
_, exemplarErr = app.AppendExemplar(0, labels, e)
|
||||||
|
exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs)
|
||||||
|
if exemplarErr != nil {
|
||||||
|
// Since exemplar storage is still experimental, we don't fail the request on ingestion errors.
|
||||||
|
level.Debug(h.logger).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if outOfOrderExemplarErrs > 0 {
|
||||||
|
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -23,11 +23,12 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/prompb"
|
"github.com/prometheus/prometheus/prompb"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRemoteWriteHandler(t *testing.T) {
|
func TestRemoteWriteHandler(t *testing.T) {
|
||||||
|
@ -47,16 +48,23 @@ func TestRemoteWriteHandler(t *testing.T) {
|
||||||
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
|
j := 0
|
||||||
for _, ts := range writeRequestFixture.Timeseries {
|
for _, ts := range writeRequestFixture.Timeseries {
|
||||||
labels := labelProtosToLabels(ts.Labels)
|
labels := labelProtosToLabels(ts.Labels)
|
||||||
for _, s := range ts.Samples {
|
for _, s := range ts.Samples {
|
||||||
require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
|
require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, e := range ts.Exemplars {
|
||||||
|
exemplarLabels := labelProtosToLabels(e.Labels)
|
||||||
|
require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
|
||||||
|
j++
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOutOfOrder(t *testing.T) {
|
func TestOutOfOrderSample(t *testing.T) {
|
||||||
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
|
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
|
||||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||||
|
@ -67,7 +75,7 @@ func TestOutOfOrder(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
appendable := &mockAppendable{
|
appendable := &mockAppendable{
|
||||||
latest: 100,
|
latestSample: 100,
|
||||||
}
|
}
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), appendable)
|
handler := NewWriteHandler(log.NewNopLogger(), appendable)
|
||||||
|
|
||||||
|
@ -78,6 +86,32 @@ func TestOutOfOrder(t *testing.T) {
|
||||||
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This test case currently aims to verify that the WriteHandler endpoint
|
||||||
|
// don't fail on ingestion errors since the exemplar storage is
|
||||||
|
// still experimental.
|
||||||
|
func TestOutOfOrderExemplar(t *testing.T) {
|
||||||
|
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
|
||||||
|
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
|
||||||
|
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
|
||||||
|
}}, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
appendable := &mockAppendable{
|
||||||
|
latestExemplar: 100,
|
||||||
|
}
|
||||||
|
handler := NewWriteHandler(log.NewNopLogger(), appendable)
|
||||||
|
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(recorder, req)
|
||||||
|
|
||||||
|
resp := recorder.Result()
|
||||||
|
// TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental.
|
||||||
|
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
func TestCommitErr(t *testing.T) {
|
func TestCommitErr(t *testing.T) {
|
||||||
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
|
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -101,9 +135,11 @@ func TestCommitErr(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockAppendable struct {
|
type mockAppendable struct {
|
||||||
latest int64
|
latestSample int64
|
||||||
samples []mockSample
|
samples []mockSample
|
||||||
commitErr error
|
latestExemplar int64
|
||||||
|
exemplars []mockExemplar
|
||||||
|
commitErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockSample struct {
|
type mockSample struct {
|
||||||
|
@ -112,16 +148,23 @@ type mockSample struct {
|
||||||
v float64
|
v float64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mockExemplar struct {
|
||||||
|
l labels.Labels
|
||||||
|
el labels.Labels
|
||||||
|
t int64
|
||||||
|
v float64
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
|
func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockAppendable) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) {
|
func (m *mockAppendable) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) {
|
||||||
if t < m.latest {
|
if t < m.latestSample {
|
||||||
return 0, storage.ErrOutOfOrderSample
|
return 0, storage.ErrOutOfOrderSample
|
||||||
}
|
}
|
||||||
|
|
||||||
m.latest = t
|
m.latestSample = t
|
||||||
m.samples = append(m.samples, mockSample{l, t, v})
|
m.samples = append(m.samples, mockSample{l, t, v})
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
@ -134,7 +177,12 @@ func (*mockAppendable) Rollback() error {
|
||||||
return fmt.Errorf("not implemented")
|
return fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*mockAppendable) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
|
func (m *mockAppendable) AppendExemplar(_ uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
|
||||||
// noop until we implement exemplars over remote write
|
if e.Ts < m.latestExemplar {
|
||||||
|
return 0, storage.ErrOutOfOrderExemplar
|
||||||
|
}
|
||||||
|
|
||||||
|
m.latestExemplar = e.Ts
|
||||||
|
m.exemplars = append(m.exemplars, mockExemplar{l, e.Labels, e.Ts, e.Value})
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue