From c5c6660e6bffcdccba9ed268d5e3c5bf6a4dc99d Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 18 Apr 2024 09:06:45 +0100 Subject: [PATCH] Fixes after main sync. Signed-off-by: bwplotka --- .yamllint | 1 + go.mod | 2 +- prompb/write/v2/custom.go | 2 +- .../local_grafana/dashboards/dashboards.yaml | 2 +- storage/remote/client.go | 10 +- storage/remote/codec.go | 3 +- storage/remote/queue_manager_test.go | 203 ------------------ storage/remote/write_handler.go | 1 - web/api/v1/api_test.go | 2 +- 9 files changed, 11 insertions(+), 215 deletions(-) diff --git a/.yamllint b/.yamllint index 1859cb624b..14faaedee7 100644 --- a/.yamllint +++ b/.yamllint @@ -2,6 +2,7 @@ extends: default ignore: | ui/react-app/node_modules + scripts/remotewrite11-bench rules: braces: diff --git a/go.mod b/go.mod index 5fca750a9e..8e9824b49d 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,6 @@ require ( go.uber.org/automaxprocs v1.5.3 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 - golang.org/x/exp v0.0.0-20240119083558-1b970713d09a golang.org/x/net v0.22.0 golang.org/x/oauth2 v0.18.0 golang.org/x/sync v0.6.0 @@ -186,6 +185,7 @@ require ( go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect golang.org/x/crypto v0.21.0 // indirect + golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect golang.org/x/mod v0.16.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/prompb/write/v2/custom.go b/prompb/write/v2/custom.go index a961a188ca..987bfd8d04 100644 --- a/prompb/write/v2/custom.go +++ b/prompb/write/v2/custom.go @@ -14,7 +14,7 @@ package writev2 import ( - "golang.org/x/exp/slices" + "slices" ) func (m Sample) T() int64 { return m.Timestamp } diff --git a/scripts/remotewrite11-bench/local_grafana/dashboards/dashboards.yaml b/scripts/remotewrite11-bench/local_grafana/dashboards/dashboards.yaml index c19eef2f22..71347d0fe9 100644 --- a/scripts/remotewrite11-bench/local_grafana/dashboards/dashboards.yaml +++ b/scripts/remotewrite11-bench/local_grafana/dashboards/dashboards.yaml @@ -21,4 +21,4 @@ providers: # path to dashboard files on disk. Required when using the 'file' type path: /etc/grafana/provisioning/dashboards # use folder names from filesystem to create folders in Grafana - foldersFromFilesStructure: true \ No newline at end of file + foldersFromFilesStructure: true diff --git a/storage/remote/client.go b/storage/remote/client.go index 9eaa61e767..3acdd60089 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -230,7 +230,7 @@ type RecoverableError struct { func (c *Client) probeRemoteVersions(ctx context.Context) error { // We assume we are in Version2 mode otherwise we shouldn't be calling this. - httpReq, err := http.NewRequest("HEAD", c.urlString, nil) + httpReq, err := http.NewRequest(http.MethodHead, c.urlString, nil) if err != nil { // Errors from NewRequest are from unparsable URLs, so are not // recoverable. @@ -259,8 +259,8 @@ func (c *Client) probeRemoteVersions(ctx context.Context) error { } // Check for an error. - if httpResp.StatusCode != 200 { - if httpResp.StatusCode == 405 { + if httpResp.StatusCode != http.StatusOK { + if httpResp.StatusCode == http.StatusMethodNotAllowed { // If we get a 405 (MethodNotAllowed) error then it means the endpoint doesn't // understand Remote Write 2.0, so we allow the lastRWHeader to be overwritten // even if it is blank. @@ -331,11 +331,11 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int, rwFormat co line = scanner.Text() } switch httpResp.StatusCode { - case 400: + case http.StatusBadRequest: // Return an unrecoverable error to indicate the 400. // This then gets passed up the chain so we can react to it properly. return &ErrRenegotiate{line, httpResp.StatusCode} - case 406: + case http.StatusNotAcceptable: // Return an unrecoverable error to indicate the 406. // This then gets passed up the chain so we can react to it properly. return &ErrRenegotiate{line, httpResp.StatusCode} diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 8e936c2087..dff820252c 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -926,7 +926,7 @@ func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []prompb.Label) la // labelProtosV2ToLabels transforms v2 proto labels references, which are uint32 values, into labels via // indexing into the symbols slice. func labelProtosV2ToLabels(labelRefs []uint32, symbols []string) labels.Labels { - b := labels.ScratchBuilder{} + b := labels.NewScratchBuilder(len(labelRefs)) for i := 0; i < len(labelRefs); i += 2 { b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]]) } @@ -1145,7 +1145,6 @@ func MinimizedWriteRequestToWriteRequest(redReq *writev2.WriteRequest) (*prompb. req.Timeseries[i].Histograms[j].ResetHint = prompb.Histogram_ResetHint(h.ResetHint) req.Timeseries[i].Histograms[j].Timestamp = h.Timestamp } - } return req, nil } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index f7d5686abf..34f8662e15 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -316,102 +316,6 @@ func TestSampleDelivery(t *testing.T) { } } -type perRequestWriteClient struct { - *TestWriteClient - - expectUnorderedRequests bool - - mtx sync.Mutex - - i int - requests []*TestWriteClient - expectedSeries []record.RefSeries - expectedRequestSamples [][]record.RefSample -} - -func newPerRequestWriteClient(expectUnorderedRequests bool) *perRequestWriteClient { - return &perRequestWriteClient{ - expectUnorderedRequests: expectUnorderedRequests, - TestWriteClient: NewTestWriteClient(Version2), - } -} - -func (c *perRequestWriteClient) expectRequestSamples(ss []record.RefSample, series []record.RefSeries) { - tc := NewTestWriteClient(Version2) - c.requests = append(c.requests, tc) - - c.expectedSeries = series - c.expectedRequestSamples = append(c.expectedRequestSamples, ss) -} - -func (c *perRequestWriteClient) expectedData(t testing.TB) { - t.Helper() - - c.mtx.Lock() - defer c.mtx.Unlock() - - c.TestWriteClient.mtx.Lock() - exp := 0 - for _, ss := range c.expectedRequestSamples { - exp += len(ss) - } - got := deepLen(c.TestWriteClient.receivedSamples) - c.TestWriteClient.mtx.Unlock() - - if got < exp { - t.Errorf("totally expected %v samples, got %v", exp, got) - } - - for i, cl := range c.requests { - cl.waitForExpectedData(t, 0*time.Second) // We already waited. - t.Log("client", i, "checked") - } - if c.i != len(c.requests) { - t.Errorf("expected %v calls, got %v", len(c.requests), c.i) - } -} - -func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int, rwFormat config.RemoteWriteFormat, compression string) error { - c.mtx.Lock() - defer c.mtx.Unlock() - defer func() { c.i++ }() - if c.i >= len(c.requests) { - return nil - } - - if err := c.TestWriteClient.Store(ctx, req, r, rwFormat, compression); err != nil { - return err - } - - expReqSampleToUse := 0 - if c.expectUnorderedRequests { - // expectUnorderedRequests tells us that multiple shards were used by queue manager, - // so we can't trust that incoming requests will match order of c.expectedRequestSamples - // slice. However, for successful test case we can assume that first sample value will - // match, so find such expected request if any. - // NOTE: This assumes sample values have unique values in our tests. - for i, es := range c.expectedRequestSamples { - if len(es) == 0 { - continue - } - for _, rs := range c.TestWriteClient.receivedSamples { - if len(rs) == 0 { - continue - } - if es[0].V != rs[0].GetValue() { - break - } - expReqSampleToUse = i - break - } - } - // We tried our best, use normal flow otherwise. - } - c.requests[c.i].expectSamples(c.expectedRequestSamples[expReqSampleToUse], c.expectedSeries) - c.expectedRequestSamples = append(c.expectedRequestSamples[:expReqSampleToUse], c.expectedRequestSamples[expReqSampleToUse+1:]...) - return c.requests[c.i].Store(ctx, req, r, rwFormat, compression) -} - func testDefaultQueueConfig() config.QueueConfig { cfg := config.DefaultQueueConfig // For faster unit tests we don't wait default 5 seconds. @@ -419,78 +323,6 @@ func testDefaultQueueConfig() config.QueueConfig { return cfg } -// TestHistogramSampleBatching tests current way of how classic histogram series -// are grouped in queue manager. -// This is a first step of exploring PRW 2.0 self-contained classic histograms. -func TestHistogramSampleBatching(t *testing.T) { - t.Parallel() - - series, samples := createTestClassicHistogram(10) - - for _, tc := range []struct { - name string - queueConfig config.QueueConfig - expRequestSamples [][]record.RefSample - }{ - { - name: "OneShardDefaultBatch", - queueConfig: func() config.QueueConfig { - cfg := testDefaultQueueConfig() - cfg.MaxShards = 1 - cfg.MinShards = 1 - return cfg - }(), - expRequestSamples: [][]record.RefSample{samples}, - }, - { - name: "OneShardLimitedBatch", - queueConfig: func() config.QueueConfig { - cfg := testDefaultQueueConfig() - cfg.MaxShards = 1 - cfg.MinShards = 1 - cfg.MaxSamplesPerSend = 5 - return cfg - }(), - expRequestSamples: [][]record.RefSample{ - samples[:5], samples[5:10], samples[10:], - }, - }, - { - name: "TwoShards", - queueConfig: func() config.QueueConfig { - cfg := testDefaultQueueConfig() - cfg.MaxShards = 2 - cfg.MinShards = 2 - return cfg - }(), - expRequestSamples: [][]record.RefSample{ - {samples[0], samples[2], samples[4], samples[6], samples[8], samples[10]}, - {samples[1], samples[3], samples[5], samples[7], samples[9], samples[11]}, - }, - }, - } { - t.Run(tc.name, func(t *testing.T) { - c := newPerRequestWriteClient(tc.queueConfig.MaxShards > 1) - - for _, s := range tc.expRequestSamples { - c.expectRequestSamples(s, series) - } - - dir := t.TempDir() - mcfg := config.DefaultMetadataConfig - - metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), tc.queueConfig, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version2) - m.StoreSeries(series, 0) - - m.Start() - m.Append(samples) - m.Stop() - c.expectedData(t) - }) - } -} - func TestMetadataDelivery(t *testing.T) { c := NewTestWriteClient(Version1) @@ -1118,41 +950,6 @@ func createSeriesMetadata(series []record.RefSeries) []record.RefMetadata { return metas } -func createTestClassicHistogram(buckets int) ([]record.RefSeries, []record.RefSample) { - samples := make([]record.RefSample, buckets+2) - series := make([]record.RefSeries, buckets+2) - - for i := range samples { - samples[i] = record.RefSample{ - Ref: chunks.HeadSeriesRef(i), T: int64(i), V: float64(i), - } - } - - for i := 0; i < buckets; i++ { - le := fmt.Sprintf("%v", i) - if i == 0 { - le = "+Inf" - } - series[i] = record.RefSeries{ - Ref: chunks.HeadSeriesRef(i), - Labels: labels.FromStrings( - "__name__", "http_request_duration_seconds_bucket", - "le", le, - ), - } - } - - series[buckets] = record.RefSeries{ - Ref: chunks.HeadSeriesRef(buckets), - Labels: labels.FromStrings("__name__", "http_request_duration_seconds_sum"), - } - series[buckets+1] = record.RefSeries{ - Ref: chunks.HeadSeriesRef(buckets + 1), - Labels: labels.FromStrings("__name__", "http_request_duration_seconds_count"), - } - return series, samples -} - func getSeriesNameFromRef(r record.RefSeries) string { return r.Labels.Get("__name__") } diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 29d611f3b9..1236ccc715 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -486,7 +486,6 @@ func (h *writeHandler) writeMinStr(ctx context.Context, req *writev2.WriteReques if _, err = app.UpdateMetadata(0, ls, m); err != nil { level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err) } - } if outOfOrderExemplarErrs > 0 { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 95bf76d02d..865e475ed1 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -367,7 +367,7 @@ func TestHeadEndpoint(t *testing.T) { s := httptest.NewServer(r) defer s.Close() - req, err := http.NewRequest("HEAD", s.URL+"/write", nil) + req, err := http.NewRequest(http.MethodHead, s.URL+"/write", nil) require.NoError(t, err, "Error creating HEAD request") client := &http.Client{} resp, err := client.Do(req)