More cleanup

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Nicolás Pazos <npazosmendez@gmail.com>
This commit is contained in:
Callum Styan 2023-11-20 16:37:03 -08:00 committed by Nicolás Pazos
parent 4bdb73780c
commit 4164eabac9
3 changed files with 18 additions and 297 deletions

View file

@ -107,39 +107,6 @@ var writeRequestMinimizedFixture = func() *prompb.MinimizedWriteRequest {
}
}()
var writeRequestMinimized64Fixture = func() *prompb.MinimizedWriteRequestFixed64 {
st := newRwSymbolTable()
labels := []uint64{}
for _, s := range []string{
"__name__", "test_metric1",
"b", "c",
"baz", "qux",
"d", "e",
"foo", "bar",
} {
ref := st.Ref64Packed(s)
labels = append(labels, ref)
}
return &prompb.MinimizedWriteRequestFixed64{
Timeseries: []prompb.MinimizedTimeSeriesFixed64{
{
LabelSymbols: labels,
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "f", Value: "g"}}, Value: 1, Timestamp: 0}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())},
},
{
LabelSymbols: labels,
Samples: []prompb.Sample{{Value: 2, Timestamp: 1}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "h", Value: "i"}}, Value: 2, Timestamp: 1}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(2, &testHistogram), FloatHistogramToHistogramProto(3, testHistogram.ToFloat())},
},
},
Symbols: st.LabelsString(),
}
}()
func TestValidateLabelsAndMetricName(t *testing.T) {
tests := []struct {
input []prompb.Label

View file

@ -80,12 +80,6 @@ func TestSampleDelivery(t *testing.T) {
{rwFormat: Min32Optimized, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"},
{rwFormat: Min32Optimized, samples: false, exemplars: false, histograms: true, name: "interned histograms only"},
{rwFormat: Min32Optimized, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"},
{rwFormat: Min64Fixed, samples: true, exemplars: false, histograms: false, name: "interned samples only"},
{rwFormat: Min64Fixed, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"},
{rwFormat: Min64Fixed, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"},
{rwFormat: Min64Fixed, samples: false, exemplars: false, histograms: true, name: "interned histograms only"},
{rwFormat: Min64Fixed, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "interned float histograms only"},
}
// Let's create an even number of send batches so we don't run into the
@ -210,7 +204,7 @@ func TestMetadataDelivery(t *testing.T) {
}
func TestSampleDeliveryTimeout(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
//remoteWrite11 := proto == "1.1"
// Let's send one less sample than batch size, and wait the timeout duration
@ -244,7 +238,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
}
func TestSampleDeliveryOrder(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
ts := 10
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
@ -346,7 +340,7 @@ func TestSeriesReset(t *testing.T) {
}
func TestReshard(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
size := 10 // Make bigger to find more races.
nSeries := 6
@ -389,7 +383,7 @@ func TestReshard(t *testing.T) {
}
func TestReshardRaceWithStop(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
c := NewTestWriteClient(rwFormat)
var m *QueueManager
@ -428,7 +422,7 @@ func TestReshardRaceWithStop(t *testing.T) {
}
func TestReshardPartialBatch(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
samples, series := createTimeseries(1, 10)
@ -474,7 +468,7 @@ func TestReshardPartialBatch(t *testing.T) {
// where a large scrape (> capacity + max samples per send) is appended at the
// same time as a batch times out according to the batch send deadline.
func TestQueueFilledDeadlock(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
samples, series := createTimeseries(50, 1)
@ -516,7 +510,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
}
func TestReleaseNoninternedString(t *testing.T) {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized, Min64Fixed} {
for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} {
t.Run(fmt.Sprint(rwFormat), func(t *testing.T) {
cfg := config.DefaultQueueConfig
@ -844,12 +838,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
if err == nil {
reqProto, err = MinimizedWriteRequestToWriteRequest(&reqMin)
}
case Min64Fixed:
var reqMin64 prompb.MinimizedWriteRequestFixed64
err = proto.Unmarshal(reqBuf, &reqMin64)
if err == nil {
reqProto, err = min64WriteRequestToWriteRequest(&reqMin64)
}
}
if err != nil {
@ -1546,138 +1534,3 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) {
})
}
}
func BenchmarkBuildMinimizedWriteRequestFixed32(b *testing.B) {
type testcase struct {
batch []timeSeries
}
testCases := []testcase{
testcase{createDummyTimeSeries(2)},
testcase{createDummyTimeSeries(10)},
testcase{createDummyTimeSeries(100)},
}
for _, tc := range testCases {
symbolTable := newRwSymbolTable()
pBuf := proto.NewBuffer(nil)
buff := make([]byte, 0)
seriesBuff := make([]prompb.MinimizedTimeSeriesFixed32, len(tc.batch))
//total := 0
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
// todo: add other types
//seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
}
//pBuf := []byte{}
// Warmup buffers
for i := 0; i < 10; i++ {
populateMinimizedTimeSeriesFixed32(&symbolTable, tc.batch, seriesBuff, true, true)
buildMinimizedWriteRequestFixed32(seriesBuff, symbolTable.LabelsString(), pBuf, &buff)
}
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
totalSize := 0
for j := 0; j < b.N; j++ {
populateMinimizedTimeSeriesFixed32(&symbolTable, tc.batch, seriesBuff, true, true)
b.ResetTimer()
req, _, err := buildMinimizedWriteRequestFixed32(seriesBuff, symbolTable.LabelsString(), pBuf, &buff)
if err != nil {
b.Fatal(err)
}
symbolTable.clear()
totalSize += len(req)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
})
}
}
func BenchmarkBuildMinimizedWriteRequestFixed64(b *testing.B) {
type testcase struct {
batch []timeSeries
}
testCases := []testcase{
testcase{createDummyTimeSeries(2)},
testcase{createDummyTimeSeries(10)},
testcase{createDummyTimeSeries(100)},
}
for _, tc := range testCases {
symbolTable := newRwSymbolTable()
pBuf := proto.NewBuffer(nil)
buff := make([]byte, 0)
seriesBuff := make([]prompb.MinimizedTimeSeriesFixed64, len(tc.batch))
//total := 0
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
// todo: add other types
//seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
}
//pBuf := []byte{}
// Warmup buffers
for i := 0; i < 10; i++ {
populateMinimizedTimeSeriesFixed64(&symbolTable, tc.batch, seriesBuff, true, true)
buildMinimizedWriteRequestFixed64(seriesBuff, symbolTable.LabelsString(), pBuf, &buff)
}
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
totalSize := 0
for j := 0; j < b.N; j++ {
populateMinimizedTimeSeriesFixed64(&symbolTable, tc.batch, seriesBuff, true, true)
b.ResetTimer()
req, _, err := buildMinimizedWriteRequestFixed64(seriesBuff, symbolTable.LabelsString(), pBuf, &buff)
if err != nil {
b.Fatal(err)
}
symbolTable.clear()
totalSize += len(req)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
})
}
}
func BenchmarkBuildMinimizedWriteRequestPacking(b *testing.B) {
type testcase struct {
batch []timeSeries
}
testCases := []testcase{
testcase{createDummyTimeSeries(2)},
testcase{createDummyTimeSeries(10)},
testcase{createDummyTimeSeries(100)},
}
for _, tc := range testCases {
symbolTable := newRwSymbolTable()
pBuf := proto.NewBuffer(nil)
buff := make([]byte, 0)
seriesBuff := make([]prompb.MinimizedTimeSeriesPacking, len(tc.batch))
//total := 0
for i := range seriesBuff {
seriesBuff[i].Samples = []prompb.Sample{{}}
// todo: add other types
//seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
}
//pBuf := []byte{}
// Warmup buffers
for i := 0; i < 10; i++ {
populateMinimizedTimeSeriesPacking(&symbolTable, tc.batch, seriesBuff, true, true)
buildMinimizedWriteRequestPacking(seriesBuff, symbolTable.LabelsString(), pBuf, &buff)
}
b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) {
totalSize := 0
for j := 0; j < b.N; j++ {
populateMinimizedTimeSeriesPacking(&symbolTable, tc.batch, seriesBuff, true, true)
b.ResetTimer()
req, _, err := buildMinimizedWriteRequestPacking(seriesBuff, symbolTable.LabelsString(), pBuf, &buff)
if err != nil {
b.Fatal(err)
}
symbolTable.clear()
totalSize += len(req)
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
}
})
}
}

View file

@ -58,25 +58,25 @@ func TestRemoteWriteHandler(t *testing.T) {
j := 0
k := 0
for _, ts := range writeRequestFixture.Timeseries {
labels := labelProtosToLabels(ts.Labels)
ls := labelProtosToLabels(ts.Labels)
for _, s := range ts.Samples {
require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
exemplarLabels := labelProtosToLabels(e.Labels)
require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := FloatHistogramProtoToFloatHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
}
k++
@ -108,124 +108,25 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
// the reduced write request is equivalent to the write request fixture.
// we can use it for
for _, ts := range writeRequestFixture.Timeseries {
labels := labelProtosToLabels(ts.Labels)
ls := labelProtosToLabels(ts.Labels)
for _, s := range ts.Samples {
require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
exemplarLabels := labelProtosToLabels(e.Labels)
require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := FloatHistogramProtoToFloatHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
}
k++
}
}
}
//func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) {
// buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil)
// require.NoError(t, err)
//
// req, err := http.NewRequest("", "", bytes.NewReader(buf))
// req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue)
// require.NoError(t, err)
//
// appendable := &mockAppendable{}
// handler := NewWriteHandler(nil, nil, appendable, false, true)
//
// recorder := httptest.NewRecorder()
// handler.ServeHTTP(recorder, req)
//
// resp := recorder.Result()
// require.Equal(t, http.StatusNoContent, resp.StatusCode)
//
// i := 0
// j := 0
// k := 0
// // the reduced write request is equivalent to the write request fixture.
// // we can use it for
// for _, ts := range writeRequestFixture.Timeseries {
// ls := labelProtosToLabels(ts.Labels)
// for _, s := range ts.Samples {
// require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i])
// i++
// }
//
// for _, e := range ts.Exemplars {
// exemplarLabels := labelProtosToLabels(e.Labels)
// require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
// j++
// }
//
// for _, hp := range ts.Histograms {
// if hp.IsFloatHistogram() {
// fh := FloatHistogramProtoToFloatHistogram(hp)
// require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k])
// } else {
// h := HistogramProtoToHistogram(hp)
// require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
// }
//
// k++
// }
// }
//}
func TestRemoteWriteHandler64Packed(t *testing.T) {
buf, _, err := buildMinimizedWriteRequestFixed64(writeRequestMinimized64Fixture.Timeseries, writeRequestMinimized64Fixture.Symbols, nil, nil)
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue)
require.NoError(t, err)
appendable := &mockAppendable{}
// TODO: test with other proto format(s)
handler := NewWriteHandler(nil, nil, appendable, Min64Fixed)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusNoContent, resp.StatusCode)
i := 0
j := 0
k := 0
// the reduced write request is equivalent to the write request fixture.
// we can use it for
for _, ts := range writeRequestMinimized64Fixture.Timeseries {
labels := Uint64RefToLabels(writeRequestMinimized64Fixture.Symbols, ts.LabelSymbols)
for _, s := range ts.Samples {
require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
i++
}
for _, e := range ts.Exemplars {
exemplarLabels := labelProtosToLabels(e.Labels)
require.Equal(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++
}
for _, hp := range ts.Histograms {
if hp.IsFloatHistogram() {
fh := FloatHistogramProtoToFloatHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, h, nil}, appendable.histograms[k])
require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k])
}
k++
@ -308,7 +209,7 @@ func TestOutOfOrderHistogram(t *testing.T) {
func BenchmarkRemoteWritehandler(b *testing.B) {
const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte"
reqs := []*http.Request{}
var reqs []*http.Request
for i := 0; i < b.N; i++ {
num := strings.Repeat(strconv.Itoa(i), 16)
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{