Merge pull request #11224 from dekimsey/consul-catalog-filter-support

consul: Initial implemenation of catalog filter support
This commit is contained in:
Bryan Boreham 2024-10-08 12:38:17 +01:00 committed by GitHub
commit 663a66b526
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 37 additions and 5 deletions

View file

@ -113,8 +113,11 @@ type SDConfig struct {
Services []string `yaml:"services,omitempty"` Services []string `yaml:"services,omitempty"`
// A list of tags used to filter instances inside a service. Services must contain all tags in the list. // A list of tags used to filter instances inside a service. Services must contain all tags in the list.
ServiceTags []string `yaml:"tags,omitempty"` ServiceTags []string `yaml:"tags,omitempty"`
// Desired node metadata. // Desired node metadata. As of Consul 1.14, consider `filter` instead.
NodeMeta map[string]string `yaml:"node_meta,omitempty"` NodeMeta map[string]string `yaml:"node_meta,omitempty"`
// Consul filter string
// See https://www.consul.io/api-docs/catalog#filtering-1, for syntax
Filter string `yaml:"filter,omitempty"`
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
} }
@ -174,6 +177,7 @@ type Discovery struct {
watchedServices []string // Set of services which will be discovered. watchedServices []string // Set of services which will be discovered.
watchedTags []string // Tags used to filter instances of a service. watchedTags []string // Tags used to filter instances of a service.
watchedNodeMeta map[string]string watchedNodeMeta map[string]string
watchedFilter string
allowStale bool allowStale bool
refreshInterval time.Duration refreshInterval time.Duration
finalizer func() finalizer func()
@ -218,6 +222,7 @@ func NewDiscovery(conf *SDConfig, logger *slog.Logger, metrics discovery.Discove
watchedServices: conf.Services, watchedServices: conf.Services,
watchedTags: conf.ServiceTags, watchedTags: conf.ServiceTags,
watchedNodeMeta: conf.NodeMeta, watchedNodeMeta: conf.NodeMeta,
watchedFilter: conf.Filter,
allowStale: conf.AllowStale, allowStale: conf.AllowStale,
refreshInterval: time.Duration(conf.RefreshInterval), refreshInterval: time.Duration(conf.RefreshInterval),
clientDatacenter: conf.Datacenter, clientDatacenter: conf.Datacenter,
@ -361,13 +366,14 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// entire list of services. // entire list of services.
func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, lastIndex *uint64, services map[string]func()) { func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, lastIndex *uint64, services map[string]func()) {
catalog := d.client.Catalog() catalog := d.client.Catalog()
d.logger.Debug("Watching services", "tags", strings.Join(d.watchedTags, ",")) d.logger.Debug("Watching services", "tags", strings.Join(d.watchedTags, ","), "filter", d.watchedFilter)
opts := &consul.QueryOptions{ opts := &consul.QueryOptions{
WaitIndex: *lastIndex, WaitIndex: *lastIndex,
WaitTime: watchTimeout, WaitTime: watchTimeout,
AllowStale: d.allowStale, AllowStale: d.allowStale,
NodeMeta: d.watchedNodeMeta, NodeMeta: d.watchedNodeMeta,
Filter: d.watchedFilter,
} }
t0 := time.Now() t0 := time.Now()
srvs, meta, err := catalog.Services(opts.WithContext(ctx)) srvs, meta, err := catalog.Services(opts.WithContext(ctx))

View file

@ -252,6 +252,8 @@ func newServer(t *testing.T) (*httptest.Server, *SDConfig) {
case "/v1/catalog/services?index=1&wait=120000ms": case "/v1/catalog/services?index=1&wait=120000ms":
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
response = ServicesTestAnswer response = ServicesTestAnswer
case "/v1/catalog/services?filter=NodeMeta.rack_name+%3D%3D+%222304%22&index=1&wait=120000ms":
response = ServicesTestAnswer
default: default:
t.Errorf("Unhandled consul call: %s", r.URL) t.Errorf("Unhandled consul call: %s", r.URL)
} }
@ -369,6 +371,27 @@ func TestAllOptions(t *testing.T) {
<-ch <-ch
} }
// Watch the test service with a specific tag and node-meta via Filter parameter.
func TestFilterOption(t *testing.T) {
stub, config := newServer(t)
defer stub.Close()
config.Services = []string{"test"}
config.Filter = `NodeMeta.rack_name == "2304"`
config.Token = "fake-token"
d := newDiscovery(t, config)
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan []*targetgroup.Group)
go func() {
d.Run(ctx, ch)
close(ch)
}()
checkOneTarget(t, <-ch)
cancel()
}
func TestGetDatacenterShouldReturnError(t *testing.T) { func TestGetDatacenterShouldReturnError(t *testing.T) {
for _, tc := range []struct { for _, tc := range []struct {
handler func(http.ResponseWriter, *http.Request) handler func(http.ResponseWriter, *http.Request)

View file

@ -796,14 +796,17 @@ The following meta labels are available on targets during [relabeling](#relabel_
services: services:
[ - <string> ] [ - <string> ]
# See https://www.consul.io/api/catalog.html#list-nodes-for-service to know more # A Consul Filter expression used to filter the catalog results
# about the possible filters that can be used. # See https://www.consul.io/api-docs/catalog#list-services to know more
# about the filter expressions that can be used.
[ filter: <string> ]
# The `tags` and `node_meta` fields are deprecated in Consul in favor of `filter`.
# An optional list of tags used to filter nodes for a given service. Services must contain all tags in the list. # An optional list of tags used to filter nodes for a given service. Services must contain all tags in the list.
tags: tags:
[ - <string> ] [ - <string> ]
# Node metadata key/value pairs to filter nodes for a given service. # Node metadata key/value pairs to filter nodes for a given service. As of Consul 1.14, consider `filter` instead.
[ node_meta: [ node_meta:
[ <string>: <string> ... ] ] [ <string>: <string> ... ] ]