server/cluster_handler.go (154 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package server
import (
"database/sql"
"encoding/json"
"fmt"
"mime"
"net/http"
"strconv"
"errors"
"github.com/uber/storagetapper/db"
"github.com/uber/storagetapper/log"
"github.com/uber/storagetapper/state"
)
//clusterInfoReq body of register/deregister cluster request
//Contains all the information required to db to the cluster
type clusterInfoReq struct {
Cmd string
Name string
Host string
Port uint16
User string
Pw string
Offset int64
Limit int64
Filter string
}
func handleClusterPosition(w http.ResponseWriter, name string) error {
gtid, seqno, err := state.GetGTID(name)
if err != nil {
return err
}
var b []byte
if b, err = json.Marshal(struct {
SeqNo int64
GTIDSet string
}{SeqNo: seqno, GTIDSet: gtid}); err != nil {
return err
}
_, err = w.Write(b)
return err
}
func handleClusterListCmd(w http.ResponseWriter, t *clusterInfoReq) error {
var err error
var cond string
var args = make([]interface{}, 0)
cond, args = state.AddSQLCond(cond, args, "AND", "name", "=", t.Name)
cond, args = state.AddSQLCond(cond, args, "AND", "host", "=", t.Host)
cond, args = state.AddSQLCond(cond, args, "AND", "user", "=", t.User)
if t.Port != 0 {
cond, args = state.AddSQLCond(cond, args, "AND", "port", "=", fmt.Sprintf("%+v", t.Port))
}
cond, args = addFilter(cond, args, []string{"name", "host", "user"}, t.Filter)
if cond != "" {
cond = " WHERE " + cond
}
if t.Offset != 0 || t.Limit != 0 {
if t.Limit == 0 && t.Offset != 0 {
t.Limit = int64((^uint64(0)) >> 1) //MaxInt
}
cond += fmt.Sprintf(" LIMIT %v,%v", t.Offset, t.Limit)
}
var rows []db.Addr
if rows, err = state.GetClusterInfo(cond, args...); err == nil {
var resp []byte
for _, v := range rows {
var b []byte
if b, err = json.Marshal(&db.Addr{Name: v.Name, Host: v.Host, Port: v.Port, User: v.User}); err != nil {
break
}
resp = append(resp, b...)
resp = append(resp, '\n')
}
if err == nil {
_, err = w.Write(resp)
}
}
return err
}
func parsePagination(r *http.Request) (offset int64, limit int64, err error) {
if r.FormValue("offset") != "" {
offset, err = strconv.ParseInt(r.FormValue("offset"), 10, 64)
if err != nil {
return
}
}
if r.FormValue("limit") != "" {
limit, err = strconv.ParseInt(r.FormValue("limit"), 10, 64)
if err != nil {
return
}
}
return
}
func clusterInfoCmd(w http.ResponseWriter, r *http.Request) {
var err error
s := clusterInfoReq{}
ct, _, _ := mime.ParseMediaType(r.Header.Get("Content-Type"))
switch {
case ct == "application/x-www-form-urlencoded", ct == "multipart/form-data", ct == "":
s.Cmd = r.FormValue("cmd")
s.Name = r.FormValue("name")
s.Host = r.FormValue("host")
s.User = r.FormValue("user")
s.Filter = r.FormValue("filter")
s.Pw = r.FormValue("pw")
if r.FormValue("port") != "" {
i, err := strconv.ParseInt(r.FormValue("port"), 10, 16)
if log.E(err) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
s.Port = uint16(i)
}
if s.Offset, s.Limit, err = parsePagination(r); log.E(err) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
case ct == "application/json":
if err := json.NewDecoder(r.Body).Decode(&s); log.E(err) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
default:
code := http.StatusUnsupportedMediaType
http.Error(w, http.StatusText(code), code)
return
}
if s.Cmd == "list" {
err = handleClusterListCmd(w, &s)
} else if len(s.Name) == 0 {
err = errors.New("invalid command. Name cannot be empty")
} else if s.Cmd == "add" {
if len(s.Host) == 0 || len(s.User) == 0 {
err = errors.New("invalid 'add' command. Host and User cannot be empty")
} else {
err = state.InsertClusterInfo(&db.Addr{Name: s.Name, Host: s.Host, Port: s.Port, User: s.User, Pwd: s.Pw})
}
} else if s.Cmd == "del" {
err = state.DeleteClusterInfo(s.Name)
} else if s.Cmd == "pos" {
err = handleClusterPosition(w, s.Name)
} else {
err = errors.New("unknown command (possible commands: add/del)")
}
if err != nil {
if err == sql.ErrNoRows {
http.Error(w, err.Error(), http.StatusNotFound)
} else {
log.Errorf("Cluster http: cmd=%v, name=%v, error=%v", s.Cmd, s.Name, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}