Add support for Zookeeper Serversets for SD.

It can discover an entire tree of serversets, or just one.
This commit is contained in:
Brian Brazil 2015-06-09 12:25:30 +01:00
parent 95bd64c770
commit 4d895242f9
22 changed files with 4343 additions and 0 deletions

4
Godeps/Godeps.json generated
View file

@ -70,6 +70,10 @@
"ImportPath": "github.com/prometheus/procfs",
"Rev": "ee2372b58cee877abe07cde670d04d3b3bac5ee6"
},
{
"ImportPath": "github.com/samuel/go-zookeeper/zk",
"Rev": "c86eba8e7e95efab81f6c0455332e49d39aed12f"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "315fcfb05d4d46d4354b313d146ef688dda272a9"

View file

@ -0,0 +1,25 @@
Copyright (c) 2013, Samuel Stauffer <samuel@descolada.com>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -0,0 +1,166 @@
package zk
import (
"fmt"
"strings"
"testing"
"time"
)
type logWriter struct {
t *testing.T
p string
}
func (lw logWriter) Write(b []byte) (int, error) {
lw.t.Logf("%s%s", lw.p, string(b))
return len(b), nil
}
func TestBasicCluster(t *testing.T) {
ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk1, err := ts.Connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk1.Close()
zk2, err := ts.Connect(1)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk2.Close()
time.Sleep(time.Second * 5)
if _, err := zk1.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create failed on node 1: %+v", err)
}
if by, _, err := zk2.Get("/gozk-test"); err != nil {
t.Fatalf("Get failed on node 2: %+v", err)
} else if string(by) != "foo-cluster" {
t.Fatal("Wrong data for node 2")
}
}
func TestClientClusterFailover(t *testing.T) {
ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, evCh, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
hasSession := make(chan string, 1)
go func() {
for ev := range evCh {
if ev.Type == EventSession && ev.State == StateHasSession {
select {
case hasSession <- ev.Server:
default:
}
}
}
}()
waitSession := func() string {
select {
case srv := <-hasSession:
return srv
case <-time.After(time.Second * 8):
t.Fatal("Failed to connect and get a session")
}
return ""
}
srv := waitSession()
if _, err := zk.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create failed on node 1: %+v", err)
}
stopped := false
for _, s := range ts.Servers {
if strings.HasSuffix(srv, fmt.Sprintf(":%d", s.Port)) {
s.Srv.Stop()
stopped = true
break
}
}
if !stopped {
t.Fatal("Failed to stop server")
}
waitSession()
if by, _, err := zk.Get("/gozk-test"); err != nil {
t.Fatalf("Get failed on node 2: %+v", err)
} else if string(by) != "foo-cluster" {
t.Fatal("Wrong data for node 2")
}
}
func TestWaitForClose(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, err := ts.Connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
timeout := time.After(30 * time.Second)
CONNECTED:
for {
select {
case ev := <-zk.eventChan:
if ev.State == StateConnected {
break CONNECTED
}
case <-timeout:
zk.Close()
t.Fatal("Timeout")
}
}
zk.Close()
for {
select {
case _, ok := <-zk.eventChan:
if !ok {
return
}
case <-timeout:
t.Fatal("Timeout")
}
}
}
func TestBadSession(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
zk.conn.Close()
time.Sleep(time.Millisecond * 100)
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
}

View file

