Fix data corruption in remote write if max_sample_age is applied (#14078)

* fix: try to reproduce the bug from https://github.com/prometheus/prometheus/issues/13979 in a test case

Signed-off-by: David Vavra <sevenood@gmail.com>

* fix: data corruption in remote write if max_sample_age is applied

Signed-off-by: David Vavra <sevenood@gmail.com>

* add benchmark for buildTimeSeries which does the filtering

Signed-off-by: Callum Styan <callumstyan@gmail.com>

---------

Signed-off-by: David Vavra <sevenood@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: David Vavra <sevenood@gmail.com>
Co-authored-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Martin Chodur 2024-06-21 23:19:58 +02:00 committed by GitHub
parent d78253319d
commit 00b110c65c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 176 additions and 18 deletions

View file

@ -1783,9 +1783,11 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}
// Move the current element to the write position and increment the write pointer
timeSeries[keepIdx] = timeSeries[i]
if i != keepIdx {
// We have to swap the kept timeseries with the one which should be dropped.
// Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries).
timeSeries[keepIdx], timeSeries[i] = timeSeries[i], timeSeries[keepIdx]
}
keepIdx++
}

View file

@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math"
"math/rand"
"os"
"runtime/pprof"
"sort"
@ -29,6 +30,7 @@ import (
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
@ -611,6 +613,30 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([
return samples, series
}
func createProtoTimeseriesWithOld(numSamples, baseTs int64, extraLabels ...labels.Label) []prompb.TimeSeries {
samples := make([]prompb.TimeSeries, numSamples)
// use a fixed rand source so tests are consistent
r := rand.New(rand.NewSource(99))
for j := int64(0); j < numSamples; j++ {
name := fmt.Sprintf("test_metric_%d", j)
samples[j] = prompb.TimeSeries{
Labels: []prompb.Label{{Name: "__name__", Value: name}},
Samples: []prompb.Sample{
{
Timestamp: baseTs + j,
Value: float64(j),
},
},
}
// 10% of the time use a ts that is too old
if r.Intn(10) == 0 {
samples[j].Samples[0].Timestamp = baseTs - 5
}
}
return samples
}
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
exemplars := make([]record.RefExemplar, 0, numExemplars)
series := make([]record.RefSeries, 0, numSeries)
@ -679,8 +705,8 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.
return histograms, nil, series
}
func getSeriesNameFromRef(r record.RefSeries) string {
return r.Labels.Get("__name__")
func getSeriesIDFromRef(r record.RefSeries) string {
return r.Labels.String()
}
type TestWriteClient struct {
@ -698,6 +724,9 @@ type TestWriteClient struct {
wg sync.WaitGroup
mtx sync.Mutex
buf []byte
storeWait time.Duration
returnError error
}
func NewTestWriteClient() *TestWriteClient {
@ -706,6 +735,8 @@ func NewTestWriteClient() *TestWriteClient {
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{},
storeWait: 0,
returnError: nil,
}
}
@ -720,12 +751,15 @@ func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.R
c.receivedSamples = map[string][]prompb.Sample{}
for _, s := range ss {
seriesName := getSeriesNameFromRef(series[s.Ref])
c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{
tsID := getSeriesIDFromRef(series[s.Ref])
c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
Timestamp: s.T,
Value: s.V,
})
}
if !c.withWaitGroup {
return
}
c.wg.Add(len(ss))
}
@ -740,13 +774,13 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
c.receivedExemplars = map[string][]prompb.Exemplar{}
for _, s := range ss {
seriesName := getSeriesNameFromRef(series[s.Ref])
tsID := getSeriesIDFromRef(series[s.Ref])
e := prompb.Exemplar{
Labels: LabelsToLabelsProto(s.Labels, nil),
Timestamp: s.T,
Value: s.V,
}
c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e)
c.expectedExemplars[tsID] = append(c.expectedExemplars[tsID], e)
}
c.wg.Add(len(ss))
}
@ -762,8 +796,8 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie
c.receivedHistograms = map[string][]prompb.Histogram{}
for _, h := range hh {
seriesName := getSeriesNameFromRef(series[h.Ref])
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H))
tsID := getSeriesIDFromRef(series[h.Ref])
c.expectedHistograms[tsID] = append(c.expectedHistograms[tsID], HistogramToHistogramProto(h.T, h.H))
}
c.wg.Add(len(hh))
}
@ -779,8 +813,8 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa
c.receivedFloatHistograms = map[string][]prompb.Histogram{}
for _, fh := range fhs {
seriesName := getSeriesNameFromRef(series[fh.Ref])
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH))
tsID := getSeriesIDFromRef(series[fh.Ref])
c.expectedFloatHistograms[tsID] = append(c.expectedFloatHistograms[tsID], FloatHistogramToHistogramProto(fh.T, fh.FH))
}
c.wg.Add(len(fhs))
}
@ -806,9 +840,27 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
}
}
func (c *TestWriteClient) SetStoreWait(w time.Duration) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.storeWait = w
}
func (c *TestWriteClient) SetReturnError(err error) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.returnError = err
}
func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
c.mtx.Lock()
defer c.mtx.Unlock()
if c.storeWait > 0 {
time.Sleep(c.storeWait)
}
if c.returnError != nil {
return c.returnError
}
// nil buffers are ok for snappy, ignore cast error.
if c.buf != nil {
c.buf = c.buf[:cap(c.buf)]
@ -827,23 +879,23 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
count := 0
for _, ts := range reqProto.Timeseries {
labels := LabelProtosToLabels(&builder, ts.Labels)
seriesName := labels.Get("__name__")
tsID := labels.String()
for _, sample := range ts.Samples {
count++
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
c.receivedSamples[tsID] = append(c.receivedSamples[tsID], sample)
}
for _, ex := range ts.Exemplars {
count++
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex)
c.receivedExemplars[tsID] = append(c.receivedExemplars[tsID], ex)
}
for _, histogram := range ts.Histograms {
count++
if histogram.IsFloatHistogram() {
c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], histogram)
c.receivedFloatHistograms[tsID] = append(c.receivedFloatHistograms[tsID], histogram)
} else {
c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram)
c.receivedHistograms[tsID] = append(c.receivedHistograms[tsID], histogram)
}
}
}
@ -1441,6 +1493,99 @@ func TestIsSampleOld(t *testing.T) {
require.False(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second))))
}
// Simulates scenario in which remote write endpoint is down and a subset of samples is dropped due to age limit while backoffing.
func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
maxSamplesPerSend := 10
sampleAgeLimit := time.Second
cfg := config.DefaultQueueConfig
cfg.MaxShards = 1
cfg.SampleAgeLimit = model.Duration(sampleAgeLimit)
// Set the batch send deadline to 5 minutes to effectively disable it.
cfg.BatchSendDeadline = model.Duration(time.Minute * 5)
cfg.Capacity = 10 * maxSamplesPerSend // more than the amount of data we append in the test
cfg.MaxBackoff = model.Duration(time.Millisecond * 100)
cfg.MinBackoff = model.Duration(time.Millisecond * 100)
cfg.MaxSamplesPerSend = maxSamplesPerSend
metadataCfg := config.DefaultMetadataConfig
metadataCfg.Send = true
metadataCfg.SendInterval = model.Duration(time.Second * 60)
metadataCfg.MaxSamplesPerSend = maxSamplesPerSend
c := NewTestWriteClient()
c.withWaitGroup = false
m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c)
m.Start()
batchID := 0
expectedSamples := map[string][]prompb.Sample{}
appendData := func(numberOfSeries int, timeAdd time.Duration, shouldBeDropped bool) {
t.Log(">>>> Appending series ", numberOfSeries, " as batch ID ", batchID, " with timeAdd ", timeAdd, " and should be dropped ", shouldBeDropped)
samples, series := createTimeseriesWithRandomLabelCount(strconv.Itoa(batchID), numberOfSeries, timeAdd, 9)
m.StoreSeries(series, batchID)
sent := m.Append(samples)
require.True(t, sent, "samples not sent")
if !shouldBeDropped {
for _, s := range samples {
tsID := getSeriesIDFromRef(series[s.Ref])
expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
Timestamp: s.T,
Value: s.V,
})
}
}
batchID++
}
timeShift := -time.Millisecond * 5
c.SetReturnError(RecoverableError{context.DeadlineExceeded, defaultBackoff})
appendData(maxSamplesPerSend/2, timeShift, true)
time.Sleep(sampleAgeLimit)
appendData(maxSamplesPerSend/2, timeShift, true)
time.Sleep(sampleAgeLimit / 10)
appendData(maxSamplesPerSend/2, timeShift, true)
time.Sleep(2 * sampleAgeLimit)
appendData(2*maxSamplesPerSend, timeShift, false)
time.Sleep(sampleAgeLimit / 2)
c.SetReturnError(nil)
appendData(5, timeShift, false)
m.Stop()
if diff := cmp.Diff(expectedSamples, c.receivedSamples); diff != "" {
t.Errorf("mismatch (-want +got):\n%s", diff)
}
}
func createTimeseriesWithRandomLabelCount(id string, seriesCount int, timeAdd time.Duration, maxLabels int) ([]record.RefSample, []record.RefSeries) {
samples := []record.RefSample{}
series := []record.RefSeries{}
// use a fixed rand source so tests are consistent
r := rand.New(rand.NewSource(99))
for i := 0; i < seriesCount; i++ {
s := record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: time.Now().Add(timeAdd).UnixMilli(),
V: r.Float64(),
}
samples = append(samples, s)
labelsCount := r.Intn(maxLabels)
lb := labels.NewScratchBuilder(1 + labelsCount)
lb.Add("__name__", "batch_"+id+"_id_"+strconv.Itoa(i))
for j := 1; j < labelsCount+1; j++ {
// same for both name and value
label := "batch_" + id + "_label_" + strconv.Itoa(j)
lb.Add(label, label)
}
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: lb.Labels(),
})
}
return samples, series
}
func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) {
newSamples := make([]record.RefSample, 0, numSamples)
samples := make([]record.RefSample, 0, numSamples)
@ -1668,3 +1813,14 @@ func TestBuildTimeSeries(t *testing.T) {
})
}
}
func BenchmarkBuildTimeSeries(b *testing.B) {
// Send one sample per series, which is the typical remote_write case
const numSamples = 10000
filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) }
for i := 0; i < b.N; i++ {
samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...)
_, _, result, _, _, _ := buildTimeSeries(samples, filter)
require.NotNil(b, result)
}
}