diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 3181db120..47360dd57 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -180,133 +180,6 @@ func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.R return 0, errors.Errorf("server does not support any of the requested response types: %v; supported: %v", accepted, supported) } -// TODO(bwlpotka): Remove when tsdb will support ChunkQuerier. -func DeprecatedStreamChunkedReadResponses( - stream io.Writer, - queryIndex int64, - ss storage.SeriesSet, - sortedExternalLabels []prompb.Label, - maxBytesInFrame int, -) (storage.Warnings, error) { - var ( - chks []prompb.Chunk - lbls []prompb.Label - err error - lblsSize int - ) - - for ss.Next() { - series := ss.At() - iter := series.Iterator() - lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels) - - lblsSize = 0 - for _, lbl := range lbls { - lblsSize += lbl.Size() - } - - // Send at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. - for { - // TODO(bwplotka): Use ChunkIterator once available in TSDB instead of re-encoding: https://github.com/prometheus/prometheus/pull/5882 - chks, err = encodeChunks(iter, chks, maxBytesInFrame-lblsSize) - if err != nil { - return ss.Warnings(), err - } - - if len(chks) == 0 { - break - } - b, err := proto.Marshal(&prompb.ChunkedReadResponse{ - ChunkedSeries: []*prompb.ChunkedSeries{ - { - Labels: lbls, - Chunks: chks, - }, - }, - QueryIndex: queryIndex, - }) - if err != nil { - return ss.Warnings(), errors.Wrap(err, "marshal ChunkedReadResponse") - } - - if _, err := stream.Write(b); err != nil { - return ss.Warnings(), errors.Wrap(err, "write to stream") - } - - chks = chks[:0] - } - - if err := iter.Err(); err != nil { - return ss.Warnings(), err - } - } - if err := ss.Err(); err != nil { - return ss.Warnings(), err - } - - return ss.Warnings(), nil -} - -// encodeChunks expects iterator to be ready to use (aka iter.Next() called before invoking). -func encodeChunks(iter chunkenc.Iterator, chks []prompb.Chunk, frameBytesLeft int) ([]prompb.Chunk, error) { - const maxSamplesInChunk = 120 - - var ( - chkMint int64 - chkMaxt int64 - chk *chunkenc.XORChunk - app chunkenc.Appender - err error - ) - - for iter.Next() { - if chk == nil { - chk = chunkenc.NewXORChunk() - app, err = chk.Appender() - if err != nil { - return nil, err - } - chkMint, _ = iter.At() - } - - app.Append(iter.At()) - chkMaxt, _ = iter.At() - - if chk.NumSamples() < maxSamplesInChunk { - continue - } - - // Cut the chunk. - chks = append(chks, prompb.Chunk{ - MinTimeMs: chkMint, - MaxTimeMs: chkMaxt, - Type: prompb.Chunk_Encoding(chk.Encoding()), - Data: chk.Bytes(), - }) - chk = nil - frameBytesLeft -= chks[len(chks)-1].Size() - - // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. - if frameBytesLeft <= 0 { - break - } - } - if iter.Err() != nil { - return nil, errors.Wrap(iter.Err(), "iter TSDB series") - } - - if chk != nil { - // Cut the chunk if exists. - chks = append(chks, prompb.Chunk{ - MinTimeMs: chkMint, - MaxTimeMs: chkMaxt, - Type: prompb.Chunk_Encoding(chk.Encoding()), - Data: chk.Bytes(), - }) - } - return chks, nil -} - // StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller. // It expects Series set with populated chunks. func StreamChunkedReadResponses(