@ -0,0 +1,871 @@
// Package zk is a native Go client library for the ZooKeeper orchestration service.
package zk
/*
TODO:
* make sure a ping response comes back in a reasonable time
Possible watcher events:
* Event{Type: EventNotWatching, State: StateDisconnected, Path: path, Err: err}
*/
import (
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
// ErrNoServer indicates that an operation cannot be completed
// because attempts to connect to all servers in the list failed.
var ErrNoServer = errors.New("zk: could not connect to a server")
// ErrInvalidPath indicates that an operation was being attempted on
// an invalid path. (e.g. empty path)
var ErrInvalidPath = errors.New("zk: invalid path")
// DefaultLogger uses the stdlib log package for logging.
var DefaultLogger = defaultLogger{}
const (
bufferSize = 1536 * 1024
eventChanSize = 6
sendChanSize = 16
protectedPrefix = "_c_"
)
type watchType int
const (
watchTypeData = iota
watchTypeExist = iota
watchTypeChild = iota
)
type watchPathType struct {
path string
wType watchType
}
type Dialer func(network, address string, timeout time.Duration) (net.Conn, error)
// Logger is an interface that can be implemented to provide custom log output.
type Logger interface {
Printf(string, ...interface{})
}
type Conn struct {
lastZxid int64
sessionID int64
state State // must be 32-bit aligned
xid uint32
timeout int32 // session timeout in milliseconds
passwd []byte
dialer Dialer
servers []string
serverIndex int // remember last server that was tried during connect to round-robin attempts to servers
lastServerIndex int // index of the last server that was successfully connected to and authenticated with
conn net.Conn
eventChan chan Event
shouldQuit chan struct{}
pingInterval time.Duration
recvTimeout time.Duration
connectTimeout time.Duration
sendChan chan *request
requests map[int32]*request // Xid -> pending request
requestsLock sync.Mutex
watchers map[watchPathType][]chan Event
watchersLock sync.Mutex
// Debug (used by unit tests)
reconnectDelay time.Duration
logger Logger
}
type request struct {
xid int32
opcode int32
pkt interface{}
recvStruct interface{}
recvChan chan response
// Because sending and receiving happen in separate go routines, there's
// a possible race condition when creating watches from outside the read
// loop. We must ensure that a watcher gets added to the list synchronously
// with the response from the server on any request that creates a watch.
// In order to not hard code the watch logic for each opcode in the recv
// loop the caller can use recvFunc to insert some synchronously code
// after a response.
recvFunc func(*request, *responseHeader, error)
}
type response struct {
zxid int64
err error
}
type Event struct {
Type EventType
State State
Path string // For non-session events, the path of the watched node.
Err error
Server string // For connection events
}
// Connect establishes a new connection to a pool of zookeeper servers
// using the default net.Dialer. See ConnectWithDialer for further
// information about session timeout.
func Connect(servers []string, sessionTimeout time.Duration) (*Conn, <-chan Event, error) {
return ConnectWithDialer(servers, sessionTimeout, nil)
}
// ConnectWithDialer establishes a new connection to a pool of zookeeper
// servers. The provided session timeout sets the amount of time for which
// a session is considered valid after losing connection to a server. Within
// the session timeout it's possible to reestablish a connection to a different
// server and keep the same session. This is means any ephemeral nodes and
// watches are maintained.
func ConnectWithDialer(servers []string, sessionTimeout time.Duration, dialer Dialer) (*Conn, <-chan Event, error) {
if len(servers) == 0 {
return nil, nil, errors.New("zk: server list must not be empty")
}
recvTimeout := sessionTimeout * 2 / 3
srvs := make([]string, len(servers))
for i, addr := range servers {
if strings.Contains(addr, ":") {
srvs[i] = addr
} else {
srvs[i] = addr + ":" + strconv.Itoa(DefaultPort)
}
}
// Randomize the order of the servers to avoid creating hotspots
stringShuffle(srvs)
ec := make(chan Event, eventChanSize)
if dialer == nil {
dialer = net.DialTimeout
}
conn := Conn{
dialer: dialer,
servers: srvs,
serverIndex: 0,
lastServerIndex: -1,
conn: nil,
state: StateDisconnected,
eventChan: ec,
shouldQuit: make(chan struct{}),
recvTimeout: recvTimeout,
pingInterval: recvTimeout / 2,
connectTimeout: 1 * time.Second,
sendChan: make(chan *request, sendChanSize),
requests: make(map[int32]*request),
watchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
timeout: int32(sessionTimeout.Nanoseconds() / 1e6),
logger: DefaultLogger,
// Debug
reconnectDelay: 0,
}
go func() {
conn.loop()
conn.flushRequests(ErrClosing)
conn.invalidateWatches(ErrClosing)
close(conn.eventChan)
}()
return &conn, ec, nil
}
func (c *Conn) Close() {
close(c.shouldQuit)
select {
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
case <-time.After(time.Second):
}
}
// States returns the current state of the connection.
func (c *Conn) State() State {
return State(atomic.LoadInt32((*int32)(&c.state)))
}
// SetLogger sets the logger to be used for printing errors.
// Logger is an interface provided by this package.
func (c *Conn) SetLogger(l Logger) {
c.logger = l
}
func (c *Conn) setState(state State) {
atomic.StoreInt32((*int32)(&c.state), int32(state))
select {
case c.eventChan <- Event{Type: EventSession, State: state, Server: c.servers[c.serverIndex]}:
default:
// panic("zk: event channel full - it must be monitored and never allowed to be full")
}
}
func (c *Conn) connect() error {
c.setState(StateConnecting)
for {
c.serverIndex = (c.serverIndex + 1) % len(c.servers)
if c.serverIndex == c.lastServerIndex {
c.flushUnsentRequests(ErrNoServer)
select {
case <-time.After(time.Second):
// pass
case <-c.shouldQuit:
c.setState(StateDisconnected)
c.flushUnsentRequests(ErrClosing)
return ErrClosing
}
} else if c.lastServerIndex < 0 {
// lastServerIndex defaults to -1 to avoid a delay on the initial connect
c.lastServerIndex = 0
}
zkConn, err := c.dialer("tcp", c.servers[c.serverIndex], c.connectTimeout)
if err == nil {
c.conn = zkConn
c.setState(StateConnected)
return nil
}
c.logger.Printf("Failed to connect to %s: %+v", c.servers[c.serverIndex], err)
}
}
func (c *Conn) loop() {
for {
if err := c.connect(); err != nil {
// c.Close() was called
return
}
err := c.authenticate()
switch {
case err == ErrSessionExpired:
c.invalidateWatches(err)
case err != nil && c.conn != nil:
c.conn.Close()
case err == nil:
c.lastServerIndex = c.serverIndex
closeChan := make(chan struct{}) // channel to tell send loop stop
var wg sync.WaitGroup
wg.Add(1)
go func() {
c.sendLoop(c.conn, closeChan)
c.conn.Close() // causes recv loop to EOF/exit
wg.Done()
}()
wg.Add(1)
go func() {
err = c.recvLoop(c.conn)
if err == nil {
panic("zk: recvLoop should never return nil error")
}
close(closeChan) // tell send loop to exit
wg.Done()
}()
wg.Wait()
}
c.setState(StateDisconnected)
// Yeesh
if err != io.EOF && err != ErrSessionExpired && !strings.Contains(err.Error(), "use of closed network connection") {
c.logger.Printf(err.Error())
}
select {
case <-c.shouldQuit:
c.flushRequests(ErrClosing)
return
default:
}
if err != ErrSessionExpired {
err = ErrConnectionClosed
}
c.flushRequests(err)
if c.reconnectDelay > 0 {
select {
case <-c.shouldQuit:
return
case <-time.After(c.reconnectDelay):
}
}
}
}
func (c *Conn) flushUnsentRequests(err error) {
for {
select {
default:
return
case req := <-c.sendChan:
req.recvChan <- response{-1, err}
}
}
}
// Send error to all pending requests and clear request map
func (c *Conn) flushRequests(err error) {
c.requestsLock.Lock()
for _, req := range c.requests {
req.recvChan <- response{-1, err}
}
c.requests = make(map[int32]*request)
c.requestsLock.Unlock()
}
// Send error to all watchers and clear watchers map
func (c *Conn) invalidateWatches(err error) {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()
if len(c.watchers) >= 0 {
for pathType, watchers := range c.watchers {
ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: pathType.path, Err: err}
for _, ch := range watchers {
ch <- ev
close(ch)
}
}
c.watchers = make(map[watchPathType][]chan Event)
}
}
func (c *Conn) sendSetWatches() {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()
if len(c.watchers) == 0 {
return
}
req := &setWatchesRequest{
RelativeZxid: c.lastZxid,
DataWatches: make([]string, 0),
ExistWatches: make([]string, 0),
ChildWatches: make([]string, 0),
}
n := 0
for pathType, watchers := range c.watchers {
if len(watchers) == 0 {
continue
}
switch pathType.wType {
case watchTypeData:
req.DataWatches = append(req.DataWatches, pathType.path)
case watchTypeExist:
req.ExistWatches = append(req.ExistWatches, pathType.path)
case watchTypeChild:
req.ChildWatches = append(req.ChildWatches, pathType.path)
}
n++
}
if n == 0 {
return
}
go func() {
res := &setWatchesResponse{}
_, err := c.request(opSetWatches, req, res, nil)
if err != nil {
c.logger.Printf("Failed to set previous watches: %s", err.Error())
}
}()
}
func (c *Conn) authenticate() error {
buf := make([]byte, 256)
// connect request
n, err := encodePacket(buf[4:], &connectRequest{
ProtocolVersion: protocolVersion,
LastZxidSeen: c.lastZxid,
TimeOut: c.timeout,
SessionID: c.sessionID,
Passwd: c.passwd,
})
if err != nil {
return err
}
binary.BigEndian.PutUint32(buf[:4], uint32(n))
c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10))
_, err = c.conn.Write(buf[:n+4])
c.conn.SetWriteDeadline(time.Time{})
if err != nil {
return err
}
c.sendSetWatches()
// connect response
// package length
c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10))
_, err = io.ReadFull(c.conn, buf[:4])
c.conn.SetReadDeadline(time.Time{})
if err != nil {
// Sometimes zookeeper just drops connection on invalid session data,
// we prefer to drop session and start from scratch when that event
// occurs instead of dropping into loop of connect/disconnect attempts
c.sessionID = 0
c.passwd = emptyPassword
c.lastZxid = 0
c.setState(StateExpired)
return ErrSessionExpired
}
blen := int(binary.BigEndian.Uint32(buf[:4]))
if cap(buf) < blen {
buf = make([]byte, blen)
}
_, err = io.ReadFull(c.conn, buf[:blen])
if err != nil {
return err
}
r := connectResponse{}
_, err = decodePacket(buf[:blen], &r)
if err != nil {
return err
}
if r.SessionID == 0 {
c.sessionID = 0
c.passwd = emptyPassword
c.lastZxid = 0
c.setState(StateExpired)
return ErrSessionExpired
}
if c.sessionID != r.SessionID {
atomic.StoreUint32(&c.xid, 0)
}
c.timeout = r.TimeOut
c.sessionID = r.SessionID
c.passwd = r.Passwd
c.setState(StateHasSession)
return nil
}
func (c *Conn) sendLoop(conn net.Conn, closeChan <-chan struct{}) error {
pingTicker := time.NewTicker(c.pingInterval)
defer pingTicker.Stop()
buf := make([]byte, bufferSize)
for {
select {
case req := <-c.sendChan:
header := &requestHeader{req.xid, req.opcode}
n, err := encodePacket(buf[4:], header)
if err != nil {
req.recvChan <- response{-1, err}
continue
}
n2, err := encodePacket(buf[4+n:], req.pkt)
if err != nil {
req.recvChan <- response{-1, err}
continue
}
n += n2
binary.BigEndian.PutUint32(buf[:4], uint32(n))
c.requestsLock.Lock()
select {
case <-closeChan:
req.recvChan <- response{-1, ErrConnectionClosed}
c.requestsLock.Unlock()
return ErrConnectionClosed
default:
}
c.requests[req.xid] = req
c.requestsLock.Unlock()
conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
_, err = conn.Write(buf[:n+4])
conn.SetWriteDeadline(time.Time{})
if err != nil {
req.recvChan <- response{-1, err}
conn.Close()
return err
}
case <-pingTicker.C:
n, err := encodePacket(buf[4:], &requestHeader{Xid: -2, Opcode: opPing})
if err != nil {
panic("zk: opPing should never fail to serialize")
}
binary.BigEndian.PutUint32(buf[:4], uint32(n))
conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
_, err = conn.Write(buf[:n+4])
conn.SetWriteDeadline(time.Time{})
if err != nil {
conn.Close()
return err
}
case <-closeChan:
return nil
}
}
}
func (c *Conn) recvLoop(conn net.Conn) error {
buf := make([]byte, bufferSize)
for {
// package length
conn.SetReadDeadline(time.Now().Add(c.recvTimeout))
_, err := io.ReadFull(conn, buf[:4])
if err != nil {
return err
}
blen := int(binary.BigEndian.Uint32(buf[:4]))
if cap(buf) < blen {
buf = make([]byte, blen)
}
_, err = io.ReadFull(conn, buf[:blen])
conn.SetReadDeadline(time.Time{})
if err != nil {
return err
}
res := responseHeader{}
_, err = decodePacket(buf[:16], &res)
if err != nil {
return err
}
if res.Xid == -1 {
res := &watcherEvent{}
_, err := decodePacket(buf[16:16+blen], res)
if err != nil {
return err
}
ev := Event{
Type: res.Type,
State: res.State,
Path: res.Path,
Err: nil,
}
select {
case c.eventChan <- ev:
default:
}
wTypes := make([]watchType, 0, 2)
switch res.Type {
case EventNodeCreated:
wTypes = append(wTypes, watchTypeExist)
case EventNodeDeleted, EventNodeDataChanged:
wTypes = append(wTypes, watchTypeExist, watchTypeData, watchTypeChild)
case EventNodeChildrenChanged:
wTypes = append(wTypes, watchTypeChild)
}
c.watchersLock.Lock()
for _, t := range wTypes {
wpt := watchPathType{res.Path, t}
if watchers := c.watchers[wpt]; watchers != nil && len(watchers) > 0 {
for _, ch := range watchers {
ch <- ev
close(ch)
}
delete(c.watchers, wpt)
}
}
c.watchersLock.Unlock()
} else if res.Xid == -2 {
// Ping response. Ignore.
} else if res.Xid < 0 {
c.logger.Printf("Xid < 0 (%d) but not ping or watcher event", res.Xid)
} else {
if res.Zxid > 0 {
c.lastZxid = res.Zxid
}
c.requestsLock.Lock()
req, ok := c.requests[res.Xid]
if ok {
delete(c.requests, res.Xid)
}
c.requestsLock.Unlock()
if !ok {
c.logger.Printf("Response for unknown request with xid %d", res.Xid)
} else {
if res.Err != 0 {
err = res.Err.toError()
} else {
_, err = decodePacket(buf[16:16+blen], req.recvStruct)
}
if req.recvFunc != nil {
req.recvFunc(req, &res, err)
}
req.recvChan <- response{res.Zxid, err}
if req.opcode == opClose {
return io.EOF
}
}
}
}
}
func (c *Conn) nextXid() int32 {
return int32(atomic.AddUint32(&c.xid, 1) & 0x7fffffff)
}
func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()
ch := make(chan Event, 1)
wpt := watchPathType{path, watchType}
c.watchers[wpt] = append(c.watchers[wpt], ch)
return ch
}
func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
rq := &request{
xid: c.nextXid(),
opcode: opcode,
pkt: req,
recvStruct: res,
recvChan: make(chan response, 1),
recvFunc: recvFunc,
}
c.sendChan <- rq
return rq.recvChan
}
func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) {
r := <-c.queueRequest(opcode, req, res, recvFunc)
return r.zxid, r.err
}
func (c *Conn) AddAuth(scheme string, auth []byte) error {
_, err := c.request(opSetAuth, &setAuthRequest{Type: 0, Scheme: scheme, Auth: auth}, &setAuthResponse{}, nil)
return err
}
func (c *Conn) Children(path string) ([]string, *Stat, error) {
res := &getChildren2Response{}
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: false}, res, nil)
return res.Children, &res.Stat, err
}
func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
var ech <-chan Event
res := &getChildren2Response{}
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watchTypeChild)
}
})
if err != nil {
return nil, nil, nil, err
}
return res.Children, &res.Stat, ech, err
}
func (c *Conn) Get(path string) ([]byte, *Stat, error) {
res := &getDataResponse{}
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: false}, res, nil)
return res.Data, &res.Stat, err
}
// GetW returns the contents of a znode and sets a watch
func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) {
var ech <-chan Event
res := &getDataResponse{}
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watchTypeData)
}
})
if err != nil {
return nil, nil, nil, err
}
return res.Data, &res.Stat, ech, err
}
func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error) {
if path == "" {
return nil, ErrInvalidPath
}
res := &setDataResponse{}
_, err := c.request(opSetData, &SetDataRequest{path, data, version}, res, nil)
return &res.Stat, err
}
func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error) {
res := &createResponse{}
_, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res, nil)
return res.Path, err
}
// CreateProtectedEphemeralSequential fixes a race condition if the server crashes
// after it creates the node. On reconnect the session may still be valid so the
// ephemeral node still exists. Therefore, on reconnect we need to check if a node
// with a GUID generated on create exists.
func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error) {
var guid [16]byte
_, err := io.ReadFull(rand.Reader, guid[:16])
if err != nil {
return "", err
}
guidStr := fmt.Sprintf("%x", guid)
parts := strings.Split(path, "/")
parts[len(parts)-1] = fmt.Sprintf("%s%s-%s", protectedPrefix, guidStr, parts[len(parts)-1])
rootPath := strings.Join(parts[:len(parts)-1], "/")
protectedPath := strings.Join(parts, "/")
var newPath string
for i := 0; i < 3; i++ {
newPath, err = c.Create(protectedPath, data, FlagEphemeral|FlagSequence, acl)
switch err {
case ErrSessionExpired:
// No need to search for the node since it can't exist. Just try again.
case ErrConnectionClosed:
children, _, err := c.Children(rootPath)
if err != nil {
return "", err
}
for _, p := range children {
parts := strings.Split(p, "/")
if pth := parts[len(parts)-1]; strings.HasPrefix(pth, protectedPrefix) {
if g := pth[len(protectedPrefix) : len(protectedPrefix)+32]; g == guidStr {
return rootPath + "/" + p, nil
}
}
}
case nil:
return newPath, nil
default:
return "", err
}
}
return "", err
}
func (c *Conn) Delete(path string, version int32) error {
_, err := c.request(opDelete, &DeleteRequest{path, version}, &deleteResponse{}, nil)
return err
}
func (c *Conn) Exists(path string) (bool, *Stat, error) {
res := &existsResponse{}
_, err := c.request(opExists, &existsRequest{Path: path, Watch: false}, res, nil)
exists := true
if err == ErrNoNode {
exists = false
err = nil
}
return exists, &res.Stat, err
}
func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) {
var ech <-chan Event
res := &existsResponse{}
_, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watchTypeData)
} else if err == ErrNoNode {
ech = c.addWatcher(path, watchTypeExist)
}
})
exists := true
if err == ErrNoNode {
exists = false
err = nil
}
if err != nil {
return false, nil, nil, err
}
return exists, &res.Stat, ech, err
}
func (c *Conn) GetACL(path string) ([]ACL, *Stat, error) {
res := &getAclResponse{}
_, err := c.request(opGetAcl, &getAclRequest{Path: path}, res, nil)
return res.Acl, &res.Stat, err
}
func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error) {
res := &setAclResponse{}
_, err := c.request(opSetAcl, &setAclRequest{Path: path, Acl: acl, Version: version}, res, nil)
return &res.Stat, err
}
func (c *Conn) Sync(path string) (string, error) {
res := &syncResponse{}
_, err := c.request(opSync, &syncRequest{Path: path}, res, nil)
return res.Path, err
}
type MultiResponse struct {
Stat *Stat
String string
}
// Multi executes multiple ZooKeeper operations or none of them. The provided
// ops must be one of *CreateRequest, *DeleteRequest, *SetDataRequest, or
// *CheckVersionRequest.
func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
req := &multiRequest{
Ops: make([]multiRequestOp, 0, len(ops)),
DoneHeader: multiHeader{Type: -1, Done: true, Err: -1},
}
for _, op := range ops {
var opCode int32
switch op.(type) {
case *CreateRequest:
opCode = opCreate
case *SetDataRequest:
opCode = opSetData
case *DeleteRequest:
opCode = opDelete
case *CheckVersionRequest:
opCode = opCheck
default:
return nil, fmt.Errorf("uknown operation type %T", op)
}
req.Ops = append(req.Ops, multiRequestOp{multiHeader{opCode, false, -1}, op})
}
res := &multiResponse{}
_, err := c.request(opMulti, req, res, nil)
mr := make([]MultiResponse, len(res.Ops))
for i, op := range res.Ops {
mr[i] = MultiResponse{Stat: op.Stat, String: op.String}
}
return mr, err
}

View file

