mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
Remove Individual Data Type Caps in Per-shard Buffering for Remote Write (#8921)
* Moved everything to nPending buffer Signed-off-by: Levi Harrison <git@leviharrison.dev> * Simplify exemplar capacity addition Signed-off-by: Levi Harrison <git@leviharrison.dev> * Added pre-allocation Signed-off-by: Levi Harrison <git@leviharrison.dev> * Don't allocate if not sending exemplars Signed-off-by: Levi Harrison <git@leviharrison.dev>
This commit is contained in:
parent
e8663a4eac
commit
fac1b57334
|
@ -1037,24 +1037,22 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
|
|||
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
|
||||
// If we have fewer samples than that, flush them out after a deadline anyways.
|
||||
var (
|
||||
max = s.qm.cfg.MaxSamplesPerSend
|
||||
// Rough estimate, 1% of active series will contain an exemplar on each scrape.
|
||||
// TODO(cstyan): Casting this many times smells, also we could get index out of bounds issues here.
|
||||
maxExemplars = int(math.Max(1, float64(max/10)))
|
||||
max = s.qm.cfg.MaxSamplesPerSend
|
||||
nPending, nPendingSamples, nPendingExemplars = 0, 0, 0
|
||||
sampleBuffer = allocateSampleBuffer(max)
|
||||
|
||||
buf []byte
|
||||
pendingData []prompb.TimeSeries
|
||||
exemplarBuffer [][]prompb.Exemplar
|
||||
buf []byte
|
||||
)
|
||||
totalPending := max
|
||||
if s.qm.sendExemplars {
|
||||
exemplarBuffer = allocateExemplarBuffer(maxExemplars)
|
||||
totalPending += maxExemplars
|
||||
max += int(float64(max) * 0.1)
|
||||
}
|
||||
|
||||
pendingData = make([]prompb.TimeSeries, totalPending)
|
||||
var pendingData = make([]prompb.TimeSeries, max)
|
||||
for i := range pendingData {
|
||||
pendingData[i].Samples = []prompb.Sample{{}}
|
||||
if s.qm.sendExemplars {
|
||||
pendingData[i].Exemplars = []prompb.Exemplar{{}}
|
||||
}
|
||||
}
|
||||
|
||||
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
|
||||
stop := func() {
|
||||
|
@ -1094,28 +1092,28 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
|
|||
return
|
||||
}
|
||||
|
||||
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
|
||||
if s.qm.sendExemplars {
|
||||
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
|
||||
}
|
||||
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
|
||||
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
|
||||
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
|
||||
switch d := sample.(type) {
|
||||
case writeSample:
|
||||
sampleBuffer[nPendingSamples][0] = d.sample
|
||||
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
|
||||
pendingData[nPending].Samples = sampleBuffer[nPendingSamples]
|
||||
pendingData[nPending].Exemplars = nil
|
||||
pendingData[nPending].Samples = append(pendingData[nPending].Samples, d.sample)
|
||||
nPendingSamples++
|
||||
nPending++
|
||||
|
||||
case writeExemplar:
|
||||
exemplarBuffer[nPendingExemplars][0] = d.exemplar
|
||||
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
|
||||
pendingData[nPending].Samples = nil
|
||||
pendingData[nPending].Exemplars = exemplarBuffer[nPendingExemplars]
|
||||
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, d.exemplar)
|
||||
nPendingExemplars++
|
||||
nPending++
|
||||
}
|
||||
|
||||
if nPendingSamples >= max || nPendingExemplars >= maxExemplars {
|
||||
if nPending >= max {
|
||||
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf)
|
||||
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
|
||||
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
|
||||
|
@ -1298,19 +1296,3 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
|
|||
compressed := snappy.Encode(buf, data)
|
||||
return compressed, highest, nil
|
||||
}
|
||||
|
||||
func allocateSampleBuffer(capacity int) [][]prompb.Sample {
|
||||
buf := make([][]prompb.Sample, capacity)
|
||||
for i := range buf {
|
||||
buf[i] = []prompb.Sample{{}}
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
func allocateExemplarBuffer(capacity int) [][]prompb.Exemplar {
|
||||
buf := make([][]prompb.Exemplar, capacity)
|
||||
for i := range buf {
|
||||
buf[i] = []prompb.Exemplar{{}}
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue