vendor: Update consul client

We were so much behind...
This commit is contained in:
beorn7 2016-10-28 15:27:00 +02:00
parent 5754f5237e
commit 43cddad30b
14 changed files with 1015 additions and 109 deletions

View file

@ -4,12 +4,12 @@ Consul API client
This package provides the `api` package which attempts to
provide programmatic access to the full Consul API.
Currently, all of the Consul APIs included in version 0.3 are supported.
Currently, all of the Consul APIs included in version 0.6.0 are supported.
Documentation
=============
The full documentation is available on [Godoc](http://godoc.org/github.com/hashicorp/consul/api)
The full documentation is available on [Godoc](https://godoc.org/github.com/hashicorp/consul/api)
Usage
=====
@ -17,13 +17,18 @@ Usage
Below is an example of using the Consul client:
```go
// Get a new client, with KV endpoints
client, _ := api.NewClient(api.DefaultConfig())
// Get a new client
client, err := api.NewClient(api.DefaultConfig())
if err != nil {
panic(err)
}
// Get a handle to the KV API
kv := client.KV()
// PUT a new KV pair
p := &api.KVPair{Key: "foo", Value: []byte("test")}
_, err := kv.Put(p, nil)
_, err = kv.Put(p, nil)
if err != nil {
panic(err)
}
@ -36,4 +41,3 @@ if err != nil {
fmt.Printf("KV: %v", pair)
```

View file

@ -18,11 +18,12 @@ type AgentCheck struct {
// AgentService represents a service known to the agent
type AgentService struct {
ID string
Service string
Tags []string
Port int
Address string
ID string
Service string
Tags []string
Port int
Address string
EnableTagOverride bool
}
// AgentMember represents a cluster member known to the agent
@ -42,13 +43,14 @@ type AgentMember struct {
// AgentServiceRegistration is used to register a new service
type AgentServiceRegistration struct {
ID string `json:",omitempty"`
Name string `json:",omitempty"`
Tags []string `json:",omitempty"`
Port int `json:",omitempty"`
Address string `json:",omitempty"`
Check *AgentServiceCheck
Checks AgentServiceChecks
ID string `json:",omitempty"`
Name string `json:",omitempty"`
Tags []string `json:",omitempty"`
Port int `json:",omitempty"`
Address string `json:",omitempty"`
EnableTagOverride bool `json:",omitempty"`
Check *AgentServiceCheck
Checks AgentServiceChecks
}
// AgentCheckRegistration is used to register a new check
@ -60,16 +62,25 @@ type AgentCheckRegistration struct {
AgentServiceCheck
}
// AgentServiceCheck is used to create an associated
// check for a service
// AgentServiceCheck is used to define a node or service level check
type AgentServiceCheck struct {
Script string `json:",omitempty"`
Interval string `json:",omitempty"`
Timeout string `json:",omitempty"`
TTL string `json:",omitempty"`
HTTP string `json:",omitempty"`
TCP string `json:",omitempty"`
Status string `json:",omitempty"`
Script string `json:",omitempty"`
DockerContainerID string `json:",omitempty"`
Shell string `json:",omitempty"` // Only supported for Docker.
Interval string `json:",omitempty"`
Timeout string `json:",omitempty"`
TTL string `json:",omitempty"`
HTTP string `json:",omitempty"`
TCP string `json:",omitempty"`
Status string `json:",omitempty"`
// In Consul 0.7 and later, checks that are associated with a service
// may also contain this optional DeregisterCriticalServiceAfter field,
// which is a timeout in the same Go time format as Interval and TTL. If
// a check is in the critical state for more than this configured value,
// then its associated service (and all of its associated checks) will
// automatically be deregistered.
DeregisterCriticalServiceAfter string `json:",omitempty"`
}
type AgentServiceChecks []*AgentServiceCheck
@ -194,23 +205,43 @@ func (a *Agent) ServiceDeregister(serviceID string) error {
return nil
}
// PassTTL is used to set a TTL check to the passing state
// PassTTL is used to set a TTL check to the passing state.
//
// DEPRECATION NOTICE: This interface is deprecated in favor of UpdateTTL().
// The client interface will be removed in 0.8 or changed to use
// UpdateTTL()'s endpoint and the server endpoints will be removed in 0.9.
func (a *Agent) PassTTL(checkID, note string) error {
return a.UpdateTTL(checkID, note, "pass")
return a.updateTTL(checkID, note, "pass")
}
// WarnTTL is used to set a TTL check to the warning state
// WarnTTL is used to set a TTL check to the warning state.
//
// DEPRECATION NOTICE: This interface is deprecated in favor of UpdateTTL().
// The client interface will be removed in 0.8 or changed to use
// UpdateTTL()'s endpoint and the server endpoints will be removed in 0.9.
func (a *Agent) WarnTTL(checkID, note string) error {
return a.UpdateTTL(checkID, note, "warn")
return a.updateTTL(checkID, note, "warn")
}
// FailTTL is used to set a TTL check to the failing state
// FailTTL is used to set a TTL check to the failing state.
//
// DEPRECATION NOTICE: This interface is deprecated in favor of UpdateTTL().
// The client interface will be removed in 0.8 or changed to use
// UpdateTTL()'s endpoint and the server endpoints will be removed in 0.9.
func (a *Agent) FailTTL(checkID, note string) error {
return a.UpdateTTL(checkID, note, "fail")
return a.updateTTL(checkID, note, "fail")
}
// UpdateTTL is used to update the TTL of a check
func (a *Agent) UpdateTTL(checkID, note, status string) error {
// updateTTL is used to update the TTL of a check. This is the internal
// method that uses the old API that's present in Consul versions prior to
// 0.6.4. Since Consul didn't have an analogous "update" API before it seemed
// ok to break this (former) UpdateTTL in favor of the new UpdateTTL below,
// but keep the old Pass/Warn/Fail methods using the old API under the hood.
//
// DEPRECATION NOTICE: This interface is deprecated in favor of UpdateTTL().
// The client interface will be removed in 0.8 and the server endpoints will
// be removed in 0.9.
func (a *Agent) updateTTL(checkID, note, status string) error {
switch status {
case "pass":
case "warn":
@ -229,6 +260,51 @@ func (a *Agent) UpdateTTL(checkID, note, status string) error {
return nil
}
// checkUpdate is the payload for a PUT for a check update.
type checkUpdate struct {
// Status is one of the api.Health* states: HealthPassing
// ("passing"), HealthWarning ("warning"), or HealthCritical
// ("critical").
Status string
// Output is the information to post to the UI for operators as the
// output of the process that decided to hit the TTL check. This is
// different from the note field that's associated with the check
// itself.
Output string
}
// UpdateTTL is used to update the TTL of a check. This uses the newer API
// that was introduced in Consul 0.6.4 and later. We translate the old status
// strings for compatibility (though a newer version of Consul will still be
// required to use this API).
func (a *Agent) UpdateTTL(checkID, output, status string) error {
switch status {
case "pass", HealthPassing:
status = HealthPassing
case "warn", HealthWarning:
status = HealthWarning
case "fail", HealthCritical:
status = HealthCritical
default:
return fmt.Errorf("Invalid status: %s", status)
}
endpoint := fmt.Sprintf("/v1/agent/check/update/%s", checkID)
r := a.c.newRequest("PUT", endpoint)
r.obj = &checkUpdate{
Status: status,
Output: output,
}
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// CheckRegister is used to register a new check with
// the local agent
func (a *Agent) CheckRegister(check *AgentCheckRegistration) error {

View file

@ -3,9 +3,11 @@ package api
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
@ -14,6 +16,8 @@ import (
"strconv"
"strings"
"time"
"github.com/hashicorp/go-cleanhttp"
)
// QueryOptions are used to parameterize a query
@ -36,12 +40,18 @@ type QueryOptions struct {
WaitIndex uint64
// WaitTime is used to bound the duration of a wait.
// Defaults to that of the Config, but can be overriden.
// Defaults to that of the Config, but can be overridden.
WaitTime time.Duration
// Token is used to provide a per-request ACL token
// which overrides the agent's default token.
Token string
// Near is used to provide a node name that will sort the results
// in ascending order based on the estimated round trip time from
// that node. Setting this to "_agent" will use the agent's node
// for the sort.
Near string
}
// WriteOptions are used to parameterize a write
@ -70,6 +80,9 @@ type QueryMeta struct {
// How long did the request take
RequestTime time.Duration
// Is address translation enabled for HTTP responses on this agent
AddressTranslationEnabled bool
}
// WriteMeta is used to return meta data about a write
@ -114,12 +127,58 @@ type Config struct {
Token string
}
// DefaultConfig returns a default configuration for the client
// TLSConfig is used to generate a TLSClientConfig that's useful for talking to
// Consul using TLS.
type TLSConfig struct {
// Address is the optional address of the Consul server. The port, if any
// will be removed from here and this will be set to the ServerName of the
// resulting config.
Address string
// CAFile is the optional path to the CA certificate used for Consul
// communication, defaults to the system bundle if not specified.
CAFile string
// CertFile is the optional path to the certificate for Consul
// communication. If this is set then you need to also set KeyFile.
CertFile string
// KeyFile is the optional path to the private key for Consul communication.
// If this is set then you need to also set CertFile.
KeyFile string
// InsecureSkipVerify if set to true will disable TLS host verification.
InsecureSkipVerify bool
}
// DefaultConfig returns a default configuration for the client. By default this
// will pool and reuse idle connections to Consul. If you have a long-lived
// client object, this is the desired behavior and should make the most efficient
// use of the connections to Consul. If you don't reuse a client object , which
// is not recommended, then you may notice idle connections building up over
// time. To avoid this, use the DefaultNonPooledConfig() instead.
func DefaultConfig() *Config {
return defaultConfig(cleanhttp.DefaultPooledTransport)
}
// DefaultNonPooledConfig returns a default configuration for the client which
// does not pool connections. This isn't a recommended configuration because it
// will reconnect to Consul on every request, but this is useful to avoid the
// accumulation of idle connections if you make many client objects during the
// lifetime of your application.
func DefaultNonPooledConfig() *Config {
return defaultConfig(cleanhttp.DefaultTransport)
}
// defaultConfig returns the default configuration for the client, using the
// given function to make the transport.
func defaultConfig(transportFn func() *http.Transport) *Config {
config := &Config{
Address: "127.0.0.1:8500",
Scheme: "http",
HttpClient: http.DefaultClient,
Address: "127.0.0.1:8500",
Scheme: "http",
HttpClient: &http.Client{
Transport: transportFn(),
},
}
if addr := os.Getenv("CONSUL_HTTP_ADDR"); addr != "" {
@ -164,17 +223,70 @@ func DefaultConfig() *Config {
}
if !doVerify {
config.HttpClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
tlsClientConfig, err := SetupTLSConfig(&TLSConfig{
InsecureSkipVerify: true,
})
// We don't expect this to fail given that we aren't
// parsing any of the input, but we panic just in case
// since this doesn't have an error return.
if err != nil {
panic(err)
}
transport := transportFn()
transport.TLSClientConfig = tlsClientConfig
config.HttpClient.Transport = transport
}
}
return config
}
// TLSConfig is used to generate a TLSClientConfig that's useful for talking to
// Consul using TLS.
func SetupTLSConfig(tlsConfig *TLSConfig) (*tls.Config, error) {
tlsClientConfig := &tls.Config{
InsecureSkipVerify: tlsConfig.InsecureSkipVerify,
}
if tlsConfig.Address != "" {
server := tlsConfig.Address
hasPort := strings.LastIndex(server, ":") > strings.LastIndex(server, "]")
if hasPort {
var err error
server, _, err = net.SplitHostPort(server)
if err != nil {
return nil, err
}
}
tlsClientConfig.ServerName = server
}
if tlsConfig.CertFile != "" && tlsConfig.KeyFile != "" {
tlsCert, err := tls.LoadX509KeyPair(tlsConfig.CertFile, tlsConfig.KeyFile)
if err != nil {
return nil, err
}
tlsClientConfig.Certificates = []tls.Certificate{tlsCert}
}
if tlsConfig.CAFile != "" {
data, err := ioutil.ReadFile(tlsConfig.CAFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA file: %v", err)
}
caPool := x509.NewCertPool()
if !caPool.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("failed to parse CA certificate")
}
tlsClientConfig.RootCAs = caPool
}
return tlsClientConfig, nil
}
// Client provides a client to the Consul API
type Client struct {
config Config
@ -198,12 +310,12 @@ func NewClient(config *Config) (*Client, error) {
}
if parts := strings.SplitN(config.Address, "unix://", 2); len(parts) == 2 {
trans := cleanhttp.DefaultTransport()
trans.Dial = func(_, _ string) (net.Conn, error) {
return net.Dial("unix", parts[1])
}
config.HttpClient = &http.Client{
Transport: &http.Transport{
Dial: func(_, _ string) (net.Conn, error) {
return net.Dial("unix", parts[1])
},
},
Transport: trans,
}
config.Address = parts[1]
}
@ -221,6 +333,7 @@ type request struct {
url *url.URL
params url.Values
body io.Reader
header http.Header
obj interface{}
}
@ -246,13 +359,38 @@ func (r *request) setQueryOptions(q *QueryOptions) {
r.params.Set("wait", durToMsec(q.WaitTime))
}
if q.Token != "" {
r.params.Set("token", q.Token)
r.header.Set("X-Consul-Token", q.Token)
}
if q.Near != "" {
r.params.Set("near", q.Near)
}
}
// durToMsec converts a duration to a millisecond specified string
// durToMsec converts a duration to a millisecond specified string. If the
// user selected a positive value that rounds to 0 ms, then we will use 1 ms
// so they get a short delay, otherwise Consul will translate the 0 ms into
// a huge default delay.
func durToMsec(dur time.Duration) string {
return fmt.Sprintf("%dms", dur/time.Millisecond)
ms := dur / time.Millisecond
if dur > 0 && ms == 0 {
ms = 1
}
return fmt.Sprintf("%dms", ms)
}
// serverError is a string we look for to detect 500 errors.
const serverError = "Unexpected response code: 500"
// IsServerError returns true for 500 errors from the Consul servers, these are
// usually retryable at a later time.
func IsServerError(err error) bool {
if err == nil {
return false
}
// TODO (slackpad) - Make a real error type here instead of using
// a string check.
return strings.Contains(err.Error(), serverError)
}
// setWriteOptions is used to annotate the request with
@ -265,7 +403,7 @@ func (r *request) setWriteOptions(q *WriteOptions) {
r.params.Set("dc", q.Datacenter)
}
if q.Token != "" {
r.params.Set("token", q.Token)
r.header.Set("X-Consul-Token", q.Token)
}
}
@ -292,6 +430,7 @@ func (r *request) toHTTP() (*http.Request, error) {
req.URL.Host = r.url.Host
req.URL.Scheme = r.url.Scheme
req.Host = r.url.Host
req.Header = r.header
// Setup auth
if r.config.HttpAuth != nil {
@ -312,6 +451,7 @@ func (c *Client) newRequest(method, path string) *request {
Path: path,
},
params: make(map[string][]string),
header: make(http.Header),
}
if c.config.Datacenter != "" {
r.params.Set("dc", c.config.Datacenter)
@ -320,7 +460,7 @@ func (c *Client) newRequest(method, path string) *request {
r.params.Set("wait", durToMsec(r.config.WaitTime))
}
if c.config.Token != "" {
r.params.Set("token", r.config.Token)
r.header.Set("X-Consul-Token", r.config.Token)
}
return r
}
@ -405,6 +545,15 @@ func parseQueryMeta(resp *http.Response, q *QueryMeta) error {
default:
q.KnownLeader = false
}
// Parse X-Consul-Translate-Addresses
switch header.Get("X-Consul-Translate-Addresses") {
case "true":
q.AddressTranslationEnabled = true
default:
q.AddressTranslationEnabled = false
}
return nil
}

View file

@ -1,18 +1,21 @@
package api
type Node struct {
Node string
Address string
Node string
Address string
TaggedAddresses map[string]string
}
type CatalogService struct {
Node string
Address string
ServiceID string
ServiceName string
ServiceAddress string
ServiceTags []string
ServicePort int
Node string
Address string
TaggedAddresses map[string]string
ServiceID string
ServiceName string
ServiceAddress string
ServiceTags []string
ServicePort int
ServiceEnableTagOverride bool
}
type CatalogNode struct {
@ -21,11 +24,12 @@ type CatalogNode struct {
}
type CatalogRegistration struct {
Node string
Address string
Datacenter string
Service *AgentService
Check *AgentCheck
Node string
Address string
TaggedAddresses map[string]string
Datacenter string
Service *AgentService
Check *AgentCheck
}
type CatalogDeregistration struct {

66
vendor/github.com/hashicorp/consul/api/coordinate.go generated vendored Normal file
View file

@ -0,0 +1,66 @@
package api
import (
"github.com/hashicorp/serf/coordinate"
)
// CoordinateEntry represents a node and its associated network coordinate.
type CoordinateEntry struct {
Node string
Coord *coordinate.Coordinate
}
// CoordinateDatacenterMap represents a datacenter and its associated WAN
// nodes and their associates coordinates.
type CoordinateDatacenterMap struct {
Datacenter string
Coordinates []CoordinateEntry
}
// Coordinate can be used to query the coordinate endpoints
type Coordinate struct {
c *Client
}
// Coordinate returns a handle to the coordinate endpoints
func (c *Client) Coordinate() *Coordinate {
return &Coordinate{c}
}
// Datacenters is used to return the coordinates of all the servers in the WAN
// pool.
func (c *Coordinate) Datacenters() ([]*CoordinateDatacenterMap, error) {
r := c.c.newRequest("GET", "/v1/coordinate/datacenters")
_, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out []*CoordinateDatacenterMap
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// Nodes is used to return the coordinates of all the nodes in the LAN pool.
func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) {
r := c.c.newRequest("GET", "/v1/coordinate/nodes")
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out []*CoordinateEntry
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}

View file

@ -4,6 +4,15 @@ import (
"fmt"
)
const (
// HealthAny is special, and is used as a wild card,
// not as a specific state.
HealthAny = "any"
HealthPassing = "passing"
HealthWarning = "warning"
HealthCritical = "critical"
)
// HealthCheck is used to represent a single check
type HealthCheck struct {
Node string
@ -85,7 +94,7 @@ func (h *Health) Service(service, tag string, passingOnly bool, q *QueryOptions)
r.params.Set("tag", tag)
}
if passingOnly {
r.params.Set("passing", "1")
r.params.Set(HealthPassing, "1")
}
rtt, resp, err := requireOK(h.c.doRequest(r))
if err != nil {
@ -104,15 +113,14 @@ func (h *Health) Service(service, tag string, passingOnly bool, q *QueryOptions)
return out, qm, nil
}
// State is used to retreive all the checks in a given state.
// State is used to retrieve all the checks in a given state.
// The wildcard "any" state can also be used for all checks.
func (h *Health) State(state string, q *QueryOptions) ([]*HealthCheck, *QueryMeta, error) {
switch state {
case "any":
case "warning":
case "critical":
case "passing":
case "unknown":
case HealthAny:
case HealthWarning:
case HealthCritical:
case HealthPassing:
default:
return nil, nil, fmt.Errorf("Unsupported state: %v", state)
}

View file

@ -11,18 +11,77 @@ import (
// KVPair is used to represent a single K/V entry
type KVPair struct {
Key string
// Key is the name of the key. It is also part of the URL path when accessed
// via the API.
Key string
// CreateIndex holds the index corresponding the creation of this KVPair. This
// is a read-only field.
CreateIndex uint64
// ModifyIndex is used for the Check-And-Set operations and can also be fed
// back into the WaitIndex of the QueryOptions in order to perform blocking
// queries.
ModifyIndex uint64
LockIndex uint64
Flags uint64
Value []byte
Session string
// LockIndex holds the index corresponding to a lock on this key, if any. This
// is a read-only field.
LockIndex uint64
// Flags are any user-defined flags on the key. It is up to the implementer
// to check these values, since Consul does not treat them specially.
Flags uint64
// Value is the value for the key. This can be any value, but it will be
// base64 encoded upon transport.
Value []byte
// Session is a string representing the ID of the session. Any other
// interactions with this key over the same session must specify the same
// session ID.
Session string
}
// KVPairs is a list of KVPair objects
type KVPairs []*KVPair
// KVOp constants give possible operations available in a KVTxn.
type KVOp string
const (
KVSet KVOp = "set"
KVDelete = "delete"
KVDeleteCAS = "delete-cas"
KVDeleteTree = "delete-tree"
KVCAS = "cas"
KVLock = "lock"
KVUnlock = "unlock"
KVGet = "get"
KVGetTree = "get-tree"
KVCheckSession = "check-session"
KVCheckIndex = "check-index"
)
// KVTxnOp defines a single operation inside a transaction.
type KVTxnOp struct {
Verb string
Key string
Value []byte
Flags uint64
Index uint64
Session string
}
// KVTxnOps defines a set of operations to be performed inside a single
// transaction.
type KVTxnOps []*KVTxnOp
// KVTxnResponse has the outcome of a transaction.
type KVTxnResponse struct {
Results []*KVPair
Errors TxnErrors
}
// KV is used to manipulate the K/V API
type KV struct {
c *Client
@ -33,7 +92,8 @@ func (c *Client) KV() *KV {
return &KV{c}
}
// Get is used to lookup a single key
// Get is used to lookup a single key. The returned pointer
// to the KVPair will be nil if the key does not exist.
func (k *KV) Get(key string, q *QueryOptions) (*KVPair, *QueryMeta, error) {
resp, qm, err := k.getInternal(key, nil, q)
if err != nil {
@ -143,7 +203,7 @@ func (k *KV) CAS(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
return k.put(p.Key, params, p.Value, q)
}
// Acquire is used for a lock acquisiiton operation. The Key,
// Acquire is used for a lock acquisition operation. The Key,
// Flags, Value and Session are respected. Returns true
// on success or false on failures.
func (k *KV) Acquire(p *KVPair, q *WriteOptions) (bool, *WriteMeta, error) {
@ -238,3 +298,122 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
res := strings.Contains(string(buf.Bytes()), "true")
return res, qm, nil
}
// TxnOp is the internal format we send to Consul. It's not specific to KV,
// though currently only KV operations are supported.
type TxnOp struct {
KV *KVTxnOp
}
// TxnOps is a list of transaction operations.
type TxnOps []*TxnOp
// TxnResult is the internal format we receive from Consul.
type TxnResult struct {
KV *KVPair
}
// TxnResults is a list of TxnResult objects.
type TxnResults []*TxnResult
// TxnError is used to return information about an operation in a transaction.
type TxnError struct {
OpIndex int
What string
}
// TxnErrors is a list of TxnError objects.
type TxnErrors []*TxnError
// TxnResponse is the internal format we receive from Consul.
type TxnResponse struct {
Results TxnResults
Errors TxnErrors
}
// Txn is used to apply multiple KV operations in a single, atomic transaction.
//
// Note that Go will perform the required base64 encoding on the values
// automatically because the type is a byte slice. Transactions are defined as a
// list of operations to perform, using the KVOp constants and KVTxnOp structure
// to define operations. If any operation fails, none of the changes are applied
// to the state store. Note that this hides the internal raw transaction interface
// and munges the input and output types into KV-specific ones for ease of use.
// If there are more non-KV operations in the future we may break out a new
// transaction API client, but it will be easy to keep this KV-specific variant
// supported.
//
// Even though this is generally a write operation, we take a QueryOptions input
// and return a QueryMeta output. If the transaction contains only read ops, then
// Consul will fast-path it to a different endpoint internally which supports
// consistency controls, but not blocking. If there are write operations then
// the request will always be routed through raft and any consistency settings
// will be ignored.
//
// Here's an example:
//
// ops := KVTxnOps{
// &KVTxnOp{
// Verb: KVLock,
// Key: "test/lock",
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
// Value: []byte("hello"),
// },
// &KVTxnOp{
// Verb: KVGet,
// Key: "another/key",
// },
// }
// ok, response, _, err := kv.Txn(&ops, nil)
//
// If there is a problem making the transaction request then an error will be
// returned. Otherwise, the ok value will be true if the transaction succeeded
// or false if it was rolled back. The response is a structured return value which
// will have the outcome of the transaction. Its Results member will have entries
// for each operation. Deleted keys will have a nil entry in the, and to save
// space, the Value of each key in the Results will be nil unless the operation
// is a KVGet. If the transaction was rolled back, the Errors member will have
// entries referencing the index of the operation that failed along with an error
// message.
func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) {
r := k.c.newRequest("PUT", "/v1/txn")
r.setQueryOptions(q)
// Convert into the internal format since this is an all-KV txn.
ops := make(TxnOps, 0, len(txn))
for _, kvOp := range txn {
ops = append(ops, &TxnOp{KV: kvOp})
}
r.obj = ops
rtt, resp, err := k.c.doRequest(r)
if err != nil {
return false, nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
var txnResp TxnResponse
if err := decodeBody(resp, &txnResp); err != nil {
return false, nil, nil, err
}
// Convert from the internal format.
kvResp := KVTxnResponse{
Errors: txnResp.Errors,
}
for _, result := range txnResp.Results {
kvResp.Results = append(kvResp.Results, result.KV)
}
return resp.StatusCode == http.StatusOK, &kvResp, qm, nil
}
var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
}
return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
}

View file

@ -22,9 +22,16 @@ const (
// DefaultLockRetryTime is how long we wait after a failed lock acquisition
// before attempting to do the lock again. This is so that once a lock-delay
// is in affect, we do not hot loop retrying the acquisition.
// is in effect, we do not hot loop retrying the acquisition.
DefaultLockRetryTime = 5 * time.Second
// DefaultMonitorRetryTime is how long we wait after a failed monitor check
// of a lock (500 response code). This allows the monitor to ride out brief
// periods of unavailability, subject to the MonitorRetries setting in the
// lock options which is by default set to 0, disabling this feature. This
// affects locks and semaphores.
DefaultMonitorRetryTime = 2 * time.Second
// LockFlagValue is a magic flag we set to indicate a key
// is being used for a lock. It is used to detect a potential
// conflict with a semaphore.
@ -49,7 +56,7 @@ var (
)
// Lock is used to implement client-side leader election. It is follows the
// algorithm as described here: https://consul.io/docs/guides/leader-election.html.
// algorithm as described here: https://www.consul.io/docs/guides/leader-election.html.
type Lock struct {
c *Client
opts *LockOptions
@ -62,11 +69,16 @@ type Lock struct {
// LockOptions is used to parameterize the Lock behavior.
type LockOptions struct {
Key string // Must be set and have write permissions
Value []byte // Optional, value to associate with the lock
Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
Key string // Must be set and have write permissions
Value []byte // Optional, value to associate with the lock
Session string // Optional, created if not specified
SessionOpts *SessionEntry // Optional, options to use when creating a session
SessionName string // Optional, defaults to DefaultLockSessionName (ignored if SessionOpts is given)
SessionTTL string // Optional, defaults to DefaultLockSessionTTL (ignored if SessionOpts is given)
MonitorRetries int // Optional, defaults to 0 which means no retries
MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
LockWaitTime time.Duration // Optional, defaults to DefaultLockWaitTime
LockTryOnce bool // Optional, defaults to false which means try forever
}
// LockKey returns a handle to a lock struct which can be used
@ -96,6 +108,12 @@ func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
}
}
if opts.MonitorRetryTime == 0 {
opts.MonitorRetryTime = DefaultMonitorRetryTime
}
if opts.LockWaitTime == 0 {
opts.LockWaitTime = DefaultLockWaitTime
}
l := &Lock{
c: c,
opts: opts,
@ -146,9 +164,11 @@ func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Setup the query options
kv := l.c.KV()
qOpts := &QueryOptions{
WaitTime: DefaultLockWaitTime,
WaitTime: l.opts.LockWaitTime,
}
start := time.Now()
attempts := 0
WAIT:
// Check if we should quit
select {
@ -157,6 +177,17 @@ WAIT:
default:
}
// Handle the one-shot mode.
if l.opts.LockTryOnce && attempts > 0 {
elapsed := time.Now().Sub(start)
if elapsed > qOpts.WaitTime {
return nil, nil
}
qOpts.WaitTime -= elapsed
}
attempts++
// Look for an existing lock, blocking until not taken
pair, meta, err := kv.Get(l.opts.Key, qOpts)
if err != nil {
@ -299,9 +330,12 @@ func (l *Lock) Destroy() error {
// createSession is used to create a new managed session
func (l *Lock) createSession() (string, error) {
session := l.c.Session()
se := &SessionEntry{
Name: l.opts.SessionName,
TTL: l.opts.SessionTTL,
se := l.opts.SessionOpts
if se == nil {
se = &SessionEntry{
Name: l.opts.SessionName,
TTL: l.opts.SessionTTL,
}
}
id, _, err := session.Create(se, nil)
if err != nil {
@ -327,8 +361,20 @@ func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
kv := l.c.KV()
opts := &QueryOptions{RequireConsistent: true}
WAIT:
retries := l.opts.MonitorRetries
RETRY:
pair, meta, err := kv.Get(l.opts.Key, opts)
if err != nil {
// If configured we can try to ride out a brief Consul unavailability
// by doing retries. Note that we have to attempt the retry in a non-
// blocking fashion so that we have a clean place to reset the retry
// counter if service is restored.
if retries > 0 && IsServerError(err) {
time.Sleep(l.opts.MonitorRetryTime)
retries--
opts.WaitIndex = 0
goto RETRY
}
return
}
if pair != nil && pair.Session == session {

81
vendor/github.com/hashicorp/consul/api/operator.go generated vendored Normal file
View file

@ -0,0 +1,81 @@
package api
// Operator can be used to perform low-level operator tasks for Consul.
type Operator struct {
c *Client
}
// Operator returns a handle to the operator endpoints.
func (c *Client) Operator() *Operator {
return &Operator{c}
}
// RaftServer has information about a server in the Raft configuration.
type RaftServer struct {
// ID is the unique ID for the server. These are currently the same
// as the address, but they will be changed to a real GUID in a future
// release of Consul.
ID string
// Node is the node name of the server, as known by Consul, or this
// will be set to "(unknown)" otherwise.
Node string
// Address is the IP:port of the server, used for Raft communications.
Address string
// Leader is true if this server is the current cluster leader.
Leader bool
// Voter is true if this server has a vote in the cluster. This might
// be false if the server is staging and still coming online, or if
// it's a non-voting server, which will be added in a future release of
// Consul.
Voter bool
}
// RaftConfigration is returned when querying for the current Raft configuration.
type RaftConfiguration struct {
// Servers has the list of servers in the Raft configuration.
Servers []*RaftServer
// Index has the Raft index of this configuration.
Index uint64
}
// RaftGetConfiguration is used to query the current Raft peer set.
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
r.setQueryOptions(q)
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out RaftConfiguration
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return &out, nil
}
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
// quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port".
func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) error {
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
r.setWriteOptions(q)
// TODO (slackpad) Currently we made address a query parameter. Once
// IDs are in place this will be DELETE /v1/operator/raft/peer/<id>.
r.params.Set("address", string(address))
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}

View file

@ -0,0 +1,194 @@
package api
// QueryDatacenterOptions sets options about how we fail over if there are no
// healthy nodes in the local datacenter.
type QueryDatacenterOptions struct {
// NearestN is set to the number of remote datacenters to try, based on
// network coordinates.
NearestN int
// Datacenters is a fixed list of datacenters to try after NearestN. We
// never try a datacenter multiple times, so those are subtracted from
// this list before proceeding.
Datacenters []string
}
// QueryDNSOptions controls settings when query results are served over DNS.
type QueryDNSOptions struct {
// TTL is the time to live for the served DNS results.
TTL string
}
// ServiceQuery is used to query for a set of healthy nodes offering a specific
// service.
type ServiceQuery struct {
// Service is the service to query.
Service string
// Near allows baking in the name of a node to automatically distance-
// sort from. The magic "_agent" value is supported, which sorts near
// the agent which initiated the request by default.
Near string
// Failover controls what we do if there are no healthy nodes in the
// local datacenter.
Failover QueryDatacenterOptions
// If OnlyPassing is true then we will only include nodes with passing
// health checks (critical AND warning checks will cause a node to be
// discarded)
OnlyPassing bool
// Tags are a set of required and/or disallowed tags. If a tag is in
// this list it must be present. If the tag is preceded with "!" then
// it is disallowed.
Tags []string
}
// QueryTemplate carries the arguments for creating a templated query.
type QueryTemplate struct {
// Type specifies the type of the query template. Currently only
// "name_prefix_match" is supported. This field is required.
Type string
// Regexp allows specifying a regex pattern to match against the name
// of the query being executed.
Regexp string
}
// PrepatedQueryDefinition defines a complete prepared query.
type PreparedQueryDefinition struct {
// ID is this UUID-based ID for the query, always generated by Consul.
ID string
// Name is an optional friendly name for the query supplied by the
// user. NOTE - if this feature is used then it will reduce the security
// of any read ACL associated with this query/service since this name
// can be used to locate nodes with supplying any ACL.
Name string
// Session is an optional session to tie this query's lifetime to. If
// this is omitted then the query will not expire.
Session string
// Token is the ACL token used when the query was created, and it is
// used when a query is subsequently executed. This token, or a token
// with management privileges, must be used to change the query later.
Token string
// Service defines a service query (leaving things open for other types
// later).
Service ServiceQuery
// DNS has options that control how the results of this query are
// served over DNS.
DNS QueryDNSOptions
// Template is used to pass through the arguments for creating a
// prepared query with an attached template. If a template is given,
// interpolations are possible in other struct fields.
Template QueryTemplate
}
// PreparedQueryExecuteResponse has the results of executing a query.
type PreparedQueryExecuteResponse struct {
// Service is the service that was queried.
Service string
// Nodes has the nodes that were output by the query.
Nodes []ServiceEntry
// DNS has the options for serving these results over DNS.
DNS QueryDNSOptions
// Datacenter is the datacenter that these results came from.
Datacenter string
// Failovers is a count of how many times we had to query a remote
// datacenter.
Failovers int
}
// PreparedQuery can be used to query the prepared query endpoints.
type PreparedQuery struct {
c *Client
}
// PreparedQuery returns a handle to the prepared query endpoints.
func (c *Client) PreparedQuery() *PreparedQuery {
return &PreparedQuery{c}
}
// Create makes a new prepared query. The ID of the new query is returned.
func (c *PreparedQuery) Create(query *PreparedQueryDefinition, q *WriteOptions) (string, *WriteMeta, error) {
r := c.c.newRequest("POST", "/v1/query")
r.setWriteOptions(q)
r.obj = query
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{}
wm.RequestTime = rtt
var out struct{ ID string }
if err := decodeBody(resp, &out); err != nil {
return "", nil, err
}
return out.ID, wm, nil
}
// Update makes updates to an existing prepared query.
func (c *PreparedQuery) Update(query *PreparedQueryDefinition, q *WriteOptions) (*WriteMeta, error) {
return c.c.write("/v1/query/"+query.ID, query, nil, q)
}
// List is used to fetch all the prepared queries (always requires a management
// token).
func (c *PreparedQuery) List(q *QueryOptions) ([]*PreparedQueryDefinition, *QueryMeta, error) {
var out []*PreparedQueryDefinition
qm, err := c.c.query("/v1/query", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}
// Get is used to fetch a specific prepared query.
func (c *PreparedQuery) Get(queryID string, q *QueryOptions) ([]*PreparedQueryDefinition, *QueryMeta, error) {
var out []*PreparedQueryDefinition
qm, err := c.c.query("/v1/query/"+queryID, &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}
// Delete is used to delete a specific prepared query.
func (c *PreparedQuery) Delete(queryID string, q *QueryOptions) (*QueryMeta, error) {
r := c.c.newRequest("DELETE", "/v1/query/"+queryID)
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
return qm, nil
}
// Execute is used to execute a specific prepared query. You can execute using
// a query ID or name.
func (c *PreparedQuery) Execute(queryIDOrName string, q *QueryOptions) (*PreparedQueryExecuteResponse, *QueryMeta, error) {
var out *PreparedQueryExecuteResponse
qm, err := c.c.query("/v1/query/"+queryIDOrName+"/execute", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}

View file

@ -63,12 +63,16 @@ type Semaphore struct {
// SemaphoreOptions is used to parameterize the Semaphore
type SemaphoreOptions struct {
Prefix string // Must be set and have write permissions
Limit int // Must be set, and be positive
Value []byte // Optional, value to associate with the contender entry
Session string // OPtional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
Prefix string // Must be set and have write permissions
Limit int // Must be set, and be positive
Value []byte // Optional, value to associate with the contender entry
Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
MonitorRetries int // Optional, defaults to 0 which means no retries
MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
SemaphoreWaitTime time.Duration // Optional, defaults to DefaultSemaphoreWaitTime
SemaphoreTryOnce bool // Optional, defaults to false which means try forever
}
// semaphoreLock is written under the DefaultSemaphoreKey and
@ -115,6 +119,12 @@ func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) {
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
}
}
if opts.MonitorRetryTime == 0 {
opts.MonitorRetryTime = DefaultMonitorRetryTime
}
if opts.SemaphoreWaitTime == 0 {
opts.SemaphoreWaitTime = DefaultSemaphoreWaitTime
}
s := &Semaphore{
c: c,
opts: opts,
@ -123,7 +133,7 @@ func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) {
}
// Acquire attempts to reserve a slot in the semaphore, blocking until
// success, interrupted via the stopCh or an error is encounted.
// success, interrupted via the stopCh or an error is encountered.
// Providing a non-nil stopCh can be used to abort the attempt.
// On success, a channel is returned that represents our slot.
// This channel could be closed at any time due to session invalidation,
@ -172,9 +182,11 @@ func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Setup the query options
qOpts := &QueryOptions{
WaitTime: DefaultSemaphoreWaitTime,
WaitTime: s.opts.SemaphoreWaitTime,
}
start := time.Now()
attempts := 0
WAIT:
// Check if we should quit
select {
@ -183,6 +195,17 @@ WAIT:
default:
}
// Handle the one-shot mode.
if s.opts.SemaphoreTryOnce && attempts > 0 {
elapsed := time.Now().Sub(start)
if elapsed > qOpts.WaitTime {
return nil, nil
}
qOpts.WaitTime -= elapsed
}
attempts++
// Read the prefix
pairs, meta, err := kv.List(s.opts.Prefix, qOpts)
if err != nil {
@ -460,8 +483,20 @@ func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) {
kv := s.c.KV()
opts := &QueryOptions{RequireConsistent: true}
WAIT:
retries := s.opts.MonitorRetries
RETRY:
pairs, meta, err := kv.List(s.opts.Prefix, opts)
if err != nil {
// If configured we can try to ride out a brief Consul unavailability
// by doing retries. Note that we have to attempt the retry in a non-
// blocking fashion so that we have a clean place to reset the retry
// counter if service is restored.
if retries > 0 && IsServerError(err) {
time.Sleep(s.opts.MonitorRetryTime)
retries--
opts.WaitIndex = 0
goto RETRY
}
return
}
lockPair := s.findLock(pairs)

View file

@ -1,6 +1,7 @@
package api
import (
"errors"
"fmt"
"time"
)
@ -16,6 +17,8 @@ const (
SessionBehaviorDelete = "delete"
)
var ErrSessionExpired = errors.New("session expired")
// SessionEntry represents a session in consul
type SessionEntry struct {
CreateIndex uint64
@ -102,7 +105,7 @@ func (s *Session) create(obj interface{}, q *WriteOptions) (string, *WriteMeta,
return out.ID, wm, nil
}
// Destroy invalides a given session
// Destroy invalidates a given session
func (s *Session) Destroy(id string, q *WriteOptions) (*WriteMeta, error) {
wm, err := s.c.write("/v1/session/destroy/"+id, nil, nil, q)
if err != nil {
@ -113,11 +116,26 @@ func (s *Session) Destroy(id string, q *WriteOptions) (*WriteMeta, error) {
// Renew renews the TTL on a given session
func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta, error) {
var entries []*SessionEntry
wm, err := s.c.write("/v1/session/renew/"+id, nil, &entries, q)
r := s.c.newRequest("PUT", "/v1/session/renew/"+id)
r.setWriteOptions(q)
rtt, resp, err := s.c.doRequest(r)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
if resp.StatusCode == 404 {
return nil, wm, nil
} else if resp.StatusCode != 200 {
return nil, nil, fmt.Errorf("Unexpected response code: %d", resp.StatusCode)
}
var entries []*SessionEntry
if err := decodeBody(resp, &entries); err != nil {
return nil, nil, fmt.Errorf("Failed to read response: %v", err)
}
if len(entries) > 0 {
return entries[0], wm, nil
}
@ -149,9 +167,7 @@ func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, d
continue
}
if entry == nil {
waitDur = time.Second
lastErr = fmt.Errorf("No SessionEntry returned")
continue
return ErrSessionExpired
}
// Handle the server updating the TTL

47
vendor/github.com/hashicorp/consul/api/snapshot.go generated vendored Normal file
View file

@ -0,0 +1,47 @@
package api
import (
"io"
)
// Snapshot can be used to query the /v1/snapshot endpoint to take snapshots of
// Consul's internal state and restore snapshots for disaster recovery.
type Snapshot struct {
c *Client
}
// Snapshot returns a handle that exposes the snapshot endpoints.
func (c *Client) Snapshot() *Snapshot {
return &Snapshot{c}
}
// Save requests a new snapshot and provides an io.ReadCloser with the snapshot
// data to save. If this doesn't return an error, then it's the responsibility
// of the caller to close it. Only a subset of the QueryOptions are supported:
// Datacenter, AllowStale, and Token.
func (s *Snapshot) Save(q *QueryOptions) (io.ReadCloser, *QueryMeta, error) {
r := s.c.newRequest("GET", "/v1/snapshot")
r.setQueryOptions(q)
rtt, resp, err := requireOK(s.c.doRequest(r))
if err != nil {
return nil, nil, err
}
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
return resp.Body, qm, nil
}
// Restore streams in an existing snapshot and attempts to restore it.
func (s *Snapshot) Restore(q *WriteOptions, in io.Reader) error {
r := s.c.newRequest("PUT", "/v1/snapshot")
r.body = in
r.setWriteOptions(q)
_, _, err := requireOK(s.c.doRequest(r))
if err != nil {
return err
}
return nil
}

5
vendor/vendor.json vendored
View file

@ -141,9 +141,10 @@
"revisionTime": "2016-05-29T05:00:41Z"
},
{
"checksumSHA1": "LclVLJYrBi03PBjsVPpgoMbUDQ8=",
"path": "github.com/hashicorp/consul/api",
"revision": "34f98f7bdf2eec7517e3aac44691566963152721",
"revisionTime": "2015-09-08T11:01:49-04:00"
"revision": "daacc4be8bee214e3fc4b32a6dd385f5ef1b4c36",
"revisionTime": "2016-10-28T04:06:46Z"
},
{
"path": "github.com/influxdb/influxdb/client",