diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 5b59288e6..17ff1850f 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1109,9 +1109,9 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool { if desiredShards == t.numShards { return false } - // We shouldn't reshard if Prometheus hasn't been able to send to the - // remote endpoint successfully within some period of time. - minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix() + // We shouldn't reshard if Prometheus hasn't been able to send + // since the last time it checked if it should reshard. + minSendTimestamp := time.Now().Add(-1 * shardUpdateDuration).Unix() lsts := t.lastSendTimestamp.Load() if lsts < minSendTimestamp { level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 7343184fc..1c06173a5 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -703,32 +703,35 @@ func TestShouldReshard(t *testing.T) { startingShards int samplesIn, samplesOut, lastSendTimestamp int64 expectedToReshard bool + sendDeadline model.Duration } cases := []testcase{ { - // Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago. + // resharding shouldn't take place if we haven't successfully sent + // since the last shardUpdateDuration, even if the send deadline is very low startingShards: 10, samplesIn: 1000, samplesOut: 10, - lastSendTimestamp: time.Now().Unix() - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second), + lastSendTimestamp: time.Now().Unix() - int64(shardUpdateDuration), expectedToReshard: false, + sendDeadline: model.Duration(100 * time.Millisecond), }, { - startingShards: 5, + startingShards: 10, samplesIn: 1000, samplesOut: 10, lastSendTimestamp: time.Now().Unix(), expectedToReshard: true, + sendDeadline: config.DefaultQueueConfig.BatchSendDeadline, }, } for _, c := range cases { - _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1) + _, m := newTestClientAndQueueManager(t, time.Duration(c.sendDeadline), config.RemoteWriteProtoMsgV1) m.numShards = c.startingShards m.dataIn.incr(c.samplesIn) m.dataOut.incr(c.samplesOut) m.lastSendTimestamp.Store(c.lastSendTimestamp) - m.Start() desiredShards := m.calculateDesiredShards()