@ -0,0 +1,242 @@
package zk
import (
"errors"
)
const (
protocolVersion = 0
DefaultPort = 2181
)
const (
opNotify = 0
opCreate = 1
opDelete = 2
opExists = 3
opGetData = 4
opSetData = 5
opGetAcl = 6
opSetAcl = 7
opGetChildren = 8
opSync = 9
opPing = 11
opGetChildren2 = 12
opCheck = 13
opMulti = 14
opClose = -11
opSetAuth = 100
opSetWatches = 101
// Not in protocol, used internally
opWatcherEvent = -2
)
const (
EventNodeCreated = EventType(1)
EventNodeDeleted = EventType(2)
EventNodeDataChanged = EventType(3)
EventNodeChildrenChanged = EventType(4)
EventSession = EventType(-1)
EventNotWatching = EventType(-2)
)
var (
eventNames = map[EventType]string{
EventNodeCreated: "EventNodeCreated",
EventNodeDeleted: "EventNodeDeleted",
EventNodeDataChanged: "EventNodeDataChanged",
EventNodeChildrenChanged: "EventNodeChildrenChanged",
EventSession: "EventSession",
EventNotWatching: "EventNotWatching",
}
)
const (
StateUnknown = State(-1)
StateDisconnected = State(0)
StateConnecting = State(1)
StateSyncConnected = State(3)
StateAuthFailed = State(4)
StateConnectedReadOnly = State(5)
StateSaslAuthenticated = State(6)
StateExpired = State(-112)
// StateAuthFailed = State(-113)
StateConnected = State(100)
StateHasSession = State(101)
)
const (
FlagEphemeral = 1
FlagSequence = 2
)
var (
stateNames = map[State]string{
StateUnknown: "StateUnknown",
StateDisconnected: "StateDisconnected",
StateSyncConnected: "StateSyncConnected",
StateConnectedReadOnly: "StateConnectedReadOnly",
StateSaslAuthenticated: "StateSaslAuthenticated",
StateExpired: "StateExpired",
StateAuthFailed: "StateAuthFailed",
StateConnecting: "StateConnecting",
StateConnected: "StateConnected",
StateHasSession: "StateHasSession",
}
)
type State int32
func (s State) String() string {
if name := stateNames[s]; name != "" {
return name
}
return "Unknown"
}
type ErrCode int32
var (
ErrConnectionClosed = errors.New("zk: connection closed")
ErrUnknown = errors.New("zk: unknown error")
ErrAPIError = errors.New("zk: api error")
ErrNoNode = errors.New("zk: node does not exist")
ErrNoAuth = errors.New("zk: not authenticated")
ErrBadVersion = errors.New("zk: version conflict")
ErrNoChildrenForEphemerals = errors.New("zk: ephemeral nodes may not have children")
ErrNodeExists = errors.New("zk: node already exists")
ErrNotEmpty = errors.New("zk: node has children")
ErrSessionExpired = errors.New("zk: session has been expired by the server")
ErrInvalidACL = errors.New("zk: invalid ACL specified")
ErrAuthFailed = errors.New("zk: client authentication failed")
ErrClosing = errors.New("zk: zookeeper is closing")
ErrNothing = errors.New("zk: no server responsees to process")
ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored")
// ErrInvalidCallback = errors.New("zk: invalid callback specified")
errCodeToError = map[ErrCode]error{
0: nil,
errAPIError: ErrAPIError,
errNoNode: ErrNoNode,
errNoAuth: ErrNoAuth,
errBadVersion: ErrBadVersion,
errNoChildrenForEphemerals: ErrNoChildrenForEphemerals,
errNodeExists: ErrNodeExists,
errNotEmpty: ErrNotEmpty,
errSessionExpired: ErrSessionExpired,
// errInvalidCallback: ErrInvalidCallback,
errInvalidAcl: ErrInvalidACL,
errAuthFailed: ErrAuthFailed,
errClosing: ErrClosing,
errNothing: ErrNothing,
errSessionMoved: ErrSessionMoved,
}
)
func (e ErrCode) toError() error {
if err, ok := errCodeToError[e]; ok {
return err
}
return ErrUnknown
}
const (
errOk = 0
// System and server-side errors
errSystemError = -1
errRuntimeInconsistency = -2
errDataInconsistency = -3
errConnectionLoss = -4
errMarshallingError = -5
errUnimplemented = -6
errOperationTimeout = -7
errBadArguments = -8
errInvalidState = -9
// API errors
errAPIError = ErrCode(-100)
errNoNode = ErrCode(-101) // *
errNoAuth = ErrCode(-102)
errBadVersion = ErrCode(-103) // *
errNoChildrenForEphemerals = ErrCode(-108)
errNodeExists = ErrCode(-110) // *
errNotEmpty = ErrCode(-111)
errSessionExpired = ErrCode(-112)
errInvalidCallback = ErrCode(-113)
errInvalidAcl = ErrCode(-114)
errAuthFailed = ErrCode(-115)
errClosing = ErrCode(-116)
errNothing = ErrCode(-117)
errSessionMoved = ErrCode(-118)
)
// Constants for ACL permissions
const (
PermRead = 1 << iota
PermWrite
PermCreate
PermDelete
PermAdmin
PermAll = 0x1f
)
var (
emptyPassword = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
opNames = map[int32]string{
opNotify: "notify",
opCreate: "create",
opDelete: "delete",
opExists: "exists",
opGetData: "getData",
opSetData: "setData",
opGetAcl: "getACL",
opSetAcl: "setACL",
opGetChildren: "getChildren",
opSync: "sync",
opPing: "ping",
opGetChildren2: "getChildren2",
opCheck: "check",
opMulti: "multi",
opClose: "close",
opSetAuth: "setAuth",
opSetWatches: "setWatches",
opWatcherEvent: "watcherEvent",
}
)
type EventType int32
func (t EventType) String() string {
if name := eventNames[t]; name != "" {
return name
}
return "Unknown"
}
// Mode is used to build custom server modes (leader|follower|standalone).
type Mode uint8
func (m Mode) String() string {
if name := modeNames[m]; name != "" {
return name
}
return "unknown"
}
const (
ModeUnknown Mode = iota
ModeLeader Mode = iota
ModeFollower Mode = iota
ModeStandalone Mode = iota
)
var (
modeNames = map[Mode]string{
ModeLeader: "leader",
ModeFollower: "follower",
ModeStandalone: "standalone",
}
)

View file

@ -0,0 +1,24 @@
package zk
import (
"fmt"
"testing"
)
func TestModeString(t *testing.T) {
if fmt.Sprintf("%v", ModeUnknown) != "unknown" {
t.Errorf("unknown value should be 'unknown'")
}
if fmt.Sprintf("%v", ModeLeader) != "leader" {
t.Errorf("leader value should be 'leader'")
}
if fmt.Sprintf("%v", ModeFollower) != "follower" {
t.Errorf("follower value should be 'follower'")
}
if fmt.Sprintf("%v", ModeStandalone) != "standalone" {
t.Errorf("standlone value should be 'standalone'")
}
}

View file

@ -0,0 +1,288 @@
package zk
import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"math/big"
"net"
"regexp"
"strconv"
"time"
)
// FLWSrvr is a FourLetterWord helper function. In particular, this function pulls the srvr output
// from the zookeeper instances and parses the output. A slice of *ServerStats structs are returned
// as well as a boolean value to indicate whether this function processed successfully.
//
// If the boolean value is false there was a problem. If the *ServerStats slice is empty or nil,
// then the error happened before we started to obtain 'srvr' values. Otherwise, one of the
// servers had an issue and the "Error" value in the struct should be inspected to determine
// which server had the issue.
func FLWSrvr(servers []string, timeout time.Duration) ([]*ServerStats, bool) {
// different parts of the regular expression that are required to parse the srvr output
var (
zrVer = `^Zookeeper version: ([A-Za-z0-9\.\-]+), built on (\d\d/\d\d/\d\d\d\d \d\d:\d\d [A-Za-z0-9:\+\-]+)`
zrLat = `^Latency min/avg/max: (\d+)/(\d+)/(\d+)`
zrNet = `^Received: (\d+).*\n^Sent: (\d+).*\n^Connections: (\d+).*\n^Outstanding: (\d+)`
zrState = `^Zxid: (0x[A-Za-z0-9]+).*\n^Mode: (\w+).*\n^Node count: (\d+)`
)
// build the regex from the pieces above
re, err := regexp.Compile(fmt.Sprintf(`(?m:\A%v.*\n%v.*\n%v.*\n%v)`, zrVer, zrLat, zrNet, zrState))
if err != nil {
return nil, false
}
imOk := true
servers = FormatServers(servers)
ss := make([]*ServerStats, len(servers))
for i := range ss {
response, err := fourLetterWord(servers[i], "srvr", timeout)
if err != nil {
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}
match := re.FindAllStringSubmatch(string(response), -1)[0][1:]
if match == nil {
err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}
// determine current server
var srvrMode Mode
switch match[10] {
case "leader":
srvrMode = ModeLeader
case "follower":
srvrMode = ModeFollower
case "standalone":
srvrMode = ModeStandalone
default:
srvrMode = ModeUnknown
}
buildTime, err := time.Parse("01/02/2006 15:04 MST", match[1])
if err != nil {
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}
parsedInt, err := strconv.ParseInt(match[9], 0, 64)
if err != nil {
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}
// the ZxID value is an int64 with two int32s packed inside
// the high int32 is the epoch (i.e., number of leader elections)
// the low int32 is the counter
epoch := int32(parsedInt >> 32)
counter := int32(parsedInt & 0xFFFFFFFF)
// within the regex above, these values must be numerical
// so we can avoid useless checking of the error return value
minLatency, _ := strconv.ParseInt(match[2], 0, 64)
avgLatency, _ := strconv.ParseInt(match[3], 0, 64)
maxLatency, _ := strconv.ParseInt(match[4], 0, 64)
recv, _ := strconv.ParseInt(match[5], 0, 64)
sent, _ := strconv.ParseInt(match[6], 0, 64)
cons, _ := strconv.ParseInt(match[7], 0, 64)
outs, _ := strconv.ParseInt(match[8], 0, 64)
ncnt, _ := strconv.ParseInt(match[11], 0, 64)
ss[i] = &ServerStats{
Sent: sent,
Received: recv,
NodeCount: ncnt,
MinLatency: minLatency,
AvgLatency: avgLatency,
MaxLatency: maxLatency,
Connections: cons,
Outstanding: outs,
Epoch: epoch,
Counter: counter,
BuildTime: buildTime,
Mode: srvrMode,
Version: match[0],
}
}
return ss, imOk
}
// FLWRuok is a FourLetterWord helper function. In particular, this function
// pulls the ruok output from each server.
func FLWRuok(servers []string, timeout time.Duration) []bool {
servers = FormatServers(servers)
oks := make([]bool, len(servers))
for i := range oks {
response, err := fourLetterWord(servers[i], "ruok", timeout)
if err != nil {
continue
}
if bytes.Equal(response[:4], []byte("imok")) {
oks[i] = true
}
}
return oks
}
// FLWCons is a FourLetterWord helper function. In particular, this function
// pulls the ruok output from each server.
//
// As with FLWSrvr, the boolean value indicates whether one of the requests had
// an issue. The Clients struct has an Error value that can be checked.
func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool) {
var (
zrAddr = `^ /((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?):(?:\d+))\[\d+\]`
zrPac = `\(queued=(\d+),recved=(\d+),sent=(\d+),sid=(0x[A-Za-z0-9]+),lop=(\w+),est=(\d+),to=(\d+),`
zrSesh = `lcxid=(0x[A-Za-z0-9]+),lzxid=(0x[A-Za-z0-9]+),lresp=(\d+),llat=(\d+),minlat=(\d+),avglat=(\d+),maxlat=(\d+)\)`
)
re, err := regexp.Compile(fmt.Sprintf("%v%v%v", zrAddr, zrPac, zrSesh))
if err != nil {
return nil, false
}
servers = FormatServers(servers)
sc := make([]*ServerClients, len(servers))
imOk := true
for i := range sc {
response, err := fourLetterWord(servers[i], "cons", timeout)
if err != nil {
sc[i] = &ServerClients{Error: err}
imOk = false
continue
}
scan := bufio.NewScanner(bytes.NewReader(response))
var clients []*ServerClient
for scan.Scan() {
line := scan.Bytes()
if len(line) == 0 {
continue
}
m := re.FindAllStringSubmatch(string(line), -1)
if m == nil {
err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
sc[i] = &ServerClients{Error: err}
imOk = false
continue
}
match := m[0][1:]
queued, _ := strconv.ParseInt(match[1], 0, 64)
recvd, _ := strconv.ParseInt(match[2], 0, 64)
sent, _ := strconv.ParseInt(match[3], 0, 64)
sid, _ := strconv.ParseInt(match[4], 0, 64)
est, _ := strconv.ParseInt(match[6], 0, 64)
timeout, _ := strconv.ParseInt(match[7], 0, 32)
lresp, _ := strconv.ParseInt(match[10], 0, 64)
llat, _ := strconv.ParseInt(match[11], 0, 32)
minlat, _ := strconv.ParseInt(match[12], 0, 32)
avglat, _ := strconv.ParseInt(match[13], 0, 32)
maxlat, _ := strconv.ParseInt(match[14], 0, 32)
// zookeeper returns a value, '0xffffffffffffffff', as the
// Lzxid for PING requests in the 'cons' output.
// unfortunately, in Go that is an invalid int64 and is not represented
// as -1.
// However, converting the string value to a big.Int and then back to
// and int64 properly sets the value to -1
lzxid, ok := new(big.Int).SetString(match[9], 0)
var errVal error
if !ok {
errVal = fmt.Errorf("failed to convert lzxid value to big.Int")
imOk = false
}
lcxid, ok := new(big.Int).SetString(match[8], 0)
if !ok && errVal == nil {
errVal = fmt.Errorf("failed to convert lcxid value to big.Int")
imOk = false
}
clients = append(clients, &ServerClient{
Queued: queued,
Received: recvd,
Sent: sent,
SessionID: sid,
Lcxid: lcxid.Int64(),
Lzxid: lzxid.Int64(),
Timeout: int32(timeout),
LastLatency: int32(llat),
MinLatency: int32(minlat),
AvgLatency: int32(avglat),
MaxLatency: int32(maxlat),
Established: time.Unix(est, 0),
LastResponse: time.Unix(lresp, 0),
Addr: match[0],
LastOperation: match[5],
Error: errVal,
})
}
sc[i] = &ServerClients{Clients: clients}
}
return sc, imOk
}
func fourLetterWord(server, command string, timeout time.Duration) ([]byte, error) {
conn, err := net.DialTimeout("tcp", server, timeout)
if err != nil {
return nil, err
}
// the zookeeper server should automatically close this socket
// once the command has been processed, but better safe than sorry
defer conn.Close()
conn.SetWriteDeadline(time.Now().Add(timeout))
_, err = conn.Write([]byte(command))
if err != nil {
return nil, err
}
conn.SetReadDeadline(time.Now().Add(timeout))
resp, err := ioutil.ReadAll(conn)
if err != nil {
return nil, err
}
return resp, nil
}

