diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3de4e5bd7..5b4cc5135 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1553,8 +1553,8 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { } pendingDataV2 := make([]*writev2.TimeSeries, maxCount) for i := range pendingDataV2 { - pendingDataV2[i] = writev2.TimeSeriesFromVTPool() - pendingDataV2[i].Samples = []*writev2.Sample{writev2.SampleFromVTPool()} + pendingDataV2[i] = &writev2.TimeSeries{} + pendingDataV2[i].Samples = []*writev2.Sample{&writev2.Sample{Value: 1, Timestamp: 0}} } timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -1952,10 +1952,6 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []*writev func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []*writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int) { var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int for nPending, d := range batch { - for _, sample := range pendingData[nPending].Samples { - sample.ReturnToVTPool() - } - pendingData[nPending].Samples = pendingData[nPending].Samples[:0] // todo: should we also safeguard against empty metadata here? if d.metadata != nil { pendingData[nPending].Metadata = &writev2.Metadata{} @@ -1984,10 +1980,10 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData[nPending].LabelsRefs = symbolTable.SymbolizeLabels(d.seriesLabels, pendingData[nPending].LabelsRefs) switch d.sType { case tSample: - sample := writev2.SampleFromVTPool() - sample.Value = d.value - sample.Timestamp = d.timestamp - pendingData[nPending].Samples = append(pendingData[nPending].Samples, sample) + // we don't do any optimization to send multiple samples for the same timeseries + // in a single proto time series, so this is safe at the moment + pendingData[nPending].Samples[0].Value = d.value + pendingData[nPending].Samples[0].Timestamp = d.timestamp nPendingSamples++ case tExemplar: exemplar := writev2.ExemplarFromVTPool() @@ -2223,7 +2219,11 @@ func buildV2WriteRequest(logger log.Logger, samples []*writev2.TimeSeries, label pBuf = &[]byte{} // For convenience in tests. Not efficient. } - data, err := proto.Marshal(req) + //data, err := proto.Marshal(req) + if len(*pBuf) < req.SizeVT() { + *pBuf = make([]byte, req.SizeVT()) + } + size, err := req.MarshalToVT(*pBuf) if err != nil { return nil, highest, lowest, err } @@ -2233,7 +2233,7 @@ func buildV2WriteRequest(logger log.Logger, samples []*writev2.TimeSeries, label if err != nil { return nil, highest, lowest, err } - *pBuf = data + //*pBuf = data // snappy uses len() to see if it needs to allocate a new slice. Make the // buffer as long as possible. @@ -2243,7 +2243,7 @@ func buildV2WriteRequest(logger log.Logger, samples []*writev2.TimeSeries, label buf = &[]byte{} } - compressed, err = compressPayload(buf, data, enc) + compressed, err = compressPayload(buf, (*pBuf)[:size], enc) if err != nil { return nil, highest, lowest, err }