diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 488485e385..dde78d35e5 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1783,9 +1783,11 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest { lowest = ts.Histograms[0].Timestamp } - - // Move the current element to the write position and increment the write pointer - timeSeries[keepIdx] = timeSeries[i] + if i != keepIdx { + // We have to swap the kept timeseries with the one which should be dropped. + // Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries). + timeSeries[keepIdx], timeSeries[i] = timeSeries[i], timeSeries[keepIdx] + } keepIdx++ } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 06783167fb..4d299994bd 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math" + "math/rand" "os" "runtime/pprof" "sort" @@ -29,6 +30,7 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/google/go-cmp/cmp" "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" @@ -611,6 +613,30 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([ return samples, series } +func createProtoTimeseriesWithOld(numSamples, baseTs int64, extraLabels ...labels.Label) []prompb.TimeSeries { + samples := make([]prompb.TimeSeries, numSamples) + // use a fixed rand source so tests are consistent + r := rand.New(rand.NewSource(99)) + for j := int64(0); j < numSamples; j++ { + name := fmt.Sprintf("test_metric_%d", j) + + samples[j] = prompb.TimeSeries{ + Labels: []prompb.Label{{Name: "__name__", Value: name}}, + Samples: []prompb.Sample{ + { + Timestamp: baseTs + j, + Value: float64(j), + }, + }, + } + // 10% of the time use a ts that is too old + if r.Intn(10) == 0 { + samples[j].Samples[0].Timestamp = baseTs - 5 + } + } + return samples +} + func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { exemplars := make([]record.RefExemplar, 0, numExemplars) series := make([]record.RefSeries, 0, numSeries) @@ -679,8 +705,8 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record. return histograms, nil, series } -func getSeriesNameFromRef(r record.RefSeries) string { - return r.Labels.Get("__name__") +func getSeriesIDFromRef(r record.RefSeries) string { + return r.Labels.String() } type TestWriteClient struct { @@ -698,6 +724,9 @@ type TestWriteClient struct { wg sync.WaitGroup mtx sync.Mutex buf []byte + + storeWait time.Duration + returnError error } func NewTestWriteClient() *TestWriteClient { @@ -706,6 +735,8 @@ func NewTestWriteClient() *TestWriteClient { receivedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{}, receivedMetadata: map[string][]prompb.MetricMetadata{}, + storeWait: 0, + returnError: nil, } } @@ -720,12 +751,15 @@ func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.R c.receivedSamples = map[string][]prompb.Sample{} for _, s := range ss { - seriesName := getSeriesNameFromRef(series[s.Ref]) - c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{ + tsID := getSeriesIDFromRef(series[s.Ref]) + c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{ Timestamp: s.T, Value: s.V, }) } + if !c.withWaitGroup { + return + } c.wg.Add(len(ss)) } @@ -740,13 +774,13 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco c.receivedExemplars = map[string][]prompb.Exemplar{} for _, s := range ss { - seriesName := getSeriesNameFromRef(series[s.Ref]) + tsID := getSeriesIDFromRef(series[s.Ref]) e := prompb.Exemplar{ Labels: LabelsToLabelsProto(s.Labels, nil), Timestamp: s.T, Value: s.V, } - c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e) + c.expectedExemplars[tsID] = append(c.expectedExemplars[tsID], e) } c.wg.Add(len(ss)) } @@ -762,8 +796,8 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie c.receivedHistograms = map[string][]prompb.Histogram{} for _, h := range hh { - seriesName := getSeriesNameFromRef(series[h.Ref]) - c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H)) + tsID := getSeriesIDFromRef(series[h.Ref]) + c.expectedHistograms[tsID] = append(c.expectedHistograms[tsID], HistogramToHistogramProto(h.T, h.H)) } c.wg.Add(len(hh)) } @@ -779,8 +813,8 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa c.receivedFloatHistograms = map[string][]prompb.Histogram{} for _, fh := range fhs { - seriesName := getSeriesNameFromRef(series[fh.Ref]) - c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH)) + tsID := getSeriesIDFromRef(series[fh.Ref]) + c.expectedFloatHistograms[tsID] = append(c.expectedFloatHistograms[tsID], FloatHistogramToHistogramProto(fh.T, fh.FH)) } c.wg.Add(len(fhs)) } @@ -806,9 +840,27 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { } } +func (c *TestWriteClient) SetStoreWait(w time.Duration) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.storeWait = w +} + +func (c *TestWriteClient) SetReturnError(err error) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.returnError = err +} + func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { c.mtx.Lock() defer c.mtx.Unlock() + if c.storeWait > 0 { + time.Sleep(c.storeWait) + } + if c.returnError != nil { + return c.returnError + } // nil buffers are ok for snappy, ignore cast error. if c.buf != nil { c.buf = c.buf[:cap(c.buf)] @@ -827,23 +879,23 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { count := 0 for _, ts := range reqProto.Timeseries { labels := LabelProtosToLabels(&builder, ts.Labels) - seriesName := labels.Get("__name__") + tsID := labels.String() for _, sample := range ts.Samples { count++ - c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) + c.receivedSamples[tsID] = append(c.receivedSamples[tsID], sample) } for _, ex := range ts.Exemplars { count++ - c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex) + c.receivedExemplars[tsID] = append(c.receivedExemplars[tsID], ex) } for _, histogram := range ts.Histograms { count++ if histogram.IsFloatHistogram() { - c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], histogram) + c.receivedFloatHistograms[tsID] = append(c.receivedFloatHistograms[tsID], histogram) } else { - c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram) + c.receivedHistograms[tsID] = append(c.receivedHistograms[tsID], histogram) } } } @@ -1441,6 +1493,99 @@ func TestIsSampleOld(t *testing.T) { require.False(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second)))) } +// Simulates scenario in which remote write endpoint is down and a subset of samples is dropped due to age limit while backoffing. +func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) { + maxSamplesPerSend := 10 + sampleAgeLimit := time.Second + + cfg := config.DefaultQueueConfig + cfg.MaxShards = 1 + cfg.SampleAgeLimit = model.Duration(sampleAgeLimit) + // Set the batch send deadline to 5 minutes to effectively disable it. + cfg.BatchSendDeadline = model.Duration(time.Minute * 5) + cfg.Capacity = 10 * maxSamplesPerSend // more than the amount of data we append in the test + cfg.MaxBackoff = model.Duration(time.Millisecond * 100) + cfg.MinBackoff = model.Duration(time.Millisecond * 100) + cfg.MaxSamplesPerSend = maxSamplesPerSend + metadataCfg := config.DefaultMetadataConfig + metadataCfg.Send = true + metadataCfg.SendInterval = model.Duration(time.Second * 60) + metadataCfg.MaxSamplesPerSend = maxSamplesPerSend + c := NewTestWriteClient() + c.withWaitGroup = false + m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c) + + m.Start() + + batchID := 0 + expectedSamples := map[string][]prompb.Sample{} + + appendData := func(numberOfSeries int, timeAdd time.Duration, shouldBeDropped bool) { + t.Log(">>>> Appending series ", numberOfSeries, " as batch ID ", batchID, " with timeAdd ", timeAdd, " and should be dropped ", shouldBeDropped) + samples, series := createTimeseriesWithRandomLabelCount(strconv.Itoa(batchID), numberOfSeries, timeAdd, 9) + m.StoreSeries(series, batchID) + sent := m.Append(samples) + require.True(t, sent, "samples not sent") + if !shouldBeDropped { + for _, s := range samples { + tsID := getSeriesIDFromRef(series[s.Ref]) + expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{ + Timestamp: s.T, + Value: s.V, + }) + } + } + batchID++ + } + timeShift := -time.Millisecond * 5 + + c.SetReturnError(RecoverableError{context.DeadlineExceeded, defaultBackoff}) + + appendData(maxSamplesPerSend/2, timeShift, true) + time.Sleep(sampleAgeLimit) + appendData(maxSamplesPerSend/2, timeShift, true) + time.Sleep(sampleAgeLimit / 10) + appendData(maxSamplesPerSend/2, timeShift, true) + time.Sleep(2 * sampleAgeLimit) + appendData(2*maxSamplesPerSend, timeShift, false) + time.Sleep(sampleAgeLimit / 2) + c.SetReturnError(nil) + appendData(5, timeShift, false) + m.Stop() + + if diff := cmp.Diff(expectedSamples, c.receivedSamples); diff != "" { + t.Errorf("mismatch (-want +got):\n%s", diff) + } +} + +func createTimeseriesWithRandomLabelCount(id string, seriesCount int, timeAdd time.Duration, maxLabels int) ([]record.RefSample, []record.RefSeries) { + samples := []record.RefSample{} + series := []record.RefSeries{} + // use a fixed rand source so tests are consistent + r := rand.New(rand.NewSource(99)) + for i := 0; i < seriesCount; i++ { + s := record.RefSample{ + Ref: chunks.HeadSeriesRef(i), + T: time.Now().Add(timeAdd).UnixMilli(), + V: r.Float64(), + } + samples = append(samples, s) + labelsCount := r.Intn(maxLabels) + lb := labels.NewScratchBuilder(1 + labelsCount) + lb.Add("__name__", "batch_"+id+"_id_"+strconv.Itoa(i)) + for j := 1; j < labelsCount+1; j++ { + // same for both name and value + label := "batch_" + id + "_label_" + strconv.Itoa(j) + lb.Add(label, label) + } + series = append(series, record.RefSeries{ + Ref: chunks.HeadSeriesRef(i), + Labels: lb.Labels(), + }) + } + return samples, series +} + func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) { newSamples := make([]record.RefSample, 0, numSamples) samples := make([]record.RefSample, 0, numSamples) @@ -1668,3 +1813,14 @@ func TestBuildTimeSeries(t *testing.T) { }) } } + +func BenchmarkBuildTimeSeries(b *testing.B) { + // Send one sample per series, which is the typical remote_write case + const numSamples = 10000 + filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) } + for i := 0; i < b.N; i++ { + samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...) + _, _, result, _, _, _ := buildTimeSeries(samples, filter) + require.NotNil(b, result) + } +}