View file

@ -0,0 +1,367 @@
package zk
import (
"net"
"testing"
"time"
)
var (
zkSrvrOut = `Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT
Latency min/avg/max: 0/1/10
Received: 4207
Sent: 4220
Connections: 81
Outstanding: 1
Zxid: 0x110a7a8f37
Mode: leader
Node count: 306
`
zkConsOut = ` /10.42.45.231:45361[1](queued=0,recved=9435,sent=9457,sid=0x94c2989e04716b5,lop=PING,est=1427238717217,to=20001,lcxid=0x55120915,lzxid=0xffffffffffffffff,lresp=1427259255908,llat=0,minlat=0,avglat=1,maxlat=17)
/10.55.33.98:34342[1](queued=0,recved=9338,sent=9350,sid=0x94c2989e0471731,lop=PING,est=1427238849319,to=20001,lcxid=0x55120944,lzxid=0xffffffffffffffff,lresp=1427259252294,llat=0,minlat=0,avglat=1,maxlat=18)
/10.44.145.114:46556[1](queued=0,recved=109253,sent=109617,sid=0x94c2989e0471709,lop=DELE,est=1427238791305,to=20001,lcxid=0x55139618,lzxid=0x110a7b187d,lresp=1427259257423,llat=2,minlat=0,avglat=1,maxlat=23)
`
)
func TestFLWRuok(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:2181")
if err != nil {
t.Fatalf(err.Error())
}
go tcpServer(l, "")
var oks []bool
var ok bool
oks = FLWRuok([]string{"127.0.0.1"}, time.Second*10)
// close the connection, and pause shortly
// to cheat around a race condition
l.Close()
time.Sleep(time.Millisecond * 1)
if len(oks) == 0 {
t.Errorf("no values returned")
}
ok = oks[0]
if !ok {
t.Errorf("instance should be marked as OK")
}
//
// Confirm that it also returns false for dead instances
//
l, err = net.Listen("tcp", "127.0.0.1:2181")
if err != nil {
t.Fatalf(err.Error())
}
defer l.Close()
go tcpServer(l, "dead")
oks = FLWRuok([]string{"127.0.0.1"}, time.Second*10)
if len(oks) == 0 {
t.Errorf("no values returned")
}
ok = oks[0]
if ok {
t.Errorf("instance should be marked as not OK")
}
}
func TestFLWSrvr(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:2181")
if err != nil {
t.Fatalf(err.Error())
}
defer l.Close()
go tcpServer(l, "")
var statsSlice []*ServerStats
var stats *ServerStats
var ok bool
statsSlice, ok = FLWSrvr([]string{"127.0.0.1:2181"}, time.Second*10)
if !ok {
t.Errorf("failure indicated on 'srvr' parsing")
}
if len(statsSlice) == 0 {
t.Errorf("no *ServerStats instances returned")
}
stats = statsSlice[0]
if stats.Error != nil {
t.Fatalf("error seen in stats: %v", err.Error())
}
if stats.Sent != 4220 {
t.Errorf("Sent != 4220")
}
if stats.Received != 4207 {
t.Errorf("Received != 4207")
}
if stats.NodeCount != 306 {
t.Errorf("NodeCount != 306")
}
if stats.MinLatency != 0 {
t.Errorf("MinLatency != 0")
}
if stats.AvgLatency != 1 {
t.Errorf("AvgLatency != 1")
}
if stats.MaxLatency != 10 {
t.Errorf("MaxLatency != 10")
}
if stats.Connections != 81 {
t.Errorf("Connection != 81")
}
if stats.Outstanding != 1 {
t.Errorf("Outstanding != 1")
}
if stats.Epoch != 17 {
t.Errorf("Epoch != 17")
}
if stats.Counter != 175804215 {
t.Errorf("Counter != 175804215")
}
if stats.Mode != ModeLeader {
t.Errorf("Mode != ModeLeader")
}
if stats.Version != "3.4.6-1569965" {
t.Errorf("Version expected: 3.4.6-1569965")
}
buildTime, err := time.Parse("01/02/2006 15:04 MST", "02/20/2014 09:09 GMT")
if !stats.BuildTime.Equal(buildTime) {
}
}
func TestFLWCons(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:2181")
if err != nil {
t.Fatalf(err.Error())
}
defer l.Close()
go tcpServer(l, "")
var clients []*ServerClients
var ok bool
clients, ok = FLWCons([]string{"127.0.0.1"}, time.Second*10)
if !ok {
t.Errorf("failure indicated on 'cons' parsing")
}
if len(clients) == 0 {
t.Errorf("no *ServerClients instances returned")
}
results := []*ServerClient{
&ServerClient{
Queued: 0,
Received: 9435,
Sent: 9457,
SessionID: 669956116721374901,
LastOperation: "PING",
Established: time.Unix(1427238717217, 0),
Timeout: 20001,
Lcxid: 1427245333,
Lzxid: -1,
LastResponse: time.Unix(1427259255908, 0),
LastLatency: 0,
MinLatency: 0,
AvgLatency: 1,
MaxLatency: 17,
Addr: "10.42.45.231:45361",
},
&ServerClient{
Queued: 0,
Received: 9338,
Sent: 9350,
SessionID: 669956116721375025,
LastOperation: "PING",
Established: time.Unix(1427238849319, 0),
Timeout: 20001,
Lcxid: 1427245380,
Lzxid: -1,
LastResponse: time.Unix(1427259252294, 0),
LastLatency: 0,
MinLatency: 0,
AvgLatency: 1,
MaxLatency: 18,
Addr: "10.55.33.98:34342",
},
&ServerClient{
Queued: 0,
Received: 109253,
Sent: 109617,
SessionID: 669956116721374985,
LastOperation: "DELE",
Established: time.Unix(1427238791305, 0),
Timeout: 20001,
Lcxid: 1427346968,
Lzxid: 73190283389,
LastResponse: time.Unix(1427259257423, 0),
LastLatency: 2,
MinLatency: 0,
AvgLatency: 1,
MaxLatency: 23,
Addr: "10.44.145.114:46556",
},
}
for _, z := range clients {
if z.Error != nil {
t.Errorf("error seen: %v", err.Error())
}
for i, v := range z.Clients {
c := results[i]
if v.Error != nil {
t.Errorf("client error seen: %v", err.Error())
}
if v.Queued != c.Queued {
t.Errorf("Queued value mismatch (%d/%d)", v.Queued, c.Queued)
}
if v.Received != c.Received {
t.Errorf("Received value mismatch (%d/%d)", v.Received, c.Received)
}
if v.Sent != c.Sent {
t.Errorf("Sent value mismatch (%d/%d)", v.Sent, c.Sent)
}
if v.SessionID != c.SessionID {
t.Errorf("SessionID value mismatch (%d/%d)", v.SessionID, c.SessionID)
}
if v.LastOperation != c.LastOperation {
t.Errorf("LastOperation value mismatch ('%v'/'%v')", v.LastOperation, c.LastOperation)
}
if v.Timeout != c.Timeout {
t.Errorf("Timeout value mismatch (%d/%d)", v.Timeout, c.Timeout)
}
if v.Lcxid != c.Lcxid {
t.Errorf("Lcxid value mismatch (%d/%d)", v.Lcxid, c.Lcxid)
}
if v.Lzxid != c.Lzxid {
t.Errorf("Lzxid value mismatch (%d/%d)", v.Lzxid, c.Lzxid)
}
if v.LastLatency != c.LastLatency {
t.Errorf("LastLatency value mismatch (%d/%d)", v.LastLatency, c.LastLatency)
}
if v.MinLatency != c.MinLatency {
t.Errorf("MinLatency value mismatch (%d/%d)", v.MinLatency, c.MinLatency)
}
if v.AvgLatency != c.AvgLatency {
t.Errorf("AvgLatency value mismatch (%d/%d)", v.AvgLatency, c.AvgLatency)
}
if v.MaxLatency != c.MaxLatency {
t.Errorf("MaxLatency value mismatch (%d/%d)", v.MaxLatency, c.MaxLatency)
}
if v.Addr != c.Addr {
t.Errorf("Addr value mismatch ('%v'/'%v')", v.Addr, c.Addr)
}
if !c.Established.Equal(v.Established) {
t.Errorf("Established value mismatch (%v/%v)", c.Established, v.Established)
}
if !c.LastResponse.Equal(v.LastResponse) {
t.Errorf("Established value mismatch (%v/%v)", c.LastResponse, v.LastResponse)
}
}
}
}
func tcpServer(listener net.Listener, thing string) {
for {
conn, err := listener.Accept()
if err != nil {
return
}
go connHandler(conn, thing)
}
}
func connHandler(conn net.Conn, thing string) {
defer conn.Close()
data := make([]byte, 4)
_, err := conn.Read(data)
if err != nil {
return
}
switch string(data) {
case "ruok":
switch thing {
case "dead":
return
default:
conn.Write([]byte("imok"))
}
case "srvr":
switch thing {
case "dead":
return
default:
conn.Write([]byte(zkSrvrOut))
}
case "cons":
switch thing {
case "dead":
return
default:
conn.Write([]byte(zkConsOut))
}
default:
conn.Write([]byte("This ZooKeeper instance is not currently serving requests."))
}
}

View file

@ -0,0 +1,142 @@
package zk
import (
"errors"
"fmt"
"strconv"
"strings"
)
var (
// ErrDeadlock is returned by Lock when trying to lock twice without unlocking first
ErrDeadlock = errors.New("zk: trying to acquire a lock twice")
// ErrNotLocked is returned by Unlock when trying to release a lock that has not first be acquired.
ErrNotLocked = errors.New("zk: not locked")
)
// Lock is a mutual exclusion lock.
type Lock struct {
c *Conn
path string
acl []ACL
lockPath string
seq int
}
// NewLock creates a new lock instance using the provided connection, path, and acl.
// The path must be a node that is only used by this lock. A lock instances starts
// unlocked until Lock() is called.
func NewLock(c *Conn, path string, acl []ACL) *Lock {
return &Lock{
c: c,
path: path,
acl: acl,
}
}
func parseSeq(path string) (int, error) {
parts := strings.Split(path, "-")
return strconv.Atoi(parts[len(parts)-1])
}
// Lock attempts to acquire the lock. It will wait to return until the lock
// is acquired or an error occurs. If this instance already has the lock
// then ErrDeadlock is returned.
func (l *Lock) Lock() error {
if l.lockPath != "" {
return ErrDeadlock
}
prefix := fmt.Sprintf("%s/lock-", l.path)
path := ""
var err error
for i := 0; i < 3; i++ {
path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl)
if err == ErrNoNode {
// Create parent node.
parts := strings.Split(l.path, "/")
pth := ""
for _, p := range parts[1:] {
pth += "/" + p
_, err := l.c.Create(pth, []byte{}, 0, l.acl)
if err != nil && err != ErrNodeExists {
return err
}
}
} else if err == nil {
break
} else {
return err
}
}
if err != nil {
return err
}
seq, err := parseSeq(path)
if err != nil {
return err
}
for {
children, _, err := l.c.Children(l.path)
if err != nil {
return err
}
lowestSeq := seq
prevSeq := 0
prevSeqPath := ""
for _, p := range children {
s, err := parseSeq(p)
if err != nil {
return err
}
if s < lowestSeq {
lowestSeq = s
}
if s < seq && s > prevSeq {
prevSeq = s
prevSeqPath = p
}
}
if seq == lowestSeq {
// Acquired the lock
break
}
// Wait on the node next in line for the lock
_, _, ch, err := l.c.GetW(l.path + "/" + prevSeqPath)
if err != nil && err != ErrNoNode {
return err
} else if err != nil && err == ErrNoNode {
// try again
continue
}
ev := <-ch
if ev.Err != nil {
return ev.Err
}
}
l.seq = seq
l.lockPath = path
return nil
}
// Unlock releases an acquired lock. If the lock is not currently acquired by
// this Lock instance than ErrNotLocked is returned.
func (l *Lock) Unlock() error {
if l.lockPath == "" {
return ErrNotLocked
}
if err := l.c.Delete(l.lockPath, -1); err != nil {
return err
}
l.lockPath = ""
l.seq = 0
return nil
}

View file

