From 46fb802791d5cb9c2e051608e2666b72ff1edcc9 Mon Sep 17 00:00:00 2001 From: Sniper91 Date: Mon, 19 Dec 2022 23:54:49 +0800 Subject: [PATCH] reset frameBytesLeft after writing (#11689) Signed-off-by: sniper91 Signed-off-by: sniper91 --- storage/remote/codec.go | 6 +- storage/remote/codec_test.go | 119 +++++++++++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 2 deletions(-) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index a74ad2b7b0..e5b8751007 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -209,10 +209,11 @@ func StreamChunkedReadResponses( iter = series.Iterator(iter) lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels) - frameBytesLeft := maxBytesInFrame + maxDataLength := maxBytesInFrame for _, lbl := range lbls { - frameBytesLeft -= lbl.Size() + maxDataLength -= lbl.Size() } + frameBytesLeft := maxDataLength isNext := iter.Next() @@ -258,6 +259,7 @@ func StreamChunkedReadResponses( // We immediately flush the Write() so it is safe to return to the pool. marshalPool.Put(&b) chks = chks[:0] + frameBytesLeft = maxDataLength } if err := iter.Err(); err != nil { return ss.Warnings(), err diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 596eb0861c..3dcc0a1f47 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -16,8 +16,10 @@ package remote import ( "bytes" "fmt" + "sync" "testing" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/histogram" @@ -26,6 +28,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" ) var testHistogram = histogram.Histogram{ @@ -367,3 +370,119 @@ func TestNilHistogramProto(t *testing.T) { // values, causing the test to fail. HistogramProtoToHistogram(prompb.Histogram{}) } + +func TestStreamResponse(t *testing.T) { + lbs1 := labelsToLabelsProto(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil) + lbs2 := labelsToLabelsProto(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil) + chunk := prompb.Chunk{ + Type: prompb.Chunk_XOR, + Data: make([]byte, 100), + } + lbSize, chunkSize := 0, chunk.Size() + for _, lb := range lbs1 { + lbSize += lb.Size() + } + maxBytesInFrame := lbSize + chunkSize*2 + testData := []*prompb.ChunkedSeries{{ + Labels: lbs1, + Chunks: []prompb.Chunk{chunk, chunk, chunk, chunk}, + }, { + Labels: lbs2, + Chunks: []prompb.Chunk{chunk, chunk, chunk, chunk}, + }} + css := newMockChunkSeriesSet(testData) + writer := mockWriter{} + warning, err := StreamChunkedReadResponses(&writer, 0, + css, + nil, + maxBytesInFrame, + &sync.Pool{}) + require.Nil(t, warning) + require.Nil(t, err) + expectData := []*prompb.ChunkedSeries{{ + Labels: lbs1, + Chunks: []prompb.Chunk{chunk, chunk}, + }, { + Labels: lbs1, + Chunks: []prompb.Chunk{chunk, chunk}, + }, { + Labels: lbs2, + Chunks: []prompb.Chunk{chunk, chunk}, + }, { + Labels: lbs2, + Chunks: []prompb.Chunk{chunk, chunk}, + }} + require.Equal(t, expectData, writer.actual) +} + +type mockWriter struct { + actual []*prompb.ChunkedSeries +} + +func (m *mockWriter) Write(p []byte) (n int, err error) { + cr := &prompb.ChunkedReadResponse{} + if err := proto.Unmarshal(p, cr); err != nil { + return 0, fmt.Errorf("unmarshaling: %w", err) + } + m.actual = append(m.actual, cr.ChunkedSeries...) + return len(p), nil +} + +type mockChunkSeriesSet struct { + chunkedSeries []*prompb.ChunkedSeries + index int +} + +func newMockChunkSeriesSet(ss []*prompb.ChunkedSeries) storage.ChunkSeriesSet { + return &mockChunkSeriesSet{chunkedSeries: ss, index: -1} +} + +func (c *mockChunkSeriesSet) Next() bool { + c.index++ + return c.index < len(c.chunkedSeries) +} + +func (c *mockChunkSeriesSet) At() storage.ChunkSeries { + return &storage.ChunkSeriesEntry{ + Lset: labelProtosToLabels(c.chunkedSeries[c.index].Labels), + ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { + return &mockChunkIterator{ + chunks: c.chunkedSeries[c.index].Chunks, + index: -1, + } + }, + } +} + +func (c *mockChunkSeriesSet) Warnings() storage.Warnings { return nil } + +func (c *mockChunkSeriesSet) Err() error { + return nil +} + +type mockChunkIterator struct { + chunks []prompb.Chunk + index int +} + +func (c *mockChunkIterator) At() chunks.Meta { + one := c.chunks[c.index] + chunk, err := chunkenc.FromData(chunkenc.Encoding(one.Type), one.Data) + if err != nil { + panic(err) + } + return chunks.Meta{ + Chunk: chunk, + MinTime: one.MinTimeMs, + MaxTime: one.MaxTimeMs, + } +} + +func (c *mockChunkIterator) Next() bool { + c.index++ + return c.index < len(c.chunks) +} + +func (c *mockChunkIterator) Err() error { + return nil +}