From 242158e7fcf94e07e5c5a71c3907c755dba0967d Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 24 Jan 2024 13:47:59 +0100 Subject: [PATCH] remote: Added test for classic histogram grouping when sending rw; Fixed queue manager test delay. (#13421) Signed-off-by: bwplotka --- storage/remote/queue_manager.go | 2 +- storage/remote/queue_manager_test.go | 333 +++++++++++++++++++++------ 2 files changed, 268 insertions(+), 67 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 4bc9abb1f9..129d524efe 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -381,7 +381,7 @@ func (m *queueManagerMetrics) unregister() { // external timeseries database. type WriteClient interface { // Store stores the given samples in the remote storage. - Store(context.Context, []byte, int) error + Store(ctx context.Context, req []byte, retryAttempt int) error // Name uniquely identifies the remote storage. Name() string // Endpoint is the remote read or write endpoint for the storage client. diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 06eb665f1a..6ecdc3862b 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -45,6 +45,7 @@ import ( "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/util/runutil" ) const defaultFlushDeadline = 1 * time.Minute @@ -153,7 +154,7 @@ func TestSampleDelivery(t *testing.T) { qm.AppendExemplars(exemplars[:len(exemplars)/2]) qm.AppendHistograms(histograms[:len(histograms)/2]) qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2]) - c.waitForExpectedData(t) + c.waitForExpectedData(t, 30*time.Second) // Send second half of data. c.expectSamples(samples[len(samples)/2:], series) @@ -164,7 +165,182 @@ func TestSampleDelivery(t *testing.T) { qm.AppendExemplars(exemplars[len(exemplars)/2:]) qm.AppendHistograms(histograms[len(histograms)/2:]) qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:]) - c.waitForExpectedData(t) + c.waitForExpectedData(t, 30*time.Second) + }) + } +} + +type perRequestWriteClient struct { + *TestWriteClient + + expectUnorderedRequests bool + + mtx sync.Mutex + + i int + requests []*TestWriteClient + expectedSeries []record.RefSeries + expectedRequestSamples [][]record.RefSample +} + +func newPerRequestWriteClient(expectUnorderedRequests bool) *perRequestWriteClient { + return &perRequestWriteClient{ + expectUnorderedRequests: expectUnorderedRequests, + TestWriteClient: NewTestWriteClient(MinStrings), + } +} + +func (c *perRequestWriteClient) expectRequestSamples(ss []record.RefSample, series []record.RefSeries) { + tc := NewTestWriteClient(MinStrings) + c.requests = append(c.requests, tc) + + c.expectedSeries = series + c.expectedRequestSamples = append(c.expectedRequestSamples, ss) +} + +func (c *perRequestWriteClient) expectedData(t testing.TB) { + t.Helper() + + c.mtx.Lock() + defer c.mtx.Unlock() + + c.TestWriteClient.mtx.Lock() + exp := 0 + for _, ss := range c.expectedRequestSamples { + exp += len(ss) + } + got := deepLen(c.TestWriteClient.receivedSamples) + c.TestWriteClient.mtx.Unlock() + + if got < exp { + t.Errorf("totally expected %v samples, got %v", exp, got) + } + + for i, cl := range c.requests { + cl.waitForExpectedData(t, 0*time.Second) // We already waited. + t.Log("client", i, "checked") + } + if c.i != len(c.requests) { + t.Errorf("expected %v calls, got %v", len(c.requests), c.i) + } +} + +func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int) error { + c.mtx.Lock() + defer c.mtx.Unlock() + defer func() { c.i++ }() + if c.i >= len(c.requests) { + return nil + } + + if err := c.TestWriteClient.Store(ctx, req, r); err != nil { + return err + } + + expReqSampleToUse := 0 + if c.expectUnorderedRequests { + // expectUnorderedRequests tells us that multiple shards were used by queue manager, + // so we can't trust that incoming requests will match order of c.expectedRequestSamples + // slice. However, for successful test case we can assume that first sample value will + // match, so find such expected request if any. + // NOTE: This assumes sample values have unique values in our tests. + for i, es := range c.expectedRequestSamples { + if len(es) == 0 { + continue + } + for _, rs := range c.TestWriteClient.receivedSamples { + if len(rs) == 0 { + continue + } + if es[0].V != rs[0].GetValue() { + break + } + expReqSampleToUse = i + break + } + } + // We tried our best, use normal flow otherwise. + } + c.requests[c.i].expectSamples(c.expectedRequestSamples[expReqSampleToUse], c.expectedSeries) + c.expectedRequestSamples = append(c.expectedRequestSamples[:expReqSampleToUse], c.expectedRequestSamples[expReqSampleToUse+1:]...) + return c.requests[c.i].Store(ctx, req, r) +} + +func testDefaultQueueConfig() config.QueueConfig { + cfg := config.DefaultQueueConfig + // For faster unit tests we don't wait default 5 seconds. + cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) + return cfg +} + +// TestHistogramSampleBatching tests current way of how classic histogram series +// are grouped in queue manager. +// This is a first step of exploring PRW 2.0 self-contained classic histograms. +func TestHistogramSampleBatching(t *testing.T) { + t.Parallel() + + series, samples := createTestClassicHistogram(10) + + for _, tc := range []struct { + name string + queueConfig config.QueueConfig + expRequestSamples [][]record.RefSample + }{ + { + name: "OneShardDefaultBatch", + queueConfig: func() config.QueueConfig { + cfg := testDefaultQueueConfig() + cfg.MaxShards = 1 + cfg.MinShards = 1 + return cfg + }(), + expRequestSamples: [][]record.RefSample{samples}, + }, + { + name: "OneShardLimitedBatch", + queueConfig: func() config.QueueConfig { + cfg := testDefaultQueueConfig() + cfg.MaxShards = 1 + cfg.MinShards = 1 + cfg.MaxSamplesPerSend = 5 + return cfg + }(), + expRequestSamples: [][]record.RefSample{ + samples[:5], samples[5:10], samples[10:], + }, + }, + { + name: "TwoShards", + queueConfig: func() config.QueueConfig { + cfg := testDefaultQueueConfig() + cfg.MaxShards = 2 + cfg.MinShards = 2 + return cfg + }(), + expRequestSamples: [][]record.RefSample{ + {samples[0], samples[2], samples[4], samples[6], samples[8], samples[10]}, + {samples[1], samples[3], samples[5], samples[7], samples[9], samples[11]}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + c := newPerRequestWriteClient(tc.queueConfig.MaxShards > 1) + + for _, s := range tc.expRequestSamples { + c.expectRequestSamples(s, series) + } + + dir := t.TempDir() + mcfg := config.DefaultMetadataConfig + + metrics := newQueueManagerMetrics(nil, "", "") + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), tc.queueConfig, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, MinStrings) + m.StoreSeries(series, 0) + + m.Start() + m.Append(samples) + m.Stop() + c.expectedData(t) }) } } @@ -174,7 +350,7 @@ func TestMetadataDelivery(t *testing.T) { dir := t.TempDir() - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") @@ -211,10 +387,9 @@ func TestSampleDeliveryTimeout(t *testing.T) { samples, series := createTimeseries(n, n) c := NewTestWriteClient(rwFormat) - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 - cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) dir := t.TempDir() @@ -227,11 +402,11 @@ func TestSampleDeliveryTimeout(t *testing.T) { // Send the samples twice, waiting for the samples in the meantime. c.expectSamples(samples, series) m.Append(samples) - c.waitForExpectedData(t) + c.waitForExpectedData(t, 30*time.Second) c.expectSamples(samples, series) m.Append(samples) - c.waitForExpectedData(t) + c.waitForExpectedData(t, 30*time.Second) }) } } @@ -261,7 +436,7 @@ func TestSampleDeliveryOrder(t *testing.T) { dir := t.TempDir() - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") @@ -272,7 +447,7 @@ func TestSampleDeliveryOrder(t *testing.T) { defer m.Stop() // These should be received by the client. m.Append(samples) - c.waitForExpectedData(t) + c.waitForExpectedData(t, 30*time.Second) }) } } @@ -283,7 +458,7 @@ func TestShutdown(t *testing.T) { dir := t.TempDir() - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") @@ -322,7 +497,7 @@ func TestSeriesReset(t *testing.T) { dir := t.TempDir() - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) @@ -349,7 +524,7 @@ func TestReshard(t *testing.T) { c := NewTestWriteClient(rwFormat) c.expectSamples(samples, series) - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 @@ -376,7 +551,7 @@ func TestReshard(t *testing.T) { time.Sleep(100 * time.Millisecond) } - c.waitForExpectedData(t) + c.waitForExpectedData(t, 30*time.Second) }) } } @@ -390,7 +565,7 @@ func TestReshardRaceWithStop(t *testing.T) { h.Lock() - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig exitCh := make(chan struct{}) go func() { @@ -427,7 +602,7 @@ func TestReshardPartialBatch(t *testing.T) { c := NewTestBlockedWriteClient() - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 batchSendDeadline := time.Millisecond @@ -473,7 +648,7 @@ func TestQueueFilledDeadlock(t *testing.T) { c := NewNopWriteClient() - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 cfg.MaxSamplesPerSend = 10 @@ -511,7 +686,7 @@ func TestQueueFilledDeadlock(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) { for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient(rwFormat) @@ -559,7 +734,7 @@ func TestShouldReshard(t *testing.T) { }, } - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig for _, c := range cases { metrics := newQueueManagerMetrics(nil, "", "") @@ -681,6 +856,41 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record. return histograms, nil, series } +func createTestClassicHistogram(buckets int) ([]record.RefSeries, []record.RefSample) { + samples := make([]record.RefSample, buckets+2) + series := make([]record.RefSeries, buckets+2) + + for i := range samples { + samples[i] = record.RefSample{ + Ref: chunks.HeadSeriesRef(i), T: int64(i), V: float64(i), + } + } + + for i := 0; i < buckets; i++ { + le := fmt.Sprintf("%v", i) + if i == 0 { + le = "+Inf" + } + series[i] = record.RefSeries{ + Ref: chunks.HeadSeriesRef(i), + Labels: labels.FromStrings( + "__name__", "http_request_duration_seconds_bucket", + "le", le, + ), + } + } + + series[buckets] = record.RefSeries{ + Ref: chunks.HeadSeriesRef(buckets), + Labels: labels.FromStrings("__name__", "http_request_duration_seconds_sum"), + } + series[buckets+1] = record.RefSeries{ + Ref: chunks.HeadSeriesRef(buckets + 1), + Labels: labels.FromStrings("__name__", "http_request_duration_seconds_count"), + } + return series, samples +} + func getSeriesNameFromRef(r record.RefSeries) string { return r.Labels.Get("__name__") } @@ -696,8 +906,6 @@ type TestWriteClient struct { expectedFloatHistograms map[string][]prompb.Histogram receivedMetadata map[string][]prompb.MetricMetadata writesReceived int - withWaitGroup bool - wg sync.WaitGroup mtx sync.Mutex buf []byte rwFormat RemoteWriteFormat @@ -705,7 +913,6 @@ type TestWriteClient struct { func NewTestWriteClient(rwFormat RemoteWriteFormat) *TestWriteClient { return &TestWriteClient{ - withWaitGroup: true, receivedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{}, receivedMetadata: map[string][]prompb.MetricMetadata{}, @@ -714,9 +921,6 @@ func NewTestWriteClient(rwFormat RemoteWriteFormat) *TestWriteClient { } func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { - if !c.withWaitGroup { - return - } c.mtx.Lock() defer c.mtx.Unlock() @@ -730,13 +934,9 @@ func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.R Value: s.V, }) } - c.wg.Add(len(ss)) } func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) { - if !c.withWaitGroup { - return - } c.mtx.Lock() defer c.mtx.Unlock() @@ -752,13 +952,9 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco } c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e) } - c.wg.Add(len(ss)) } func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, series []record.RefSeries) { - if !c.withWaitGroup { - return - } c.mtx.Lock() defer c.mtx.Unlock() @@ -769,13 +965,9 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie seriesName := getSeriesNameFromRef(series[h.Ref]) c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H)) } - c.wg.Add(len(hh)) } func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSample, series []record.RefSeries) { - if !c.withWaitGroup { - return - } c.mtx.Lock() defer c.mtx.Unlock() @@ -786,14 +978,36 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa seriesName := getSeriesNameFromRef(series[fh.Ref]) c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH)) } - c.wg.Add(len(fhs)) } -func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { - if !c.withWaitGroup { - return +func deepLen[M any](ms ...map[string][]M) int { + l := 0 + for _, m := range ms { + for _, v := range m { + l += len(v) + } + } + return l +} + +func (c *TestWriteClient) waitForExpectedData(tb testing.TB, timeout time.Duration) { + tb.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if err := runutil.Retry(500*time.Millisecond, ctx.Done(), func() error { + c.mtx.Lock() + exp := deepLen(c.expectedSamples) + deepLen(c.expectedExemplars) + deepLen(c.expectedHistograms, c.expectedFloatHistograms) + got := deepLen(c.receivedSamples) + deepLen(c.receivedExemplars) + deepLen(c.receivedHistograms, c.receivedFloatHistograms) + c.mtx.Unlock() + + if got < exp { + return fmt.Errorf("expected %v samples/exemplars/histograms/floathistograms, got %v", exp, got) + } + return nil + }); err != nil { + tb.Error(err) } - c.wg.Wait() c.mtx.Lock() defer c.mtx.Unlock() @@ -839,44 +1053,31 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { } if err != nil { - fmt.Println("error: ", err) return err } - count := 0 for _, ts := range reqProto.Timeseries { ls := labelProtosToLabels(ts.Labels) seriesName := ls.Get("__name__") - for _, sample := range ts.Samples { - count++ - c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) + if len(ts.Samples) > 0 { + c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], ts.Samples...) } - - for _, ex := range ts.Exemplars { - count++ - c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex) + if len(ts.Exemplars) > 0 { + c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ts.Exemplars...) } - - for _, hist := range ts.Histograms { - count++ - if hist.IsFloatHistogram() { - c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], hist) + for _, h := range ts.Histograms { + if h.IsFloatHistogram() { + c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], h) } else { - c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], hist) + c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], h) } - } } - if c.withWaitGroup { - c.wg.Add(-count) - } - for _, m := range reqProto.Metadata { c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m) } c.writesReceived++ - return nil } @@ -953,7 +1154,7 @@ func BenchmarkSampleSend(b *testing.B) { c := NewNopWriteClient() - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) cfg.MinShards = 20 @@ -1002,7 +1203,7 @@ func BenchmarkStartup(b *testing.B) { logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)) logger = log.With(logger, "caller", log.DefaultCaller) - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig for n := 0; n < b.N; n++ { metrics := newQueueManagerMetrics(nil, "", "") @@ -1086,7 +1287,7 @@ func TestProcessExternalLabels(t *testing.T) { func TestCalculateDesiredShards(t *testing.T) { c := NewNopWriteClient() - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig dir := t.TempDir() @@ -1164,7 +1365,7 @@ func TestCalculateDesiredShards(t *testing.T) { func TestCalculateDesiredShardsDetail(t *testing.T) { c := NewTestWriteClient(Base1) - cfg := config.DefaultQueueConfig + cfg := testDefaultQueueConfig() mcfg := config.DefaultMetadataConfig dir := t.TempDir()