prometheus/discovery/manager_test.go
Simon Pasquier 674c76adb8 discovery: coalesce identical SD configurations (#3912)
* discovery: coalesce identical SD configurations

Instead of creating as many SD providers as declared in the
configuration, the discovery manager merges identical configurations
into the same provider and keeps track of the subscribers. When
the manager receives target updates from a SD provider, it will
broadcast the updates to all interested subscribers.

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
2018-09-01 08:51:31 +01:00

912 lines
22 KiB
Go

// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"context"
"fmt"
"io/ioutil"
"os"
"reflect"
"sort"
"strconv"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"gopkg.in/yaml.v2"
)
// TestTargetUpdatesOrder checks that the target updates are received in the expected order.
func TestTargetUpdatesOrder(t *testing.T) {
// The order by which the updates are send is determined by the interval passed to the mock discovery adapter
// Final targets array is ordered alphabetically by the name of the discoverer.
// For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge.
testCases := []struct {
title string
updates map[string][]update
expectedTargets [][]*targetgroup.Group
}{
{
title: "Single TP no updates",
updates: map[string][]update{
"tp1": {},
},
expectedTargets: nil,
},
{
title: "Multips TPs no updates",
updates: map[string][]update{
"tp1": {},
"tp2": {},
"tp3": {},
},
expectedTargets: nil,
},
{
title: "Single TP empty initials",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{},
interval: 5,
},
},
},
expectedTargets: [][]*targetgroup.Group{
{},
},
},
{
title: "Multiple TPs empty initials",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{},
interval: 5,
},
},
"tp2": {
{
targetGroups: []targetgroup.Group{},
interval: 200,
},
},
"tp3": {
{
targetGroups: []targetgroup.Group{},
interval: 100,
},
},
},
expectedTargets: [][]*targetgroup.Group{
{},
{},
{},
},
},
{
title: "Single TP initials only",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
}},
},
},
},
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
},
},
{
title: "Multiple TPs initials only",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
},
},
"tp2": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
}, {
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
},
},
},
{
title: "Single TP initials followed by empty updates",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 0,
},
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{},
},
},
},
},
{
title: "Single TP initials and new groups",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 0,
},
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp1_group3",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp1_group3",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
},
},
},
{
title: "Multiple TPs initials and new groups",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 10,
},
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group3",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1_group4",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 500,
},
},
"tp2": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
interval: 100,
},
{
targetGroups: []targetgroup.Group{
{
Source: "tp2_group3",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2_group4",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
{
Source: "tp2_group3",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2_group4",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp1_group3",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1_group4",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
{
Source: "tp2_group3",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2_group4",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
},
},
{
title: "One TP initials arrive after other TP updates.",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 10,
},
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 150,
},
},
"tp2": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
interval: 200,
},
{
targetGroups: []targetgroup.Group{
{
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
interval: 100,
},
},
},
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
},
},
{
title: "Single TP empty update in between",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 30,
},
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{},
},
},
interval: 10,
},
{
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 300,
},
},
},
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
},
},
}
for testIndex, testCase := range testCases {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, nil)
var totalUpdatesCount int
provUpdates := make(chan []*targetgroup.Group)
for _, up := range testCase.updates {
go newMockDiscoveryProvider(up).Run(ctx, provUpdates)
if len(up) > 0 {
totalUpdatesCount = totalUpdatesCount + len(up)
}
}
Loop:
for x := 0; x < totalUpdatesCount; x++ {
select {
case <-time.After(10 * time.Second):
t.Errorf("%v. %q: no update arrived within the timeout limit", x, testCase.title)
break Loop
case tgs := <-provUpdates:
discoveryManager.updateGroup(poolKey{setName: strconv.Itoa(testIndex), provider: testCase.title}, tgs)
for _, received := range discoveryManager.allGroups() {
// Need to sort by the Groups source as the received order is not guaranteed.
sort.Sort(byGroupSource(received))
if !reflect.DeepEqual(received, testCase.expectedTargets[x]) {
var receivedFormated string
for _, receivedTargets := range received {
receivedFormated = receivedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets)
}
var expectedFormated string
for _, expectedTargets := range testCase.expectedTargets[x] {
expectedFormated = expectedFormated + expectedTargets.Source + ":" + fmt.Sprint(expectedTargets.Targets)
}
t.Errorf("%v. %v: \ntargets mismatch \nreceived: %v \nexpected: %v",
x, testCase.title,
receivedFormated,
expectedFormated)
}
}
}
}
}
}
func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) {
if _, ok := tSets[poolKey]; !ok {
t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets)
return
}
match := false
var mergedTargets string
for _, targetGroup := range tSets[poolKey] {
for _, l := range targetGroup.Targets {
mergedTargets = mergedTargets + " " + l.String()
if l.String() == label {
match = true
}
}
}
if match != present {
msg := ""
if !present {
msg = "not"
}
t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets)
}
}
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
cfg := &config.Config{}
sOne := `
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ["foo:9090"]
- targets: ["bar:9090"]
`
if err := yaml.UnmarshalStrict([]byte(sOne), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, nil)
go discoveryManager.Run()
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"bar:9090\"}", true)
sTwo := `
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ["foo:9090"]
`
if err := yaml.UnmarshalStrict([]byte(sTwo), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
c = make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "string/0"}, "{__address__=\"bar:9090\"}", false)
}
func TestIdenticalConfigurationsAreCoalesced(t *testing.T) {
tmpFile, err := ioutil.TempFile("", "sd")
if err != nil {
t.Fatalf("error creating temporary file: %v", err)
}
defer os.Remove(tmpFile.Name())
if _, err := tmpFile.Write([]byte(`[{"targets": ["foo:9090"]}]`)); err != nil {
t.Fatalf("error writing temporary file: %v", err)
}
if err := tmpFile.Close(); err != nil {
t.Fatalf("error closing temporary file: %v", err)
}
tmpFile2 := fmt.Sprintf("%s.json", tmpFile.Name())
if err = os.Link(tmpFile.Name(), tmpFile2); err != nil {
t.Fatalf("error linking temporary file: %v", err)
}
defer os.Remove(tmpFile2)
cfg := &config.Config{}
sOne := `
scrape_configs:
- job_name: 'prometheus'
file_sd_configs:
- files: ["%s"]
- job_name: 'prometheus2'
file_sd_configs:
- files: ["%s"]
`
sOne = fmt.Sprintf(sOne, tmpFile2, tmpFile2)
if err := yaml.UnmarshalStrict([]byte(sOne), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, nil)
go discoveryManager.Run()
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "*file.SDConfig/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus2", provider: "*file.SDConfig/0"}, "{__address__=\"foo:9090\"}", true)
if len(discoveryManager.providers) != 1 {
t.Fatalf("Invalid number of providers: expected 1, got %d", len(discoveryManager.providers))
}
}
func TestApplyConfigDoesNotModifyStaticProviderTargets(t *testing.T) {
cfgText := `
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ["foo:9090"]
- targets: ["bar:9090"]
- targets: ["baz:9090"]
`
originalConfig := &config.Config{}
if err := yaml.UnmarshalStrict([]byte(cfgText), originalConfig); err != nil {
t.Fatalf("Unable to load YAML config cfgYaml: %s", err)
}
origScrpCfg := originalConfig.ScrapeConfigs[0]
processedConfig := &config.Config{}
if err := yaml.UnmarshalStrict([]byte(cfgText), processedConfig); err != nil {
t.Fatalf("Unable to load YAML config cfgYaml: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, nil)
go discoveryManager.Run()
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range processedConfig.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
for _, sdcfg := range c {
if !reflect.DeepEqual(origScrpCfg.ServiceDiscoveryConfig.StaticConfigs, sdcfg.StaticConfigs) {
t.Fatalf("discovery manager modified static config \n expected: %v\n got: %v\n",
origScrpCfg.ServiceDiscoveryConfig.StaticConfigs, sdcfg.StaticConfigs)
}
}
}
type update struct {
targetGroups []targetgroup.Group
interval time.Duration
}
type mockdiscoveryProvider struct {
updates []update
up chan<- []*targetgroup.Group
}
func newMockDiscoveryProvider(updates []update) mockdiscoveryProvider {
tp := mockdiscoveryProvider{
updates: updates,
}
return tp
}
func (tp mockdiscoveryProvider) Run(ctx context.Context, up chan<- []*targetgroup.Group) {
tp.up = up
tp.sendUpdates()
}
func (tp mockdiscoveryProvider) sendUpdates() {
for _, update := range tp.updates {
time.Sleep(update.interval * time.Millisecond)
tgs := make([]*targetgroup.Group, len(update.targetGroups))
for i := range update.targetGroups {
tgs[i] = &update.targetGroups[i]
}
tp.up <- tgs
}
}
// byGroupSource implements sort.Interface so we can sort by the Source field.
type byGroupSource []*targetgroup.Group
func (a byGroupSource) Len() int { return len(a) }
func (a byGroupSource) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byGroupSource) Less(i, j int) bool { return a[i].Source < a[j].Source }