This commit is contained in:
Junang Li 2024-09-12 15:03:31 -04:00
parent bbe00ac2f0
commit 94bd29c271
4 changed files with 24 additions and 2 deletions

6
package-lock.json generated Normal file
View file

@ -0,0 +1,6 @@
{
"name": "prometheus",
"lockfileVersion": 3,
"requires": true,
"packages": {}
}

BIN
remote.test Executable file

Binary file not shown.

View file

@ -1553,7 +1553,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
pendingDataV2 := make([]*writev2.TimeSeries, maxCount)
for i := range pendingDataV2 {
pendingDataV2[i] = &writev2.TimeSeries{}
pendingDataV2[i] = writev2.TimeSeriesFromVTPool()
pendingDataV2[i].Samples = []*writev2.Sample{{}}
}
@ -1568,6 +1568,22 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
}
defer stop()
returnTimeSeriesToVTPool := func() {
for _, ts := range pendingDataV2 {
for _, sample := range ts.Samples {
sample.ReturnToVTPool()
}
for _, examplar := range ts.Exemplars {
examplar.ReturnToVTPool()
}
for _, hist := range ts.Histograms {
hist.ReturnToVTPool()
}
ts.ReturnToVTPool()
}
}
defer returnTimeSeriesToVTPool()
sendBatch := func(batch []timeSeries, protoMsg config.RemoteWriteProtoMsg, enc Compression, timer bool) {
switch protoMsg {
case config.RemoteWriteProtoMsgV1:

View file

@ -192,7 +192,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Remote Write 2.x proto message handling.
req := writev2.RequestFromVTPool()
// Timeseries as well
if err := proto.Unmarshal(decompressed, req); err != nil {
if err := req.UnmarshalVT(decompressed); err != nil {
// TODO(bwplotka): Add more context to responded error?
level.Error(h.logger).Log("msg", "Error decoding v2 remote write request", "protobuf_message", msgType, "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)