Merge pull request #698 from prometheus/fabxc/consul_sd

Add initial implementation for SD via Consul.
This commit is contained in:
Julius Volz 2015-05-20 12:43:17 +02:00
commit 984c2ae6ea
28 changed files with 5732 additions and 0 deletions

4
Godeps/Godeps.json generated
View file

@ -19,6 +19,10 @@
"ImportPath": "github.com/golang/protobuf/proto",
"Rev": "655cdfa588ea190e901bc5590e65d5621688847c"
},
{
"ImportPath": "github.com/hashicorp/consul/api",
"Rev": "9fb235a98d8e88f7857b21bb2dd3efc428c01427",
},
{
"ImportPath": "github.com/matttproud/golang_protobuf_extensions/pbutil",
"Rev": "fc2b8d3a73c4867e51861bbdd5ae3c1f0869dd6a"

View file

@ -0,0 +1,39 @@
Consul API client
=================
This package provides the `api` package which attempts to
provide programmatic access to the full Consul API.
Currently, all of the Consul APIs included in version 0.3 are supported.
Documentation
=============
The full documentation is available on [Godoc](http://godoc.org/github.com/hashicorp/consul/api)
Usage
=====
Below is an example of using the Consul client:
```go
// Get a new client, with KV endpoints
client, _ := api.NewClient(api.DefaultConfig())
kv := client.KV()
// PUT a new KV pair
p := &api.KVPair{Key: "foo", Value: []byte("test")}
_, err := kv.Put(p, nil)
if err != nil {
panic(err)
}
// Lookup the pair
pair, _, err := kv.Get("foo", nil)
if err != nil {
panic(err)
}
fmt.Printf("KV: %v", pair)
```

View file

@ -0,0 +1,140 @@
package api
const (
// ACLCLientType is the client type token
ACLClientType = "client"
// ACLManagementType is the management type token
ACLManagementType = "management"
)
// ACLEntry is used to represent an ACL entry
type ACLEntry struct {
CreateIndex uint64
ModifyIndex uint64
ID string
Name string
Type string
Rules string
}
// ACL can be used to query the ACL endpoints
type ACL struct {
c *Client
}
// ACL returns a handle to the ACL endpoints
func (c *Client) ACL() *ACL {
return &ACL{c}
}
// Create is used to generate a new token with the given parameters
func (a *ACL) Create(acl *ACLEntry, q *WriteOptions) (string, *WriteMeta, error) {
r := a.c.newRequest("PUT", "/v1/acl/create")
r.setWriteOptions(q)
r.obj = acl
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
var out struct{ ID string }
if err := decodeBody(resp, &out); err != nil {
return "", nil, err
}
return out.ID, wm, nil
}
// Update is used to update the rules of an existing token
func (a *ACL) Update(acl *ACLEntry, q *WriteOptions) (*WriteMeta, error) {
r := a.c.newRequest("PUT", "/v1/acl/update")
r.setWriteOptions(q)
r.obj = acl
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
return wm, nil
}
// Destroy is used to destroy a given ACL token ID
func (a *ACL) Destroy(id string, q *WriteOptions) (*WriteMeta, error) {
r := a.c.newRequest("PUT", "/v1/acl/destroy/"+id)
r.setWriteOptions(q)
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
return wm, nil
}
// Clone is used to return a new token cloned from an existing one
func (a *ACL) Clone(id string, q *WriteOptions) (string, *WriteMeta, error) {
r := a.c.newRequest("PUT", "/v1/acl/clone/"+id)
r.setWriteOptions(q)
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
var out struct{ ID string }
if err := decodeBody(resp, &out); err != nil {
return "", nil, err
}
return out.ID, wm, nil
}
// Info is used to query for information about an ACL token
func (a *ACL) Info(id string, q *QueryOptions) (*ACLEntry, *QueryMeta, error) {
r := a.c.newRequest("GET", "/v1/acl/info/"+id)
r.setQueryOptions(q)
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var entries []*ACLEntry
if err := decodeBody(resp, &entries); err != nil {
return nil, nil, err
}
if len(entries) > 0 {
return entries[0], qm, nil
}
return nil, qm, nil
}
// List is used to get all the ACL tokens
func (a *ACL) List(q *QueryOptions) ([]*ACLEntry, *QueryMeta, error) {
r := a.c.newRequest("GET", "/v1/acl/list")
r.setQueryOptions(q)
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var entries []*ACLEntry
if err := decodeBody(resp, &entries); err != nil {
return nil, nil, err
}
return entries, qm, nil
}

View file

@ -0,0 +1,152 @@
package api
import (
"os"
"testing"
)
// ROOT is a management token for the tests
var CONSUL_ROOT string
func init() {
CONSUL_ROOT = os.Getenv("CONSUL_ROOT")
}
func TestACL_CreateDestroy(t *testing.T) {
t.Parallel()
if CONSUL_ROOT == "" {
t.SkipNow()
}
c, s := makeClient(t)
defer s.Stop()
c.config.Token = CONSUL_ROOT
acl := c.ACL()
ae := ACLEntry{
Name: "API test",
Type: ACLClientType,
Rules: `key "" { policy = "deny" }`,
}
id, wm, err := acl.Create(&ae, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if wm.RequestTime == 0 {
t.Fatalf("bad: %v", wm)
}
if id == "" {
t.Fatalf("invalid: %v", id)
}
ae2, _, err := acl.Info(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if ae2.Name != ae.Name || ae2.Type != ae.Type || ae2.Rules != ae.Rules {
t.Fatalf("Bad: %#v", ae2)
}
wm, err = acl.Destroy(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if wm.RequestTime == 0 {
t.Fatalf("bad: %v", wm)
}
}
func TestACL_CloneDestroy(t *testing.T) {
t.Parallel()
if CONSUL_ROOT == "" {
t.SkipNow()
}
c, s := makeClient(t)
defer s.Stop()
c.config.Token = CONSUL_ROOT
acl := c.ACL()
id, wm, err := acl.Clone(CONSUL_ROOT, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if wm.RequestTime == 0 {
t.Fatalf("bad: %v", wm)
}
if id == "" {
t.Fatalf("invalid: %v", id)
}
wm, err = acl.Destroy(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if wm.RequestTime == 0 {
t.Fatalf("bad: %v", wm)
}
}
func TestACL_Info(t *testing.T) {
t.Parallel()
if CONSUL_ROOT == "" {
t.SkipNow()
}
c, s := makeClient(t)
defer s.Stop()
c.config.Token = CONSUL_ROOT
acl := c.ACL()
ae, qm, err := acl.Info(CONSUL_ROOT, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if qm.LastIndex == 0 {
t.Fatalf("bad: %v", qm)
}
if !qm.KnownLeader {
t.Fatalf("bad: %v", qm)
}
if ae == nil || ae.ID != CONSUL_ROOT || ae.Type != ACLManagementType {
t.Fatalf("bad: %#v", ae)
}
}
func TestACL_List(t *testing.T) {
t.Parallel()
if CONSUL_ROOT == "" {
t.SkipNow()
}
c, s := makeClient(t)
defer s.Stop()
c.config.Token = CONSUL_ROOT
acl := c.ACL()
acls, qm, err := acl.List(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(acls) < 2 {
t.Fatalf("bad: %v", acls)
}
if qm.LastIndex == 0 {
t.Fatalf("bad: %v", qm)
}
if !qm.KnownLeader {
t.Fatalf("bad: %v", qm)
}
}

View file

@ -0,0 +1,334 @@
package api
import (
"fmt"
)
// AgentCheck represents a check known to the agent
type AgentCheck struct {
Node string
CheckID string
Name string
Status string
Notes string
Output string
ServiceID string
ServiceName string
}
// AgentService represents a service known to the agent
type AgentService struct {
ID string
Service string
Tags []string
Port int
Address string
}
// AgentMember represents a cluster member known to the agent
type AgentMember struct {
Name string
Addr string
Port uint16
Tags map[string]string
Status int
ProtocolMin uint8
ProtocolMax uint8
ProtocolCur uint8
DelegateMin uint8
DelegateMax uint8
DelegateCur uint8
}
// AgentServiceRegistration is used to register a new service
type AgentServiceRegistration struct {
ID string `json:",omitempty"`
Name string `json:",omitempty"`
Tags []string `json:",omitempty"`
Port int `json:",omitempty"`
Address string `json:",omitempty"`
Check *AgentServiceCheck
Checks AgentServiceChecks
}
// AgentCheckRegistration is used to register a new check
type AgentCheckRegistration struct {
ID string `json:",omitempty"`
Name string `json:",omitempty"`
Notes string `json:",omitempty"`
ServiceID string `json:",omitempty"`
AgentServiceCheck
}
// AgentServiceCheck is used to create an associated
// check for a service
type AgentServiceCheck struct {
Script string `json:",omitempty"`
Interval string `json:",omitempty"`
Timeout string `json:",omitempty"`
TTL string `json:",omitempty"`
HTTP string `json:",omitempty"`
Status string `json:",omitempty"`
}
type AgentServiceChecks []*AgentServiceCheck
// Agent can be used to query the Agent endpoints
type Agent struct {
c *Client
// cache the node name
nodeName string
}
// Agent returns a handle to the agent endpoints
func (c *Client) Agent() *Agent {
return &Agent{c: c}
}
// Self is used to query the agent we are speaking to for
// information about itself
func (a *Agent) Self() (map[string]map[string]interface{}, error) {
r := a.c.newRequest("GET", "/v1/agent/self")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out map[string]map[string]interface{}
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// NodeName is used to get the node name of the agent
func (a *Agent) NodeName() (string, error) {
if a.nodeName != "" {
return a.nodeName, nil
}
info, err := a.Self()
if err != nil {
return "", err
}
name := info["Config"]["NodeName"].(string)
a.nodeName = name
return name, nil
}
// Checks returns the locally registered checks
func (a *Agent) Checks() (map[string]*AgentCheck, error) {
r := a.c.newRequest("GET", "/v1/agent/checks")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out map[string]*AgentCheck
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// Services returns the locally registered services
func (a *Agent) Services() (map[string]*AgentService, error) {
r := a.c.newRequest("GET", "/v1/agent/services")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out map[string]*AgentService
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// Members returns the known gossip members. The WAN
// flag can be used to query a server for WAN members.
func (a *Agent) Members(wan bool) ([]*AgentMember, error) {
r := a.c.newRequest("GET", "/v1/agent/members")
if wan {
r.params.Set("wan", "1")
}
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out []*AgentMember
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// ServiceRegister is used to register a new service with
// the local agent
func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error {
r := a.c.newRequest("PUT", "/v1/agent/service/register")
r.obj = service
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// ServiceDeregister is used to deregister a service with
// the local agent
func (a *Agent) ServiceDeregister(serviceID string) error {
r := a.c.newRequest("PUT", "/v1/agent/service/deregister/"+serviceID)
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// PassTTL is used to set a TTL check to the passing state
func (a *Agent) PassTTL(checkID, note string) error {
return a.UpdateTTL(checkID, note, "pass")
}
// WarnTTL is used to set a TTL check to the warning state
func (a *Agent) WarnTTL(checkID, note string) error {
return a.UpdateTTL(checkID, note, "warn")
}
// FailTTL is used to set a TTL check to the failing state
func (a *Agent) FailTTL(checkID, note string) error {
return a.UpdateTTL(checkID, note, "fail")
}
// UpdateTTL is used to update the TTL of a check
func (a *Agent) UpdateTTL(checkID, note, status string) error {
switch status {
case "pass":
case "warn":
case "fail":
default:
return fmt.Errorf("Invalid status: %s", status)
}
endpoint := fmt.Sprintf("/v1/agent/check/%s/%s", status, checkID)
r := a.c.newRequest("PUT", endpoint)
r.params.Set("note", note)
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// CheckRegister is used to register a new check with
// the local agent
func (a *Agent) CheckRegister(check *AgentCheckRegistration) error {
r := a.c.newRequest("PUT", "/v1/agent/check/register")
r.obj = check
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// CheckDeregister is used to deregister a check with
// the local agent
func (a *Agent) CheckDeregister(checkID string) error {
r := a.c.newRequest("PUT", "/v1/agent/check/deregister/"+checkID)
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// Join is used to instruct the agent to attempt a join to
// another cluster member
func (a *Agent) Join(addr string, wan bool) error {
r := a.c.newRequest("PUT", "/v1/agent/join/"+addr)
if wan {
r.params.Set("wan", "1")
}
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// ForceLeave is used to have the agent eject a failed node
func (a *Agent) ForceLeave(node string) error {
r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node)
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// EnableServiceMaintenance toggles service maintenance mode on
// for the given service ID.
func (a *Agent) EnableServiceMaintenance(serviceID, reason string) error {
r := a.c.newRequest("PUT", "/v1/agent/service/maintenance/"+serviceID)
r.params.Set("enable", "true")
r.params.Set("reason", reason)
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// DisableServiceMaintenance toggles service maintenance mode off
// for the given service ID.
func (a *Agent) DisableServiceMaintenance(serviceID string) error {
r := a.c.newRequest("PUT", "/v1/agent/service/maintenance/"+serviceID)
r.params.Set("enable", "false")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// EnableNodeMaintenance toggles node maintenance mode on for the
// agent we are connected to.
func (a *Agent) EnableNodeMaintenance(reason string) error {
r := a.c.newRequest("PUT", "/v1/agent/maintenance")
r.params.Set("enable", "true")
r.params.Set("reason", reason)
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// DisableNodeMaintenance toggles node maintenance mode off for the
// agent we are connected to.
func (a *Agent) DisableNodeMaintenance() error {
r := a.c.newRequest("PUT", "/v1/agent/maintenance")
r.params.Set("enable", "false")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}

View file

@ -0,0 +1,524 @@
package api
import (
"strings"
"testing"
)
func TestAgent_Self(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
info, err := agent.Self()
if err != nil {
t.Fatalf("err: %v", err)
}
name := info["Config"]["NodeName"]
if name == "" {
t.Fatalf("bad: %v", info)
}
}
func TestAgent_Members(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
members, err := agent.Members(false)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(members) != 1 {
t.Fatalf("bad: %v", members)
}
}
func TestAgent_Services(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
reg := &AgentServiceRegistration{
Name: "foo",
Tags: []string{"bar", "baz"},
Port: 8000,
Check: &AgentServiceCheck{
TTL: "15s",
},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
services, err := agent.Services()
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services["foo"]; !ok {
t.Fatalf("missing service: %v", services)
}
checks, err := agent.Checks()
if err != nil {
t.Fatalf("err: %v", err)
}
chk, ok := checks["service:foo"]
if !ok {
t.Fatalf("missing check: %v", checks)
}
// Checks should default to critical
if chk.Status != "critical" {
t.Fatalf("Bad: %#v", chk)
}
if err := agent.ServiceDeregister("foo"); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestAgent_Services_CheckPassing(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
reg := &AgentServiceRegistration{
Name: "foo",
Tags: []string{"bar", "baz"},
Port: 8000,
Check: &AgentServiceCheck{
TTL: "15s",
Status: "passing",
},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
services, err := agent.Services()
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services["foo"]; !ok {
t.Fatalf("missing service: %v", services)
}
checks, err := agent.Checks()
if err != nil {
t.Fatalf("err: %v", err)
}
chk, ok := checks["service:foo"]
if !ok {
t.Fatalf("missing check: %v", checks)
}
if chk.Status != "passing" {
t.Fatalf("Bad: %#v", chk)
}
if err := agent.ServiceDeregister("foo"); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestAgent_Services_CheckBadStatus(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
reg := &AgentServiceRegistration{
Name: "foo",
Tags: []string{"bar", "baz"},
Port: 8000,
Check: &AgentServiceCheck{
TTL: "15s",
Status: "fluffy",
},
}
if err := agent.ServiceRegister(reg); err == nil {
t.Fatalf("bad status accepted")
}
}
func TestAgent_ServiceAddress(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
reg1 := &AgentServiceRegistration{
Name: "foo1",
Port: 8000,
Address: "192.168.0.42",
}
reg2 := &AgentServiceRegistration{
Name: "foo2",
Port: 8000,
}
if err := agent.ServiceRegister(reg1); err != nil {
t.Fatalf("err: %v", err)
}
if err := agent.ServiceRegister(reg2); err != nil {
t.Fatalf("err: %v", err)
}
services, err := agent.Services()
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services["foo1"]; !ok {
t.Fatalf("missing service: %v", services)
}
if _, ok := services["foo2"]; !ok {
t.Fatalf("missing service: %v", services)
}
if services["foo1"].Address != "192.168.0.42" {
t.Fatalf("missing Address field in service foo1: %v", services)
}
if services["foo2"].Address != "" {
t.Fatalf("missing Address field in service foo2: %v", services)
}
if err := agent.ServiceDeregister("foo"); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestAgent_Services_MultipleChecks(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
reg := &AgentServiceRegistration{
Name: "foo",
Tags: []string{"bar", "baz"},
Port: 8000,
Checks: AgentServiceChecks{
&AgentServiceCheck{
TTL: "15s",
},
&AgentServiceCheck{
TTL: "30s",
},
},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
services, err := agent.Services()
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services["foo"]; !ok {
t.Fatalf("missing service: %v", services)
}
checks, err := agent.Checks()
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := checks["service:foo:1"]; !ok {
t.Fatalf("missing check: %v", checks)
}
if _, ok := checks["service:foo:2"]; !ok {
t.Fatalf("missing check: %v", checks)
}
}
func TestAgent_SetTTLStatus(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
reg := &AgentServiceRegistration{
Name: "foo",
Check: &AgentServiceCheck{
TTL: "15s",
},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
if err := agent.WarnTTL("service:foo", "test"); err != nil {
t.Fatalf("err: %v", err)
}
checks, err := agent.Checks()
if err != nil {
t.Fatalf("err: %v", err)
}
chk, ok := checks["service:foo"]
if !ok {
t.Fatalf("missing check: %v", checks)
}
if chk.Status != "warning" {
t.Fatalf("Bad: %#v", chk)
}
if chk.Output != "test" {
t.Fatalf("Bad: %#v", chk)
}
if err := agent.ServiceDeregister("foo"); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestAgent_Checks(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
reg := &AgentCheckRegistration{
Name: "foo",
}
reg.TTL = "15s"
if err := agent.CheckRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
checks, err := agent.Checks()
if err != nil {
t.Fatalf("err: %v", err)
}
chk, ok := checks["foo"]
if !ok {
t.Fatalf("missing check: %v", checks)
}
if chk.Status != "critical" {
t.Fatalf("check not critical: %v", chk)
}
if err := agent.CheckDeregister("foo"); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestAgent_CheckStartPassing(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
reg := &AgentCheckRegistration{
Name: "foo",
AgentServiceCheck: AgentServiceCheck{
Status: "passing",
},
}
reg.TTL = "15s"
if err := agent.CheckRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
checks, err := agent.Checks()
if err != nil {
t.Fatalf("err: %v", err)
}
chk, ok := checks["foo"]
if !ok {
t.Fatalf("missing check: %v", checks)
}
if chk.Status != "passing" {
t.Fatalf("check not passing: %v", chk)
}
if err := agent.CheckDeregister("foo"); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestAgent_Checks_serviceBound(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
// First register a service
serviceReg := &AgentServiceRegistration{
Name: "redis",
}
if err := agent.ServiceRegister(serviceReg); err != nil {
t.Fatalf("err: %v", err)
}
// Register a check bound to the service
reg := &AgentCheckRegistration{
Name: "redischeck",
ServiceID: "redis",
}
reg.TTL = "15s"
if err := agent.CheckRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
checks, err := agent.Checks()
if err != nil {
t.Fatalf("err: %v", err)
}
check, ok := checks["redischeck"]
if !ok {
t.Fatalf("missing check: %v", checks)
}
if check.ServiceID != "redis" {
t.Fatalf("missing service association for check: %v", check)
}
}
func TestAgent_Join(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
info, err := agent.Self()
if err != nil {
t.Fatalf("err: %v", err)
}
// Join ourself
addr := info["Config"]["AdvertiseAddr"].(string)
err = agent.Join(addr, false)
if err != nil {
t.Fatalf("err: %v", err)
}
}
func TestAgent_ForceLeave(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
// Eject somebody
err := agent.ForceLeave("foo")
if err != nil {
t.Fatalf("err: %v", err)
}
}
func TestServiceMaintenance(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
// First register a service
serviceReg := &AgentServiceRegistration{
Name: "redis",
}
if err := agent.ServiceRegister(serviceReg); err != nil {
t.Fatalf("err: %v", err)
}
// Enable maintenance mode
if err := agent.EnableServiceMaintenance("redis", "broken"); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure a critical check was added
checks, err := agent.Checks()
if err != nil {
t.Fatalf("err: %v", err)
}
found := false
for _, check := range checks {
if strings.Contains(check.CheckID, "maintenance") {
found = true
if check.Status != "critical" || check.Notes != "broken" {
t.Fatalf("bad: %#v", checks)
}
}
}
if !found {
t.Fatalf("bad: %#v", checks)
}
// Disable maintenance mode
if err := agent.DisableServiceMaintenance("redis"); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the critical health check was removed
checks, err = agent.Checks()
if err != nil {
t.Fatalf("err: %s", err)
}
for _, check := range checks {
if strings.Contains(check.CheckID, "maintenance") {
t.Fatalf("should have removed health check")
}
}
}
func TestNodeMaintenance(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
// Enable maintenance mode
if err := agent.EnableNodeMaintenance("broken"); err != nil {
t.Fatalf("err: %s", err)
}
// Check that a critical check was added
checks, err := agent.Checks()
if err != nil {
t.Fatalf("err: %s", err)
}
found := false
for _, check := range checks {
if strings.Contains(check.CheckID, "maintenance") {
found = true
if check.Status != "critical" || check.Notes != "broken" {
t.Fatalf("bad: %#v", checks)
}
}
}
if !found {
t.Fatalf("bad: %#v", checks)
}
// Disable maintenance mode
if err := agent.DisableNodeMaintenance(); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the check was removed
checks, err = agent.Checks()
if err != nil {
t.Fatalf("err: %s", err)
}
for _, check := range checks {
if strings.Contains(check.CheckID, "maintenance") {
t.Fatalf("should have removed health check")
}
}
}

View file

@ -0,0 +1,442 @@
package api
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
)
// QueryOptions are used to parameterize a query
type QueryOptions struct {
// Providing a datacenter overwrites the DC provided
// by the Config
Datacenter string
// AllowStale allows any Consul server (non-leader) to service
// a read. This allows for lower latency and higher throughput
AllowStale bool
// RequireConsistent forces the read to be fully consistent.
// This is more expensive but prevents ever performing a stale
// read.
RequireConsistent bool
// WaitIndex is used to enable a blocking query. Waits
// until the timeout or the next index is reached
WaitIndex uint64
// WaitTime is used to bound the duration of a wait.
// Defaults to that of the Config, but can be overriden.
WaitTime time.Duration
// Token is used to provide a per-request ACL token
// which overrides the agent's default token.
Token string
}
// WriteOptions are used to parameterize a write
type WriteOptions struct {
// Providing a datacenter overwrites the DC provided
// by the Config
Datacenter string
// Token is used to provide a per-request ACL token
// which overrides the agent's default token.
Token string
}
// QueryMeta is used to return meta data about a query
type QueryMeta struct {
// LastIndex. This can be used as a WaitIndex to perform
// a blocking query
LastIndex uint64
// Time of last contact from the leader for the
// server servicing the request
LastContact time.Duration
// Is there a known leader
KnownLeader bool
// How long did the request take
RequestTime time.Duration
}
// WriteMeta is used to return meta data about a write
type WriteMeta struct {
// How long did the request take
RequestTime time.Duration
}
// HttpBasicAuth is used to authenticate http client with HTTP Basic Authentication
type HttpBasicAuth struct {
// Username to use for HTTP Basic Authentication
Username string
// Password to use for HTTP Basic Authentication
Password string
}
// Config is used to configure the creation of a client
type Config struct {
// Address is the address of the Consul server
Address string
// Scheme is the URI scheme for the Consul server
Scheme string
// Datacenter to use. If not provided, the default agent datacenter is used.
Datacenter string
// HttpClient is the client to use. Default will be
// used if not provided.
HttpClient *http.Client
// HttpAuth is the auth info to use for http access.
HttpAuth *HttpBasicAuth
// WaitTime limits how long a Watch will block. If not provided,
// the agent default values will be used.
WaitTime time.Duration
// Token is used to provide a per-request ACL token
// which overrides the agent's default token.
Token string
}
// DefaultConfig returns a default configuration for the client
func DefaultConfig() *Config {
config := &Config{
Address: "127.0.0.1:8500",
Scheme: "http",
HttpClient: http.DefaultClient,
}
if addr := os.Getenv("CONSUL_HTTP_ADDR"); addr != "" {
config.Address = addr
}
if token := os.Getenv("CONSUL_HTTP_TOKEN"); token != "" {
config.Token = token
}
if auth := os.Getenv("CONSUL_HTTP_AUTH"); auth != "" {
var username, password string
if strings.Contains(auth, ":") {
split := strings.SplitN(auth, ":", 2)
username = split[0]
password = split[1]
} else {
username = auth
}
config.HttpAuth = &HttpBasicAuth{
Username: username,
Password: password,
}
}
if ssl := os.Getenv("CONSUL_HTTP_SSL"); ssl != "" {
enabled, err := strconv.ParseBool(ssl)
if err != nil {
log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL: %s", err)
}
if enabled {
config.Scheme = "https"
}
}
if verify := os.Getenv("CONSUL_HTTP_SSL_VERIFY"); verify != "" {
doVerify, err := strconv.ParseBool(verify)
if err != nil {
log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL_VERIFY: %s", err)
}
if !doVerify {
config.HttpClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
}
}
return config
}
// Client provides a client to the Consul API
type Client struct {
config Config
}
// NewClient returns a new client
func NewClient(config *Config) (*Client, error) {
// bootstrap the config
defConfig := DefaultConfig()
if len(config.Address) == 0 {
config.Address = defConfig.Address
}
if len(config.Scheme) == 0 {
config.Scheme = defConfig.Scheme
}
if config.HttpClient == nil {
config.HttpClient = defConfig.HttpClient
}
if parts := strings.SplitN(config.Address, "unix://", 2); len(parts) == 2 {
config.HttpClient = &http.Client{
Transport: &http.Transport{
Dial: func(_, _ string) (net.Conn, error) {
return net.Dial("unix", parts[1])
},
},
}
config.Address = parts[1]
}
client := &Client{
config: *config,
}
return client, nil
}
// request is used to help build up a request
type request struct {
config *Config
method string
url *url.URL
params url.Values
body io.Reader
obj interface{}
}
// setQueryOptions is used to annotate the request with
// additional query options
func (r *request) setQueryOptions(q *QueryOptions) {
if q == nil {
return
}
if q.Datacenter != "" {
r.params.Set("dc", q.Datacenter)
}
if q.AllowStale {
r.params.Set("stale", "")
}
if q.RequireConsistent {
r.params.Set("consistent", "")
}
if q.WaitIndex != 0 {
r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10))
}
if q.WaitTime != 0 {
r.params.Set("wait", durToMsec(q.WaitTime))
}
if q.Token != "" {
r.params.Set("token", q.Token)
}
}
// durToMsec converts a duration to a millisecond specified string
func durToMsec(dur time.Duration) string {
return fmt.Sprintf("%dms", dur/time.Millisecond)
}
// setWriteOptions is used to annotate the request with
// additional write options
func (r *request) setWriteOptions(q *WriteOptions) {
if q == nil {
return
}
if q.Datacenter != "" {
r.params.Set("dc", q.Datacenter)
}
if q.Token != "" {
r.params.Set("token", q.Token)
}
}
// toHTTP converts the request to an HTTP request
func (r *request) toHTTP() (*http.Request, error) {
// Encode the query parameters
r.url.RawQuery = r.params.Encode()
// Check if we should encode the body
if r.body == nil && r.obj != nil {
if b, err := encodeBody(r.obj); err != nil {
return nil, err
} else {
r.body = b
}
}
// Create the HTTP request
req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body)
if err != nil {
return nil, err
}
req.URL.Host = r.url.Host
req.URL.Scheme = r.url.Scheme
req.Host = r.url.Host
// Setup auth
if r.config.HttpAuth != nil {
req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
}
return req, nil
}
// newRequest is used to create a new request
func (c *Client) newRequest(method, path string) *request {
r := &request{
config: &c.config,
method: method,
url: &url.URL{
Scheme: c.config.Scheme,
Host: c.config.Address,
Path: path,
},
params: make(map[string][]string),
}
if c.config.Datacenter != "" {
r.params.Set("dc", c.config.Datacenter)
}
if c.config.WaitTime != 0 {
r.params.Set("wait", durToMsec(r.config.WaitTime))
}
if c.config.Token != "" {
r.params.Set("token", r.config.Token)
}
return r
}
// doRequest runs a request with our client
func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
req, err := r.toHTTP()
if err != nil {
return 0, nil, err
}
start := time.Now()
resp, err := c.config.HttpClient.Do(req)
diff := time.Now().Sub(start)
return diff, resp, err
}
// Query is used to do a GET request against an endpoint
// and deserialize the response into an interface using
// standard Consul conventions.
func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
r := c.newRequest("GET", endpoint)
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
if err := decodeBody(resp, out); err != nil {
return nil, err
}
return qm, nil
}
// write is used to do a PUT request against an endpoint
// and serialize/deserialized using the standard Consul conventions.
func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
r := c.newRequest("PUT", endpoint)
r.setWriteOptions(q)
r.obj = in
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
if out != nil {
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
}
return wm, nil
}
// parseQueryMeta is used to help parse query meta-data
func parseQueryMeta(resp *http.Response, q *QueryMeta) error {
header := resp.Header
// Parse the X-Consul-Index
index, err := strconv.ParseUint(header.Get("X-Consul-Index"), 10, 64)
if err != nil {
return fmt.Errorf("Failed to parse X-Consul-Index: %v", err)
}
q.LastIndex = index
// Parse the X-Consul-LastContact
last, err := strconv.ParseUint(header.Get("X-Consul-LastContact"), 10, 64)
if err != nil {
return fmt.Errorf("Failed to parse X-Consul-LastContact: %v", err)
}
q.LastContact = time.Duration(last) * time.Millisecond
// Parse the X-Consul-KnownLeader
switch header.Get("X-Consul-KnownLeader") {
case "true":
q.KnownLeader = true
default:
q.KnownLeader = false
}
return nil
}
// decodeBody is used to JSON decode a body
func decodeBody(resp *http.Response, out interface{}) error {
dec := json.NewDecoder(resp.Body)
return dec.Decode(out)
}
// encodeBody is used to encode a request body
func encodeBody(obj interface{}) (io.Reader, error) {
buf := bytes.NewBuffer(nil)
enc := json.NewEncoder(buf)
if err := enc.Encode(obj); err != nil {
return nil, err
}
return buf, nil
}
// requireOK is used to wrap doRequest and check for a 200
func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) {
if e != nil {
if resp != nil {
resp.Body.Close()
}
return d, nil, e
}
if resp.StatusCode != 200 {
var buf bytes.Buffer
io.Copy(&buf, resp.Body)
resp.Body.Close()
return d, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes())
}
return d, resp, nil
}

View file

@ -0,0 +1,242 @@
package api
import (
crand "crypto/rand"
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"runtime"
"testing"
"time"
"github.com/hashicorp/consul/testutil"
)
type configCallback func(c *Config)
func makeClient(t *testing.T) (*Client, *testutil.TestServer) {
return makeClientWithConfig(t, nil, nil)
}
func makeClientWithConfig(
t *testing.T,
cb1 configCallback,
cb2 testutil.ServerConfigCallback) (*Client, *testutil.TestServer) {
// Make client config
conf := DefaultConfig()
if cb1 != nil {
cb1(conf)
}
// Create server
server := testutil.NewTestServerConfig(t, cb2)
conf.Address = server.HTTPAddr
// Create client
client, err := NewClient(conf)
if err != nil {
t.Fatalf("err: %v", err)
}
return client, server
}
func testKey() string {
buf := make([]byte, 16)
if _, err := crand.Read(buf); err != nil {
panic(fmt.Errorf("Failed to read random bytes: %v", err))
}
return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
buf[0:4],
buf[4:6],
buf[6:8],
buf[8:10],
buf[10:16])
}
func TestDefaultConfig_env(t *testing.T) {
t.Parallel()
addr := "1.2.3.4:5678"
token := "abcd1234"
auth := "username:password"
os.Setenv("CONSUL_HTTP_ADDR", addr)
defer os.Setenv("CONSUL_HTTP_ADDR", "")
os.Setenv("CONSUL_HTTP_TOKEN", token)
defer os.Setenv("CONSUL_HTTP_TOKEN", "")
os.Setenv("CONSUL_HTTP_AUTH", auth)
defer os.Setenv("CONSUL_HTTP_AUTH", "")
os.Setenv("CONSUL_HTTP_SSL", "1")
defer os.Setenv("CONSUL_HTTP_SSL", "")
os.Setenv("CONSUL_HTTP_SSL_VERIFY", "0")
defer os.Setenv("CONSUL_HTTP_SSL_VERIFY", "")
config := DefaultConfig()
if config.Address != addr {
t.Errorf("expected %q to be %q", config.Address, addr)
}
if config.Token != token {
t.Errorf("expected %q to be %q", config.Token, token)
}
if config.HttpAuth == nil {
t.Fatalf("expected HttpAuth to be enabled")
}
if config.HttpAuth.Username != "username" {
t.Errorf("expected %q to be %q", config.HttpAuth.Username, "username")
}
if config.HttpAuth.Password != "password" {
t.Errorf("expected %q to be %q", config.HttpAuth.Password, "password")
}
if config.Scheme != "https" {
t.Errorf("expected %q to be %q", config.Scheme, "https")
}
if !config.HttpClient.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify {
t.Errorf("expected SSL verification to be off")
}
}
func TestSetQueryOptions(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
r := c.newRequest("GET", "/v1/kv/foo")
q := &QueryOptions{
Datacenter: "foo",
AllowStale: true,
RequireConsistent: true,
WaitIndex: 1000,
WaitTime: 100 * time.Second,
Token: "12345",
}
r.setQueryOptions(q)
if r.params.Get("dc") != "foo" {
t.Fatalf("bad: %v", r.params)
}
if _, ok := r.params["stale"]; !ok {
t.Fatalf("bad: %v", r.params)
}
if _, ok := r.params["consistent"]; !ok {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("index") != "1000" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("wait") != "100000ms" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("token") != "12345" {
t.Fatalf("bad: %v", r.params)
}
}
func TestSetWriteOptions(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
r := c.newRequest("GET", "/v1/kv/foo")
q := &WriteOptions{
Datacenter: "foo",
Token: "23456",
}
r.setWriteOptions(q)
if r.params.Get("dc") != "foo" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("token") != "23456" {
t.Fatalf("bad: %v", r.params)
}
}
func TestRequestToHTTP(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
r := c.newRequest("DELETE", "/v1/kv/foo")
q := &QueryOptions{
Datacenter: "foo",
}
r.setQueryOptions(q)
req, err := r.toHTTP()
if err != nil {
t.Fatalf("err: %v", err)
}
if req.Method != "DELETE" {
t.Fatalf("bad: %v", req)
}
if req.URL.RequestURI() != "/v1/kv/foo?dc=foo" {
t.Fatalf("bad: %v", req)
}
}
func TestParseQueryMeta(t *testing.T) {
t.Parallel()
resp := &http.Response{
Header: make(map[string][]string),
}
resp.Header.Set("X-Consul-Index", "12345")
resp.Header.Set("X-Consul-LastContact", "80")
resp.Header.Set("X-Consul-KnownLeader", "true")
qm := &QueryMeta{}
if err := parseQueryMeta(resp, qm); err != nil {
t.Fatalf("err: %v", err)
}
if qm.LastIndex != 12345 {
t.Fatalf("Bad: %v", qm)
}
if qm.LastContact != 80*time.Millisecond {
t.Fatalf("Bad: %v", qm)
}
if !qm.KnownLeader {
t.Fatalf("Bad: %v", qm)
}
}
func TestAPI_UnixSocket(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
t.SkipNow()
}
tempDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tempDir)
socket := filepath.Join(tempDir, "test.sock")
c, s := makeClientWithConfig(t, func(c *Config) {
c.Address = "unix://" + socket
}, func(c *testutil.TestServerConfig) {
c.Addresses = &testutil.TestAddressConfig{
HTTP: "unix://" + socket,
}
})
defer s.Stop()
agent := c.Agent()
info, err := agent.Self()
if err != nil {
t.Fatalf("err: %s", err)
}
if info["Config"]["NodeName"] == "" {
t.Fatalf("bad: %v", info)
}
}

View file

@ -0,0 +1,182 @@
package api
type Node struct {
Node string
Address string
}
type CatalogService struct {
Node string
Address string
ServiceID string
ServiceName string
ServiceAddress string
ServiceTags []string
ServicePort int
}
type CatalogNode struct {
Node *Node
Services map[string]*AgentService
}
type CatalogRegistration struct {
Node string
Address string
Datacenter string
Service *AgentService
Check *AgentCheck
}
type CatalogDeregistration struct {
Node string
Address string
Datacenter string
ServiceID string
CheckID string
}
// Catalog can be used to query the Catalog endpoints
type Catalog struct {
c *Client
}
// Catalog returns a handle to the catalog endpoints
func (c *Client) Catalog() *Catalog {
return &Catalog{c}
}
func (c *Catalog) Register(reg *CatalogRegistration, q *WriteOptions) (*WriteMeta, error) {
r := c.c.newRequest("PUT", "/v1/catalog/register")
r.setWriteOptions(q)
r.obj = reg
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
resp.Body.Close()
wm := &WriteMeta{}
wm.RequestTime = rtt
return wm, nil
}
func (c *Catalog) Deregister(dereg *CatalogDeregistration, q *WriteOptions) (*WriteMeta, error) {
r := c.c.newRequest("PUT", "/v1/catalog/deregister")
r.setWriteOptions(q)
r.obj = dereg
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
resp.Body.Close()
wm := &WriteMeta{}
wm.RequestTime = rtt
return wm, nil
}
// Datacenters is used to query for all the known datacenters
func (c *Catalog) Datacenters() ([]string, error) {
r := c.c.newRequest("GET", "/v1/catalog/datacenters")
_, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out []string
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// Nodes is used to query all the known nodes
func (c *Catalog) Nodes(q *QueryOptions) ([]*Node, *QueryMeta, error) {
r := c.c.newRequest("GET", "/v1/catalog/nodes")
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out []*Node
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}
// Services is used to query for all known services
func (c *Catalog) Services(q *QueryOptions) (map[string][]string, *QueryMeta, error) {
r := c.c.newRequest("GET", "/v1/catalog/services")
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out map[string][]string
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}
// Service is used to query catalog entries for a given service
func (c *Catalog) Service(service, tag string, q *QueryOptions) ([]*CatalogService, *QueryMeta, error) {
r := c.c.newRequest("GET", "/v1/catalog/service/"+service)
r.setQueryOptions(q)
if tag != "" {
r.params.Set("tag", tag)
}
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out []*CatalogService
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}
// Node is used to query for service information about a single node
func (c *Catalog) Node(node string, q *QueryOptions) (*CatalogNode, *QueryMeta, error) {
r := c.c.newRequest("GET", "/v1/catalog/node/"+node)
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out *CatalogNode
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}

View file

@ -0,0 +1,279 @@
package api
import (
"fmt"
"testing"
"github.com/hashicorp/consul/testutil"
)
func TestCatalog_Datacenters(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
catalog := c.Catalog()
testutil.WaitForResult(func() (bool, error) {
datacenters, err := catalog.Datacenters()
if err != nil {
return false, err
}
if len(datacenters) == 0 {
return false, fmt.Errorf("Bad: %v", datacenters)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestCatalog_Nodes(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
catalog := c.Catalog()
testutil.WaitForResult(func() (bool, error) {
nodes, meta, err := catalog.Nodes(nil)
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}
if len(nodes) == 0 {
return false, fmt.Errorf("Bad: %v", nodes)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestCatalog_Services(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
catalog := c.Catalog()
testutil.WaitForResult(func() (bool, error) {
services, meta, err := catalog.Services(nil)
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}
if len(services) == 0 {
return false, fmt.Errorf("Bad: %v", services)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestCatalog_Service(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
catalog := c.Catalog()
testutil.WaitForResult(func() (bool, error) {
services, meta, err := catalog.Service("consul", "", nil)
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}
if len(services) == 0 {
return false, fmt.Errorf("Bad: %v", services)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestCatalog_Node(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
catalog := c.Catalog()
name, _ := c.Agent().NodeName()
testutil.WaitForResult(func() (bool, error) {
info, meta, err := catalog.Node(name, nil)
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}
if len(info.Services) == 0 {
return false, fmt.Errorf("Bad: %v", info)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestCatalog_Registration(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
catalog := c.Catalog()
service := &AgentService{
ID: "redis1",
Service: "redis",
Tags: []string{"master", "v1"},
Port: 8000,
}
check := &AgentCheck{
Node: "foobar",
CheckID: "service:redis1",
Name: "Redis health check",
Notes: "Script based health check",
Status: "passing",
ServiceID: "redis1",
}
reg := &CatalogRegistration{
Datacenter: "dc1",
Node: "foobar",
Address: "192.168.10.10",
Service: service,
Check: check,
}
testutil.WaitForResult(func() (bool, error) {
if _, err := catalog.Register(reg, nil); err != nil {
return false, err
}
node, _, err := catalog.Node("foobar", nil)
if err != nil {
return false, err
}
if _, ok := node.Services["redis1"]; !ok {
return false, fmt.Errorf("missing service: redis1")
}
health, _, err := c.Health().Node("foobar", nil)
if err != nil {
return false, err
}
if health[0].CheckID != "service:redis1" {
return false, fmt.Errorf("missing checkid service:redis1")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
// Test catalog deregistration of the previously registered service
dereg := &CatalogDeregistration{
Datacenter: "dc1",
Node: "foobar",
Address: "192.168.10.10",
ServiceID: "redis1",
}
if _, err := catalog.Deregister(dereg, nil); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
node, _, err := catalog.Node("foobar", nil)
if err != nil {
return false, err
}
if _, ok := node.Services["redis1"]; ok {
return false, fmt.Errorf("ServiceID:redis1 is not deregistered")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
// Test deregistration of the previously registered check
dereg = &CatalogDeregistration{
Datacenter: "dc1",
Node: "foobar",
Address: "192.168.10.10",
CheckID: "service:redis1",
}
if _, err := catalog.Deregister(dereg, nil); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
health, _, err := c.Health().Node("foobar", nil)
if err != nil {
return false, err
}
if len(health) != 0 {
return false, fmt.Errorf("CheckID:service:redis1 is not deregistered")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
// Test node deregistration of the previously registered node
dereg = &CatalogDeregistration{
Datacenter: "dc1",
Node: "foobar",
Address: "192.168.10.10",
}
if _, err := catalog.Deregister(dereg, nil); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
node, _, err := catalog.Node("foobar", nil)
if err != nil {
return false, err
}
if node != nil {
return false, fmt.Errorf("node is not deregistered: %v", node)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

View file

@ -0,0 +1,104 @@
package api
import (
"bytes"
"strconv"
)
// Event can be used to query the Event endpoints
type Event struct {
c *Client
}
// UserEvent represents an event that was fired by the user
type UserEvent struct {
ID string
Name string
Payload []byte
NodeFilter string
ServiceFilter string
TagFilter string
Version int
LTime uint64
}
// Event returns a handle to the event endpoints
func (c *Client) Event() *Event {
return &Event{c}
}
// Fire is used to fire a new user event. Only the Name, Payload and Filters
// are respected. This returns the ID or an associated error. Cross DC requests
// are supported.
func (e *Event) Fire(params *UserEvent, q *WriteOptions) (string, *WriteMeta, error) {
r := e.c.newRequest("PUT", "/v1/event/fire/"+params.Name)
r.setWriteOptions(q)
if params.NodeFilter != "" {
r.params.Set("node", params.NodeFilter)
}
if params.ServiceFilter != "" {
r.params.Set("service", params.ServiceFilter)
}
if params.TagFilter != "" {
r.params.Set("tag", params.TagFilter)
}
if params.Payload != nil {
r.body = bytes.NewReader(params.Payload)
}
rtt, resp, err := requireOK(e.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
var out UserEvent
if err := decodeBody(resp, &out); err != nil {
return "", nil, err
}
return out.ID, wm, nil
}
// List is used to get the most recent events an agent has received.
// This list can be optionally filtered by the name. This endpoint supports
// quasi-blocking queries. The index is not monotonic, nor does it provide provide
// LastContact or KnownLeader.
func (e *Event) List(name string, q *QueryOptions) ([]*UserEvent, *QueryMeta, error) {
r := e.c.newRequest("GET", "/v1/event/list")
r.setQueryOptions(q)
if name != "" {
r.params.Set("name", name)
}
rtt, resp, err := requireOK(e.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var entries []*UserEvent
if err := decodeBody(resp, &entries); err != nil {
return nil, nil, err
}
return entries, qm, nil
}
// IDToIndex is a bit of a hack. This simulates the index generation to
// convert an event ID into a WaitIndex.
func (e *Event) IDToIndex(uuid string) uint64 {
lower := uuid[0:8] + uuid[9:13] + uuid[14:18]
upper := uuid[19:23] + uuid[24:36]
lowVal, err := strconv.ParseUint(lower, 16, 64)
if err != nil {
panic("Failed to convert " + lower)
}
highVal, err := strconv.ParseUint(upper, 16, 64)
if err != nil {
panic("Failed to convert " + upper)
}
return lowVal ^ highVal
}

View file

@ -0,0 +1,49 @@
package api
import (
"testing"
"github.com/hashicorp/consul/testutil"
)
func TestEvent_FireList(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
event := c.Event()
params := &UserEvent{Name: "foo"}
id, meta, err := event.Fire(params, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if meta.RequestTime == 0 {
t.Fatalf("bad: %v", meta)
}
if id == "" {
t.Fatalf("invalid: %v", id)
}
var events []*UserEvent
var qm *QueryMeta
testutil.WaitForResult(func() (bool, error) {
events, qm, err = event.List("", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
return len(events) > 0, err
}, func(err error) {
t.Fatalf("err: %#v", err)
})
if events[len(events)-1].ID != id {
t.Fatalf("bad: %#v", events)
}
if qm.LastIndex != event.IDToIndex(id) {
t.Fatalf("Bad: %#v", qm)
}
}

View file

@ -0,0 +1,136 @@
package api
import (
"fmt"
)
// HealthCheck is used to represent a single check
type HealthCheck struct {
Node string
CheckID string
Name string
Status string
Notes string
Output string
ServiceID string
ServiceName string
}
// ServiceEntry is used for the health service endpoint
type ServiceEntry struct {
Node *Node
Service *AgentService
Checks []*HealthCheck
}
// Health can be used to query the Health endpoints
type Health struct {
c *Client
}
// Health returns a handle to the health endpoints
func (c *Client) Health() *Health {
return &Health{c}
}
// Node is used to query for checks belonging to a given node
func (h *Health) Node(node string, q *QueryOptions) ([]*HealthCheck, *QueryMeta, error) {
r := h.c.newRequest("GET", "/v1/health/node/"+node)
r.setQueryOptions(q)
rtt, resp, err := requireOK(h.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out []*HealthCheck
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}
// Checks is used to return the checks associated with a service
func (h *Health) Checks(service string, q *QueryOptions) ([]*HealthCheck, *QueryMeta, error) {
r := h.c.newRequest("GET", "/v1/health/checks/"+service)
r.setQueryOptions(q)
rtt, resp, err := requireOK(h.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out []*HealthCheck
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}
// Service is used to query health information along with service info
// for a given service. It can optionally do server-side filtering on a tag
// or nodes with passing health checks only.
func (h *Health) Service(service, tag string, passingOnly bool, q *QueryOptions) ([]*ServiceEntry, *QueryMeta, error) {
r := h.c.newRequest("GET", "/v1/health/service/"+service)
r.setQueryOptions(q)
if tag != "" {
r.params.Set("tag", tag)
}
if passingOnly {
r.params.Set("passing", "1")
}
rtt, resp, err := requireOK(h.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out []*ServiceEntry
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}
// State is used to retreive all the checks in a given state.
// The wildcard "any" state can also be used for all checks.
func (h *Health) State(state string, q *QueryOptions) ([]*HealthCheck, *QueryMeta, error) {
switch state {
case "any":
case "warning":
case "critical":
case "passing":
case "unknown":
default:
return nil, nil, fmt.Errorf("Unsupported state: %v", state)
}
r := h.c.newRequest("GET", "/v1/health/state/"+state)
r.setQueryOptions(q)
rtt, resp, err := requireOK(h.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out []*HealthCheck
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}

View file

@ -0,0 +1,125 @@
package api
import (
"fmt"
"testing"
"github.com/hashicorp/consul/testutil"
)
func TestHealth_Node(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
health := c.Health()
info, err := agent.Self()
if err != nil {
t.Fatalf("err: %v", err)
}
name := info["Config"]["NodeName"].(string)
testutil.WaitForResult(func() (bool, error) {
checks, meta, err := health.Node(name, nil)
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("bad: %v", meta)
}
if len(checks) == 0 {
return false, fmt.Errorf("bad: %v", checks)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestHealth_Checks(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
health := c.Health()
// Make a service with a check
reg := &AgentServiceRegistration{
Name: "foo",
Check: &AgentServiceCheck{
TTL: "15s",
},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
defer agent.ServiceDeregister("foo")
testutil.WaitForResult(func() (bool, error) {
checks, meta, err := health.Checks("foo", nil)
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("bad: %v", meta)
}
if len(checks) == 0 {
return false, fmt.Errorf("Bad: %v", checks)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestHealth_Service(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
health := c.Health()
testutil.WaitForResult(func() (bool, error) {
// consul service should always exist...
checks, meta, err := health.Service("consul", "", true, nil)
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("bad: %v", meta)
}
if len(checks) == 0 {
return false, fmt.Errorf("Bad: %v", checks)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestHealth_State(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
health := c.Health()
testutil.WaitForResult(func() (bool, error) {
checks, meta, err := health.State("any", nil)
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("bad: %v", meta)
}
if len(checks) == 0 {
return false, fmt.Errorf("Bad: %v", checks)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

View file

@ -0,0 +1,236 @@
package api
import (
"bytes"
"fmt"
"io"
"net/http"
"strconv"
"strings"
)
// KVPair is used to represent a single K/V entry
type KVPair struct {
Key string
CreateIndex uint64
ModifyIndex uint64
LockIndex uint64
Flags uint64
Value []byte
Session string
}
// KVPairs is a list of KVPair objects
type KVPairs []*KVPair
// KV is used to manipulate the K/V API
type KV struct {
c *Client
}
// KV is used to return a handle to the K/V apis
func (c *Client) KV() *KV {
return &KV{c}
}
// Get is used to lookup a single key
func (k *KV) Get(key string, q *QueryOptions) (*KVPair, *QueryMeta, error) {
resp, qm, err := k.getInternal(key, nil, q)
if err != nil {
return nil, nil, err
}
if resp == nil {
return nil, qm, nil
}
defer resp.Body.Close()
var entries []*KVPair
if err := decodeBody(resp, &entries); err != nil {
return nil, nil, err
}
if len(entries) > 0 {
return entries[0], qm, nil
}
return nil, qm, nil
}
// List is used to lookup all keys under a prefix
func (k *KV) List(prefix string, q *QueryOptions) (KVPairs, *QueryMeta, error) {
resp, qm, err := k.getInternal(prefix, map[string]string{"recurse": ""}, q)
if err != nil {
return nil, nil, err
}
if resp == nil {
return nil, qm, nil
}
defer resp.Body.Close()
var entries []*KVPair
if err := decodeBody(resp, &entries); err != nil {
return nil, nil, err
}
return entries, qm, nil
}
// Keys is used to list all the keys under a prefix. Optionally,
// a separator can be used to limit the responses.
func (k *KV) Keys(prefix, separator string, q *QueryOptions) ([]string, *QueryMeta, error) {
params := map[string]string{"keys": ""}
if separator != "" {
params["separator"] = separator
}
resp, qm, err := k.getInternal(prefix, params, q)
if err != nil {
return nil, nil, err
}
if resp == nil {
return nil, qm, nil
}
defer resp.Body.Close()
var entries []string
if err := decodeBody(resp, &entries); err != nil {
return nil, nil, err
}
return entries, qm, nil
}
func (k *KV) getInternal(key string, params map[string]string, q *QueryOptions) (*http.Response, *QueryMeta, error) {
r := k.c.newRequest("GET", "/v1/kv/"+key)
r.setQueryOptions(q)
for param, val := range params {
r.params.Set(param, val)
}
rtt, resp, err := k.c.doRequest(r)
if err != nil {
return nil, nil, err
}
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
if resp.StatusCode == 404 {
resp.Body.Close()
return nil, qm, nil
} else if resp.StatusCode != 200 {
resp.Body.Close()
return nil, nil, fmt.Errorf("Unexpected response code: %d", resp.StatusCode)
}
return resp, qm, nil
}
// Put is used to write a new value. Only the
// Key, Flags and Value is respected.
func (k *KV) Put(p *KVPair, q *WriteOptions) (*WriteMeta, error) {
params := make(map[string]string, 1)
if p.Flags != 0 {
params["flags"] = strconv.FormatUint(p.Flags, 10)
}
_, wm, err := k.put(p.Key, params, p.Value, q)
return wm, err
}
// CAS is used for a Check-And-Set operation. The Key,
// ModifyIndex, Flags and Value are respected. Returns true
// on success or false on failures.
func (k *KV) CAS(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
params := make(map[string]string, 2)
if p.Flags != 0 {
params["flags"] = strconv.FormatUint(p.Flags, 10)
}
params["cas"] = strconv.FormatUint(p.ModifyIndex, 10)
return k.put(p.Key, params, p.Value, q)
}
// Acquire is used for a lock acquisiiton operation. The Key,
// Flags, Value and Session are respected. Returns true
// on success or false on failures.
func (k *KV) Acquire(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
params := make(map[string]string, 2)
if p.Flags != 0 {
params["flags"] = strconv.FormatUint(p.Flags, 10)
}
params["acquire"] = p.Session
return k.put(p.Key, params, p.Value, q)
}
// Release is used for a lock release operation. The Key,
// Flags, Value and Session are respected. Returns true
// on success or false on failures.
func (k *KV) Release(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
params := make(map[string]string, 2)
if p.Flags != 0 {
params["flags"] = strconv.FormatUint(p.Flags, 10)
}
params["release"] = p.Session
return k.put(p.Key, params, p.Value, q)
}
func (k *KV) put(key string, params map[string]string, body []byte, q *WriteOptions) (bool, *WriteMeta, error) {
r := k.c.newRequest("PUT", "/v1/kv/"+key)
r.setWriteOptions(q)
for param, val := range params {
r.params.Set(param, val)
}
r.body = bytes.NewReader(body)
rtt, resp, err := requireOK(k.c.doRequest(r))
if err != nil {
return false, nil, err
}
defer resp.Body.Close()
qm := &WriteMeta{}
qm.RequestTime = rtt
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, fmt.Errorf("Failed to read response: %v", err)
}
res := strings.Contains(string(buf.Bytes()), "true")
return res, qm, nil
}
// Delete is used to delete a single key
func (k *KV) Delete(key string, w *WriteOptions) (*WriteMeta, error) {
_, qm, err := k.deleteInternal(key, nil, w)
return qm, err
}
// DeleteCAS is used for a Delete Check-And-Set operation. The Key
// and ModifyIndex are respected. Returns true on success or false on failures.
func (k *KV) DeleteCAS(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
params := map[string]string{
"cas": strconv.FormatUint(p.ModifyIndex, 10),
}
return k.deleteInternal(p.Key, params, q)
}
// DeleteTree is used to delete all keys under a prefix
func (k *KV) DeleteTree(prefix string, w *WriteOptions) (*WriteMeta, error) {
_, qm, err := k.deleteInternal(prefix, map[string]string{"recurse": ""}, w)
return qm, err
}
func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOptions) (bool, *WriteMeta, error) {
r := k.c.newRequest("DELETE", "/v1/kv/"+key)
r.setWriteOptions(q)
for param, val := range params {
r.params.Set(param, val)
}
rtt, resp, err := requireOK(k.c.doRequest(r))
if err != nil {
return false, nil, err
}
defer resp.Body.Close()
qm := &WriteMeta{}
qm.RequestTime = rtt
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, fmt.Errorf("Failed to read response: %v", err)
}
res := strings.Contains(string(buf.Bytes()), "true")
return res, qm, nil
}

View file

@ -0,0 +1,439 @@
package api
import (
"bytes"
"path"
"testing"
"time"
)
func TestClientPutGetDelete(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
kv := c.KV()
// Get a get without a key
key := testKey()
pair, _, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair != nil {
t.Fatalf("unexpected value: %#v", pair)
}
// Put the key
value := []byte("test")
p := &KVPair{Key: key, Flags: 42, Value: value}
if _, err := kv.Put(p, nil); err != nil {
t.Fatalf("err: %v", err)
}
// Get should work
pair, meta, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if !bytes.Equal(pair.Value, value) {
t.Fatalf("unexpected value: %#v", pair)
}
if pair.Flags != 42 {
t.Fatalf("unexpected value: %#v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
// Delete
if _, err := kv.Delete(key, nil); err != nil {
t.Fatalf("err: %v", err)
}
// Get should fail
pair, _, err = kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair != nil {
t.Fatalf("unexpected value: %#v", pair)
}
}
func TestClient_List_DeleteRecurse(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
kv := c.KV()
// Generate some test keys
prefix := testKey()
var keys []string
for i := 0; i < 100; i++ {
keys = append(keys, path.Join(prefix, testKey()))
}
// Set values
value := []byte("test")
for _, key := range keys {
p := &KVPair{Key: key, Value: value}
if _, err := kv.Put(p, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
// List the values
pairs, meta, err := kv.List(prefix, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(pairs) != len(keys) {
t.Fatalf("got %d keys", len(pairs))
}
for _, pair := range pairs {
if !bytes.Equal(pair.Value, value) {
t.Fatalf("unexpected value: %#v", pair)
}
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
// Delete all
if _, err := kv.DeleteTree(prefix, nil); err != nil {
t.Fatalf("err: %v", err)
}
// List the values
pairs, _, err = kv.List(prefix, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(pairs) != 0 {
t.Fatalf("got %d keys", len(pairs))
}
}
func TestClient_DeleteCAS(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
kv := c.KV()
// Put the key
key := testKey()
value := []byte("test")
p := &KVPair{Key: key, Value: value}
if work, _, err := kv.CAS(p, nil); err != nil {
t.Fatalf("err: %v", err)
} else if !work {
t.Fatalf("CAS failure")
}
// Get should work
pair, meta, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
// CAS update with bad index
p.ModifyIndex = 1
if work, _, err := kv.DeleteCAS(p, nil); err != nil {
t.Fatalf("err: %v", err)
} else if work {
t.Fatalf("unexpected CAS")
}
// CAS update with valid index
p.ModifyIndex = meta.LastIndex
if work, _, err := kv.DeleteCAS(p, nil); err != nil {
t.Fatalf("err: %v", err)
} else if !work {
t.Fatalf("unexpected CAS failure")
}
}
func TestClient_CAS(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
kv := c.KV()
// Put the key
key := testKey()
value := []byte("test")
p := &KVPair{Key: key, Value: value}
if work, _, err := kv.CAS(p, nil); err != nil {
t.Fatalf("err: %v", err)
} else if !work {
t.Fatalf("CAS failure")
}
// Get should work
pair, meta, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
// CAS update with bad index
newVal := []byte("foo")
p.Value = newVal
p.ModifyIndex = 1
if work, _, err := kv.CAS(p, nil); err != nil {
t.Fatalf("err: %v", err)
} else if work {
t.Fatalf("unexpected CAS")
}
// CAS update with valid index
p.ModifyIndex = meta.LastIndex
if work, _, err := kv.CAS(p, nil); err != nil {
t.Fatalf("err: %v", err)
} else if !work {
t.Fatalf("unexpected CAS failure")
}
}
func TestClient_WatchGet(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
kv := c.KV()
// Get a get without a key
key := testKey()
pair, meta, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair != nil {
t.Fatalf("unexpected value: %#v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
// Put the key
value := []byte("test")
go func() {
kv := c.KV()
time.Sleep(100 * time.Millisecond)
p := &KVPair{Key: key, Flags: 42, Value: value}
if _, err := kv.Put(p, nil); err != nil {
t.Fatalf("err: %v", err)
}
}()
// Get should work
options := &QueryOptions{WaitIndex: meta.LastIndex}
pair, meta2, err := kv.Get(key, options)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if !bytes.Equal(pair.Value, value) {
t.Fatalf("unexpected value: %#v", pair)
}
if pair.Flags != 42 {
t.Fatalf("unexpected value: %#v", pair)
}
if meta2.LastIndex <= meta.LastIndex {
t.Fatalf("unexpected value: %#v", meta2)
}
}
func TestClient_WatchList(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
kv := c.KV()
// Get a get without a key
prefix := testKey()
key := path.Join(prefix, testKey())
pairs, meta, err := kv.List(prefix, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(pairs) != 0 {
t.Fatalf("unexpected value: %#v", pairs)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
// Put the key
value := []byte("test")
go func() {
kv := c.KV()
time.Sleep(100 * time.Millisecond)
p := &KVPair{Key: key, Flags: 42, Value: value}
if _, err := kv.Put(p, nil); err != nil {
t.Fatalf("err: %v", err)
}
}()
// Get should work
options := &QueryOptions{WaitIndex: meta.LastIndex}
pairs, meta2, err := kv.List(prefix, options)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(pairs) != 1 {
t.Fatalf("expected value: %#v", pairs)
}
if !bytes.Equal(pairs[0].Value, value) {
t.Fatalf("unexpected value: %#v", pairs)
}
if pairs[0].Flags != 42 {
t.Fatalf("unexpected value: %#v", pairs)
}
if meta2.LastIndex <= meta.LastIndex {
t.Fatalf("unexpected value: %#v", meta2)
}
}
func TestClient_Keys_DeleteRecurse(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
kv := c.KV()
// Generate some test keys
prefix := testKey()
var keys []string
for i := 0; i < 100; i++ {
keys = append(keys, path.Join(prefix, testKey()))
}
// Set values
value := []byte("test")
for _, key := range keys {
p := &KVPair{Key: key, Value: value}
if _, err := kv.Put(p, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
// List the values
out, meta, err := kv.Keys(prefix, "", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(out) != len(keys) {
t.Fatalf("got %d keys", len(out))
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
// Delete all
if _, err := kv.DeleteTree(prefix, nil); err != nil {
t.Fatalf("err: %v", err)
}
// List the values
out, _, err = kv.Keys(prefix, "", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(out) != 0 {
t.Fatalf("got %d keys", len(out))
}
}
func TestClient_AcquireRelease(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
session := c.Session()
kv := c.KV()
// Make a session
id, _, err := session.CreateNoChecks(nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer session.Destroy(id, nil)
// Acquire the key
key := testKey()
value := []byte("test")
p := &KVPair{Key: key, Value: value, Session: id}
if work, _, err := kv.Acquire(p, nil); err != nil {
t.Fatalf("err: %v", err)
} else if !work {
t.Fatalf("Lock failure")
}
// Get should work
pair, meta, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if pair.LockIndex != 1 {
t.Fatalf("Expected lock: %v", pair)
}
if pair.Session != id {
t.Fatalf("Expected lock: %v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
// Release
if work, _, err := kv.Release(p, nil); err != nil {
t.Fatalf("err: %v", err)
} else if !work {
t.Fatalf("Release fail")
}
// Get should work
pair, meta, err = kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if pair.LockIndex != 1 {
t.Fatalf("Expected lock: %v", pair)
}
if pair.Session != "" {
t.Fatalf("Expected unlock: %v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
}

View file

@ -0,0 +1,326 @@
package api
import (
"fmt"
"sync"
"time"
)
const (
// DefaultLockSessionName is the Session Name we assign if none is provided
DefaultLockSessionName = "Consul API Lock"
// DefaultLockSessionTTL is the default session TTL if no Session is provided
// when creating a new Lock. This is used because we do not have another
// other check to depend upon.
DefaultLockSessionTTL = "15s"
// DefaultLockWaitTime is how long we block for at a time to check if lock
// acquisition is possible. This affects the minimum time it takes to cancel
// a Lock acquisition.
DefaultLockWaitTime = 15 * time.Second
// DefaultLockRetryTime is how long we wait after a failed lock acquisition
// before attempting to do the lock again. This is so that once a lock-delay
// is in affect, we do not hot loop retrying the acquisition.
DefaultLockRetryTime = 5 * time.Second
// LockFlagValue is a magic flag we set to indicate a key
// is being used for a lock. It is used to detect a potential
// conflict with a semaphore.
LockFlagValue = 0x2ddccbc058a50c18
)
var (
// ErrLockHeld is returned if we attempt to double lock
ErrLockHeld = fmt.Errorf("Lock already held")
// ErrLockNotHeld is returned if we attempt to unlock a lock
// that we do not hold.
ErrLockNotHeld = fmt.Errorf("Lock not held")
// ErrLockInUse is returned if we attempt to destroy a lock
// that is in use.
ErrLockInUse = fmt.Errorf("Lock in use")
// ErrLockConflict is returned if the flags on a key
// used for a lock do not match expectation
ErrLockConflict = fmt.Errorf("Existing key does not match lock use")
)
// Lock is used to implement client-side leader election. It is follows the
// algorithm as described here: https://consul.io/docs/guides/leader-election.html.
type Lock struct {
c *Client
opts *LockOptions
isHeld bool
sessionRenew chan struct{}
lockSession string
l sync.Mutex
}
// LockOptions is used to parameterize the Lock behavior.
type LockOptions struct {
Key string // Must be set and have write permissions
Value []byte // Optional, value to associate with the lock
Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
}
// LockKey returns a handle to a lock struct which can be used
// to acquire and release the mutex. The key used must have
// write permissions.
func (c *Client) LockKey(key string) (*Lock, error) {
opts := &LockOptions{
Key: key,
}
return c.LockOpts(opts)
}
// LockOpts returns a handle to a lock struct which can be used
// to acquire and release the mutex. The key used must have
// write permissions.
func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
if opts.Key == "" {
return nil, fmt.Errorf("missing key")
}
if opts.SessionName == "" {
opts.SessionName = DefaultLockSessionName
}
if opts.SessionTTL == "" {
opts.SessionTTL = DefaultLockSessionTTL
} else {
if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
}
}
l := &Lock{
c: c,
opts: opts,
}
return l, nil
}
// Lock attempts to acquire the lock and blocks while doing so.
// Providing a non-nil stopCh can be used to abort the lock attempt.
// Returns a channel that is closed if our lock is lost or an error.
// This channel could be closed at any time due to session invalidation,
// communication errors, operator intervention, etc. It is NOT safe to
// assume that the lock is held until Unlock() unless the Session is specifically
// created without any associated health checks. By default Consul sessions
// prefer liveness over safety and an application must be able to handle
// the lock being lost.
func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Hold the lock as we try to acquire
l.l.Lock()
defer l.l.Unlock()
// Check if we already hold the lock
if l.isHeld {
return nil, ErrLockHeld
}
// Check if we need to create a session first
l.lockSession = l.opts.Session
if l.lockSession == "" {
if s, err := l.createSession(); err != nil {
return nil, fmt.Errorf("failed to create session: %v", err)
} else {
l.sessionRenew = make(chan struct{})
l.lockSession = s
session := l.c.Session()
go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
// If we fail to acquire the lock, cleanup the session
defer func() {
if !l.isHeld {
close(l.sessionRenew)
l.sessionRenew = nil
}
}()
}
}
// Setup the query options
kv := l.c.KV()
qOpts := &QueryOptions{
WaitTime: DefaultLockWaitTime,
}
WAIT:
// Check if we should quit
select {
case <-stopCh:
return nil, nil
default:
}
// Look for an existing lock, blocking until not taken
pair, meta, err := kv.Get(l.opts.Key, qOpts)
if err != nil {
return nil, fmt.Errorf("failed to read lock: %v", err)
}
if pair != nil && pair.Flags != LockFlagValue {
return nil, ErrLockConflict
}
locked := false
if pair != nil && pair.Session == l.lockSession {
goto HELD
}
if pair != nil && pair.Session != "" {
qOpts.WaitIndex = meta.LastIndex
goto WAIT
}
// Try to acquire the lock
pair = l.lockEntry(l.lockSession)
locked, _, err = kv.Acquire(pair, nil)
if err != nil {
return nil, fmt.Errorf("failed to acquire lock: %v", err)
}
// Handle the case of not getting the lock
if !locked {
select {
case <-time.After(DefaultLockRetryTime):
goto WAIT
case <-stopCh:
return nil, nil
}
}
HELD:
// Watch to ensure we maintain leadership
leaderCh := make(chan struct{})
go l.monitorLock(l.lockSession, leaderCh)
// Set that we own the lock
l.isHeld = true
// Locked! All done
return leaderCh, nil
}
// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func (l *Lock) Unlock() error {
// Hold the lock as we try to release
l.l.Lock()
defer l.l.Unlock()
// Ensure the lock is actually held
if !l.isHeld {
return ErrLockNotHeld
}
// Set that we no longer own the lock
l.isHeld = false
// Stop the session renew
if l.sessionRenew != nil {
defer func() {
close(l.sessionRenew)
l.sessionRenew = nil
}()
}
// Get the lock entry, and clear the lock session
lockEnt := l.lockEntry(l.lockSession)
l.lockSession = ""
// Release the lock explicitly
kv := l.c.KV()
_, _, err := kv.Release(lockEnt, nil)
if err != nil {
return fmt.Errorf("failed to release lock: %v", err)
}
return nil
}
// Destroy is used to cleanup the lock entry. It is not necessary
// to invoke. It will fail if the lock is in use.
func (l *Lock) Destroy() error {
// Hold the lock as we try to release
l.l.Lock()
defer l.l.Unlock()
// Check if we already hold the lock
if l.isHeld {
return ErrLockHeld
}
// Look for an existing lock
kv := l.c.KV()
pair, _, err := kv.Get(l.opts.Key, nil)
if err != nil {
return fmt.Errorf("failed to read lock: %v", err)
}
// Nothing to do if the lock does not exist
if pair == nil {
return nil
}
// Check for possible flag conflict
if pair.Flags != LockFlagValue {
return ErrLockConflict
}
// Check if it is in use
if pair.Session != "" {
return ErrLockInUse
}
// Attempt the delete
didRemove, _, err := kv.DeleteCAS(pair, nil)
if err != nil {
return fmt.Errorf("failed to remove lock: %v", err)
}
if !didRemove {
return ErrLockInUse
}
return nil
}
// createSession is used to create a new managed session
func (l *Lock) createSession() (string, error) {
session := l.c.Session()
se := &SessionEntry{
Name: l.opts.SessionName,
TTL: l.opts.SessionTTL,
}
id, _, err := session.Create(se, nil)
if err != nil {
return "", err
}
return id, nil
}
// lockEntry returns a formatted KVPair for the lock
func (l *Lock) lockEntry(session string) *KVPair {
return &KVPair{
Key: l.opts.Key,
Value: l.opts.Value,
Session: session,
Flags: LockFlagValue,
}
}
// monitorLock is a long running routine to monitor a lock ownership
// It closes the stopCh if we lose our leadership.
func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
defer close(stopCh)
kv := l.c.KV()
opts := &QueryOptions{RequireConsistent: true}
WAIT:
pair, meta, err := kv.Get(l.opts.Key, opts)
if err != nil {
return
}
if pair != nil && pair.Session == session {
opts.WaitIndex = meta.LastIndex
goto WAIT
}
}

View file

@ -0,0 +1,363 @@
package api
import (
"log"
"sync"
"testing"
"time"
)
func TestLock_LockUnlock(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Initial unlock should fail
err = lock.Unlock()
if err != ErrLockNotHeld {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
// Double lock should fail
_, err = lock.Lock(nil)
if err != ErrLockHeld {
t.Fatalf("err: %v", err)
}
// Should be leader
select {
case <-leaderCh:
t.Fatalf("should be leader")
default:
}
// Initial unlock should work
err = lock.Unlock()
if err != nil {
t.Fatalf("err: %v", err)
}
// Double unlock should fail
err = lock.Unlock()
if err != ErrLockNotHeld {
t.Fatalf("err: %v", err)
}
// Should loose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}
func TestLock_ForceInvalidate(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
go func() {
// Nuke the session, simulator an operator invalidation
// or a health check failure
session := c.Session()
session.Destroy(lock.lockSession, nil)
}()
// Should loose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}
func TestLock_DeleteKey(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
go func() {
// Nuke the key, simulate an operator intervention
kv := c.KV()
kv.Delete("test/lock", nil)
}()
// Should loose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}
func TestLock_Contend(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
wg := &sync.WaitGroup{}
acquired := make([]bool, 3)
for idx := range acquired {
wg.Add(1)
go func(idx int) {
defer wg.Done()
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work eventually, will contend
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
log.Printf("Contender %d acquired", idx)
// Set acquired and then leave
acquired[idx] = true
}(idx)
}
// Wait for termination
doneCh := make(chan struct{})
go func() {
wg.Wait()
close(doneCh)
}()
// Wait for everybody to get a turn
select {
case <-doneCh:
case <-time.After(3 * DefaultLockRetryTime):
t.Fatalf("timeout")
}
for idx, did := range acquired {
if !did {
t.Fatalf("contender %d never acquired", idx)
}
}
}
func TestLock_Destroy(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
// Destroy should fail
if err := lock.Destroy(); err != ErrLockHeld {
t.Fatalf("err: %v", err)
}
// Should be able to release
err = lock.Unlock()
if err != nil {
t.Fatalf("err: %v", err)
}
// Acquire with a different lock
l2, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err = l2.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
// Destroy should still fail
if err := lock.Destroy(); err != ErrLockInUse {
t.Fatalf("err: %v", err)
}
// Should relese
err = l2.Unlock()
if err != nil {
t.Fatalf("err: %v", err)
}
// Destroy should work
err = lock.Destroy()
if err != nil {
t.Fatalf("err: %v", err)
}
// Double destroy should work
err = l2.Destroy()
if err != nil {
t.Fatalf("err: %v", err)
}
}
func TestLock_Conflict(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
sema, err := c.SemaphorePrefix("test/lock/", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
lockCh, err := sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if lockCh == nil {
t.Fatalf("not hold")
}
defer sema.Release()
lock, err := c.LockKey("test/lock/.lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should conflict with semaphore
_, err = lock.Lock(nil)
if err != ErrLockConflict {
t.Fatalf("err: %v", err)
}
// Should conflict with semaphore
err = lock.Destroy()
if err != ErrLockConflict {
t.Fatalf("err: %v", err)
}
}
func TestLock_ReclaimLock(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
session, _, err := c.Session().Create(&SessionEntry{}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
lock, err := c.LockOpts(&LockOptions{Key: "test/lock", Session: session})
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
l2, err := c.LockOpts(&LockOptions{Key: "test/lock", Session: session})
if err != nil {
t.Fatalf("err: %v", err)
}
reclaimed := make(chan (<-chan struct{}), 1)
go func() {
l2Ch, err := l2.Lock(nil)
if err != nil {
t.Fatalf("not locked: %v", err)
}
reclaimed <- l2Ch
}()
// Should reclaim the lock
var leader2Ch <-chan struct{}
select {
case leader2Ch = <-reclaimed:
case <-time.After(time.Second):
t.Fatalf("should have locked")
}
// unlock should work
err = l2.Unlock()
if err != nil {
t.Fatalf("err: %v", err)
}
//Both locks should see the unlock
select {
case <-leader2Ch:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}

View file

@ -0,0 +1,24 @@
package api
// Raw can be used to do raw queries against custom endpoints
type Raw struct {
c *Client
}
// Raw returns a handle to query endpoints
func (c *Client) Raw() *Raw {
return &Raw{c}
}
// Query is used to do a GET request against an endpoint
// and deserialize the response into an interface using
// standard Consul conventions.
func (raw *Raw) Query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
return raw.c.query(endpoint, out, q)
}
// Write is used to do a PUT request against an endpoint
// and serialize/deserialized using the standard Consul conventions.
func (raw *Raw) Write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
return raw.c.write(endpoint, in, out, q)
}

View file

@ -0,0 +1,482 @@
package api
import (
"encoding/json"
"fmt"
"path"
"sync"
"time"
)
const (
// DefaultSemaphoreSessionName is the Session Name we assign if none is provided
DefaultSemaphoreSessionName = "Consul API Semaphore"
// DefaultSemaphoreSessionTTL is the default session TTL if no Session is provided
// when creating a new Semaphore. This is used because we do not have another
// other check to depend upon.
DefaultSemaphoreSessionTTL = "15s"
// DefaultSemaphoreWaitTime is how long we block for at a time to check if semaphore
// acquisition is possible. This affects the minimum time it takes to cancel
// a Semaphore acquisition.
DefaultSemaphoreWaitTime = 15 * time.Second
// DefaultSemaphoreRetryTime is how long we wait after a failed lock acquisition
// before attempting to do the lock again. This is so that once a lock-delay
// is in affect, we do not hot loop retrying the acquisition.
DefaultSemaphoreRetryTime = 5 * time.Second
// DefaultSemaphoreKey is the key used within the prefix to
// use for coordination between all the contenders.
DefaultSemaphoreKey = ".lock"
// SemaphoreFlagValue is a magic flag we set to indicate a key
// is being used for a semaphore. It is used to detect a potential
// conflict with a lock.
SemaphoreFlagValue = 0xe0f69a2baa414de0
)
var (
// ErrSemaphoreHeld is returned if we attempt to double lock
ErrSemaphoreHeld = fmt.Errorf("Semaphore already held")
// ErrSemaphoreNotHeld is returned if we attempt to unlock a semaphore
// that we do not hold.
ErrSemaphoreNotHeld = fmt.Errorf("Semaphore not held")
// ErrSemaphoreInUse is returned if we attempt to destroy a semaphore
// that is in use.
ErrSemaphoreInUse = fmt.Errorf("Semaphore in use")
// ErrSemaphoreConflict is returned if the flags on a key
// used for a semaphore do not match expectation
ErrSemaphoreConflict = fmt.Errorf("Existing key does not match semaphore use")
)
// Semaphore is used to implement a distributed semaphore
// using the Consul KV primitives.
type Semaphore struct {
c *Client
opts *SemaphoreOptions
isHeld bool
sessionRenew chan struct{}
lockSession string
l sync.Mutex
}
// SemaphoreOptions is used to parameterize the Semaphore
type SemaphoreOptions struct {
Prefix string // Must be set and have write permissions
Limit int // Must be set, and be positive
Value []byte // Optional, value to associate with the contender entry
Session string // OPtional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
}
// semaphoreLock is written under the DefaultSemaphoreKey and
// is used to coordinate between all the contenders.
type semaphoreLock struct {
// Limit is the integer limit of holders. This is used to
// verify that all the holders agree on the value.
Limit int
// Holders is a list of all the semaphore holders.
// It maps the session ID to true. It is used as a set effectively.
Holders map[string]bool
}
// SemaphorePrefix is used to created a Semaphore which will operate
// at the given KV prefix and uses the given limit for the semaphore.
// The prefix must have write privileges, and the limit must be agreed
// upon by all contenders.
func (c *Client) SemaphorePrefix(prefix string, limit int) (*Semaphore, error) {
opts := &SemaphoreOptions{
Prefix: prefix,
Limit: limit,
}
return c.SemaphoreOpts(opts)
}
// SemaphoreOpts is used to create a Semaphore with the given options.
// The prefix must have write privileges, and the limit must be agreed
// upon by all contenders. If a Session is not provided, one will be created.
func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) {
if opts.Prefix == "" {
return nil, fmt.Errorf("missing prefix")
}
if opts.Limit <= 0 {
return nil, fmt.Errorf("semaphore limit must be positive")
}
if opts.SessionName == "" {
opts.SessionName = DefaultSemaphoreSessionName
}
if opts.SessionTTL == "" {
opts.SessionTTL = DefaultSemaphoreSessionTTL
} else {
if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
}
}
s := &Semaphore{
c: c,
opts: opts,
}
return s, nil
}
// Acquire attempts to reserve a slot in the semaphore, blocking until
// success, interrupted via the stopCh or an error is encounted.
// Providing a non-nil stopCh can be used to abort the attempt.
// On success, a channel is returned that represents our slot.
// This channel could be closed at any time due to session invalidation,
// communication errors, operator intervention, etc. It is NOT safe to
// assume that the slot is held until Release() unless the Session is specifically
// created without any associated health checks. By default Consul sessions
// prefer liveness over safety and an application must be able to handle
// the session being lost.
func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Hold the lock as we try to acquire
s.l.Lock()
defer s.l.Unlock()
// Check if we already hold the semaphore
if s.isHeld {
return nil, ErrSemaphoreHeld
}
// Check if we need to create a session first
s.lockSession = s.opts.Session
if s.lockSession == "" {
if sess, err := s.createSession(); err != nil {
return nil, fmt.Errorf("failed to create session: %v", err)
} else {
s.sessionRenew = make(chan struct{})
s.lockSession = sess
session := s.c.Session()
go session.RenewPeriodic(s.opts.SessionTTL, sess, nil, s.sessionRenew)
// If we fail to acquire the lock, cleanup the session
defer func() {
if !s.isHeld {
close(s.sessionRenew)
s.sessionRenew = nil
}
}()
}
}
// Create the contender entry
kv := s.c.KV()
made, _, err := kv.Acquire(s.contenderEntry(s.lockSession), nil)
if err != nil || !made {
return nil, fmt.Errorf("failed to make contender entry: %v", err)
}
// Setup the query options
qOpts := &QueryOptions{
WaitTime: DefaultSemaphoreWaitTime,
}
WAIT:
// Check if we should quit
select {
case <-stopCh:
return nil, nil
default:
}
// Read the prefix
pairs, meta, err := kv.List(s.opts.Prefix, qOpts)
if err != nil {
return nil, fmt.Errorf("failed to read prefix: %v", err)
}
// Decode the lock
lockPair := s.findLock(pairs)
if lockPair.Flags != SemaphoreFlagValue {
return nil, ErrSemaphoreConflict
}
lock, err := s.decodeLock(lockPair)
if err != nil {
return nil, err
}
// Verify we agree with the limit
if lock.Limit != s.opts.Limit {
return nil, fmt.Errorf("semaphore limit conflict (lock: %d, local: %d)",
lock.Limit, s.opts.Limit)
}
// Prune the dead holders
s.pruneDeadHolders(lock, pairs)
// Check if the lock is held
if len(lock.Holders) >= lock.Limit {
qOpts.WaitIndex = meta.LastIndex
goto WAIT
}
// Create a new lock with us as a holder
lock.Holders[s.lockSession] = true
newLock, err := s.encodeLock(lock, lockPair.ModifyIndex)
if err != nil {
return nil, err
}
// Attempt the acquisition
didSet, _, err := kv.CAS(newLock, nil)
if err != nil {
return nil, fmt.Errorf("failed to update lock: %v", err)
}
if !didSet {
// Update failed, could have been a race with another contender,
// retry the operation
goto WAIT
}
// Watch to ensure we maintain ownership of the slot
lockCh := make(chan struct{})
go s.monitorLock(s.lockSession, lockCh)
// Set that we own the lock
s.isHeld = true
// Acquired! All done
return lockCh, nil
}
// Release is used to voluntarily give up our semaphore slot. It is
// an error to call this if the semaphore has not been acquired.
func (s *Semaphore) Release() error {
// Hold the lock as we try to release
s.l.Lock()
defer s.l.Unlock()
// Ensure the lock is actually held
if !s.isHeld {
return ErrSemaphoreNotHeld
}
// Set that we no longer own the lock
s.isHeld = false
// Stop the session renew
if s.sessionRenew != nil {
defer func() {
close(s.sessionRenew)
s.sessionRenew = nil
}()
}
// Get and clear the lock session
lockSession := s.lockSession
s.lockSession = ""
// Remove ourselves as a lock holder
kv := s.c.KV()
key := path.Join(s.opts.Prefix, DefaultSemaphoreKey)
READ:
pair, _, err := kv.Get(key, nil)
if err != nil {
return err
}
if pair == nil {
pair = &KVPair{}
}
lock, err := s.decodeLock(pair)
if err != nil {
return err
}
// Create a new lock without us as a holder
if _, ok := lock.Holders[lockSession]; ok {
delete(lock.Holders, lockSession)
newLock, err := s.encodeLock(lock, pair.ModifyIndex)
if err != nil {
return err
}
// Swap the locks
didSet, _, err := kv.CAS(newLock, nil)
if err != nil {
return fmt.Errorf("failed to update lock: %v", err)
}
if !didSet {
goto READ
}
}
// Destroy the contender entry
contenderKey := path.Join(s.opts.Prefix, lockSession)
if _, err := kv.Delete(contenderKey, nil); err != nil {
return err
}
return nil
}
// Destroy is used to cleanup the semaphore entry. It is not necessary
// to invoke. It will fail if the semaphore is in use.
func (s *Semaphore) Destroy() error {
// Hold the lock as we try to acquire
s.l.Lock()
defer s.l.Unlock()
// Check if we already hold the semaphore
if s.isHeld {
return ErrSemaphoreHeld
}
// List for the semaphore
kv := s.c.KV()
pairs, _, err := kv.List(s.opts.Prefix, nil)
if err != nil {
return fmt.Errorf("failed to read prefix: %v", err)
}
// Find the lock pair, bail if it doesn't exist
lockPair := s.findLock(pairs)
if lockPair.ModifyIndex == 0 {
return nil
}
if lockPair.Flags != SemaphoreFlagValue {
return ErrSemaphoreConflict
}
// Decode the lock
lock, err := s.decodeLock(lockPair)
if err != nil {
return err
}
// Prune the dead holders
s.pruneDeadHolders(lock, pairs)
// Check if there are any holders
if len(lock.Holders) > 0 {
return ErrSemaphoreInUse
}
// Attempt the delete
didRemove, _, err := kv.DeleteCAS(lockPair, nil)
if err != nil {
return fmt.Errorf("failed to remove semaphore: %v", err)
}
if !didRemove {
return ErrSemaphoreInUse
}
return nil
}
// createSession is used to create a new managed session
func (s *Semaphore) createSession() (string, error) {
session := s.c.Session()
se := &SessionEntry{
Name: s.opts.SessionName,
TTL: s.opts.SessionTTL,
Behavior: SessionBehaviorDelete,
}
id, _, err := session.Create(se, nil)
if err != nil {
return "", err
}
return id, nil
}
// contenderEntry returns a formatted KVPair for the contender
func (s *Semaphore) contenderEntry(session string) *KVPair {
return &KVPair{
Key: path.Join(s.opts.Prefix, session),
Value: s.opts.Value,
Session: session,
Flags: SemaphoreFlagValue,
}
}
// findLock is used to find the KV Pair which is used for coordination
func (s *Semaphore) findLock(pairs KVPairs) *KVPair {
key := path.Join(s.opts.Prefix, DefaultSemaphoreKey)
for _, pair := range pairs {
if pair.Key == key {
return pair
}
}
return &KVPair{Flags: SemaphoreFlagValue}
}
// decodeLock is used to decode a semaphoreLock from an
// entry in Consul
func (s *Semaphore) decodeLock(pair *KVPair) (*semaphoreLock, error) {
// Handle if there is no lock
if pair == nil || pair.Value == nil {
return &semaphoreLock{
Limit: s.opts.Limit,
Holders: make(map[string]bool),
}, nil
}
l := &semaphoreLock{}
if err := json.Unmarshal(pair.Value, l); err != nil {
return nil, fmt.Errorf("lock decoding failed: %v", err)
}
return l, nil
}
// encodeLock is used to encode a semaphoreLock into a KVPair
// that can be PUT
func (s *Semaphore) encodeLock(l *semaphoreLock, oldIndex uint64) (*KVPair, error) {
enc, err := json.Marshal(l)
if err != nil {
return nil, fmt.Errorf("lock encoding failed: %v", err)
}
pair := &KVPair{
Key: path.Join(s.opts.Prefix, DefaultSemaphoreKey),
Value: enc,
Flags: SemaphoreFlagValue,
ModifyIndex: oldIndex,
}
return pair, nil
}
// pruneDeadHolders is used to remove all the dead lock holders
func (s *Semaphore) pruneDeadHolders(lock *semaphoreLock, pairs KVPairs) {
// Gather all the live holders
alive := make(map[string]struct{}, len(pairs))
for _, pair := range pairs {
if pair.Session != "" {
alive[pair.Session] = struct{}{}
}
}
// Remove any holders that are dead
for holder := range lock.Holders {
if _, ok := alive[holder]; !ok {
delete(lock.Holders, holder)
}
}
}
// monitorLock is a long running routine to monitor a semaphore ownership
// It closes the stopCh if we lose our slot.
func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) {
defer close(stopCh)
kv := s.c.KV()
opts := &QueryOptions{RequireConsistent: true}
WAIT:
pairs, meta, err := kv.List(s.opts.Prefix, opts)
if err != nil {
return
}
lockPair := s.findLock(pairs)
lock, err := s.decodeLock(lockPair)
if err != nil {
return
}
s.pruneDeadHolders(lock, pairs)
if _, ok := lock.Holders[session]; ok {
opts.WaitIndex = meta.LastIndex
goto WAIT
}
}

View file

@ -0,0 +1,313 @@
package api
import (
"log"
"sync"
"testing"
"time"
)
func TestSemaphore_AcquireRelease(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
sema, err := c.SemaphorePrefix("test/semaphore", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Initial release should fail
err = sema.Release()
if err != ErrSemaphoreNotHeld {
t.Fatalf("err: %v", err)
}
// Should work
lockCh, err := sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if lockCh == nil {
t.Fatalf("not hold")
}
// Double lock should fail
_, err = sema.Acquire(nil)
if err != ErrSemaphoreHeld {
t.Fatalf("err: %v", err)
}
// Should be held
select {
case <-lockCh:
t.Fatalf("should be held")
default:
}
// Initial release should work
err = sema.Release()
if err != nil {
t.Fatalf("err: %v", err)
}
// Double unlock should fail
err = sema.Release()
if err != ErrSemaphoreNotHeld {
t.Fatalf("err: %v", err)
}
// Should lose resource
select {
case <-lockCh:
case <-time.After(time.Second):
t.Fatalf("should not be held")
}
}
func TestSemaphore_ForceInvalidate(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
sema, err := c.SemaphorePrefix("test/semaphore", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
lockCh, err := sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if lockCh == nil {
t.Fatalf("not acquired")
}
defer sema.Release()
go func() {
// Nuke the session, simulator an operator invalidation
// or a health check failure
session := c.Session()
session.Destroy(sema.lockSession, nil)
}()
// Should loose slot
select {
case <-lockCh:
case <-time.After(time.Second):
t.Fatalf("should not be locked")
}
}
func TestSemaphore_DeleteKey(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
sema, err := c.SemaphorePrefix("test/semaphore", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
lockCh, err := sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if lockCh == nil {
t.Fatalf("not locked")
}
defer sema.Release()
go func() {
// Nuke the key, simulate an operator intervention
kv := c.KV()
kv.DeleteTree("test/semaphore", nil)
}()
// Should loose leadership
select {
case <-lockCh:
case <-time.After(time.Second):
t.Fatalf("should not be locked")
}
}
func TestSemaphore_Contend(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
wg := &sync.WaitGroup{}
acquired := make([]bool, 4)
for idx := range acquired {
wg.Add(1)
go func(idx int) {
defer wg.Done()
sema, err := c.SemaphorePrefix("test/semaphore", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work eventually, will contend
lockCh, err := sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if lockCh == nil {
t.Fatalf("not locked")
}
defer sema.Release()
log.Printf("Contender %d acquired", idx)
// Set acquired and then leave
acquired[idx] = true
}(idx)
}
// Wait for termination
doneCh := make(chan struct{})
go func() {
wg.Wait()
close(doneCh)
}()
// Wait for everybody to get a turn
select {
case <-doneCh:
case <-time.After(3 * DefaultLockRetryTime):
t.Fatalf("timeout")
}
for idx, did := range acquired {
if !did {
t.Fatalf("contender %d never acquired", idx)
}
}
}
func TestSemaphore_BadLimit(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
sema, err := c.SemaphorePrefix("test/semaphore", 0)
if err == nil {
t.Fatalf("should error")
}
sema, err = c.SemaphorePrefix("test/semaphore", 1)
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
sema2, err := c.SemaphorePrefix("test/semaphore", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = sema2.Acquire(nil)
if err.Error() != "semaphore limit conflict (lock: 1, local: 2)" {
t.Fatalf("err: %v", err)
}
}
func TestSemaphore_Destroy(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
sema, err := c.SemaphorePrefix("test/semaphore", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
sema2, err := c.SemaphorePrefix("test/semaphore", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
_, err = sema2.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
// Destroy should fail, still held
if err := sema.Destroy(); err != ErrSemaphoreHeld {
t.Fatalf("err: %v", err)
}
err = sema.Release()
if err != nil {
t.Fatalf("err: %v", err)
}
// Destroy should fail, still in use
if err := sema.Destroy(); err != ErrSemaphoreInUse {
t.Fatalf("err: %v", err)
}
err = sema2.Release()
if err != nil {
t.Fatalf("err: %v", err)
}
// Destroy should work
if err := sema.Destroy(); err != nil {
t.Fatalf("err: %v", err)
}
// Destroy should work
if err := sema2.Destroy(); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestSemaphore_Conflict(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
lock, err := c.LockKey("test/sema/.lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
sema, err := c.SemaphorePrefix("test/sema/", 2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should conflict with lock
_, err = sema.Acquire(nil)
if err != ErrSemaphoreConflict {
t.Fatalf("err: %v", err)
}
// Should conflict with lock
err = sema.Destroy()
if err != ErrSemaphoreConflict {
t.Fatalf("err: %v", err)
}
}

View file

@ -0,0 +1,201 @@
package api
import (
"fmt"
"time"
)
const (
// SessionBehaviorRelease is the default behavior and causes
// all associated locks to be released on session invalidation.
SessionBehaviorRelease = "release"
// SessionBehaviorDelete is new in Consul 0.5 and changes the
// behavior to delete all associated locks on session invalidation.
// It can be used in a way similar to Ephemeral Nodes in ZooKeeper.
SessionBehaviorDelete = "delete"
)
// SessionEntry represents a session in consul
type SessionEntry struct {
CreateIndex uint64
ID string
Name string
Node string
Checks []string
LockDelay time.Duration
Behavior string
TTL string
}
// Session can be used to query the Session endpoints
type Session struct {
c *Client
}
// Session returns a handle to the session endpoints
func (c *Client) Session() *Session {
return &Session{c}
}
// CreateNoChecks is like Create but is used specifically to create
// a session with no associated health checks.
func (s *Session) CreateNoChecks(se *SessionEntry, q *WriteOptions) (string, *WriteMeta, error) {
body := make(map[string]interface{})
body["Checks"] = []string{}
if se != nil {
if se.Name != "" {
body["Name"] = se.Name
}
if se.Node != "" {
body["Node"] = se.Node
}
if se.LockDelay != 0 {
body["LockDelay"] = durToMsec(se.LockDelay)
}
if se.Behavior != "" {
body["Behavior"] = se.Behavior
}
if se.TTL != "" {
body["TTL"] = se.TTL
}
}
return s.create(body, q)
}
// Create makes a new session. Providing a session entry can
// customize the session. It can also be nil to use defaults.
func (s *Session) Create(se *SessionEntry, q *WriteOptions) (string, *WriteMeta, error) {
var obj interface{}
if se != nil {
body := make(map[string]interface{})
obj = body
if se.Name != "" {
body["Name"] = se.Name
}
if se.Node != "" {
body["Node"] = se.Node
}
if se.LockDelay != 0 {
body["LockDelay"] = durToMsec(se.LockDelay)
}
if len(se.Checks) > 0 {
body["Checks"] = se.Checks
}
if se.Behavior != "" {
body["Behavior"] = se.Behavior
}
if se.TTL != "" {
body["TTL"] = se.TTL
}
}
return s.create(obj, q)
}
func (s *Session) create(obj interface{}, q *WriteOptions) (string, *WriteMeta, error) {
var out struct{ ID string }
wm, err := s.c.write("/v1/session/create", obj, &out, q)
if err != nil {
return "", nil, err
}
return out.ID, wm, nil
}
// Destroy invalides a given session
func (s *Session) Destroy(id string, q *WriteOptions) (*WriteMeta, error) {
wm, err := s.c.write("/v1/session/destroy/"+id, nil, nil, q)
if err != nil {
return nil, err
}
return wm, nil
}
// Renew renews the TTL on a given session
func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta, error) {
var entries []*SessionEntry
wm, err := s.c.write("/v1/session/renew/"+id, nil, &entries, q)
if err != nil {
return nil, nil, err
}
if len(entries) > 0 {
return entries[0], wm, nil
}
return nil, wm, nil
}
// RenewPeriodic is used to periodically invoke Session.Renew on a
// session until a doneCh is closed. This is meant to be used in a long running
// goroutine to ensure a session stays valid.
func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh chan struct{}) error {
ttl, err := time.ParseDuration(initialTTL)
if err != nil {
return err
}
waitDur := ttl / 2
lastRenewTime := time.Now()
var lastErr error
for {
if time.Since(lastRenewTime) > ttl {
return lastErr
}
select {
case <-time.After(waitDur):
entry, _, err := s.Renew(id, q)
if err != nil {
waitDur = time.Second
lastErr = err
continue
}
if entry == nil {
waitDur = time.Second
lastErr = fmt.Errorf("No SessionEntry returned")
continue
}
// Handle the server updating the TTL
ttl, _ = time.ParseDuration(entry.TTL)
waitDur = ttl / 2
lastRenewTime = time.Now()
case <-doneCh:
// Attempt a session destroy
s.Destroy(id, q)
return nil
}
}
}
// Info looks up a single session
func (s *Session) Info(id string, q *QueryOptions) (*SessionEntry, *QueryMeta, error) {
var entries []*SessionEntry
qm, err := s.c.query("/v1/session/info/"+id, &entries, q)
if err != nil {
return nil, nil, err
}
if len(entries) > 0 {
return entries[0], qm, nil
}
return nil, qm, nil
}
// List gets sessions for a node
func (s *Session) Node(node string, q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) {
var entries []*SessionEntry
qm, err := s.c.query("/v1/session/node/"+node, &entries, q)
if err != nil {
return nil, nil, err
}
return entries, qm, nil
}
// List gets all active sessions
func (s *Session) List(q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) {
var entries []*SessionEntry
qm, err := s.c.query("/v1/session/list", &entries, q)
if err != nil {
return nil, nil, err
}
return entries, qm, nil
}

View file

@ -0,0 +1,205 @@
package api
import (
"testing"
)
func TestSession_CreateDestroy(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
session := c.Session()
id, meta, err := session.Create(nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if meta.RequestTime == 0 {
t.Fatalf("bad: %v", meta)
}
if id == "" {
t.Fatalf("invalid: %v", id)
}
meta, err = session.Destroy(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if meta.RequestTime == 0 {
t.Fatalf("bad: %v", meta)
}
}
func TestSession_CreateRenewDestroy(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
session := c.Session()
se := &SessionEntry{
TTL: "10s",
}
id, meta, err := session.Create(se, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer session.Destroy(id, nil)
if meta.RequestTime == 0 {
t.Fatalf("bad: %v", meta)
}
if id == "" {
t.Fatalf("invalid: %v", id)
}
if meta.RequestTime == 0 {
t.Fatalf("bad: %v", meta)
}
renew, meta, err := session.Renew(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if meta.RequestTime == 0 {
t.Fatalf("bad: %v", meta)
}
if renew == nil {
t.Fatalf("should get session")
}
if renew.ID != id {
t.Fatalf("should have matching id")
}
if renew.TTL != "10s" {
t.Fatalf("should get session with TTL")
}
}
func TestSession_Info(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
session := c.Session()
id, _, err := session.Create(nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer session.Destroy(id, nil)
info, qm, err := session.Info(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if qm.LastIndex == 0 {
t.Fatalf("bad: %v", qm)
}
if !qm.KnownLeader {
t.Fatalf("bad: %v", qm)
}
if info == nil {
t.Fatalf("should get session")
}
if info.CreateIndex == 0 {
t.Fatalf("bad: %v", info)
}
if info.ID != id {
t.Fatalf("bad: %v", info)
}
if info.Name != "" {
t.Fatalf("bad: %v", info)
}
if info.Node == "" {
t.Fatalf("bad: %v", info)
}
if len(info.Checks) == 0 {
t.Fatalf("bad: %v", info)
}
if info.LockDelay == 0 {
t.Fatalf("bad: %v", info)
}
if info.Behavior != "release" {
t.Fatalf("bad: %v", info)
}
if info.TTL != "" {
t.Fatalf("bad: %v", info)
}
}
func TestSession_Node(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
session := c.Session()
id, _, err := session.Create(nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer session.Destroy(id, nil)
info, qm, err := session.Info(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
sessions, qm, err := session.Node(info.Node, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions) != 1 {
t.Fatalf("bad: %v", sessions)
}
if qm.LastIndex == 0 {
t.Fatalf("bad: %v", qm)
}
if !qm.KnownLeader {
t.Fatalf("bad: %v", qm)
}
}
func TestSession_List(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
session := c.Session()
id, _, err := session.Create(nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer session.Destroy(id, nil)
sessions, qm, err := session.List(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions) != 1 {
t.Fatalf("bad: %v", sessions)
}
if qm.LastIndex == 0 {
t.Fatalf("bad: %v", qm)
}
if !qm.KnownLeader {
t.Fatalf("bad: %v", qm)
}
}

View file

@ -0,0 +1,43 @@
package api
// Status can be used to query the Status endpoints
type Status struct {
c *Client
}
// Status returns a handle to the status endpoints
func (c *Client) Status() *Status {
return &Status{c}
}
// Leader is used to query for a known leader
func (s *Status) Leader() (string, error) {
r := s.c.newRequest("GET", "/v1/status/leader")
_, resp, err := requireOK(s.c.doRequest(r))
if err != nil {
return "", err
}
defer resp.Body.Close()
var leader string
if err := decodeBody(resp, &leader); err != nil {
return "", err
}
return leader, nil
}
// Peers is used to query for a known raft peers
func (s *Status) Peers() ([]string, error) {
r := s.c.newRequest("GET", "/v1/status/peers")
_, resp, err := requireOK(s.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var peers []string
if err := decodeBody(resp, &peers); err != nil {
return nil, err
}
return peers, nil
}

View file

@ -0,0 +1,37 @@
package api
import (
"testing"
)
func TestStatusLeader(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
status := c.Status()
leader, err := status.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
if leader == "" {
t.Fatalf("Expected leader")
}
}
func TestStatusPeers(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
status := c.Status()
peers, err := status.Peers()
if err != nil {
t.Fatalf("err: %v", err)
}
if len(peers) == 0 {
t.Fatalf("Expected peers ")
}
}

View file

@ -78,6 +78,12 @@ var (
DefaultFileSDConfig = DefaultedFileSDConfig{
RefreshInterval: Duration(30 * time.Second),
}
// The default Consul SD configuration.
DefaultConsulSDConfig = DefaultedConsulSDConfig{
TagSeparator: ",",
Scheme: "http",
}
)
// Config is the top-level configuration for Prometheus's config files.
@ -200,6 +206,8 @@ type DefaultedScrapeConfig struct {
DNSSDConfigs []*DNSSDConfig `yaml:"dns_sd_configs,omitempty"`
// List of file service discovery configurations.
FileSDConfigs []*FileSDConfig `yaml:"file_sd_configs,omitempty"`
// List of Consul service discovery configurations.
ConsulSDConfigs []*ConsulSDConfig `yaml:"consul_sd_configs,omitempty"`
// List of relabel configurations.
RelabelConfigs []*RelabelConfig `yaml:"relabel_configs,omitempty"`
}
@ -340,6 +348,40 @@ type DefaultedFileSDConfig struct {
RefreshInterval Duration `yaml:"refresh_interval,omitempty"`
}
// ConsulSDConfig is the configuration for Consul service discovery.
type ConsulSDConfig struct {
// DefaultedConsulSDConfig contains the actual fields for ConsulSDConfig.
DefaultedConsulSDConfig `yaml:",inline"`
}
// UnmarshalYAML implements the yaml.Unmarshaller interface.
func (c *ConsulSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
c.DefaultedConsulSDConfig = DefaultConsulSDConfig
err := unmarshal(&c.DefaultedConsulSDConfig)
if err != nil {
return err
}
if strings.TrimSpace(c.Server) == "" {
return fmt.Errorf("Consul SD configuration requires a server address")
}
if len(c.Services) == 0 {
return fmt.Errorf("Consul SD configuration requires at least one service name")
}
return nil
}
// DefaultedConsulSDConfig is a proxy type for ConsulSDConfig.
type DefaultedConsulSDConfig struct {
Server string `yaml:"server"`
Token string `yaml:"token"`
Datacenter string `yaml:"datacenter"`
TagSeparator string `yaml:"tag_separator"`
Scheme string `yaml:"scheme"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Services []string `yaml:"services"`
}
// RelabelAction is the action to be performed on relabeling.
type RelabelAction string

View file

@ -0,0 +1,266 @@
package discovery
import (
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/golang/glog"
consul "github.com/hashicorp/consul/api"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
)
const (
consulSourcePrefix = "consul"
consulWatchTimeout = 30 * time.Second
consulRetryInterval = 15 * time.Second
// ConsuleNodeLabel is the name for the label containing a target's node name.
ConsulNodeLabel = clientmodel.MetaLabelPrefix + "consul_node"
// ConsulTagsLabel is the name of the label containing the tags assigned to the target.
ConsulTagsLabel = clientmodel.MetaLabelPrefix + "consul_tags"
// ConsulServiceLabel is the name of the label containing the service name.
ConsulServiceLabel = clientmodel.MetaLabelPrefix + "consul_service"
// ConsulDCLabel is the name of the label containing the datacenter ID.
ConsulDCLabel = clientmodel.MetaLabelPrefix + "consul_dc"
)
// ConsulDiscovery retrieves target information from a Consul server
// and updates them via watches.
type ConsulDiscovery struct {
client *consul.Client
clientConf *consul.Config
tagSeparator string
scrapedServices map[string]struct{}
mu sync.RWMutex
services map[string]*consulService
runDone, srvsDone chan struct{}
}
// consulService contains data belonging to the same service.
type consulService struct {
name string
tgroup *config.TargetGroup
lastIndex uint64
removed bool
running bool
done chan struct{}
}
// NewConsulDiscovery returns a new ConsulDiscovery for the given config.
func NewConsulDiscovery(conf *config.ConsulSDConfig) *ConsulDiscovery {
clientConf := &consul.Config{
Address: conf.Server,
Scheme: conf.Scheme,
Datacenter: conf.Datacenter,
Token: conf.Token,
HttpAuth: &consul.HttpBasicAuth{
Username: conf.Username,
Password: conf.Password,
},
}
client, err := consul.NewClient(clientConf)
if err != nil {
// NewClient always returns a nil error.
panic(fmt.Errorf("discovery.NewConsulDiscovery: %s", err))
}
cd := &ConsulDiscovery{
client: client,
clientConf: clientConf,
tagSeparator: conf.TagSeparator,
runDone: make(chan struct{}),
srvsDone: make(chan struct{}, 1),
scrapedServices: map[string]struct{}{},
services: map[string]*consulService{},
}
for _, name := range conf.Services {
cd.scrapedServices[name] = struct{}{}
}
return cd
}
// Sources implements the TargetProvider interface.
func (cd *ConsulDiscovery) Sources() []string {
clientConf := *cd.clientConf
clientConf.HttpClient = &http.Client{Timeout: 5 * time.Second}
client, err := consul.NewClient(&clientConf)
if err != nil {
// NewClient always returns a nil error.
panic(fmt.Errorf("discovery.ConsulDiscovery.Sources: %s", err))
}
srvs, _, err := client.Catalog().Services(nil)
if err != nil {
glog.Errorf("Error refreshing service list: %s", err)
return nil
}
cd.mu.Lock()
defer cd.mu.Unlock()
srcs := make([]string, 0, len(srvs))
for name := range srvs {
if _, ok := cd.scrapedServices[name]; ok {
srcs = append(srcs, consulSourcePrefix+":"+name)
}
}
return srcs
}
// Run implements the TargetProvider interface.
func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) {
defer close(ch)
update := make(chan *consulService, 10)
go cd.watchServices(update)
for {
select {
case <-cd.runDone:
return
case srv := <-update:
if srv.removed {
ch <- &config.TargetGroup{Source: consulSourcePrefix + ":" + srv.name}
break
}
// Launch watcher for the service.
if !srv.running {
go cd.watchService(srv, ch)
srv.running = true
}
}
}
}
// Stop implements the TargetProvider interface.
func (cd *ConsulDiscovery) Stop() {
glog.V(1).Infof("Stopping Consul service discovery for %s", cd.clientConf.Address)
// The lock prevents Run from terminating while the watchers attempt
// to send on their channels.
cd.mu.Lock()
defer cd.mu.Unlock()
// The watching goroutines will terminate after their next watch timeout.
// As this can take long, the channel is buffered and we do not wait.
for _, srv := range cd.services {
srv.done <- struct{}{}
}
cd.srvsDone <- struct{}{}
// Terminate Run.
cd.runDone <- struct{}{}
glog.V(1).Infof("Consul service discovery for %s stopped.", cd.clientConf.Address)
}
// watchServices retrieves updates from Consul's services endpoint and sends
// potential updates to the update channel.
func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
var lastIndex uint64
for {
catalog := cd.client.Catalog()
srvs, meta, err := catalog.Services(&consul.QueryOptions{
RequireConsistent: false,
WaitIndex: lastIndex,
})
if err != nil {
glog.Errorf("Error refreshing service list: %s", err)
<-time.After(consulRetryInterval)
continue
}
// If the index equals the previous one, the watch timed out with no update.
if meta.LastIndex == lastIndex {
continue
}
lastIndex = meta.LastIndex
cd.mu.Lock()
select {
case <-cd.srvsDone:
return
default:
// Continue.
}
// Check for new services.
for name := range srvs {
if _, ok := cd.scrapedServices[name]; !ok {
continue
}
srv, ok := cd.services[name]
if !ok {
srv = &consulService{
name: name,
tgroup: &config.TargetGroup{},
done: make(chan struct{}, 1),
}
srv.tgroup.Source = consulSourcePrefix + ":" + name
cd.services[name] = srv
}
srv.tgroup.Labels = clientmodel.LabelSet{
ConsulServiceLabel: clientmodel.LabelValue(name),
ConsulDCLabel: clientmodel.LabelValue(cd.clientConf.Datacenter),
}
update <- srv
}
// Check for removed services.
for name, srv := range cd.services {
if _, ok := srvs[name]; !ok {
srv.removed = true
update <- srv
srv.done <- struct{}{}
delete(cd.services, name)
}
}
cd.mu.Unlock()
}
}
// watchService retrieves updates about srv from Consul's service endpoint.
// On a potential update the resulting target group is sent to ch.
func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.TargetGroup) {
catalog := cd.client.Catalog()
for {
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{
WaitIndex: srv.lastIndex,
WaitTime: consulWatchTimeout,
})
if err != nil {
glog.Errorf("Error refreshing service %s: %s", srv.name, err)
<-time.After(consulRetryInterval)
continue
}
// If the index equals the previous one, the watch timed out with no update.
if meta.LastIndex == srv.lastIndex {
continue
}
srv.lastIndex = meta.LastIndex
srv.tgroup.Targets = make([]clientmodel.LabelSet, 0, len(nodes))
for _, node := range nodes {
addr := fmt.Sprintf("%s:%d", node.Address, node.ServicePort)
tags := strings.Join(node.ServiceTags, cd.tagSeparator)
srv.tgroup.Targets = append(srv.tgroup.Targets, clientmodel.LabelSet{
clientmodel.AddressLabel: clientmodel.LabelValue(addr),
ConsulNodeLabel: clientmodel.LabelValue(node.Node),
ConsulTagsLabel: clientmodel.LabelValue(tags),
})
}
cd.mu.Lock()
select {
case <-srv.done:
return
default:
// Continue.
}
ch <- srv.tgroup
cd.mu.Unlock()
}
}

View file

@ -361,6 +361,9 @@ func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
for _, c := range cfg.FileSDConfigs {
providers = append(providers, discovery.NewFileDiscovery(c))
}
for _, c := range cfg.ConsulSDConfigs {
providers = append(providers, discovery.NewConsulDiscovery(c))
}
if len(cfg.TargetGroups) > 0 {
providers = append(providers, NewStaticProvider(cfg.TargetGroups))
}