Merge pull request #3747 from prometheus/sched-update-throttle

Update throttle & tsdb update
This commit is contained in:
Frederic Branczyk 2018-01-29 16:05:05 +01:00 committed by GitHub
commit 47538cf6ce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 16 deletions

View file

@ -17,6 +17,7 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
@ -81,6 +82,10 @@ type Manager struct {
targets map[poolKey]map[string]*targetgroup.Group targets map[poolKey]map[string]*targetgroup.Group
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
syncCh chan map[string][]*targetgroup.Group syncCh chan map[string][]*targetgroup.Group
// True if updates were received in the last 5 seconds.
recentlyUpdated bool
// Protects recentlyUpdated.
recentlyUpdatedMtx sync.Mutex
} }
// Run starts the background processing // Run starts the background processing
@ -123,6 +128,7 @@ func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Dis
go worker.Run(ctx, updates) go worker.Run(ctx, updates)
go m.runProvider(ctx, poolKey, updates) go m.runProvider(ctx, poolKey, updates)
go m.runUpdater(ctx)
} }
func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) { func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) {
@ -137,7 +143,28 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan
return return
} }
m.updateGroup(poolKey, tgs) m.updateGroup(poolKey, tgs)
m.syncCh <- m.allGroups() m.recentlyUpdatedMtx.Lock()
m.recentlyUpdated = true
m.recentlyUpdatedMtx.Unlock()
}
}
}
func (m *Manager) runUpdater(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.recentlyUpdatedMtx.Lock()
if m.recentlyUpdated {
m.syncCh <- m.allGroups()
m.recentlyUpdated = false
}
m.recentlyUpdatedMtx.Unlock()
} }
} }
} }

View file

@ -29,8 +29,8 @@ import (
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
// TestDiscoveryManagerSyncCalls checks that the target updates are received in the expected order. // TestTargetUpdatesOrder checks that the target updates are received in the expected order.
func TestDiscoveryManagerSyncCalls(t *testing.T) { func TestTargetUpdatesOrder(t *testing.T) {
// The order by which the updates are send is detirmened by the interval passed to the mock discovery adapter // The order by which the updates are send is detirmened by the interval passed to the mock discovery adapter
// Final targets array is ordered alphabetically by the name of the discoverer. // Final targets array is ordered alphabetically by the name of the discoverer.
@ -656,15 +656,14 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
discoveryManager := NewManager(nil) discoveryManager := NewManager(nil)
go discoveryManager.Run(ctx)
var totalUpdatesCount int var totalUpdatesCount int
for tpName, update := range testCase.updates {
provider := newMockDiscoveryProvider(update)
discoveryManager.startProvider(ctx, poolKey{setName: strconv.Itoa(testIndex), provider: tpName}, provider)
if len(update) > 0 { provUpdates := make(chan []*targetgroup.Group)
totalUpdatesCount = totalUpdatesCount + len(update) for _, up := range testCase.updates {
go newMockDiscoveryProvider(up).Run(ctx, provUpdates)
if len(up) > 0 {
totalUpdatesCount = totalUpdatesCount + len(up)
} }
} }
@ -674,9 +673,10 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
t.Errorf("%v. %q: no update arrived within the timeout limit", x, testCase.title) t.Errorf("%v. %q: no update arrived within the timeout limit", x, testCase.title)
break Loop break Loop
case tsetMap := <-discoveryManager.SyncCh(): case tgs := <-provUpdates:
for _, received := range tsetMap { discoveryManager.updateGroup(poolKey{setName: strconv.Itoa(testIndex), provider: testCase.title}, tgs)
// Need to sort by the Groups source as the Discovery manager doesn't guarantee the order. for _, received := range discoveryManager.allGroups() {
// Need to sort by the Groups source as the received order is not guaranteed.
sort.Sort(byGroupSource(received)) sort.Sort(byGroupSource(received))
if !reflect.DeepEqual(received, testCase.expectedTargets[x]) { if !reflect.DeepEqual(received, testCase.expectedTargets[x]) {
var receivedFormated string var receivedFormated string

View file

@ -299,7 +299,11 @@ func (db *DB) retentionCutoff() (bool, error) {
} }
// This will close the dirs and then delete the dirs. // This will close the dirs and then delete the dirs.
return len(dirs) > 0, db.reload(dirs...) if len(dirs) > 0 {
return true, db.reload(dirs...)
}
return false, nil
} }
// Appender opens a new appender against the database. // Appender opens a new appender against the database.

6
vendor/vendor.json vendored
View file

@ -800,10 +800,10 @@
"revisionTime": "2016-04-11T19:08:41Z" "revisionTime": "2016-04-11T19:08:41Z"
}, },
{ {
"checksumSHA1": "errrSfk16j274D3QY61bWODd56c=", "checksumSHA1": "5mCM640B2xa7y+kRRUeVCglEk7o=",
"path": "github.com/prometheus/tsdb", "path": "github.com/prometheus/tsdb",
"revision": "ad0fdaf436fc99828bce60f81984dcfa3a282a44", "revision": "44dd5e1202b7598d50c69ce3617ca6ae6503cf52",
"revisionTime": "2018-01-24T14:58:35Z" "revisionTime": "2018-01-26T14:54:38Z"
}, },
{ {
"checksumSHA1": "XTirmk6Pq5TBGIZEaN5VL4k3i1s=", "checksumSHA1": "XTirmk6Pq5TBGIZEaN5VL4k3i1s=",