mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Test sample timeout delivery.
This commit is contained in:
parent
22d820ef8e
commit
390b018c90
|
@ -44,6 +44,9 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
|
||||||
|
c.expectedSamples = map[string][]*prompb.Sample{}
|
||||||
|
c.receivedSamples = map[string][]*prompb.Sample{}
|
||||||
|
|
||||||
for _, s := range ss {
|
for _, s := range ss {
|
||||||
ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String()
|
ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String()
|
||||||
c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{
|
c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{
|
||||||
|
@ -122,6 +125,44 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
c.waitForExpectedSamples(t)
|
c.waitForExpectedSamples(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSampleDeliveryTimeout(t *testing.T) {
|
||||||
|
// Let's send on less sample than batch size, and wait the timeout duration
|
||||||
|
n := config.DefaultQueueConfig.Capacity - 1
|
||||||
|
|
||||||
|
samples := make(model.Samples, 0, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
|
||||||
|
samples = append(samples, &model.Sample{
|
||||||
|
Metric: model.Metric{
|
||||||
|
model.MetricNameLabel: name,
|
||||||
|
},
|
||||||
|
Value: model.SampleValue(i),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
c := NewTestStorageClient()
|
||||||
|
|
||||||
|
cfg := config.DefaultQueueConfig
|
||||||
|
cfg.MaxShards = 1
|
||||||
|
cfg.BatchSendDeadline = 100 * time.Millisecond
|
||||||
|
m := NewQueueManager(nil, cfg, nil, nil, c)
|
||||||
|
m.Start()
|
||||||
|
defer m.Stop()
|
||||||
|
|
||||||
|
// Send the samples twice, waiting for the samples in the meantime.
|
||||||
|
c.expectSamples(samples)
|
||||||
|
for _, s := range samples {
|
||||||
|
m.Append(s)
|
||||||
|
}
|
||||||
|
c.waitForExpectedSamples(t)
|
||||||
|
|
||||||
|
c.expectSamples(samples)
|
||||||
|
for _, s := range samples {
|
||||||
|
m.Append(s)
|
||||||
|
}
|
||||||
|
c.waitForExpectedSamples(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSampleDeliveryOrder(t *testing.T) {
|
func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
ts := 10
|
ts := 10
|
||||||
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
|
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
|
||||||
|
|
Loading…
Reference in a new issue