mirror of
https://github.com/prometheus/prometheus.git
synced 2024-09-19 23:37:31 -07:00
storage/remote: remove unused code now that tsdb implements ChunkQuerier (#7715)
* storage/remote: remove unused code now that tsdb implements ChunkQuerier Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu> * encodeChunks is unused too Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
parent
31929b83d5
commit
9848e77678
|
@ -180,133 +180,6 @@ func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.R
|
||||||
return 0, errors.Errorf("server does not support any of the requested response types: %v; supported: %v", accepted, supported)
|
return 0, errors.Errorf("server does not support any of the requested response types: %v; supported: %v", accepted, supported)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(bwlpotka): Remove when tsdb will support ChunkQuerier.
|
|
||||||
func DeprecatedStreamChunkedReadResponses(
|
|
||||||
stream io.Writer,
|
|
||||||
queryIndex int64,
|
|
||||||
ss storage.SeriesSet,
|
|
||||||
sortedExternalLabels []prompb.Label,
|
|
||||||
maxBytesInFrame int,
|
|
||||||
) (storage.Warnings, error) {
|
|
||||||
var (
|
|
||||||
chks []prompb.Chunk
|
|
||||||
lbls []prompb.Label
|
|
||||||
err error
|
|
||||||
lblsSize int
|
|
||||||
)
|
|
||||||
|
|
||||||
for ss.Next() {
|
|
||||||
series := ss.At()
|
|
||||||
iter := series.Iterator()
|
|
||||||
lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels)
|
|
||||||
|
|
||||||
lblsSize = 0
|
|
||||||
for _, lbl := range lbls {
|
|
||||||
lblsSize += lbl.Size()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
|
|
||||||
for {
|
|
||||||
// TODO(bwplotka): Use ChunkIterator once available in TSDB instead of re-encoding: https://github.com/prometheus/prometheus/pull/5882
|
|
||||||
chks, err = encodeChunks(iter, chks, maxBytesInFrame-lblsSize)
|
|
||||||
if err != nil {
|
|
||||||
return ss.Warnings(), err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(chks) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
b, err := proto.Marshal(&prompb.ChunkedReadResponse{
|
|
||||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
|
||||||
{
|
|
||||||
Labels: lbls,
|
|
||||||
Chunks: chks,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
QueryIndex: queryIndex,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return ss.Warnings(), errors.Wrap(err, "marshal ChunkedReadResponse")
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := stream.Write(b); err != nil {
|
|
||||||
return ss.Warnings(), errors.Wrap(err, "write to stream")
|
|
||||||
}
|
|
||||||
|
|
||||||
chks = chks[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := iter.Err(); err != nil {
|
|
||||||
return ss.Warnings(), err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := ss.Err(); err != nil {
|
|
||||||
return ss.Warnings(), err
|
|
||||||
}
|
|
||||||
|
|
||||||
return ss.Warnings(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// encodeChunks expects iterator to be ready to use (aka iter.Next() called before invoking).
|
|
||||||
func encodeChunks(iter chunkenc.Iterator, chks []prompb.Chunk, frameBytesLeft int) ([]prompb.Chunk, error) {
|
|
||||||
const maxSamplesInChunk = 120
|
|
||||||
|
|
||||||
var (
|
|
||||||
chkMint int64
|
|
||||||
chkMaxt int64
|
|
||||||
chk *chunkenc.XORChunk
|
|
||||||
app chunkenc.Appender
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
for iter.Next() {
|
|
||||||
if chk == nil {
|
|
||||||
chk = chunkenc.NewXORChunk()
|
|
||||||
app, err = chk.Appender()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
chkMint, _ = iter.At()
|
|
||||||
}
|
|
||||||
|
|
||||||
app.Append(iter.At())
|
|
||||||
chkMaxt, _ = iter.At()
|
|
||||||
|
|
||||||
if chk.NumSamples() < maxSamplesInChunk {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cut the chunk.
|
|
||||||
chks = append(chks, prompb.Chunk{
|
|
||||||
MinTimeMs: chkMint,
|
|
||||||
MaxTimeMs: chkMaxt,
|
|
||||||
Type: prompb.Chunk_Encoding(chk.Encoding()),
|
|
||||||
Data: chk.Bytes(),
|
|
||||||
})
|
|
||||||
chk = nil
|
|
||||||
frameBytesLeft -= chks[len(chks)-1].Size()
|
|
||||||
|
|
||||||
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
|
|
||||||
if frameBytesLeft <= 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if iter.Err() != nil {
|
|
||||||
return nil, errors.Wrap(iter.Err(), "iter TSDB series")
|
|
||||||
}
|
|
||||||
|
|
||||||
if chk != nil {
|
|
||||||
// Cut the chunk if exists.
|
|
||||||
chks = append(chks, prompb.Chunk{
|
|
||||||
MinTimeMs: chkMint,
|
|
||||||
MaxTimeMs: chkMaxt,
|
|
||||||
Type: prompb.Chunk_Encoding(chk.Encoding()),
|
|
||||||
Data: chk.Bytes(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return chks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller.
|
// StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller.
|
||||||
// It expects Series set with populated chunks.
|
// It expects Series set with populated chunks.
|
||||||
func StreamChunkedReadResponses(
|
func StreamChunkedReadResponses(
|
||||||
|
|
Loading…
Reference in a new issue