This commit is contained in:
Junang Li 2024-09-13 23:30:11 -04:00
parent 94bd29c271
commit ff4fceeda6
4 changed files with 55 additions and 56 deletions

View file

@ -168,38 +168,38 @@ func deltasToCounts(deltas []int64) []float64 {
// FromIntHistogram returns remote Histogram from the integer Histogram. // FromIntHistogram returns remote Histogram from the integer Histogram.
func FromIntHistogram(timestamp int64, h *histogram.Histogram) *Histogram { func FromIntHistogram(timestamp int64, h *histogram.Histogram) *Histogram {
return &Histogram{ hist := HistogramFromVTPool()
Count: &Histogram_CountInt{CountInt: h.Count}, hist.Count = &Histogram_CountInt{CountInt: h.Count}
Sum: h.Sum, hist.Sum = h.Sum
Schema: h.Schema, hist.Schema = h.Schema
ZeroThreshold: h.ZeroThreshold, hist.ZeroThreshold = h.ZeroThreshold
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount}, hist.ZeroCount = &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount}
NegativeSpans: spansToSpansProto(h.NegativeSpans), hist.NegativeSpans = spansToSpansProto(h.NegativeSpans)
NegativeDeltas: h.NegativeBuckets, hist.NegativeDeltas = h.NegativeBuckets
PositiveSpans: spansToSpansProto(h.PositiveSpans), hist.PositiveSpans = spansToSpansProto(h.PositiveSpans)
PositiveDeltas: h.PositiveBuckets, hist.PositiveDeltas = h.PositiveBuckets
ResetHint: Histogram_ResetHint(h.CounterResetHint), hist.ResetHint = Histogram_ResetHint(h.CounterResetHint)
CustomValues: h.CustomValues, hist.CustomValues = h.CustomValues
Timestamp: timestamp, hist.Timestamp = timestamp
} return hist
} }
// FromFloatHistogram returns remote Histogram from the float Histogram. // FromFloatHistogram returns remote Histogram from the float Histogram.
func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) *Histogram { func FromFloatHistogram(timestamp int64, fh *histogram.FloatHistogram) *Histogram {
return &Histogram{ hist := HistogramFromVTPool()
Count: &Histogram_CountFloat{CountFloat: fh.Count}, hist.Count = &Histogram_CountFloat{CountFloat: fh.Count}
Sum: fh.Sum, hist.Sum = fh.Sum
Schema: fh.Schema, hist.Schema = fh.Schema
ZeroThreshold: fh.ZeroThreshold, hist.ZeroThreshold = fh.ZeroThreshold
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount}, hist.ZeroCount = &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount}
NegativeSpans: spansToSpansProto(fh.NegativeSpans), hist.NegativeSpans = spansToSpansProto(fh.NegativeSpans)
NegativeCounts: fh.NegativeBuckets, hist.NegativeCounts = fh.NegativeBuckets
PositiveSpans: spansToSpansProto(fh.PositiveSpans), hist.PositiveSpans = spansToSpansProto(fh.PositiveSpans)
PositiveCounts: fh.PositiveBuckets, hist.PositiveCounts = fh.PositiveBuckets
ResetHint: Histogram_ResetHint(fh.CounterResetHint), hist.ResetHint = Histogram_ResetHint(fh.CounterResetHint)
CustomValues: fh.CustomValues, hist.CustomValues = fh.CustomValues
Timestamp: timestamp, hist.Timestamp = timestamp
} return hist
} }
func spansToSpansProto(s []histogram.Span) []*BucketSpan { func spansToSpansProto(s []histogram.Span) []*BucketSpan {

View file

@ -44,7 +44,10 @@ func (t *SymbolsTable) Symbolize(str string) uint32 {
// SymbolizeLabels symbolize Prometheus labels. // SymbolizeLabels symbolize Prometheus labels.
func (t *SymbolsTable) SymbolizeLabels(lbls labels.Labels, buf []uint32) []uint32 { func (t *SymbolsTable) SymbolizeLabels(lbls labels.Labels, buf []uint32) []uint32 {
result := buf[:0] var result []uint32
if buf != nil {
result = buf[:0]
}
lbls.Range(func(l labels.Label) { lbls.Range(func(l labels.Label) {
off := t.Symbolize(l.Name) off := t.Symbolize(l.Name)
result = append(result, off) result = append(result, off)

Binary file not shown.

View file

@ -1554,7 +1554,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
pendingDataV2 := make([]*writev2.TimeSeries, maxCount) pendingDataV2 := make([]*writev2.TimeSeries, maxCount)
for i := range pendingDataV2 { for i := range pendingDataV2 {
pendingDataV2[i] = writev2.TimeSeriesFromVTPool() pendingDataV2[i] = writev2.TimeSeriesFromVTPool()
pendingDataV2[i].Samples = []*writev2.Sample{{}} pendingDataV2[i].Samples = []*writev2.Sample{writev2.SampleFromVTPool()}
} }
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
@ -1568,22 +1568,6 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) {
} }
defer stop() defer stop()
returnTimeSeriesToVTPool := func() {
for _, ts := range pendingDataV2 {
for _, sample := range ts.Samples {
sample.ReturnToVTPool()
}
for _, examplar := range ts.Exemplars {
examplar.ReturnToVTPool()
}
for _, hist := range ts.Histograms {
hist.ReturnToVTPool()
}
ts.ReturnToVTPool()
}
}
defer returnTimeSeriesToVTPool()
sendBatch := func(batch []timeSeries, protoMsg config.RemoteWriteProtoMsg, enc Compression, timer bool) { sendBatch := func(batch []timeSeries, protoMsg config.RemoteWriteProtoMsg, enc Compression, timer bool) {
switch protoMsg { switch protoMsg {
case config.RemoteWriteProtoMsgV1: case config.RemoteWriteProtoMsgV1:
@ -1968,6 +1952,9 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []*writev
func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []*writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int) { func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries, pendingData []*writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int, int) {
var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int var nPendingSamples, nPendingExemplars, nPendingHistograms, nPendingMetadata int
for nPending, d := range batch { for nPending, d := range batch {
for _, sample := range pendingData[nPending].Samples {
sample.ReturnToVTPool()
}
pendingData[nPending].Samples = pendingData[nPending].Samples[:0] pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
// todo: should we also safeguard against empty metadata here? // todo: should we also safeguard against empty metadata here?
if d.metadata != nil { if d.metadata != nil {
@ -1979,9 +1966,15 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
} }
if sendExemplars { if sendExemplars {
for _, exemplar := range pendingData[nPending].Exemplars {
exemplar.ReturnToVTPool()
}
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0] pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
} }
if sendNativeHistograms { if sendNativeHistograms {
for _, histogram := range pendingData[nPending].Histograms {
histogram.ReturnToVTPool()
}
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0] pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
} }
@ -1991,17 +1984,17 @@ func populateV2TimeSeries(symbolTable *writev2.SymbolsTable, batch []timeSeries,
pendingData[nPending].LabelsRefs = symbolTable.SymbolizeLabels(d.seriesLabels, pendingData[nPending].LabelsRefs) pendingData[nPending].LabelsRefs = symbolTable.SymbolizeLabels(d.seriesLabels, pendingData[nPending].LabelsRefs)
switch d.sType { switch d.sType {
case tSample: case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, &writev2.Sample{ sample := writev2.SampleFromVTPool()
Value: d.value, sample.Value = d.value
Timestamp: d.timestamp, sample.Timestamp = d.timestamp
}) pendingData[nPending].Samples = append(pendingData[nPending].Samples, sample)
nPendingSamples++ nPendingSamples++
case tExemplar: case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, &writev2.Exemplar{ exemplar := writev2.ExemplarFromVTPool()
LabelsRefs: symbolTable.SymbolizeLabels(d.exemplarLabels, nil), // TODO: optimize, reuse slice exemplar.LabelsRefs = symbolTable.SymbolizeLabels(d.exemplarLabels, nil)
Value: d.value, exemplar.Value = d.value
Timestamp: d.timestamp, exemplar.Timestamp = d.timestamp
}) pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, exemplar)
nPendingExemplars++ nPendingExemplars++
case tHistogram: case tHistogram:
pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, writev2.FromIntHistogram(d.timestamp, d.histogram)) pendingData[nPending].Histograms = append(pendingData[nPending].Histograms, writev2.FromIntHistogram(d.timestamp, d.histogram))
@ -2230,7 +2223,10 @@ func buildV2WriteRequest(logger log.Logger, samples []*writev2.TimeSeries, label
pBuf = &[]byte{} // For convenience in tests. Not efficient. pBuf = &[]byte{} // For convenience in tests. Not efficient.
} }
data, err := req.MarshalVT() data, err := proto.Marshal(req)
if err != nil {
return nil, highest, lowest, err
}
req.Symbols = []string{} req.Symbols = []string{}
req.Timeseries = []*writev2.TimeSeries{} req.Timeseries = []*writev2.TimeSeries{}
req.ReturnToVTPool() req.ReturnToVTPool()