remote write handler: reject samples with future timestamps (#14304)

* fix(remote_write): reject samples with future timestamps

* increase check to +10 minutes to allow for clock drift

---------

Signed-off-by: Jan-Otto Kröpke <mail@jkroepke.de>
Signed-off-by: Jan-Otto Kröpke <joe@cloudeteer.de>
Signed-off-by: Jan-Otto Kröpke <github@jkroepke.de>
Co-authored-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Jan-Otto Kröpke 2024-06-25 13:25:39 +02:00 committed by GitHub
parent 348f7f8d0c
commit 99355443c7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 164 additions and 56 deletions

View file

@ -18,6 +18,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"time"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
@ -25,7 +26,9 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
@ -38,6 +41,8 @@ type writeHandler struct {
samplesWithInvalidLabelsTotal prometheus.Counter samplesWithInvalidLabelsTotal prometheus.Counter
} }
const maxAheadTime = 10 * time.Minute
// NewWriteHandler creates a http.Handler that accepts remote write requests and // NewWriteHandler creates a http.Handler that accepts remote write requests and
// writes them to the provided appendable. // writes them to the provided appendable.
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable) http.Handler { func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable) http.Handler {
@ -104,17 +109,22 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
outOfOrderExemplarErrs := 0 outOfOrderExemplarErrs := 0
samplesWithInvalidLabels := 0 samplesWithInvalidLabels := 0
app := h.appendable.Appender(ctx) timeLimitApp := &timeLimitAppender{
Appender: h.appendable.Appender(ctx),
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}
defer func() { defer func() {
if err != nil { if err != nil {
_ = app.Rollback() _ = timeLimitApp.Rollback()
return return
} }
err = app.Commit() err = timeLimitApp.Commit()
}() }()
b := labels.NewScratchBuilder(0) b := labels.NewScratchBuilder(0)
var exemplarErr error var exemplarErr error
for _, ts := range req.Timeseries { for _, ts := range req.Timeseries {
labels := LabelProtosToLabels(&b, ts.Labels) labels := LabelProtosToLabels(&b, ts.Labels)
if !labels.IsValid() { if !labels.IsValid() {
@ -124,7 +134,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
} }
var ref storage.SeriesRef var ref storage.SeriesRef
for _, s := range ts.Samples { for _, s := range ts.Samples {
ref, err = app.Append(ref, labels, s.Timestamp, s.Value) ref, err = timeLimitApp.Append(ref, labels, s.Timestamp, s.Value)
if err != nil { if err != nil {
unwrappedErr := errors.Unwrap(err) unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil { if unwrappedErr == nil {
@ -140,7 +150,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
for _, ep := range ts.Exemplars { for _, ep := range ts.Exemplars {
e := exemplarProtoToExemplar(&b, ep) e := exemplarProtoToExemplar(&b, ep)
_, exemplarErr = app.AppendExemplar(0, labels, e) _, exemplarErr = timeLimitApp.AppendExemplar(0, labels, e)
exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs) exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs)
if exemplarErr != nil { if exemplarErr != nil {
// Since exemplar storage is still experimental, we don't fail the request on ingestion errors. // Since exemplar storage is still experimental, we don't fail the request on ingestion errors.
@ -151,11 +161,12 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
for _, hp := range ts.Histograms { for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() { if hp.IsFloatHistogram() {
fhs := FloatHistogramProtoToFloatHistogram(hp) fhs := FloatHistogramProtoToFloatHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) _, err = timeLimitApp.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
} else { } else {
hs := HistogramProtoToHistogram(hp) hs := HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil) _, err = timeLimitApp.AppendHistogram(0, labels, hp.Timestamp, hs, nil)
} }
if err != nil { if err != nil {
unwrappedErr := errors.Unwrap(err) unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil { if unwrappedErr == nil {
@ -233,3 +244,45 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
type timeLimitAppender struct {
storage.Appender
maxTime int64
}
func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if t > app.maxTime {
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
}
ref, err := app.Appender.Append(ref, lset, t, v)
if err != nil {
return 0, err
}
return ref, nil
}
func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if t > app.maxTime {
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
}
ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh)
if err != nil {
return 0, err
}
return ref, nil
}
func (app *timeLimitAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
if e.Ts > app.maxTime {
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
}
ref, err := app.Appender.AppendExemplar(ref, l, e)
if err != nil {
return 0, err
}
return ref, nil
}

View file

@ -18,6 +18,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"math"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strconv" "strconv"
@ -87,9 +88,25 @@ func TestRemoteWriteHandler(t *testing.T) {
} }
func TestOutOfOrderSample(t *testing.T) { func TestOutOfOrderSample(t *testing.T) {
tests := []struct {
Name string
Timestamp int64
}{
{
Name: "historic",
Timestamp: 0,
},
{
Name: "future",
Timestamp: math.MaxInt64,
},
}
for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, Samples: []prompb.Sample{{Value: 1, Timestamp: tc.Timestamp}},
}}, nil, nil, nil, nil) }}, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
@ -106,15 +123,33 @@ func TestOutOfOrderSample(t *testing.T) {
resp := recorder.Result() resp := recorder.Result()
require.Equal(t, http.StatusBadRequest, resp.StatusCode) require.Equal(t, http.StatusBadRequest, resp.StatusCode)
})
}
} }
// This test case currently aims to verify that the WriteHandler endpoint // This test case currently aims to verify that the WriteHandler endpoint
// don't fail on ingestion errors since the exemplar storage is // don't fail on ingestion errors since the exemplar storage is
// still experimental. // still experimental.
func TestOutOfOrderExemplar(t *testing.T) { func TestOutOfOrderExemplar(t *testing.T) {
tests := []struct {
Name string
Timestamp int64
}{
{
Name: "historic",
Timestamp: 0,
},
{
Name: "future",
Timestamp: math.MaxInt64,
},
}
for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: tc.Timestamp}},
}}, nil, nil, nil, nil) }}, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
@ -132,12 +167,30 @@ func TestOutOfOrderExemplar(t *testing.T) {
resp := recorder.Result() resp := recorder.Result()
// TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental. // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental.
require.Equal(t, http.StatusNoContent, resp.StatusCode) require.Equal(t, http.StatusNoContent, resp.StatusCode)
})
}
} }
func TestOutOfOrderHistogram(t *testing.T) { func TestOutOfOrderHistogram(t *testing.T) {
tests := []struct {
Name string
Timestamp int64
}{
{
Name: "historic",
Timestamp: 0,
},
{
Name: "future",
Timestamp: math.MaxInt64,
},
}
for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))}, Histograms: []prompb.Histogram{HistogramToHistogramProto(tc.Timestamp, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
}}, nil, nil, nil, nil) }}, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
@ -154,6 +207,8 @@ func TestOutOfOrderHistogram(t *testing.T) {
resp := recorder.Result() resp := recorder.Result()
require.Equal(t, http.StatusBadRequest, resp.StatusCode) require.Equal(t, http.StatusBadRequest, resp.StatusCode)
})
}
} }
func BenchmarkRemoteWritehandler(b *testing.B) { func BenchmarkRemoteWritehandler(b *testing.B) {