server/table_handler.go (230 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package server
import (
"encoding/json"
"errors"
"fmt"
"mime"
"net/http"
"strconv"
"strings"
"time"
"github.com/uber/storagetapper/config"
"github.com/uber/storagetapper/log"
"github.com/uber/storagetapper/pipe"
"github.com/uber/storagetapper/state"
"github.com/uber/storagetapper/types"
)
type tableCmdReq struct {
Cmd string
Cluster string
Service string
DB string
Table string
Input string
Output string
OutputFormat string
Offset int64
Limit int64
Filter string
Type string //if set to state list shards, registrations otherwise
PublishSchema string
Params string
Version int
CreateTopic bool
AutoVersion bool
}
type tableListResponse struct {
Cluster string `json:"cluster"`
Service string `json:"service"`
DB string `json:"db"`
Table string `json:"table"`
Input string `json:"input"`
Output string `json:"output"`
Version int `json:"version"`
OutputFormat string `json:"outputFormat"`
SnapshottedAt time.Time `json:"snapshottedAt"`
NeedSnapshot bool `json:"needSnapshot"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
Params string `json:"params,omitempty"`
}
func checkSQLFormat(output, format string) bool {
return output != "postgres" && output != "mysql" && output != "clickhouse" || ((output == "postgres" || output == "mysql") && strings.HasPrefix(format, "ansisql")) || ((output == "mysql" || output == "clickhouse") && strings.HasPrefix(format, "mysql"))
}
func fillSQLFormat(t *tableCmdReq) {
if t.OutputFormat == "" {
if t.Output == "mysql" || t.Output == "clickhouse" {
t.OutputFormat = "mysql"
} else if t.Output == "postgres" {
t.OutputFormat = "ansisql"
}
}
}
func handleAddCmd(_ http.ResponseWriter, t *tableCmdReq) error {
fillSQLFormat(t)
if !checkSQLFormat(t.Output, t.OutputFormat) {
return fmt.Errorf("incompatible output format. MySQL, Postgres, ClickHouse outputs only support SQL output format")
}
if t.Params != "" {
v := &config.TableParams{}
err := json.Unmarshal([]byte(t.Params), v)
if err != nil {
return fmt.Errorf("invalid table params value: %v", err)
}
}
if t.AutoVersion {
var err error
t.Version, err = state.TableMaxVersion(t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output)
if err != nil {
return err
}
t.Version++
}
if t.PublishSchema != "" && t.Input == types.InputMySQL {
err := SchemaRegister(t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output, t.Version, t.OutputFormat, t.PublishSchema, t.CreateTopic && t.Output == "kafka")
if err != nil {
return err
}
}
tn, err := config.Get().GetChangelogTopicName(t.Service, t.DB, t.Table, t.Input, t.Output, t.Version, time.Now())
if err != nil {
return err
}
//TODO: Implement generic interface for handing offsets in pipe
if err := pipe.DeleteKafkaOffsets(tn, state.GetDB()); err != nil {
return err
}
if !state.RegisterTable(t.Cluster, t.Service, t.DB, t.Table, t.Input, t.Output, t.Version, t.OutputFormat, t.Params) {
return errors.New("error registering table")
}
return nil
}
func handleDelCmd(_ http.ResponseWriter, t *tableCmdReq) error {
if !state.DeregisterTable(t.Cluster, t.Service, t.DB, t.Table, t.Input, t.Output, t.Version) {
return errors.New("error deregistering table")
}
return nil
}
func handleListCmd(w http.ResponseWriter, t *tableCmdReq) error {
var err error
var cond string
var args = make([]interface{}, 0)
cond, args = state.AddSQLCond(cond, args, "AND", "cluster", "=", t.Cluster)
cond, args = state.AddSQLCond(cond, args, "AND", "service", "=", t.Service)
cond, args = state.AddSQLCond(cond, args, "AND", "db", "=", t.DB)
cond, args = state.AddSQLCond(cond, args, "AND", "table_name", "=", t.Table)
cond, args = state.AddSQLCond(cond, args, "AND", "input", "=", t.Input)
cond, args = state.AddSQLCond(cond, args, "AND", "output", "=", t.Output)
if t.Version != 0 {
cond, args = state.AddSQLCond(cond, args, "AND", "version", "=", fmt.Sprintf("%d", t.Version))
}
cond, args = addFilter(cond, args, []string{"cluster", "service", "db", "table_name", "input", "output", "output_format"}, t.Filter)
if t.Offset != 0 || t.Limit != 0 {
if t.Limit == 0 && t.Offset != 0 {
t.Limit = int64((^uint64(0)) >> 1) //MaxInt
}
cond += fmt.Sprintf(" LIMIT %v,%v", t.Offset, t.Limit)
}
var rows state.Type
if t.Type == "state" {
rows, err = state.GetCond(cond, args...)
} else {
rows, err = state.GetRegCond(cond, args...)
}
if err == nil {
var resp []byte
for _, v := range rows {
if v.Cluster == "" {
v.Cluster = "*"
}
if v.DB == "" {
v.DB = "*"
}
var b []byte
if b, err = json.Marshal(&tableListResponse{Cluster: v.Cluster, Service: v.Service, DB: v.DB, Table: v.Table, Input: v.Input, Output: v.Output, Version: v.Version, OutputFormat: v.OutputFormat, SnapshottedAt: v.SnapshottedAt, NeedSnapshot: v.NeedSnapshot, Params: v.ParamsRaw, CreatedAt: v.CreatedAt, UpdatedAt: v.UpdatedAt}); err != nil {
break
}
resp = append(resp, b...)
resp = append(resp, '\n')
}
if err == nil {
_, err = w.Write(resp)
}
}
return err
}
func parseTableForm(w http.ResponseWriter, r *http.Request) *tableCmdReq {
var err error
t := tableCmdReq{}
ct, _, _ := mime.ParseMediaType(r.Header.Get("Content-Type"))
switch {
case ct == "application/x-www-form-urlencoded", ct == "multipart/form-data", ct == "":
t.Cmd = r.FormValue("cmd")
t.Cluster = r.FormValue("cluster")
t.Service = r.FormValue("service")
t.DB = r.FormValue("db")
t.Table = r.FormValue("table")
t.Input = strings.ToLower(r.FormValue("input"))
t.Output = strings.ToLower(r.FormValue("output"))
t.OutputFormat = strings.ToLower(r.FormValue("outputFormat"))
t.Filter = r.FormValue("filter")
t.Type = r.FormValue("type")
t.PublishSchema = strings.ToLower(r.FormValue("publishSchema"))
s := strings.ToLower(r.FormValue("createTopic"))
t.CreateTopic = s == "true" || s == "1"
if r.FormValue("version") != "" {
i, err := strconv.ParseInt(r.FormValue("version"), 10, 32)
if log.E(err) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return nil
}
t.Version = int(i)
}
if t.Offset, t.Limit, err = parsePagination(r); log.E(err) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return nil
}
t.Params = r.FormValue("params")
case ct == "application/json":
if err := json.NewDecoder(r.Body).Decode(&t); log.E(err) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return nil
}
default:
code := http.StatusUnsupportedMediaType
http.Error(w, http.StatusText(code), code)
return nil
}
return &t
}
func outputSQL(s string) bool {
return s != "mysql" && s != "postgres" && s != "clickhouse"
}
func tableCmd(w http.ResponseWriter, r *http.Request) {
var err error
t := parseTableForm(w, r)
if t == nil {
return
}
if t.Cmd == "list" {
err = handleListCmd(w, t)
} else if len(t.Service) == 0 || len(t.Cluster) == 0 || len(t.DB) == 0 || len(t.Table) == 0 {
err = errors.New("invalid command, parameters(service,cluster,db,table) must not be empty")
} else if t.Cmd == "add" && len(t.OutputFormat) == 0 && !outputSQL(t.Output) {
err = errors.New("invalid add command. outputFormat must not be empty")
} else if (t.Cmd == "del" || t.Cmd == "add") && (len(t.Input) == 0 || len(t.Output) == 0) {
err = fmt.Errorf("parameters(input,output) must not be empty for '%v' command", t.Cmd)
} else if t.Cmd == "del" {
err = handleDelCmd(w, t)
} else if t.Cmd == "add" {
err = handleAddCmd(w, t)
} else {
err = errors.New("unknown command (possible commands add/del/list)")
}
if err != nil {
log.Errorf("Table http: cmd=%v, service=%v, cluster=%v, db=%v, table=%v, input=%v, output=%v, version=%v, outputFormat=%v, error=%v", t.Cmd, t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output, t.Version, t.OutputFormat, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}