@ -0,0 +1,94 @@
package zk
import (
"testing"
"time"
)
func TestLock(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
acls := WorldACL(PermAll)
l := NewLock(zk, "/test", acls)
if err := l.Lock(); err != nil {
t.Fatal(err)
}
if err := l.Unlock(); err != nil {
t.Fatal(err)
}
val := make(chan int, 3)
if err := l.Lock(); err != nil {
t.Fatal(err)
}
l2 := NewLock(zk, "/test", acls)
go func() {
if err := l2.Lock(); err != nil {
t.Fatal(err)
}
val <- 2
if err := l2.Unlock(); err != nil {
t.Fatal(err)
}
val <- 3
}()
time.Sleep(time.Millisecond * 100)
val <- 1
if err := l.Unlock(); err != nil {
t.Fatal(err)
}
if x := <-val; x != 1 {
t.Fatalf("Expected 1 instead of %d", x)
}
if x := <-val; x != 2 {
t.Fatalf("Expected 2 instead of %d", x)
}
if x := <-val; x != 3 {
t.Fatalf("Expected 3 instead of %d", x)
}
}
// This tests creating a lock with a path that's more than 1 node deep (e.g. "/test-multi-level/lock"),
// when a part of that path already exists (i.e. "/test-multi-level" node already exists).
func TestMultiLevelLock(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
acls := WorldACL(PermAll)
path := "/test-multi-level"
if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
l := NewLock(zk, "/test-multi-level/lock", acls)
defer zk.Delete("/test-multi-level", -1) // Clean up what we've created for this test
defer zk.Delete("/test-multi-level/lock", -1)
if err := l.Lock(); err != nil {
t.Fatal(err)
}
if err := l.Unlock(); err != nil {
t.Fatal(err)
}
}

View file

@ -0,0 +1,119 @@
package zk
import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"time"
)
type TestServer struct {
Port int
Path string
Srv *Server
}
type TestCluster struct {
Path string
Servers []TestServer
}
func StartTestCluster(size int, stdout, stderr io.Writer) (*TestCluster, error) {
tmpPath, err := ioutil.TempDir("", "gozk")
if err != nil {
return nil, err
}
success := false
startPort := int(rand.Int31n(6000) + 10000)
cluster := &TestCluster{Path: tmpPath}
defer func() {
if !success {
cluster.Stop()
}
}()
for serverN := 0; serverN < size; serverN++ {
srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN))
if err := os.Mkdir(srvPath, 0700); err != nil {
return nil, err
}
port := startPort + serverN*3
cfg := ServerConfig{
ClientPort: port,
DataDir: srvPath,
}
for i := 0; i < size; i++ {
cfg.Servers = append(cfg.Servers, ServerConfigServer{
ID: i + 1,
Host: "127.0.0.1",
PeerPort: startPort + i*3 + 1,
LeaderElectionPort: startPort + i*3 + 2,
})
}
cfgPath := filepath.Join(srvPath, "zoo.cfg")
fi, err := os.Create(cfgPath)
if err != nil {
return nil, err
}
err = cfg.Marshall(fi)
fi.Close()
if err != nil {
return nil, err
}
fi, err = os.Create(filepath.Join(srvPath, "myid"))
if err != nil {
return nil, err
}
_, err = fmt.Fprintf(fi, "%d\n", serverN+1)
fi.Close()
if err != nil {
return nil, err
}
srv := &Server{
ConfigPath: cfgPath,
Stdout: stdout,
Stderr: stderr,
}
if err := srv.Start(); err != nil {
return nil, err
}
cluster.Servers = append(cluster.Servers, TestServer{
Path: srvPath,
Port: cfg.ClientPort,
Srv: srv,
})
}
success = true
time.Sleep(time.Second) // Give the server time to become active. Should probably actually attempt to connect to verify.
return cluster, nil
}
func (ts *TestCluster) Connect(idx int) (*Conn, error) {
zk, _, err := Connect([]string{fmt.Sprintf("127.0.0.1:%d", ts.Servers[idx].Port)}, time.Second*15)
return zk, err
}
func (ts *TestCluster) ConnectAll() (*Conn, <-chan Event, error) {
return ts.ConnectAllTimeout(time.Second * 15)
}
func (ts *TestCluster) ConnectAllTimeout(sessionTimeout time.Duration) (*Conn, <-chan Event, error) {
hosts := make([]string, len(ts.Servers))
for i, srv := range ts.Servers {
hosts[i] = fmt.Sprintf("127.0.0.1:%d", srv.Port)
}
zk, ch, err := Connect(hosts, sessionTimeout)
return zk, ch, err
}
func (ts *TestCluster) Stop() error {
for _, srv := range ts.Servers {
srv.Srv.Stop()
}
defer os.RemoveAll(ts.Path)
return nil
}

View file

@ -0,0 +1,136 @@
package zk
import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
)
type ErrMissingServerConfigField string
func (e ErrMissingServerConfigField) Error() string {
return fmt.Sprintf("zk: missing server config field '%s'", string(e))
}
const (
DefaultServerTickTime = 2000
DefaultServerInitLimit = 10
DefaultServerSyncLimit = 5
DefaultServerAutoPurgeSnapRetainCount = 3
DefaultPeerPort = 2888
DefaultLeaderElectionPort = 3888
)
type ServerConfigServer struct {
ID int
Host string
PeerPort int
LeaderElectionPort int
}
type ServerConfig struct {
TickTime int // Number of milliseconds of each tick
InitLimit int // Number of ticks that the initial synchronization phase can take
SyncLimit int // Number of ticks that can pass between sending a request and getting an acknowledgement
DataDir string // Direcrory where the snapshot is stored
ClientPort int // Port at which clients will connect
AutoPurgeSnapRetainCount int // Number of snapshots to retain in dataDir
AutoPurgePurgeInterval int // Purge task internal in hours (0 to disable auto purge)
Servers []ServerConfigServer
}
func (sc ServerConfig) Marshall(w io.Writer) error {
if sc.DataDir == "" {
return ErrMissingServerConfigField("dataDir")
}
fmt.Fprintf(w, "dataDir=%s\n", sc.DataDir)
if sc.TickTime <= 0 {
sc.TickTime = DefaultServerTickTime
}
fmt.Fprintf(w, "tickTime=%d\n", sc.TickTime)
if sc.InitLimit <= 0 {
sc.InitLimit = DefaultServerInitLimit
}
fmt.Fprintf(w, "initLimit=%d\n", sc.InitLimit)
if sc.SyncLimit <= 0 {
sc.SyncLimit = DefaultServerSyncLimit
}
fmt.Fprintf(w, "syncLimit=%d\n", sc.SyncLimit)
if sc.ClientPort <= 0 {
sc.ClientPort = DefaultPort
}
fmt.Fprintf(w, "clientPort=%d\n", sc.ClientPort)
if sc.AutoPurgePurgeInterval > 0 {
if sc.AutoPurgeSnapRetainCount <= 0 {
sc.AutoPurgeSnapRetainCount = DefaultServerAutoPurgeSnapRetainCount
}
fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount)
fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval)
}
if len(sc.Servers) > 0 {
for _, srv := range sc.Servers {
if srv.PeerPort <= 0 {
srv.PeerPort = DefaultPeerPort
}
if srv.LeaderElectionPort <= 0 {
srv.LeaderElectionPort = DefaultLeaderElectionPort
}
fmt.Fprintf(w, "server.%d=%s:%d:%d\n", srv.ID, srv.Host, srv.PeerPort, srv.LeaderElectionPort)
}
}
return nil
}
var jarSearchPaths = []string{
"zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"../zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"/usr/share/java/zookeeper-*.jar",
"/usr/local/zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"/usr/local/Cellar/zookeeper/*/libexec/contrib/fatjar/zookeeper-*-fatjar.jar",
}
func findZookeeperFatJar() string {
var paths []string
zkPath := os.Getenv("ZOOKEEPER_PATH")
if zkPath == "" {
paths = jarSearchPaths
} else {
paths = []string{filepath.Join(zkPath, "contrib/fatjar/zookeeper-*-fatjar.jar")}
}
for _, path := range paths {
matches, _ := filepath.Glob(path)
// TODO: could sort by version and pick latest
if len(matches) > 0 {
return matches[0]
}
}
return ""
}
type Server struct {
JarPath string
ConfigPath string
Stdout, Stderr io.Writer
cmd *exec.Cmd
}
func (srv *Server) Start() error {
if srv.JarPath == "" {
srv.JarPath = findZookeeperFatJar()
if srv.JarPath == "" {
return fmt.Errorf("zk: unable to find server jar")
}
}
srv.cmd = exec.Command("java", "-jar", srv.JarPath, "server", srv.ConfigPath)
srv.cmd.Stdout = srv.Stdout
srv.cmd.Stderr = srv.Stderr
return srv.cmd.Start()
}
func (srv *Server) Stop() error {
srv.cmd.Process.Signal(os.Kill)
return srv.cmd.Wait()
}

View file

@ -0,0 +1,640 @@
package zk
import (
"encoding/binary"
"errors"
"log"
"reflect"
"runtime"
"time"
)
var (
ErrUnhandledFieldType = errors.New("zk: unhandled field type")
ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct")
ErrShortBuffer = errors.New("zk: buffer too small")
)
type defaultLogger struct{}
func (defaultLogger) Printf(format string, a ...interface{}) {
log.Printf(format, a...)
}
type ACL struct {
Perms int32
Scheme string
ID string
}
type Stat struct {
Czxid int64 // The zxid of the change that caused this znode to be created.
Mzxid int64 // The zxid of the change that last modified this znode.
Ctime int64 // The time in milliseconds from epoch when this znode was created.
Mtime int64 // The time in milliseconds from epoch when this znode was last modified.
Version int32 // The number of changes to the data of this znode.
Cversion int32 // The number of changes to the children of this znode.
Aversion int32 // The number of changes to the ACL of this znode.
EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
DataLength int32 // The length of the data field of this znode.
NumChildren int32 // The number of children of this znode.
Pzxid int64 // last modified children
}
// ServerClient is the information for a single Zookeeper client and its session.
// This is used to parse/extract the output fo the `cons` command.
type ServerClient struct {
Queued int64
Received int64
Sent int64
SessionID int64
Lcxid int64
Lzxid int64
Timeout int32
LastLatency int32
MinLatency int32
AvgLatency int32
MaxLatency int32
Established time.Time
LastResponse time.Time
Addr string
LastOperation string // maybe?
Error error
}
// ServerClients is a struct for the FLWCons() function. It's used to provide
// the list of Clients.
//
// This is needed because FLWCons() takes multiple servers.
type ServerClients struct {
Clients []*ServerClient
Error error
}
// ServerStats is the information pulled from the Zookeeper `stat` command.
type ServerStats struct {
Sent int64
Received int64
NodeCount int64
MinLatency int64
AvgLatency int64
MaxLatency int64
Connections int64
Outstanding int64
Epoch int32
Counter int32
BuildTime time.Time
Mode Mode
Version string
Error error
}
type requestHeader struct {
Xid int32
Opcode int32
}
type responseHeader struct {
Xid int32
Zxid int64
Err ErrCode
}
type multiHeader struct {
Type int32
Done bool
Err ErrCode
}
type auth struct {
Type int32
Scheme string
Auth []byte
}
// Generic request structs
type pathRequest struct {
Path string
}
type PathVersionRequest struct {
Path string
Version int32
}
type pathWatchRequest struct {
Path string
Watch bool
}
type pathResponse struct {
Path string
}
type statResponse struct {
Stat Stat
}
//
type CheckVersionRequest PathVersionRequest
type closeRequest struct{}
type closeResponse struct{}
type connectRequest struct {
ProtocolVersion int32
LastZxidSeen int64
TimeOut int32
SessionID int64
Passwd []byte
}
type connectResponse struct {
ProtocolVersion int32
TimeOut int32
SessionID int64
Passwd []byte
}
type CreateRequest struct {
Path string
Data []byte
Acl []ACL
Flags int32
}
type createResponse pathResponse
type DeleteRequest PathVersionRequest
type deleteResponse struct{}
type errorResponse struct {
Err int32
}
type existsRequest pathWatchRequest
type existsResponse statResponse
type getAclRequest pathRequest
type getAclResponse struct {
Acl []ACL
Stat Stat
}
type getChildrenRequest pathRequest
type getChildrenResponse struct {
Children []string
}
type getChildren2Request pathWatchRequest
type getChildren2Response struct {
Children []string
Stat Stat
}
type getDataRequest pathWatchRequest
type getDataResponse struct {
Data []byte
Stat Stat
}
type getMaxChildrenRequest pathRequest
type getMaxChildrenResponse struct {
Max int32
}
type getSaslRequest struct {
Token []byte
}
type pingRequest struct{}
type pingResponse struct{}
type setAclRequest struct {
Path string
Acl []ACL
Version int32
}
type setAclResponse statResponse
type SetDataRequest struct {
Path string
Data []byte
Version int32
}
type setDataResponse statResponse
type setMaxChildren struct {
Path string
Max int32
}
type setSaslRequest struct {
Token string
}
type setSaslResponse struct {
Token string
}
type setWatchesRequest struct {
RelativeZxid int64
DataWatches []string
ExistWatches []string
ChildWatches []string
}
type setWatchesResponse struct{}
type syncRequest pathRequest
type syncResponse pathResponse
type setAuthRequest auth
type setAuthResponse struct{}
type multiRequestOp struct {
Header multiHeader
Op interface{}
}
type multiRequest struct {
Ops []multiRequestOp
DoneHeader multiHeader
}
type multiResponseOp struct {
Header multiHeader
String string
Stat *Stat
}
type multiResponse struct {
Ops []multiResponseOp
DoneHeader multiHeader
}
func (r *multiRequest) Encode(buf []byte) (int, error) {
total := 0
for _, op := range r.Ops {
op.Header.Done = false
n, err := encodePacketValue(buf[total:], reflect.ValueOf(op))
if err != nil {
return total, err
}
total += n
}
r.DoneHeader.Done = true
n, err := encodePacketValue(buf[total:], reflect.ValueOf(r.DoneHeader))
if err != nil {
return total, err
}
total += n
return total, nil
}
func (r *multiRequest) Decode(buf []byte) (int, error) {
r.Ops = make([]multiRequestOp, 0)
r.DoneHeader = multiHeader{-1, true, -1}
total := 0
for {
header := &multiHeader{}
n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
if err != nil {
return total, err
}
total += n
if header.Done {
r.DoneHeader = *header
break
}
req := requestStructForOp(header.Type)
if req == nil {
return total, ErrAPIError
}
n, err = decodePacketValue(buf[total:], reflect.ValueOf(req))
if err != nil {
return total, err
}
total += n
r.Ops = append(r.Ops, multiRequestOp{*header, req})
}
return total, nil
}
func (r *multiResponse) Decode(buf []byte) (int, error) {
r.Ops = make([]multiResponseOp, 0)
r.DoneHeader = multiHeader{-1, true, -1}
total := 0
for {
header := &multiHeader{}
n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
if err != nil {
return total, err
}
total += n
if header.Done {
r.DoneHeader = *header
break
}
res := multiResponseOp{Header: *header}
var w reflect.Value
switch header.Type {
default:
return total, ErrAPIError
case opCreate:
w = reflect.ValueOf(&res.String)
case opSetData:
res.Stat = new(Stat)
w = reflect.ValueOf(res.Stat)
case opCheck, opDelete:
}
if w.IsValid() {
n, err := decodePacketValue(buf[total:], w)
if err != nil {
return total, err
}
total += n
}
r.Ops = append(r.Ops, res)
}
return total, nil
}
type watcherEvent struct {
Type EventType
State State
Path string
}
type decoder interface {
Decode(buf []byte) (int, error)
}
type encoder interface {
Encode(buf []byte) (int, error)
}
func decodePacket(buf []byte, st interface{}) (n int, err error) {
defer func() {
if r := recover(); r != nil {
if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
err = ErrShortBuffer
} else {
panic(r)
}
}
}()
v := reflect.ValueOf(st)
if v.Kind() != reflect.Ptr || v.IsNil() {
return 0, ErrPtrExpected
}
return decodePacketValue(buf, v)
}
func decodePacketValue(buf []byte, v reflect.Value) (int, error) {
rv := v
kind := v.Kind()
if kind == reflect.Ptr {
if v.IsNil() {
v.Set(reflect.New(v.Type().Elem()))
}
v = v.Elem()
kind = v.Kind()
}
n := 0
switch kind {
default:
return n, ErrUnhandledFieldType
case reflect.Struct:
if de, ok := rv.Interface().(decoder); ok {
return de.Decode(buf)
} else if de, ok := v.Interface().(decoder); ok {
return de.Decode(buf)
} else {
for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
n2, err := decodePacketValue(buf[n:], field)
n += n2
if err != nil {
return n, err
}
}
}
case reflect.Bool:
v.SetBool(buf[n] != 0)
n++
case reflect.Int32:
v.SetInt(int64(binary.BigEndian.Uint32(buf[n : n+4])))
n += 4
case reflect.Int64:
v.SetInt(int64(binary.BigEndian.Uint64(buf[n : n+8])))
n += 8
case reflect.String:
ln := int(binary.BigEndian.Uint32(buf[n : n+4]))
v.SetString(string(buf[n+4 : n+4+ln]))
n += 4 + ln
case reflect.Slice:
switch v.Type().Elem().Kind() {
default:
count := int(binary.BigEndian.Uint32(buf[n : n+4]))
n += 4
values := reflect.MakeSlice(v.Type(), count, count)
v.Set(values)
for i := 0; i < count; i++ {
n2, err := decodePacketValue(buf[n:], values.Index(i))
n += n2
if err != nil {
return n, err
}
}
case reflect.Uint8:
ln := int(int32(binary.BigEndian.Uint32(buf[n : n+4])))
if ln < 0 {
n += 4
v.SetBytes(nil)
} else {
bytes := make([]byte, ln)
copy(bytes, buf[n+4:n+4+ln])
v.SetBytes(bytes)
n += 4 + ln
}
}
}
return n, nil
}
func encodePacket(buf []byte, st interface{}) (n int, err error) {
defer func() {
if r := recover(); r != nil {
if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
err = ErrShortBuffer
} else {
panic(r)
}
}
}()
v := reflect.ValueOf(st)
if v.Kind() != reflect.Ptr || v.IsNil() {
return 0, ErrPtrExpected
}
return encodePacketValue(buf, v)
}
func encodePacketValue(buf []byte, v reflect.Value) (int, error) {
rv := v
for v.Kind() == reflect.Ptr || v.Kind() == reflect.Interface {
v = v.Elem()
}
n := 0
switch v.Kind() {
default:
return n, ErrUnhandledFieldType
case reflect.Struct:
if en, ok := rv.Interface().(encoder); ok {
return en.Encode(buf)
} else if en, ok := v.Interface().(encoder); ok {
return en.Encode(buf)
} else {
for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
n2, err := encodePacketValue(buf[n:], field)
n += n2
if err != nil {
return n, err
}
}
}
case reflect.Bool:
if v.Bool() {
buf[n] = 1
} else {
buf[n] = 0
}
n++
case reflect.Int32:
binary.BigEndian.PutUint32(buf[n:n+4], uint32(v.Int()))
n += 4
case reflect.Int64:
binary.BigEndian.PutUint64(buf[n:n+8], uint64(v.Int()))
n += 8
case reflect.String:
str := v.String()
binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(str)))
copy(buf[n+4:n+4+len(str)], []byte(str))
n += 4 + len(str)
case reflect.Slice:
switch v.Type().Elem().Kind() {
default:
count := v.Len()
startN := n
n += 4
for i := 0; i < count; i++ {
n2, err := encodePacketValue(buf[n:], v.Index(i))
n += n2
if err != nil {
return n, err
}
}
binary.BigEndian.PutUint32(buf[startN:startN+4], uint32(count))
case reflect.Uint8:
if v.IsNil() {
binary.BigEndian.PutUint32(buf[n:n+4], uint32(0xffffffff))
n += 4
} else {
bytes := v.Bytes()
binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(bytes)))
copy(buf[n+4:n+4+len(bytes)], bytes)
n += 4 + len(bytes)
}
}
}
return n, nil
}
func requestStructForOp(op int32) interface{} {
switch op {
case opClose:
return &closeRequest{}
case opCreate:
return &CreateRequest{}
case opDelete:
return &DeleteRequest{}
case opExists:
return &existsRequest{}
case opGetAcl:
return &getAclRequest{}
case opGetChildren:
return &getChildrenRequest{}
case opGetChildren2:
return &getChildren2Request{}
case opGetData:
return &getDataRequest{}
case opPing:
return &pingRequest{}
case opSetAcl:
return &setAclRequest{}
case opSetData:
return &SetDataRequest{}
case opSetWatches:
return &setWatchesRequest{}
case opSync:
return &syncRequest{}
case opSetAuth:
return &setAuthRequest{}
case opCheck:
return &CheckVersionRequest{}
case opMulti:
return &multiRequest{}
}
return nil
}
func responseStructForOp(op int32) interface{} {
switch op {
case opClose:
return &closeResponse{}
case opCreate:
return &createResponse{}
case opDelete:
return &deleteResponse{}
case opExists:
return &existsResponse{}
case opGetAcl:
return &getAclResponse{}
case opGetChildren:
return &getChildrenResponse{}
case opGetChildren2:
return &getChildren2Response{}
case opGetData:
return &getDataResponse{}
case opPing:
return &pingResponse{}
case opSetAcl:
return &setAclResponse{}
case opSetData:
return &setDataResponse{}
case opSetWatches:
return &setWatchesResponse{}
case opSync:
return &syncResponse{}
case opWatcherEvent:
return &watcherEvent{}
case opSetAuth:
return &setAuthResponse{}
// case opCheck:
// return &checkVersionResponse{}
case opMulti:
return &multiResponse{}
}
return nil
}

View file

@ -0,0 +1,71 @@
package zk
import (
"reflect"
"testing"
)
func TestEncodeDecodePacket(t *testing.T) {
encodeDecodeTest(t, &requestHeader{-2, 5})
encodeDecodeTest(t, &connectResponse{1, 2, 3, nil})
encodeDecodeTest(t, &connectResponse{1, 2, 3, []byte{4, 5, 6}})
encodeDecodeTest(t, &getAclResponse{[]ACL{{12, "s", "anyone"}}, Stat{}})
encodeDecodeTest(t, &getChildrenResponse{[]string{"foo", "bar"}})
encodeDecodeTest(t, &pathWatchRequest{"path", true})
encodeDecodeTest(t, &pathWatchRequest{"path", false})
encodeDecodeTest(t, &CheckVersionRequest{"/", -1})
encodeDecodeTest(t, &multiRequest{Ops: []multiRequestOp{{multiHeader{opCheck, false, -1}, &CheckVersionRequest{"/", -1}}}})
}
func encodeDecodeTest(t *testing.T, r interface{}) {
buf := make([]byte, 1024)
n, err := encodePacket(buf, r)
if err != nil {
t.Errorf("encodePacket returned non-nil error %+v\n", err)
return
}
t.Logf("%+v %x", r, buf[:n])
r2 := reflect.New(reflect.ValueOf(r).Elem().Type()).Interface()
n2, err := decodePacket(buf[:n], r2)
if err != nil {
t.Errorf("decodePacket returned non-nil error %+v\n", err)
return
}
if n != n2 {
t.Errorf("sizes don't match: %d != %d", n, n2)
return
}
if !reflect.DeepEqual(r, r2) {
t.Errorf("results don't match: %+v != %+v", r, r2)
return
}
}
func TestEncodeShortBuffer(t *testing.T) {
buf := make([]byte, 0)
_, err := encodePacket(buf, &requestHeader{1, 2})
if err != ErrShortBuffer {
t.Errorf("encodePacket should return ErrShortBuffer on a short buffer instead of '%+v'", err)
return
}
}
func TestDecodeShortBuffer(t *testing.T) {
buf := make([]byte, 0)
_, err := decodePacket(buf, &responseHeader{})
if err != ErrShortBuffer {
t.Errorf("decodePacket should return ErrShortBuffer on a short buffer instead of '%+v'", err)
return
}
}
func BenchmarkEncode(b *testing.B) {
buf := make([]byte, 4096)
st := &connectRequest{Passwd: []byte("1234567890")}
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if _, err := encodePacket(buf, st); err != nil {
b.Fatal(err)
}
}
}

View file

@ -0,0 +1,148 @@
package zk
import (
"encoding/binary"
"fmt"
"io"
"net"
"sync"
)
var (
requests = make(map[int32]int32) // Map of Xid -> Opcode
requestsLock = &sync.Mutex{}
)
func trace(conn1, conn2 net.Conn, client bool) {
defer conn1.Close()
defer conn2.Close()
buf := make([]byte, 10*1024)
init := true
for {
_, err := io.ReadFull(conn1, buf[:4])
if err != nil {
fmt.Println("1>", client, err)
return
}
blen := int(binary.BigEndian.Uint32(buf[:4]))
_, err = io.ReadFull(conn1, buf[4:4+blen])
if err != nil {
fmt.Println("2>", client, err)
return
}
var cr interface{}
opcode := int32(-1)
readHeader := true
if client {
if init {
cr = &connectRequest{}
readHeader = false
} else {
xid := int32(binary.BigEndian.Uint32(buf[4:8]))
opcode = int32(binary.BigEndian.Uint32(buf[8:12]))
requestsLock.Lock()
requests[xid] = opcode
requestsLock.Unlock()
cr = requestStructForOp(opcode)
if cr == nil {
fmt.Printf("Unknown opcode %d\n", opcode)
}
}
} else {
if init {
cr = &connectResponse{}
readHeader = false
} else {
xid := int32(binary.BigEndian.Uint32(buf[4:8]))
zxid := int64(binary.BigEndian.Uint64(buf[8:16]))
errnum := int32(binary.BigEndian.Uint32(buf[16:20]))
if xid != -1 || zxid != -1 {
requestsLock.Lock()
found := false
opcode, found = requests[xid]
if !found {
opcode = 0
}
delete(requests, xid)
requestsLock.Unlock()
} else {
opcode = opWatcherEvent
}
cr = responseStructForOp(opcode)
if cr == nil {
fmt.Printf("Unknown opcode %d\n", opcode)
}
if errnum != 0 {
cr = &struct{}{}
}
}
}
opname := "."
if opcode != -1 {
opname = opNames[opcode]
}
if cr == nil {
fmt.Printf("%+v %s %+v\n", client, opname, buf[4:4+blen])
} else {
n := 4
hdrStr := ""
if readHeader {
var hdr interface{}
if client {
hdr = &requestHeader{}
} else {
hdr = &responseHeader{}
}
if n2, err := decodePacket(buf[n:n+blen], hdr); err != nil {
fmt.Println(err)
} else {
n += n2
}
hdrStr = fmt.Sprintf(" %+v", hdr)
}
if _, err := decodePacket(buf[n:n+blen], cr); err != nil {
fmt.Println(err)
}
fmt.Printf("%+v %s%s %+v\n", client, opname, hdrStr, cr)
}
init = false
written, err := conn2.Write(buf[:4+blen])
if err != nil {
fmt.Println("3>", client, err)
return
} else if written != 4+blen {
fmt.Printf("Written != read: %d != %d\n", written, blen)
return
}
}
}
func handleConnection(addr string, conn net.Conn) {
zkConn, err := net.Dial("tcp", addr)
if err != nil {
fmt.Println(err)
return
}
go trace(conn, zkConn, true)
trace(zkConn, conn, false)
}
func StartTracer(listenAddr, serverAddr string) {
ln, err := net.Listen("tcp", listenAddr)
if err != nil {
panic(err)
}
for {
conn, err := ln.Accept()
if err != nil {
fmt.Println(err)
continue
}
go handleConnection(serverAddr, conn)
}
}

View file

@ -0,0 +1,54 @@
package zk
import (
"crypto/sha1"
"encoding/base64"
"fmt"
"math/rand"
"strconv"
"strings"
)
// AuthACL produces an ACL list containing a single ACL which uses the
// provided permissions, with the scheme "auth", and ID "", which is used
// by ZooKeeper to represent any authenticated user.
func AuthACL(perms int32) []ACL {
return []ACL{{perms, "auth", ""}}
}
// WorldACL produces an ACL list containing a single ACL which uses the
// provided permissions, with the scheme "world", and ID "anyone", which
// is used by ZooKeeper to represent any user at all.
func WorldACL(perms int32) []ACL {
return []ACL{{perms, "world", "anyone"}}
}
func DigestACL(perms int32, user, password string) []ACL {
userPass := []byte(fmt.Sprintf("%s:%s", user, password))
h := sha1.New()
if n, err := h.Write(userPass); err != nil || n != len(userPass) {
panic("SHA1 failed")
}
digest := base64.StdEncoding.EncodeToString(h.Sum(nil))
return []ACL{{perms, "digest", fmt.Sprintf("%s:%s", user, digest)}}
}
// FormatServers takes a slice of addresses, and makes sure they are in a format
// that resembles <addr>:<port>. If the server has no port provided, the
// DefaultPort constant is added to the end.
func FormatServers(servers []string) []string {
for i := range servers {
if !strings.Contains(servers[i], ":") {
servers[i] = servers[i] + ":" + strconv.Itoa(DefaultPort)
}
}
return servers
}
// stringShuffle performs a Fisher-Yates shuffle on a slice of strings
func stringShuffle(s []string) {
for i := len(s) - 1; i > 0; i-- {
j := rand.Intn(i + 1)
s[i], s[j] = s[j], s[i]
}
}

View file

@ -0,0 +1,17 @@
package zk
import "testing"
func TestFormatServers(t *testing.T) {
servers := []string{"127.0.0.1:2181", "127.0.0.42", "127.0.42.1:8811"}
r := []string{"127.0.0.1:2181", "127.0.0.42:2181", "127.0.42.1:8811"}
var s []string
s = FormatServers(servers)
for i := range s {
if s[i] != r[i] {
t.Errorf("%v should equal %v", s[i], r[i])
}
}
}

View file

@ -0,0 +1,518 @@
package zk
import (
"fmt"
"io"
"net"
"strings"
"testing"
"time"
"camlistore.org/pkg/throttle"
)
func TestCreate(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
path := "/gozk-test"
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
}
}
func TestMulti(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
path := "/gozk-test"
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
ops := []interface{}{
&CreateRequest{Path: path, Data: []byte{1, 2, 3, 4}, Acl: WorldACL(PermAll)},
&SetDataRequest{Path: path, Data: []byte{1, 2, 3, 4}, Version: -1},
}
if res, err := zk.Multi(ops...); err != nil {
t.Fatalf("Multi returned error: %+v", err)
} else if len(res) != 2 {
t.Fatalf("Expected 2 responses got %d", len(res))
} else {
t.Logf("%+v", res)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error: %+v", err)
} else if stat == nil {
t.Fatal("Get returned nil stat")
} else if len(data) < 4 {
t.Fatal("Get returned wrong size data")
}
}
func TestGetSetACL(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
if err := zk.AddAuth("digest", []byte("blah")); err != nil {
t.Fatalf("AddAuth returned error %+v", err)
}
path := "/gozk-test"
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
if path, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != "/gozk-test" {
t.Fatalf("Create returned different path '%s' != '/gozk-test'", path)
}
expected := WorldACL(PermAll)
if acl, stat, err := zk.GetACL(path); err != nil {
t.Fatalf("GetACL returned error %+v", err)
} else if stat == nil {
t.Fatalf("GetACL returned nil Stat")
} else if len(acl) != 1 || expected[0] != acl[0] {
t.Fatalf("GetACL mismatch expected %+v instead of %+v", expected, acl)
}
expected = []ACL{{PermAll, "ip", "127.0.0.1"}}
if stat, err := zk.SetACL(path, expected, -1); err != nil {
t.Fatalf("SetACL returned error %+v", err)
} else if stat == nil {
t.Fatalf("SetACL returned nil Stat")
}
if acl, stat, err := zk.GetACL(path); err != nil {
t.Fatalf("GetACL returned error %+v", err)
} else if stat == nil {
t.Fatalf("GetACL returned nil Stat")
} else if len(acl) != 1 || expected[0] != acl[0] {
t.Fatalf("GetACL mismatch expected %+v instead of %+v", expected, acl)
}
}
func TestAuth(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
path := "/gozk-digest-test"
if err := zk.Delete(path, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
acl := DigestACL(PermAll, "user", "password")
if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, acl); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if p != path {
t.Fatalf("Create returned different path '%s' != '%s'", p, path)
}
if a, stat, err := zk.GetACL(path); err != nil {
t.Fatalf("GetACL returned error %+v", err)
} else if stat == nil {
t.Fatalf("GetACL returned nil Stat")
} else if len(a) != 1 || acl[0] != a[0] {
t.Fatalf("GetACL mismatch expected %+v instead of %+v", acl, a)
}
if _, _, err := zk.Get(path); err != ErrNoAuth {
t.Fatalf("Get returned error %+v instead of ErrNoAuth", err)
}
if err := zk.AddAuth("digest", []byte("user:password")); err != nil {
t.Fatalf("AddAuth returned error %+v", err)
}
if data, stat, err := zk.Get(path); err != nil {
t.Fatalf("Get returned error %+v", err)
} else if stat == nil {
t.Fatalf("Get returned nil Stat")
} else if len(data) != 4 {
t.Fatalf("Get returned wrong data length")
}
}
func TestChildWatch(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
children, stat, childCh, err := zk.ChildrenW("/")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
} else if stat == nil {
t.Fatal("Children returned nil stat")
} else if len(children) < 1 {
t.Fatal("Children should return at least 1 child")
}
if path, err := zk.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != "/gozk-test" {
t.Fatalf("Create returned different path '%s' != '/gozk-test'", path)
}
select {
case ev := <-childCh:
if ev.Err != nil {
t.Fatalf("Child watcher error %+v", ev.Err)
}
if ev.Path != "/" {
t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/")
}
case _ = <-time.After(time.Second * 2):
t.Fatal("Child watcher timed out")
}
// Delete of the watched node should trigger the watch
children, stat, childCh, err = zk.ChildrenW("/gozk-test")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
} else if stat == nil {
t.Fatal("Children returned nil stat")
} else if len(children) != 0 {
t.Fatal("Children should return 0 children")
}
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
select {
case ev := <-childCh:
if ev.Err != nil {
t.Fatalf("Child watcher error %+v", ev.Err)
}
if ev.Path != "/gozk-test" {
t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/")
}
case _ = <-time.After(time.Second * 2):
t.Fatal("Child watcher timed out")
}
}
func TestSetWatchers(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
zk.reconnectDelay = time.Second
zk2, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk2.Close()
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
testPath, err := zk.Create("/gozk-test-2", []byte{}, 0, WorldACL(PermAll))
if err != nil {
t.Fatalf("Create returned: %+v", err)
}
_, _, testEvCh, err := zk.GetW(testPath)
if err != nil {
t.Fatalf("GetW returned: %+v", err)
}
children, stat, childCh, err := zk.ChildrenW("/")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
} else if stat == nil {
t.Fatal("Children returned nil stat")
} else if len(children) < 1 {
t.Fatal("Children should return at least 1 child")
}
zk.conn.Close()
if err := zk2.Delete(testPath, -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
time.Sleep(time.Millisecond * 100)
if path, err := zk2.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create returned error: %+v", err)
} else if path != "/gozk-test" {
t.Fatalf("Create returned different path '%s' != '/gozk-test'", path)
}
select {
case ev := <-testEvCh:
if ev.Err != nil {
t.Fatalf("GetW watcher error %+v", ev.Err)
}
if ev.Path != testPath {
t.Fatalf("GetW watcher wrong path %s instead of %s", ev.Path, testPath)
}
case <-time.After(2 * time.Second):
t.Fatal("GetW watcher timed out")
}
select {
case ev := <-childCh:
if ev.Err != nil {
t.Fatalf("Child watcher error %+v", ev.Err)
}
if ev.Path != "/" {
t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/")
}
case <-time.After(2 * time.Second):
t.Fatal("Child watcher timed out")
}
}
func TestExpiringWatch(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode {
t.Fatalf("Delete returned error: %+v", err)
}
children, stat, childCh, err := zk.ChildrenW("/")
if err != nil {
t.Fatalf("Children returned error: %+v", err)
} else if stat == nil {
t.Fatal("Children returned nil stat")
} else if len(children) < 1 {
t.Fatal("Children should return at least 1 child")
}
zk.sessionID = 99999
zk.conn.Close()
select {
case ev := <-childCh:
if ev.Err != ErrSessionExpired {
t.Fatalf("Child watcher error %+v instead of expected ErrSessionExpired", ev.Err)
}
if ev.Path != "/" {
t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/")
}
case <-time.After(2 * time.Second):
t.Fatal("Child watcher timed out")
}
}
func TestRequestFail(t *testing.T) {
// If connecting fails to all servers in the list then pending requests
// should be errored out so they don't hang forever.
zk, _, err := Connect([]string{"127.0.0.1:32444"}, time.Second*15)
if err != nil {
t.Fatal(err)
}
defer zk.Close()
ch := make(chan error)
go func() {
_, _, err := zk.Get("/blah")
ch <- err
}()
select {
case err := <-ch:
if err == nil {
t.Fatal("Expected non-nil error on failed request due to connection failure")
}
case <-time.After(time.Second * 2):
t.Fatal("Get hung when connection could not be made")
}
}
func TestSlowServer(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
realAddr := fmt.Sprintf("127.0.0.1:%d", ts.Servers[0].Port)
proxyAddr, stopCh, err := startSlowProxy(t,
throttle.Rate{}, throttle.Rate{},
realAddr, func(ln *throttle.Listener) {
if ln.Up.Latency == 0 {
ln.Up.Latency = time.Millisecond * 2000
ln.Down.Latency = time.Millisecond * 2000
} else {
ln.Up.Latency = 0
ln.Down.Latency = 0
}
})
if err != nil {
t.Fatal(err)
}
defer close(stopCh)
zk, _, err := Connect([]string{proxyAddr}, time.Millisecond*500)
if err != nil {
t.Fatal(err)
}
defer zk.Close()
_, _, wch, err := zk.ChildrenW("/")
if err != nil {
t.Fatal(err)
}
// Force a reconnect to get a throttled connection
zk.conn.Close()
time.Sleep(time.Millisecond * 100)
if err := zk.Delete("/gozk-test", -1); err == nil {
t.Fatal("Delete should have failed")
}
// The previous request should have timed out causing the server to be disconnected and reconnected
if _, err := zk.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatal(err)
}
// Make sure event is still returned because the session should not have been affected
select {
case ev := <-wch:
t.Logf("Received event: %+v", ev)
case <-time.After(time.Second):
t.Fatal("Expected to receive a watch event")
}
}
func startSlowProxy(t *testing.T, up, down throttle.Rate, upstream string, adj func(ln *throttle.Listener)) (string, chan bool, error) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return "", nil, err
}
tln := &throttle.Listener{
Listener: ln,
Up: up,
Down: down,
}
stopCh := make(chan bool)
go func() {
<-stopCh
tln.Close()
}()
go func() {
for {
cn, err := tln.Accept()
if err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
t.Fatalf("Accept failed: %s", err.Error())
}
return
}
if adj != nil {
adj(tln)
}
go func(cn net.Conn) {
defer cn.Close()
upcn, err := net.Dial("tcp", upstream)
if err != nil {
t.Log(err)
return
}
// This will leave hanging goroutines util stopCh is closed
// but it doesn't matter in the context of running tests.
go func() {
<-stopCh
upcn.Close()
}()
go func() {
if _, err := io.Copy(upcn, cn); err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
// log.Printf("Upstream write failed: %s", err.Error())
}
}
}()
if _, err := io.Copy(cn, upcn); err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
// log.Printf("Upstream read failed: %s", err.Error())
}
}
}(cn)
}
}()
return ln.Addr().String(), stopCh, nil
}

5
NOTICE
View file

@ -66,3 +66,8 @@ gosnappy - a fork of code.google.com/p/snappy-go
https://github.com/syndtr/gosnappy
Copyright 2011 The Snappy-Go Authors
See https://github.com/syndtr/gosnappy/blob/master/LICENSE for license details.
go-zookeeper - Native ZooKeeper client for Go
https://github.com/samuel/go-zookeeper
Copyright (c) 2013, Samuel Stauffer <samuel@descolada.com>
See https://github.com/samuel/go-zookeeper/blob/master/LICENSE for license details.

View file

@ -85,6 +85,11 @@ var (
TagSeparator: ",",
Scheme: "http",
}
// The default Serverset SD configuration.
DefaultServersetSDConfig = ServersetSDConfig{
Timeout: Duration(10 * time.Second),
}
)
// Config is the top-level configuration for Prometheus's config files.
@ -204,6 +209,9 @@ type ScrapeConfig struct {
FileSDConfigs []*FileSDConfig `yaml:"file_sd_configs,omitempty"`
// List of Consul service discovery configurations.
ConsulSDConfigs []*ConsulSDConfig `yaml:"consul_sd_configs,omitempty"`
// List of Serverset service discovery configurations.
ServersetSDConfigs []*ServersetSDConfig `yaml:"serverset_sd_configs,omitempty"`
// List of target relabel configurations.
RelabelConfigs []*RelabelConfig `yaml:"relabel_configs,omitempty"`
// List of metric relabel configurations.
@ -406,6 +414,35 @@ func (c *ConsulSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
return checkOverflow(c.XXX, "consul_sd_config")
}
// ServersetSDConfig is the configuration for Twitter serversets in Zookeeper based discovery.
type ServersetSDConfig struct {
Servers []string `yaml:"servers"`
Paths []string `yaml:"paths"`
Timeout Duration `yaml:"timeout,omitempty"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultServersetSDConfig
type plain ServersetSDConfig
err := unmarshal((*plain)(c))
if err != nil {
return err
}
if len(c.Servers) == 0 {
return fmt.Errorf("serverset SD config must contain at least one Zookeeper server")
}
if len(c.Paths) == 0 {
return fmt.Errorf("serverset SD config must contain at least one path")
}
for _, path := range c.Paths {
if !strings.HasPrefix(path, "/") {
return fmt.Errorf("serverset SD config paths must begin with '/': %s", path)
}
}
return nil
}
// RelabelAction is the action to be performed on relabeling.
type RelabelAction string

View file

@ -0,0 +1,372 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"bytes"
"encoding/json"
"fmt"
"regexp"
"strings"
"sync"
"time"
"github.com/prometheus/log"
"github.com/samuel/go-zookeeper/zk"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
)
const (
serversetSourcePrefix = "serverset"
serversetNodePrefix = "member_"
serversetLabelPrefix = clientmodel.MetaLabelPrefix + "serverset_"
serversetStatusLabel = serversetLabelPrefix + "status"
serversetPathLabel = serversetLabelPrefix + "path"
serversetEndpointLabelPrefix = serversetLabelPrefix + "endpoint"
)
var (
invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
)
type serversetMember struct {
ServiceEndpoint serversetEndpoint
AdditionalEndpoints map[string]serversetEndpoint
Status string `json:"status"`
}
type serversetEndpoint struct {
Host string
Port int
}
type ZookeeperLogger struct {
}
// Implements zk.Logger
func (zl ZookeeperLogger) Printf(s string, i ...interface{}) {
log.Infof(s, i...)
}
// ServersetDiscovery retrieves target information from a Serverset server
// and updates them via watches.
type ServersetDiscovery struct {
conf *config.ServersetSDConfig
conn *zk.Conn
mu sync.RWMutex
sources map[string]*config.TargetGroup
sdUpdates *chan<- *config.TargetGroup
updates chan zookeeperTreeCacheEvent
runDone chan struct{}
treeCache *zookeeperTreeCache
}
// NewServersetDiscovery returns a new ServersetDiscovery for the given config.
func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery {
conn, _, err := zk.Connect(conf.Servers, time.Duration(conf.Timeout))
conn.SetLogger(ZookeeperLogger{})
if err != nil {
return nil
}
updates := make(chan zookeeperTreeCacheEvent)
sd := &ServersetDiscovery{
conf: conf,
conn: conn,
updates: updates,
sources: map[string]*config.TargetGroup{},
runDone: make(chan struct{}),
}
go sd.processUpdates()
sd.treeCache = NewZookeeperTreeCache(conn, conf.Paths[0], updates)
return sd
}
// Sources implements the TargetProvider interface.
func (sd *ServersetDiscovery) Sources() []string {
sd.mu.RLock()
defer sd.mu.RUnlock()
srcs := []string{}
for t := range sd.sources {
srcs = append(srcs, t)
}
return srcs
}
func (sd *ServersetDiscovery) processUpdates() {
defer sd.conn.Close()
for event := range sd.updates {
tg := &config.TargetGroup{
Source: serversetSourcePrefix + event.Path,
}
sd.mu.Lock()
if event.Data != nil {
labelSet, err := parseServersetMember(*event.Data, event.Path)
if err == nil {
tg.Targets = []clientmodel.LabelSet{*labelSet}
sd.sources[event.Path] = tg
} else {
delete(sd.sources, event.Path)
}
} else {
delete(sd.sources, event.Path)
}
sd.mu.Unlock()
if sd.sdUpdates != nil {
*sd.sdUpdates <- tg
}
}
if sd.sdUpdates != nil {
close(*sd.sdUpdates)
}
}
// Run implements the TargetProvider interface.
func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup) {
// Send on everything we have seen so far.
sd.mu.Lock()
for _, targetGroup := range sd.sources {
ch <- targetGroup
}
// Tell processUpdates to send future updates.
sd.sdUpdates = &ch
sd.mu.Unlock()
<-sd.runDone
sd.treeCache.Stop()
}
// Stop implements the TargetProvider interface.
func (sd *ServersetDiscovery) Stop() {
log.Debugf("Stopping serverset service discovery for %s %s", sd.conf.Servers, sd.conf.Paths)
// Terminate Run.
sd.runDone <- struct{}{}
log.Debugf("Serverset service discovery for %s %s stopped", sd.conf.Servers, sd.conf.Paths)
}
func parseServersetMember(data []byte, path string) (*clientmodel.LabelSet, error) {
member := serversetMember{}
err := json.Unmarshal(data, &member)
if err != nil {
return nil, fmt.Errorf("error unmarshaling serverset member %q: %s", path, err)
}
labels := clientmodel.LabelSet{}
labels[serversetPathLabel] = clientmodel.LabelValue(path)
labels[clientmodel.AddressLabel] = clientmodel.LabelValue(
fmt.Sprintf("%s:%d", member.ServiceEndpoint.Host, member.ServiceEndpoint.Port))
labels[serversetEndpointLabelPrefix+"_host"] = clientmodel.LabelValue(member.ServiceEndpoint.Host)
labels[serversetEndpointLabelPrefix+"_port"] = clientmodel.LabelValue(fmt.Sprintf("%d", member.ServiceEndpoint.Port))
for name, endpoint := range member.AdditionalEndpoints {
cleanName := clientmodel.LabelName(invalidLabelCharRE.ReplaceAllString(name, "_"))
labels[serversetEndpointLabelPrefix+"_host_"+cleanName] = clientmodel.LabelValue(
endpoint.Host)
labels[serversetEndpointLabelPrefix+"_port_"+cleanName] = clientmodel.LabelValue(
fmt.Sprintf("%d", endpoint.Port))
}
labels[serversetStatusLabel] = clientmodel.LabelValue(member.Status)
return &labels, nil
}
type zookeeperTreeCache struct {
conn *zk.Conn
prefix string
events chan zookeeperTreeCacheEvent
zkEvents chan zk.Event
stop chan struct{}
head *zookeeperTreeCacheNode
}
type zookeeperTreeCacheEvent struct {
Path string
Data *[]byte
}
type zookeeperTreeCacheNode struct {
data *[]byte
events chan zk.Event
done chan struct{}
stopped bool
children map[string]*zookeeperTreeCacheNode
}
func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan zookeeperTreeCacheEvent) *zookeeperTreeCache {
tc := &zookeeperTreeCache{
conn: conn,
prefix: path,
events: events,
stop: make(chan struct{}),
}
tc.head = &zookeeperTreeCacheNode{
events: make(chan zk.Event),
children: map[string]*zookeeperTreeCacheNode{},
}
err := tc.recursiveNodeUpdate(path, tc.head)
if err != nil {
log.Errorf("Error during initial read of Zookeeper: %s", err)
}
go tc.loop(err != nil)
return tc
}
func (tc *zookeeperTreeCache) Stop() {
tc.stop <- struct{}{}
}
func (tc *zookeeperTreeCache) loop(failureMode bool) {
retryChan := make(chan struct{})
failure := func() {
failureMode = true
time.AfterFunc(time.Second*10, func() {
retryChan <- struct{}{}
})
}
if failureMode {
failure()
}
for {
select {
case ev := <-tc.head.events:
log.Debugf("Received Zookeeper event: %s", ev)
if failureMode {
continue
}
if ev.Type == zk.EventNotWatching {
log.Infof("Lost connection to Zookeeper.")
failure()
} else {
path := strings.TrimPrefix(ev.Path, tc.prefix)
parts := strings.Split(path, "/")
node := tc.head
for _, part := range parts[1:] {
childNode := node.children[part]
if childNode == nil {
childNode = &zookeeperTreeCacheNode{
events: tc.head.events,
children: map[string]*zookeeperTreeCacheNode{},
done: make(chan struct{}, 1),
}
node.children[part] = childNode
}
node = childNode
}
err := tc.recursiveNodeUpdate(ev.Path, node)
if err != nil {
log.Errorf("Error during processing of Zookeeper event: %s", err)
failure()
}
}
case <-retryChan:
log.Infof("Attempting to resync state with Zookeeper")
err := tc.recursiveNodeUpdate(tc.prefix, tc.head)
if err == nil {
failureMode = false
} else {
log.Errorf("Error during Zookeeper resync: %s", err)
failure()
}
case <-tc.stop:
close(tc.events)
return
}
}
}
func (tc *zookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTreeCacheNode) error {
data, _, dataWatcher, err := tc.conn.GetW(path)
if err == zk.ErrNoNode {
tc.recursiveDelete(path, node)
return nil
} else if err != nil {
return err
}
if node.data == nil || !bytes.Equal(*node.data, data) {
node.data = &data
tc.events <- zookeeperTreeCacheEvent{Path: path, Data: node.data}
}
children, _, childWatcher, err := tc.conn.ChildrenW(path)
if err == zk.ErrNoNode {
tc.recursiveDelete(path, node)
return nil
} else if err != nil {
return err
}
currentChildren := map[string]struct{}{}
for _, child := range children {
currentChildren[child] = struct{}{}
childNode := node.children[child]
// Does not already exists, create it.
if childNode == nil {
node.children[child] = &zookeeperTreeCacheNode{
events: node.events,
children: map[string]*zookeeperTreeCacheNode{},
done: make(chan struct{}, 1),
}
}
err = tc.recursiveNodeUpdate(path+"/"+child, node.children[child])
if err != nil {
return err
}
}
// Remove nodes that no longer exist
for name, childNode := range node.children {
if _, ok := currentChildren[name]; !ok || node.data == nil {
tc.recursiveDelete(path+"/"+name, childNode)
delete(node.children, name)
}
}
go func() {
// Pass up zookeeper events, until the node is deleted.
select {
case event := <-dataWatcher:
node.events <- event
case event := <-childWatcher:
node.events <- event
case <-node.done:
}
}()
return nil
}
func (tc *zookeeperTreeCache) recursiveDelete(path string, node *zookeeperTreeCacheNode) {
if !node.stopped {
node.done <- struct{}{}
node.stopped = true
}
if node.data != nil {
tc.events <- zookeeperTreeCacheEvent{Path: path, Data: nil}
node.data = nil
}
for name, childNode := range node.children {
tc.recursiveDelete(path+"/"+name, childNode)
}
}

View file

@ -389,6 +389,9 @@ func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
for _, c := range cfg.ConsulSDConfigs {
providers = append(providers, discovery.NewConsulDiscovery(c))
}
for _, c := range cfg.ServersetSDConfigs {
providers = append(providers, discovery.NewServersetDiscovery(c))
}
if len(cfg.TargetGroups) > 0 {
providers = append(providers, NewStaticProvider(cfg.TargetGroups))
}