diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index e2974f3dd8..31bf0cff02 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -107,39 +107,6 @@ var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequest { } }() -var writeRequestMinimized64Fixture = func() *prompb.MinimizedWriteRequestFixed64 { - st := newRwSymbolTable() - labels := []uint64{} - for _, s := range []string{ - "__name__", "test_metric1", - "b", "c", - "baz", "qux", - "d", "e", - "foo", "bar", - } { - ref := st.Ref64Packed(s) - labels = append(labels, ref) - } - - return &prompb.MinimizedWriteRequestFixed64{ - Timeseries: []prompb.MinimizedTimeSeriesFixed64{ - { - LabelSymbols: labels, - Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}}, - Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())}, - }, - { - LabelSymbols: labels, - Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, - Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}}, - Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat())}, - }, - }, - Symbols: st.LabelsString(), - } -}() - func TestValidateLabelsAndMetricName(t *testing.T) { tests := []struct { input []prompb.Label diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 86c1e66490..3b48332ef3 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -80,12 +80,6 @@ func TestSampleDelivery(t *testing.T) { {rwFormat: Min32Optimized, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"}, {rwFormat: Min32Optimized, samples: false, exemplars: false, histograms: true, name: "interned histograms only"}, {rwFormat: Min32Optimized, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"}, - - {rwFormat: Min64Fixed, samples: true, exemplars: false, histograms: false, name: "interned samples only"}, - {rwFormat: Min64Fixed, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"}, - {rwFormat: Min64Fixed, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"}, - {rwFormat: Min64Fixed, samples: false, exemplars: false, histograms: true, name: "interned histograms only"}, - {rwFormat: Min64Fixed, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"}, } // Let's create an even number of send batches so we don't run into the @@ -210,7 +204,7 @@ func TestMetadataDelivery(t *testing.T) { } func TestSampleDeliveryTimeout(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} { + for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { //remoteWrite11 := proto == "1.1" // Let's send one less sample than batch size, and wait the timeout duration @@ -244,7 +238,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { } func TestSampleDeliveryOrder(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} { + for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { ts := 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts @@ -346,7 +340,7 @@ func TestSeriesReset(t *testing.T) { } func TestReshard(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} { + for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { size := 10 // Make bigger to find more races. nSeries := 6 @@ -389,7 +383,7 @@ func TestReshard(t *testing.T) { } func TestReshardRaceWithStop(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} { + for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { c := NewTestWriteClient(rwFormat) var m *QueueManager @@ -428,7 +422,7 @@ func TestReshardRaceWithStop(t *testing.T) { } func TestReshardPartialBatch(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} { + for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { samples, series := createTimeseries(1, 10) @@ -474,7 +468,7 @@ func TestReshardPartialBatch(t *testing.T) { // where a large scrape (> capacity + max samples per send) is appended at the // same time as a batch times out according to the batch send deadline. func TestQueueFilledDeadlock(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} { + for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { samples, series := createTimeseries(50, 1) @@ -516,7 +510,7 @@ func TestQueueFilledDeadlock(t *testing.T) { } func TestReleaseNoninternedString(t *testing.T) { - for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} { + for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { cfg := config.DefaultQueueConfig @@ -844,12 +838,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { if err == nil { reqProto, err = MinimizedWriteRequestToWriteRequest(&reqMin) } - case Min64Fixed: - var reqMin64 prompb.MinimizedWriteRequestFixed64 - err = proto.Unmarshal(reqBuf, &reqMin64) - if err == nil { - reqProto, err = min64WriteRequestToWriteRequest(&reqMin64) - } } if err != nil { @@ -1546,138 +1534,3 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { }) } } - -func BenchmarkBuildMinimizedWriteRequestFixed32(b *testing.B) { - type testcase struct { - batch []timeSeries - } - testCases := []testcase{ - testcase{createDummyTimeSeries(2)}, - testcase{createDummyTimeSeries(10)}, - testcase{createDummyTimeSeries(100)}, - } - for _, tc := range testCases { - symbolTable := newRwSymbolTable() - pBuf := proto.NewBuffer(nil) - buff := make([]byte, 0) - seriesBuff := make([]prompb.MinimizedTimeSeriesFixed32, len(tc.batch)) - //total := 0 - for i := range seriesBuff { - seriesBuff[i].Samples = []prompb.Sample{{}} - // todo: add other types - //seriesBuff[i].Exemplars = []prompb.Exemplar{{}} - } - //pBuf := []byte{} - - // Warmup buffers - for i := 0; i < 10; i++ { - populateMinimizedTimeSeriesFixed32(&symbolTable, tc.batch, seriesBuff, true, true) - buildMinimizedWriteRequestFixed32(seriesBuff, symbolTable.LabelsString(), pBuf, &buff) - } - - b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { - totalSize := 0 - for j := 0; j < b.N; j++ { - populateMinimizedTimeSeriesFixed32(&symbolTable, tc.batch, seriesBuff, true, true) - b.ResetTimer() - req, _, err := buildMinimizedWriteRequestFixed32(seriesBuff, symbolTable.LabelsString(), pBuf, &buff) - if err != nil { - b.Fatal(err) - } - symbolTable.clear() - totalSize += len(req) - b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") - } - }) - } -} - -func BenchmarkBuildMinimizedWriteRequestFixed64(b *testing.B) { - type testcase struct { - batch []timeSeries - } - testCases := []testcase{ - testcase{createDummyTimeSeries(2)}, - testcase{createDummyTimeSeries(10)}, - testcase{createDummyTimeSeries(100)}, - } - for _, tc := range testCases { - symbolTable := newRwSymbolTable() - pBuf := proto.NewBuffer(nil) - buff := make([]byte, 0) - seriesBuff := make([]prompb.MinimizedTimeSeriesFixed64, len(tc.batch)) - //total := 0 - for i := range seriesBuff { - seriesBuff[i].Samples = []prompb.Sample{{}} - // todo: add other types - //seriesBuff[i].Exemplars = []prompb.Exemplar{{}} - } - //pBuf := []byte{} - - // Warmup buffers - for i := 0; i < 10; i++ { - populateMinimizedTimeSeriesFixed64(&symbolTable, tc.batch, seriesBuff, true, true) - buildMinimizedWriteRequestFixed64(seriesBuff, symbolTable.LabelsString(), pBuf, &buff) - } - - b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { - totalSize := 0 - for j := 0; j < b.N; j++ { - populateMinimizedTimeSeriesFixed64(&symbolTable, tc.batch, seriesBuff, true, true) - b.ResetTimer() - req, _, err := buildMinimizedWriteRequestFixed64(seriesBuff, symbolTable.LabelsString(), pBuf, &buff) - if err != nil { - b.Fatal(err) - } - symbolTable.clear() - totalSize += len(req) - b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") - } - }) - } -} - -func BenchmarkBuildMinimizedWriteRequestPacking(b *testing.B) { - type testcase struct { - batch []timeSeries - } - testCases := []testcase{ - testcase{createDummyTimeSeries(2)}, - testcase{createDummyTimeSeries(10)}, - testcase{createDummyTimeSeries(100)}, - } - for _, tc := range testCases { - symbolTable := newRwSymbolTable() - pBuf := proto.NewBuffer(nil) - buff := make([]byte, 0) - seriesBuff := make([]prompb.MinimizedTimeSeriesPacking, len(tc.batch)) - //total := 0 - for i := range seriesBuff { - seriesBuff[i].Samples = []prompb.Sample{{}} - // todo: add other types - //seriesBuff[i].Exemplars = []prompb.Exemplar{{}} - } - //pBuf := []byte{} - - // Warmup buffers - for i := 0; i < 10; i++ { - populateMinimizedTimeSeriesPacking(&symbolTable, tc.batch, seriesBuff, true, true) - buildMinimizedWriteRequestPacking(seriesBuff, symbolTable.LabelsString(), pBuf, &buff) - } - - b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { - totalSize := 0 - for j := 0; j < b.N; j++ { - populateMinimizedTimeSeriesPacking(&symbolTable, tc.batch, seriesBuff, true, true) - b.ResetTimer() - req, _, err := buildMinimizedWriteRequestPacking(seriesBuff, symbolTable.LabelsString(), pBuf, &buff) - if err != nil { - b.Fatal(err) - } - symbolTable.clear() - totalSize += len(req) - b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") - } - }) - } -} diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index ca36f1baa1..d52552e6cf 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -58,25 +58,25 @@ func TestRemoteWriteHandler(t *testing.T) { j := 0 k := 0 for _, ts := range writeRequestFixture.Timeseries { - labels := labelProtosToLabels(ts.Labels) + ls := labelProtosToLabels(ts.Labels) for _, s := range ts.Samples { - require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) + require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) i++ } for _, e := range ts.Exemplars { exemplarLabels := labelProtosToLabels(e.Labels) - require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) + require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) j++ } for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { fh := FloatHistogramProtoToFloatHistogram(hp) - require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k]) + require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) } else { h := HistogramProtoToHistogram(hp) - require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k]) + require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) } k++ @@ -108,124 +108,25 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { // the reduced write request is equivalent to the write request fixture. // we can use it for for _, ts := range writeRequestFixture.Timeseries { - labels := labelProtosToLabels(ts.Labels) + ls := labelProtosToLabels(ts.Labels) for _, s := range ts.Samples { - require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) + require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) i++ } for _, e := range ts.Exemplars { exemplarLabels := labelProtosToLabels(e.Labels) - require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) + require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) j++ } for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { fh := FloatHistogramProtoToFloatHistogram(hp) - require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k]) + require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) } else { h := HistogramProtoToHistogram(hp) - require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k]) - } - - k++ - } - } -} - -//func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { -// buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil) -// require.NoError(t, err) -// -// req, err := http.NewRequest("", "", bytes.NewReader(buf)) -// req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue) -// require.NoError(t, err) -// -// appendable := &mockAppendable{} -// handler := NewWriteHandler(nil, nil, appendable, false, true) -// -// recorder := httptest.NewRecorder() -// handler.ServeHTTP(recorder, req) -// -// resp := recorder.Result() -// require.Equal(t, http.StatusNoContent, resp.StatusCode) -// -// i := 0 -// j := 0 -// k := 0 -// // the reduced write request is equivalent to the write request fixture. -// // we can use it for -// for _, ts := range writeRequestFixture.Timeseries { -// ls := labelProtosToLabels(ts.Labels) -// for _, s := range ts.Samples { -// require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) -// i++ -// } -// -// for _, e := range ts.Exemplars { -// exemplarLabels := labelProtosToLabels(e.Labels) -// require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) -// j++ -// } -// -// for _, hp := range ts.Histograms { -// if hp.IsFloatHistogram() { -// fh := FloatHistogramProtoToFloatHistogram(hp) -// require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) -// } else { -// h := HistogramProtoToHistogram(hp) -// require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) -// } -// -// k++ -// } -// } -//} - -func TestRemoteWriteHandler64Packed(t *testing.T) { - buf, _, err := buildMinimizedWriteRequestFixed64(writeRequestMinimized64Fixture.Timeseries, writeRequestMinimized64Fixture.Symbols, nil, nil) - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue) - require.NoError(t, err) - - appendable := &mockAppendable{} - // TODO: test with other proto format(s) - handler := NewWriteHandler(nil, nil, appendable, Min64Fixed) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - require.Equal(t, http.StatusNoContent, resp.StatusCode) - - i := 0 - j := 0 - k := 0 - // the reduced write request is equivalent to the write request fixture. - // we can use it for - for _, ts := range writeRequestMinimized64Fixture.Timeseries { - labels := Uint64RefToLabels(writeRequestMinimized64Fixture.Symbols, ts.LabelSymbols) - for _, s := range ts.Samples { - require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) - i++ - } - - for _, e := range ts.Exemplars { - exemplarLabels := labelProtosToLabels(e.Labels) - require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) - j++ - } - - for _, hp := range ts.Histograms { - if hp.IsFloatHistogram() { - fh := FloatHistogramProtoToFloatHistogram(hp) - require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k]) - } else { - h := HistogramProtoToHistogram(hp) - require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k]) + require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) } k++ @@ -308,7 +209,7 @@ func TestOutOfOrderHistogram(t *testing.T) { func BenchmarkRemoteWritehandler(b *testing.B) { const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte" - reqs := []*http.Request{} + var reqs []*http.Request for i := 0; i < b.N; i++ { num := strings.Repeat(strconv.Itoa(i), 16) buf, _, err := buildWriteRequest([]prompb.TimeSeries{{