remote write: increase time threshold for resharding (#14450)

Don't reshard if we haven't successfully sent a sample in the last
shardUpdateDuration seconds.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: kushagra Shukla <kushalshukla110@gmail.com>
This commit is contained in:
Callum Styan 2024-07-30 14:08:28 -07:00 committed by GitHub
parent 84b819a69f
commit 1561815732
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 11 additions and 8 deletions

View file

@ -1109,9 +1109,9 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool {
if desiredShards == t.numShards { if desiredShards == t.numShards {
return false return false
} }
// We shouldn't reshard if Prometheus hasn't been able to send to the // We shouldn't reshard if Prometheus hasn't been able to send
// remote endpoint successfully within some period of time. // since the last time it checked if it should reshard.
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix() minSendTimestamp := time.Now().Add(-1 * shardUpdateDuration).Unix()
lsts := t.lastSendTimestamp.Load() lsts := t.lastSendTimestamp.Load()
if lsts < minSendTimestamp { if lsts < minSendTimestamp {
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp) level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)

View file

@ -703,32 +703,35 @@ func TestShouldReshard(t *testing.T) {
startingShards int startingShards int
samplesIn, samplesOut, lastSendTimestamp int64 samplesIn, samplesOut, lastSendTimestamp int64
expectedToReshard bool expectedToReshard bool
sendDeadline model.Duration
} }
cases := []testcase{ cases := []testcase{
{ {
// Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago. // resharding shouldn't take place if we haven't successfully sent
// since the last shardUpdateDuration, even if the send deadline is very low
startingShards: 10, startingShards: 10,
samplesIn: 1000, samplesIn: 1000,
samplesOut: 10, samplesOut: 10,
lastSendTimestamp: time.Now().Unix() - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second), lastSendTimestamp: time.Now().Unix() - int64(shardUpdateDuration),
expectedToReshard: false, expectedToReshard: false,
sendDeadline: model.Duration(100 * time.Millisecond),
}, },
{ {
startingShards: 5, startingShards: 10,
samplesIn: 1000, samplesIn: 1000,
samplesOut: 10, samplesOut: 10,
lastSendTimestamp: time.Now().Unix(), lastSendTimestamp: time.Now().Unix(),
expectedToReshard: true, expectedToReshard: true,
sendDeadline: config.DefaultQueueConfig.BatchSendDeadline,
}, },
} }
for _, c := range cases { for _, c := range cases {
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1) _, m := newTestClientAndQueueManager(t, time.Duration(c.sendDeadline), config.RemoteWriteProtoMsgV1)
m.numShards = c.startingShards m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn) m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut) m.dataOut.incr(c.samplesOut)
m.lastSendTimestamp.Store(c.lastSendTimestamp) m.lastSendTimestamp.Store(c.lastSendTimestamp)
m.Start() m.Start()
desiredShards := m.calculateDesiredShards() desiredShards := m.calculateDesiredShards()