mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
We need to check ctx.Done in it's own select call since select ordering
is non-deterministic. Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
parent
84b00564f4
commit
1e6f8c6d88
|
@ -791,7 +791,10 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case sample, ok := <-queue:
|
||||
if !ok {
|
||||
if nPending > 0 {
|
||||
|
|
|
@ -177,6 +177,45 @@ func TestShutdown(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestShutdownNoPendingSamples(t *testing.T) {
|
||||
deadline := 1 * time.Second
|
||||
c := NewTestBlockedStorageClient()
|
||||
|
||||
dir, err := ioutil.TempDir("", "TestShutdown")
|
||||
testutil.Ok(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
metrics := newQueueManagerMetrics(nil)
|
||||
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
|
||||
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
||||
samples, series := createTimeseries(n, n)
|
||||
m.StoreSeries(series, 0)
|
||||
m.Start()
|
||||
m.reshardChan <- 10
|
||||
|
||||
// Append blocks to guarantee delivery, so we do it in the background.
|
||||
go func() {
|
||||
m.Append(samples)
|
||||
}()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
m.reshardChan <- 1
|
||||
// Test to ensure that Stop doesn't block.
|
||||
start := time.Now()
|
||||
m.Stop()
|
||||
// The samples will never be delivered, so duration should
|
||||
// be at least equal to deadline, otherwise the flush deadline
|
||||
// was not respected.
|
||||
duration := time.Since(start)
|
||||
if duration > time.Duration(deadline+(deadline/10)) {
|
||||
t.Errorf("Took too long to shutdown: %s > %s", duration, deadline)
|
||||
}
|
||||
if duration < time.Duration(deadline) {
|
||||
t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline)
|
||||
}
|
||||
testutil.Assert(t, client_testutil.ToFloat64(m.pendingSamplesMetric) == 0, "pending samples was not 0 after all shards were stopped: %f", client_testutil.ToFloat64(m.pendingSamplesMetric))
|
||||
}
|
||||
|
||||
func TestSeriesReset(t *testing.T) {
|
||||
c := NewTestBlockedStorageClient()
|
||||
deadline := 5 * time.Second
|
||||
|
|
Loading…
Reference in a new issue