prometheus/discovery/consul/consul_test.go
Daniel Kimsey aa3e58358b consul: Add support for catalog list services filter
This adds support for Consul's Catalog [List Services][^1] API's `filter`
parameter added in 1.14.x. This parameter grants the operator more
flexibility to do server-side filtering of the Catalog, before
Prometheus subscribes for updates. Operators can use this to improve
both the performance of Prometheus's Consul SD and reduce the impact of
enumerating large catalogs.

[^1]: https://developer.hashicorp.com/consul/api-docs/v1.14.x/catalog

Signed-off-by: Daniel Kimsey <dekimsey@protonmail.com>
2024-03-17 20:32:54 -05:00

523 lines
14 KiB
Go

// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consul
import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
// TODO: Add ability to unregister metrics?
func NewTestMetrics(t *testing.T, conf discovery.Config, reg prometheus.Registerer) discovery.DiscovererMetrics {
refreshMetrics := discovery.NewRefreshMetrics(reg)
require.NoError(t, refreshMetrics.Register())
metrics := conf.NewDiscovererMetrics(prometheus.NewRegistry(), refreshMetrics)
require.NoError(t, metrics.Register())
return metrics
}
func TestConfiguredService(t *testing.T) {
conf := &SDConfig{
Services: []string{"configuredServiceName"},
}
metrics := NewTestMetrics(t, conf, prometheus.NewRegistry())
consulDiscovery, err := NewDiscovery(conf, nil, metrics)
require.NoError(t, err, "when initializing discovery")
require.True(t, consulDiscovery.shouldWatch("configuredServiceName", []string{""}),
"Expected service %s to be watched", "configuredServiceName")
require.False(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}),
"Expected service %s to not be watched", "nonConfiguredServiceName")
}
func TestConfiguredServiceWithTag(t *testing.T) {
conf := &SDConfig{
Services: []string{"configuredServiceName"},
ServiceTags: []string{"http"},
}
metrics := NewTestMetrics(t, conf, prometheus.NewRegistry())
consulDiscovery, err := NewDiscovery(conf, nil, metrics)
require.NoError(t, err, "when initializing discovery")
require.False(t, consulDiscovery.shouldWatch("configuredServiceName", []string{""}),
"Expected service %s to not be watched without tag", "configuredServiceName")
require.True(t, consulDiscovery.shouldWatch("configuredServiceName", []string{"http"}),
"Expected service %s to be watched with tag %s", "configuredServiceName", "http")
require.False(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}),
"Expected service %s to not be watched without tag", "nonConfiguredServiceName")
require.False(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{"http"}),
"Expected service %s to not be watched with tag %s", "nonConfiguredServiceName", "http")
}
func TestConfiguredServiceWithTags(t *testing.T) {
type testcase struct {
// What we've configured to watch.
conf *SDConfig
// The service we're checking if we should watch or not.
serviceName string
serviceTags []string
shouldWatch bool
}
cases := []testcase{
{
conf: &SDConfig{
Services: []string{"configuredServiceName"},
ServiceTags: []string{"http", "v1"},
},
serviceName: "configuredServiceName",
serviceTags: []string{""},
shouldWatch: false,
},
{
conf: &SDConfig{
Services: []string{"configuredServiceName"},
ServiceTags: []string{"http", "v1"},
},
serviceName: "configuredServiceName",
serviceTags: []string{"http", "v1"},
shouldWatch: true,
},
{
conf: &SDConfig{
Services: []string{"configuredServiceName"},
ServiceTags: []string{"http", "v1"},
},
serviceName: "nonConfiguredServiceName",
serviceTags: []string{""},
shouldWatch: false,
},
{
conf: &SDConfig{
Services: []string{"configuredServiceName"},
ServiceTags: []string{"http", "v1"},
},
serviceName: "nonConfiguredServiceName",
serviceTags: []string{"http, v1"},
shouldWatch: false,
},
{
conf: &SDConfig{
Services: []string{"configuredServiceName"},
ServiceTags: []string{"http", "v1"},
},
serviceName: "configuredServiceName",
serviceTags: []string{"http", "v1", "foo"},
shouldWatch: true,
},
{
conf: &SDConfig{
Services: []string{"configuredServiceName"},
ServiceTags: []string{"http", "v1", "foo"},
},
serviceName: "configuredServiceName",
serviceTags: []string{"http", "v1", "foo"},
shouldWatch: true,
},
{
conf: &SDConfig{
Services: []string{"configuredServiceName"},
ServiceTags: []string{"http", "v1"},
},
serviceName: "configuredServiceName",
serviceTags: []string{"http", "v1", "v1"},
shouldWatch: true,
},
}
for _, tc := range cases {
metrics := NewTestMetrics(t, tc.conf, prometheus.NewRegistry())
consulDiscovery, err := NewDiscovery(tc.conf, nil, metrics)
require.NoError(t, err, "when initializing discovery")
ret := consulDiscovery.shouldWatch(tc.serviceName, tc.serviceTags)
require.Equal(t, tc.shouldWatch, ret, "Watched service and tags: %s %+v, input was %s %+v",
tc.conf.Services, tc.conf.ServiceTags, tc.serviceName, tc.serviceTags)
}
}
func TestNonConfiguredService(t *testing.T) {
conf := &SDConfig{}
metrics := NewTestMetrics(t, conf, prometheus.NewRegistry())
consulDiscovery, err := NewDiscovery(conf, nil, metrics)
require.NoError(t, err, "when initializing discovery")
require.True(t, consulDiscovery.shouldWatch("nonConfiguredServiceName", []string{""}), "Expected service %s to be watched", "nonConfiguredServiceName")
}
const (
AgentAnswer = `{"Config": {"Datacenter": "test-dc"}}`
ServiceTestAnswer = `
[{
"Node": {
"ID": "b78c2e48-5ef3-1814-31b8-0d880f50471e",
"Node": "node1",
"Address": "1.1.1.1",
"Datacenter": "test-dc",
"TaggedAddresses": {
"lan": "192.168.10.10",
"wan": "10.0.10.10"
},
"Meta": {"rack_name": "2304"},
"CreateIndex": 1,
"ModifyIndex": 1
},
"Service": {
"ID": "test",
"Service": "test",
"Tags": ["tag1"],
"Address": "",
"Meta": {"version":"1.0.0","environment":"staging"},
"Port": 3341,
"Weights": {
"Passing": 1,
"Warning": 1
},
"EnableTagOverride": false,
"ProxyDestination": "",
"Proxy": {},
"Connect": {},
"CreateIndex": 1,
"ModifyIndex": 1
},
"Checks": [{
"Node": "node1",
"CheckID": "serfHealth",
"Name": "Serf Health Status",
"Status": "passing"
}]
}]`
ServicesTestAnswer = `{"test": ["tag1"], "other": ["tag2"]}`
)
func newServer(t *testing.T) (*httptest.Server, *SDConfig) {
// github.com/hashicorp/consul/testutil/ would be nice but it needs a local consul binary.
stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
response := ""
switch r.URL.String() {
case "/v1/agent/self":
response = AgentAnswer
case "/v1/health/service/test?node-meta=rack_name%3A2304&stale=&tag=tag1&wait=120000ms":
response = ServiceTestAnswer
case "/v1/health/service/test?wait=120000ms":
response = ServiceTestAnswer
case "/v1/health/service/other?wait=120000ms":
response = `[]`
case "/v1/catalog/services?node-meta=rack_name%3A2304&stale=&wait=120000ms":
response = ServicesTestAnswer
case "/v1/catalog/services?wait=120000ms":
response = ServicesTestAnswer
case "/v1/catalog/services?index=1&node-meta=rack_name%3A2304&stale=&wait=120000ms":
time.Sleep(5 * time.Second)
response = ServicesTestAnswer
case "/v1/catalog/services?index=1&wait=120000ms":
time.Sleep(5 * time.Second)
response = ServicesTestAnswer
case "/v1/catalog/services?filter=NodeMeta.rack_name+%3D%3D+%222304%22&index=1&wait=120000ms":
response = ServicesTestAnswer
default:
t.Errorf("Unhandled consul call: %s", r.URL)
}
w.Header().Add("X-Consul-Index", "1")
w.Write([]byte(response))
}))
stuburl, err := url.Parse(stub.URL)
require.NoError(t, err)
config := &SDConfig{
Server: stuburl.Host,
Token: "fake-token",
RefreshInterval: model.Duration(1 * time.Second),
}
return stub, config
}
func newDiscovery(t *testing.T, config *SDConfig) *Discovery {
logger := log.NewNopLogger()
metrics := NewTestMetrics(t, config, prometheus.NewRegistry())
d, err := NewDiscovery(config, logger, metrics)
require.NoError(t, err)
return d
}
func checkOneTarget(t *testing.T, tg []*targetgroup.Group) {
require.Len(t, tg, 1)
target := tg[0]
require.Equal(t, "test-dc", string(target.Labels["__meta_consul_dc"]))
require.Equal(t, target.Source, string(target.Labels["__meta_consul_service"]))
if target.Source == "test" {
// test service should have one node.
require.NotEmpty(t, target.Targets, "Test service should have one node")
}
}
// Watch all the services in the catalog.
func TestAllServices(t *testing.T) {
stub, config := newServer(t)
defer stub.Close()
d := newDiscovery(t, config)
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan []*targetgroup.Group)
go func() {
d.Run(ctx, ch)
close(ch)
}()
checkOneTarget(t, <-ch)
checkOneTarget(t, <-ch)
cancel()
<-ch
}
// targetgroup with no targets is emitted if no services were discovered.
func TestNoTargets(t *testing.T) {
stub, config := newServer(t)
defer stub.Close()
config.ServiceTags = []string{"missing"}
d := newDiscovery(t, config)
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan []*targetgroup.Group)
go func() {
d.Run(ctx, ch)
close(ch)
}()
targets := (<-ch)[0].Targets
require.Empty(t, targets)
cancel()
<-ch
}
// Watch only the test service.
func TestOneService(t *testing.T) {
stub, config := newServer(t)
defer stub.Close()
config.Services = []string{"test"}
d := newDiscovery(t, config)
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan []*targetgroup.Group)
go d.Run(ctx, ch)
checkOneTarget(t, <-ch)
cancel()
}
// Watch the test service with a specific tag and node-meta.
func TestAllOptions(t *testing.T) {
stub, config := newServer(t)
defer stub.Close()
config.Services = []string{"test"}
config.NodeMeta = map[string]string{"rack_name": "2304"}
config.ServiceTags = []string{"tag1"}
config.AllowStale = true
config.Token = "fake-token"
d := newDiscovery(t, config)
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan []*targetgroup.Group)
go func() {
d.Run(ctx, ch)
close(ch)
}()
checkOneTarget(t, <-ch)
cancel()
<-ch
}
// Watch the test service with a specific tag and node-meta via Filter parameter.
func TestFilterOption(t *testing.T) {
stub, config := newServer(t)
defer stub.Close()
config.Services = []string{"test"}
config.Filter = `NodeMeta.rack_name == "2304"`
config.Token = "fake-token"
d := newDiscovery(t, config)
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan []*targetgroup.Group)
go func() {
d.Run(ctx, ch)
close(ch)
}()
checkOneTarget(t, <-ch)
cancel()
}
func TestGetDatacenterShouldReturnError(t *testing.T) {
for _, tc := range []struct {
handler func(http.ResponseWriter, *http.Request)
errMessage string
}{
{
// Define a handler that will return status 500.
handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
},
errMessage: "Unexpected response code: 500 ()",
},
{
// Define a handler that will return incorrect response.
handler: func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"Config": {"Not-Datacenter": "test-dc"}}`))
},
errMessage: "invalid value '<nil>' for Config.Datacenter",
},
} {
stub := httptest.NewServer(http.HandlerFunc(tc.handler))
stuburl, err := url.Parse(stub.URL)
require.NoError(t, err)
config := &SDConfig{
Server: stuburl.Host,
Token: "fake-token",
RefreshInterval: model.Duration(1 * time.Second),
}
defer stub.Close()
d := newDiscovery(t, config)
// Should be empty if not initialized.
require.Equal(t, "", d.clientDatacenter)
err = d.getDatacenter()
// An error should be returned.
require.Equal(t, tc.errMessage, err.Error())
// Should still be empty.
require.Equal(t, "", d.clientDatacenter)
}
}
func TestUnmarshalConfig(t *testing.T) {
unmarshal := func(d []byte) func(interface{}) error {
return func(o interface{}) error {
return yaml.Unmarshal(d, o)
}
}
goodConfig := DefaultSDConfig
goodConfig.Username = "123"
goodConfig.Password = "1234"
goodConfig.HTTPClientConfig = config.HTTPClientConfig{
BasicAuth: &config.BasicAuth{
Username: "123",
Password: "1234",
},
FollowRedirects: true,
EnableHTTP2: true,
}
cases := []struct {
name string
config string
expected SDConfig
errMessage string
}{
{
name: "good",
config: `
server: localhost:8500
username: 123
password: 1234
`,
expected: goodConfig,
},
{
name: "username and password and basic auth configured",
config: `
server: localhost:8500
username: 123
password: 1234
basic_auth:
username: 12345
password: 123456
`,
errMessage: "at most one of consul SD configuration username and password and basic auth can be configured",
},
{
name: "token and authorization configured",
config: `
server: localhost:8500
token: 1234567
authorization:
credentials: 12345678
`,
errMessage: "at most one of consul SD token, authorization, or oauth2 can be configured",
},
{
name: "token and oauth2 configured",
config: `
server: localhost:8500
token: 1234567
oauth2:
client_id: 10
client_secret: 11
token_url: http://example.com
`,
errMessage: "at most one of consul SD token, authorization, or oauth2 can be configured",
},
}
for _, test := range cases {
t.Run(test.name, func(t *testing.T) {
var config SDConfig
err := config.UnmarshalYAML(unmarshal([]byte(test.config)))
if err != nil {
require.EqualError(t, err, test.errMessage)
return
}
require.Empty(t, test.errMessage, "Expected error.")
require.Equal(t, test.expected, config)
})
}
}