mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
Remote Write: Add max samples per metadata send (#8959)
* Added MaxSamplesPerSend Signed-off-by: Levi Harrison <git@leviharrison.dev> * Added tests Signed-off-by: Levi Harrison <git@leviharrison.dev> * Fixed order of require Signed-off-by: Levi Harrison <git@leviharrison.dev> * Added docs Signed-off-by: Levi Harrison <git@leviharrison.dev> * writes -> writesReceived Signed-off-by: Levi Harrison <git@leviharrison.dev> * Improved send loop Signed-off-by: Levi Harrison <git@leviharrison.dev>
This commit is contained in:
parent
7cb55d5732
commit
d5c3c567d3
|
@ -173,8 +173,9 @@ var (
|
|||
|
||||
// DefaultMetadataConfig is the default metadata configuration for a remote write endpoint.
|
||||
DefaultMetadataConfig = MetadataConfig{
|
||||
Send: true,
|
||||
SendInterval: model.Duration(1 * time.Minute),
|
||||
Send: true,
|
||||
SendInterval: model.Duration(1 * time.Minute),
|
||||
MaxSamplesPerSend: 500,
|
||||
}
|
||||
|
||||
// DefaultRemoteReadConfig is the default remote read configuration.
|
||||
|
@ -731,6 +732,8 @@ type MetadataConfig struct {
|
|||
Send bool `yaml:"send"`
|
||||
// SendInterval controls how frequently we send metric metadata.
|
||||
SendInterval model.Duration `yaml:"send_interval"`
|
||||
// Maximum number of samples per send.
|
||||
MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"`
|
||||
}
|
||||
|
||||
// SigV4Config is the configuration for signing remote write requests with
|
||||
|
|
|
@ -2443,6 +2443,8 @@ metadata_config:
|
|||
[ send: <boolean> | default = true ]
|
||||
# How frequently metric metadata is sent to remote storage.
|
||||
[ send_interval: <duration> | default = 1m ]
|
||||
# Maximum number of samples per send.
|
||||
[ max_samples_per_send: <int> | default = 500]
|
||||
```
|
||||
|
||||
There is a list of
|
||||
|
|
|
@ -443,11 +443,17 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met
|
|||
})
|
||||
}
|
||||
|
||||
err := t.sendMetadataWithBackoff(ctx, mm)
|
||||
|
||||
if err != nil {
|
||||
t.metrics.failedMetadataTotal.Add(float64(len(metadata)))
|
||||
level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", len(metadata), "err", err)
|
||||
numSends := int(math.Ceil(float64(len(metadata)) / float64(t.mcfg.MaxSamplesPerSend)))
|
||||
for i := 0; i < numSends; i++ {
|
||||
last := (i + 1) * t.mcfg.MaxSamplesPerSend
|
||||
if last > len(metadata) {
|
||||
last = len(metadata)
|
||||
}
|
||||
err := t.sendMetadataWithBackoff(ctx, mm[i*t.mcfg.MaxSamplesPerSend:last])
|
||||
if err != nil {
|
||||
t.metrics.failedMetadataTotal.Add(float64(last - (i * t.mcfg.MaxSamplesPerSend)))
|
||||
level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", last-(i*t.mcfg.MaxSamplesPerSend), "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -167,16 +167,25 @@ func TestMetadataDelivery(t *testing.T) {
|
|||
m.Start()
|
||||
defer m.Stop()
|
||||
|
||||
m.AppendMetadata(context.Background(), []scrape.MetricMetadata{
|
||||
{
|
||||
Metric: "prometheus_remote_storage_sent_metadata_bytes_total",
|
||||
metadata := []scrape.MetricMetadata{}
|
||||
numMetadata := 1532
|
||||
for i := 0; i < numMetadata; i++ {
|
||||
metadata = append(metadata, scrape.MetricMetadata{
|
||||
Metric: "prometheus_remote_storage_sent_metadata_bytes_total_" + strconv.Itoa(i),
|
||||
Type: textparse.MetricTypeCounter,
|
||||
Help: "a nice help text",
|
||||
Unit: "",
|
||||
},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
require.Equal(t, len(c.receivedMetadata), 1)
|
||||
m.AppendMetadata(context.Background(), metadata)
|
||||
|
||||
require.Equal(t, numMetadata, len(c.receivedMetadata))
|
||||
// One more write than the rounded qoutient should be performed in order to get samples that didn't
|
||||
// fit into MaxSamplesPerSend.
|
||||
require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived)
|
||||
// Make sure the last samples were sent.
|
||||
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric)
|
||||
}
|
||||
|
||||
func TestSampleDeliveryTimeout(t *testing.T) {
|
||||
|
@ -522,6 +531,7 @@ type TestWriteClient struct {
|
|||
receivedExemplars map[string][]prompb.Exemplar
|
||||
expectedExemplars map[string][]prompb.Exemplar
|
||||
receivedMetadata map[string][]prompb.MetricMetadata
|
||||
writesReceived int
|
||||
withWaitGroup bool
|
||||
wg sync.WaitGroup
|
||||
mtx sync.Mutex
|
||||
|
@ -655,6 +665,8 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
|
|||
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
|
||||
}
|
||||
|
||||
c.writesReceived++
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue