flw/flw.go (183 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
*/
package flw
import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"net"
"regexp"
"strconv"
"strings"
"time"
)
// Srvr is a FourLetterWord helper function. In particular, this function pulls the "srvr" output
// from the zookeeper instance and parses the output. A *ServerStats struct is returned
// as well as an error value to indicate whether this function processed successfully.
func (c *Client) Srvr(server string) (*ServerStats, error) {
// different parts of the regular expression that are required to parse the srvr output
const (
zrVer = `^Zookeeper version: ([A-Za-z0-9\.\-]+), built on (\d\d/\d\d/\d\d\d\d \d\d:\d\d [A-Za-z0-9:\+\-]+)`
zrLat = `^Latency min/avg/max: (\d+)/([0-9.]+)/(\d+)`
zrNet = `^Received: (\d+).*\n^Sent: (\d+).*\n^Connections: (\d+).*\n^Outstanding: (\d+)`
zrState = `^Zxid: (0x[A-Za-z0-9]+).*\n^Mode: (\w+).*\n^Node count: (\d+)`
)
// build the regex from the pieces above
re, err := regexp.Compile(fmt.Sprintf(`(?m:\A%v.*\n%v.*\n%v.*\n%v)`, zrVer, zrLat, zrNet, zrState))
if err != nil {
return nil, fmt.Errorf("error compiling srvr response regex: %w", err)
}
response, err := fourLetterWord(server, "srvr", c.Timeout)
if err != nil {
return nil, fmt.Errorf("invalid srvr response: %w", err)
}
matches := re.FindAllStringSubmatch(string(response), -1)
if matches == nil {
return nil, fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
}
match := matches[0][1:]
// determine current server
var srvrMode Mode
switch match[10] {
case "leader":
srvrMode = ModeLeader
case "follower":
srvrMode = ModeFollower
case "standalone":
srvrMode = ModeStandalone
default:
srvrMode = ModeUnknown
}
buildTime, err := time.Parse("01/02/2006 15:04 MST", match[1])
if err != nil {
return nil, fmt.Errorf("error parsing srvr response: %w", err)
}
parsedInt, err := strconv.ParseInt(match[9], 0, 64)
if err != nil {
return nil, fmt.Errorf("error parsing srvr response: %w", err)
}
// the ZxID value is an int64 with two int32s packed inside
// the high int32 is the epoch (i.e., number of leader elections)
// the low int32 is the counter
epoch := int32(parsedInt >> 32)
counter := int32(parsedInt & 0xFFFFFFFF)
// within the regex above, these values must be numerical
// so we can avoid useless checking of the error return value
minLatency, _ := strconv.ParseInt(match[2], 0, 64)
avgLatency, _ := strconv.ParseFloat(match[3], 64)
maxLatency, _ := strconv.ParseInt(match[4], 0, 64)
recv, _ := strconv.ParseInt(match[5], 0, 64)
sent, _ := strconv.ParseInt(match[6], 0, 64)
cons, _ := strconv.ParseInt(match[7], 0, 64)
outs, _ := strconv.ParseInt(match[8], 0, 64)
ncnt, _ := strconv.ParseInt(match[11], 0, 64)
return &ServerStats{
Sent: sent,
Received: recv,
NodeCount: ncnt,
MinLatency: minLatency,
AvgLatency: avgLatency,
MaxLatency: maxLatency,
Connections: cons,
Outstanding: outs,
Epoch: epoch,
Counter: counter,
BuildTime: buildTime,
Mode: srvrMode,
Version: match[0],
}, nil
}
// Ruok is a FourLetterWord helper function. In particular, this function
// pulls the "ruok" output of a server.
func (c *Client) Ruok(server string) error {
response, err := fourLetterWord(server, "ruok", c.Timeout)
if err != nil {
return fmt.Errorf("error calling ruok FLW: %w", err)
}
if string(response[:4]) != "imok" {
return fmt.Errorf("invalid ruok response from server: %s", response[:4])
}
return nil
}
// Cons is a FourLetterWord helper function. In particular, this function
// pulls the "cons" output from a server.
// As with Srvr, the error value indicates whether the request had an issue.
func (c *Client) Cons(server string) ([]*ServerClient, error) {
const (
zrAddr = `^ /((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?):(?:\d+))\[\d+\]`
zrPac = `\(queued=(\d+),recved=(\d+),sent=(\d+),sid=(0x[A-Za-z0-9]+),lop=(\w+),est=(\d+),to=(\d+),`
zrSesh = `lcxid=(0x[A-Za-z0-9]+),lzxid=(0x[A-Za-z0-9]+),lresp=(\d+),llat=(\d+),minlat=(\d+),avglat=(\d+),maxlat=(\d+)\)`
)
re, err := regexp.Compile(fmt.Sprintf("%v%v%v", zrAddr, zrPac, zrSesh))
if err != nil {
return nil, fmt.Errorf("error compiling cons response regex: %w", err)
}
response, err := fourLetterWord(server, "cons", c.Timeout)
if err != nil {
return nil, fmt.Errorf("error parsing cons response: %w", err)
}
scan := bufio.NewScanner(bytes.NewReader(response))
var clients []*ServerClient
for scan.Scan() {
line := scan.Bytes()
if len(line) == 0 {
continue
}
m := re.FindAllStringSubmatch(string(line), -1)
if m == nil {
return nil, fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
}
match := m[0][1:]
queued, _ := strconv.ParseInt(match[1], 0, 64)
recvd, _ := strconv.ParseInt(match[2], 0, 64)
sent, _ := strconv.ParseInt(match[3], 0, 64)
sid, _ := strconv.ParseInt(match[4], 0, 64)
est, _ := strconv.ParseInt(match[6], 0, 64)
timeout, _ := strconv.ParseInt(match[7], 0, 32)
lcxid, _ := parseInt64(match[8])
lzxid, _ := parseInt64(match[9])
lresp, _ := strconv.ParseInt(match[10], 0, 64)
llat, _ := strconv.ParseInt(match[11], 0, 32)
minlat, _ := strconv.ParseInt(match[12], 0, 32)
avglat, _ := strconv.ParseInt(match[13], 0, 32)
maxlat, _ := strconv.ParseInt(match[14], 0, 32)
clients = append(clients, &ServerClient{
Queued: queued,
Received: recvd,
Sent: sent,
SessionID: sid,
Lcxid: int64(lcxid),
Lzxid: int64(lzxid),
Timeout: int32(timeout),
LastLatency: int32(llat),
MinLatency: int32(minlat),
AvgLatency: int32(avglat),
MaxLatency: int32(maxlat),
Established: time.Unix(est, 0),
LastResponse: time.Unix(lresp, 0),
Addr: match[0],
LastOperation: match[5],
})
}
return clients, nil
}
// Srvr executes the srvr FLW protocol function using the default client.
func Srvr(server string) (*ServerStats, error) {
defaultClient := &Client{Timeout: defaultTimeout}
return defaultClient.Srvr(server)
}
// Ruok executes the ruok FLW protocol function using the default client.
func Ruok(server string) error {
defaultClient := &Client{Timeout: defaultTimeout}
return defaultClient.Ruok(server)
}
// Cons executes the cons FLW protocol function using the default client.
func Cons(server string) ([]*ServerClient, error) {
defaultClient := &Client{Timeout: defaultTimeout}
return defaultClient.Cons(server)
}
// parseInt64 is similar to strconv.ParseInt, but it also handles hex values that represent negative numbers
func parseInt64(s string) (int64, error) {
if strings.HasPrefix(s, "0x") {
i, err := strconv.ParseUint(s, 0, 64)
return int64(i), err
}
return strconv.ParseInt(s, 0, 64)
}
func fourLetterWord(server, command string, timeout time.Duration) ([]byte, error) {
conn, err := net.DialTimeout("tcp", server, timeout)
if err != nil {
return nil, err
}
// the zookeeper server should automatically close this socket
// once the command has been processed, but better safe than sorry
defer conn.Close()
if err = conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
_, err = conn.Write([]byte(command))
if err != nil {
return nil, err
}
if err = conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
return ioutil.ReadAll(conn)
}