Adding a shouldReshard function to modularize logic for the QueueManager deciding if it should shard or not (#7143)

Signed-off-by: Marek Slabicki <thaniri@gmail.com>
This commit is contained in:
Marek Slabicki 2020-04-20 15:20:39 -07:00 committed by GitHub
parent fc3fb3265a
commit 4b5e7d4984
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 34 deletions

View file

@ -544,7 +544,7 @@ func (t *QueueManager) updateShardsLoop() {
select { select {
case <-ticker.C: case <-ticker.C:
desiredShards := t.calculateDesiredShards() desiredShards := t.calculateDesiredShards()
if desiredShards == t.numShards { if !t.shouldReshard(desiredShards) {
continue continue
} }
// Resharding can take some time, and we want this loop // Resharding can take some time, and we want this loop
@ -562,6 +562,22 @@ func (t *QueueManager) updateShardsLoop() {
} }
} }
// shouldReshard returns if resharding should occur
func (t *QueueManager) shouldReshard(desiredShards int) bool {
if desiredShards == t.numShards {
return false
}
// We shouldn't reshard if Prometheus hasn't been able to send to the
// remote endpoint successfully within some period of time.
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
lsts := atomic.LoadInt64(&t.lastSendTimestamp)
if lsts < minSendTimestamp {
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
return false
}
return true
}
// calculateDesiredShards returns the number of desired shards, which will be // calculateDesiredShards returns the number of desired shards, which will be
// the current QueueManager.numShards if resharding should not occur for reasons // the current QueueManager.numShards if resharding should not occur for reasons
// outlined in this functions implementation. It is up to the caller to reshard, or not, // outlined in this functions implementation. It is up to the caller to reshard, or not,
@ -591,15 +607,6 @@ func (t *QueueManager) calculateDesiredShards() int {
return t.numShards return t.numShards
} }
// We shouldn't reshard if Prometheus hasn't been able to send to the
// remote endpoint successfully within some period of time.
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
lsts := atomic.LoadInt64(&t.lastSendTimestamp)
if lsts < minSendTimestamp {
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
return t.numShards
}
// When behind we will try to catch up on a proporation of samples per tick. // When behind we will try to catch up on a proporation of samples per tick.
// This works similarly to an integral accumulator in that pending samples // This works similarly to an integral accumulator in that pending samples
// is the result of the error integral. // is the result of the error integral.

View file

@ -292,26 +292,27 @@ func TestReleaseNoninternedString(t *testing.T) {
testutil.Assert(t, metric == 0, "expected there to be no calls to release for strings that were not already interned: %d", int(metric)) testutil.Assert(t, metric == 0, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
} }
func TestCalculateDesiredsShards(t *testing.T) { func TestShouldReshard(t *testing.T) {
type testcase struct { type testcase struct {
startingShards int startingShards int
samplesIn, samplesOut int64 samplesIn, samplesOut, lastSendTimestamp int64
reshard bool expectedToReshard bool
} }
cases := []testcase{ cases := []testcase{
{ {
// Test that ensures that if we haven't successfully sent a // Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago.
// sample recently the queue will not reshard. startingShards: 10,
startingShards: 10, samplesIn: 1000,
reshard: false, samplesOut: 10,
samplesIn: 1000, lastSendTimestamp: time.Now().Unix() - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second),
samplesOut: 10, expectedToReshard: false,
}, },
{ {
startingShards: 5, startingShards: 5,
reshard: true, samplesIn: 1000,
samplesIn: 1000, samplesOut: 10,
samplesOut: 10, lastSendTimestamp: time.Now().Unix(),
expectedToReshard: true,
}, },
} }
for _, c := range cases { for _, c := range cases {
@ -321,20 +322,16 @@ func TestCalculateDesiredsShards(t *testing.T) {
m.numShards = c.startingShards m.numShards = c.startingShards
m.samplesIn.incr(c.samplesIn) m.samplesIn.incr(c.samplesIn)
m.samplesOut.incr(c.samplesOut) m.samplesOut.incr(c.samplesOut)
m.lastSendTimestamp = time.Now().Unix() m.lastSendTimestamp = c.lastSendTimestamp
// Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago.
if !c.reshard {
m.lastSendTimestamp = m.lastSendTimestamp - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second)
}
m.Start() m.Start()
desiredShards := m.calculateDesiredShards() desiredShards := m.calculateDesiredShards()
shouldReshard := m.shouldReshard(desiredShards)
m.Stop() m.Stop()
if !c.reshard {
testutil.Assert(t, desiredShards == m.numShards, "expected calculateDesiredShards to not want to reshard, wants to change from %d to %d shards", m.numShards, desiredShards) testutil.Equals(t, c.expectedToReshard, shouldReshard)
} else {
testutil.Assert(t, desiredShards != m.numShards, "expected calculateDesiredShards to want to reshard, wants to change from %d to %d shards", m.numShards, desiredShards)
}
} }
} }