Write a test that reproduces the deadlock

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
This commit is contained in:
Chris Marchbanks 2022-03-03 15:53:36 -07:00
parent 83032011a5
commit afdc1decac
No known key found for this signature in database
GPG key ID: B7FD940BC86A8E7A

View file

@ -20,6 +20,7 @@ import (
"math" "math"
"net/url" "net/url"
"os" "os"
"runtime/pprof"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -413,6 +414,7 @@ func TestReshardPartialBatch(t *testing.T) {
case <-done: case <-done:
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Error("Deadlock between sending and stopping detected") t.Error("Deadlock between sending and stopping detected")
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.FailNow() t.FailNow()
} }
} }
@ -420,6 +422,47 @@ func TestReshardPartialBatch(t *testing.T) {
m.Stop() m.Stop()
} }
// TestQueueFilledDeadlock makes sure the code does not deadlock in the case
// where a large scrape (> capacity + max samples per send) is appended at the
// same time as a batch times out according to the batch send deadline.
func TestQueueFilledDeadlock(t *testing.T) {
samples, series := createTimeseries(50, 1)
c := NewNopWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.MaxSamplesPerSend = 10
cfg.Capacity = 20
flushDeadline := time.Second
batchSendDeadline := time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
for i := 0; i < 100; i++ {
done := make(chan struct{})
go func() {
time.Sleep(batchSendDeadline)
m.Append(samples)
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Error("Deadlock between sending and appending detected")
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.FailNow()
}
}
}
func TestReleaseNoninternedString(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig