encoder/sql.go (242 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package encoder
import (
"bytes"
"encoding/hex"
"fmt"
"strconv"
"time"
"github.com/uber/storagetapper/types"
"github.com/uber/storagetapper/util"
)
func init() {
registerPlugin("ansisql", initAnsiSQLEncoder)
registerPlugin("mysql", initMySQLEncoder)
registerPlugin("ansisql_idempotent", initAnsiSQLIdempotentEncoder)
registerPlugin("mysql_idempotent", initMySQLIdempotentEncoder)
}
type sqlEncoder struct {
typ string
c jsonEncoder
insertPrefix string
deletePrefix string
identQuote string
idempotentInsert bool
}
func initEncoder(tp, service, db, table, input string, output string, version int, quote string, ii bool) (Encoder, error) {
return &sqlEncoder{typ: tp, c: jsonEncoder{Service: service, DB: db, Table: table, Input: input, Output: output, Version: version}, identQuote: quote, idempotentInsert: ii}, nil
}
func initMySQLEncoder(service, db, table, input string, output string, version int) (Encoder, error) {
return initEncoder("mysql", service, db, table, input, output, version, "`", false)
}
func initAnsiSQLEncoder(service, db, table, input string, output string, version int) (Encoder, error) {
return initEncoder("ansisql", service, db, table, input, output, version, "\"", false)
}
func initMySQLIdempotentEncoder(service, db, table, input string, output string, version int) (Encoder, error) {
return initEncoder("mysql_idempotent", service, db, table, input, output, version, "`", true)
}
func initAnsiSQLIdempotentEncoder(service, db, table, input string, output string, version int) (Encoder, error) {
return initEncoder("ansisql_idempotent", service, db, table, input, output, version, "\"", true)
}
//Schema returns table schema
func (e *sqlEncoder) Schema() *types.TableSchema {
return e.c.inSchema
}
//EncodeSchema encodes current output schema
func (e *sqlEncoder) EncodeSchema(seqno uint64) ([]byte, error) {
return []byte(e.appendSchema(e.c.inSchema, e.c.filter)), nil
}
func (e *sqlEncoder) quotedIdent(id string) string {
return e.identQuote + util.EscapeQuotes(id, e.identQuote[0]) + e.identQuote
}
/*UpdateCodec refreshes the schema from state DB */
func (e *sqlEncoder) UpdateCodec() error {
if err := e.c.UpdateCodec(); err != nil {
return err
}
t := e.quotedIdent(e.c.Table)
e.insertPrefix = "INSERT INTO " + t + " (" + e.appendFieldNames(e.c.inSchema, e.c.filter, false) + ") VALUES ("
e.deletePrefix = "DELETE FROM " + t + " WHERE"
return nil
}
//CommonFormat encodes common format event into byte array
func (e *sqlEncoder) CommonFormat(cf *types.CommonFormatEvent) ([]byte, error) {
if cf.Type == "schema" {
if err := e.c.UpdateCodec(); err != nil {
return nil, err
}
}
//FIXME: Assumes that cf is in input schema format, so we need to filter it
//to conform to output schema
return e.CommonFormatEncode(cf)
}
//Type returns this encoder type
func (e *sqlEncoder) Type() string {
return e.typ
}
func (e *sqlEncoder) encodeSQLValue(d interface{}) string {
switch v := d.(type) {
case nil:
return "NULL"
case string:
return "'" + util.MySQLEscape(false, v) + "'"
case []byte:
return "0x" + hex.EncodeToString(v)
default:
return fmt.Sprintf("%v", d)
}
}
func bufWrite(b *bytes.Buffer, s string) {
_, _ = b.WriteString(s)
}
func (e *sqlEncoder) appendSetFields(b *bytes.Buffer, seqno uint64) {
for i := 0; i < len(e.c.inSchema.Columns); i++ {
if e.c.inSchema.Columns[i].Key == "PRI" {
continue
}
cn := e.quotedIdent(e.c.inSchema.Columns[i].Name)
bufWrite(b, cn)
bufWrite(b, "= IF(seqno < VALUES(seqno), VALUES(")
bufWrite(b, cn)
bufWrite(b, "),")
bufWrite(b, cn)
bufWrite(b, "),")
}
bufWrite(b, " seqno = IF(seqno < VALUES(seqno), VALUES(seqno), seqno)")
bufWrite(b, ";")
}
func (e *sqlEncoder) appendPKWhere(b *bytes.Buffer, row []interface{}, cf *types.CommonFormatEvent, seqno uint64, seqnop string) error {
bufWrite(b, " ")
bufWrite(b, e.quotedIdent("seqno"))
bufWrite(b, seqnop)
bufWrite(b, strconv.FormatUint(seqno, 10))
var k int
for i := 0; i < len(e.c.inSchema.Columns); i++ {
if e.c.inSchema.Columns[i].Key != "PRI" {
continue
}
if len(e.deletePrefix) != b.Len() {
bufWrite(b, " AND ")
}
bufWrite(b, e.quotedIdent(e.c.inSchema.Columns[i].Name))
_ = b.WriteByte('=')
if row != nil {
bufWrite(b, e.encodeSQLValue(row[i]))
} else if k < len(cf.Key) {
bufWrite(b, e.encodeSQLValue(cf.Key[k]))
k++
} else {
return fmt.Errorf("broken event. key in the schema has more fields. event key len: %v", len(cf.Key))
}
}
return nil
}
func (e *sqlEncoder) appendFieldNames(schema *types.TableSchema, filter []int, pk bool) string {
var fieldNames string
if !pk {
fieldNames += e.quotedIdent("seqno")
}
for i, j := 0, 0; i < len(schema.Columns); i++ {
if pk && schema.Columns[i].Key != "PRI" {
continue
}
if filteredField(filter, i, &j) {
continue
}
if len(fieldNames) != 0 {
fieldNames += ","
}
fieldNames += e.quotedIdent(schema.Columns[i].Name)
}
return fieldNames
}
func (e *sqlEncoder) appendSchemaFields(schema *types.TableSchema, filter []int) string {
fieldNames := e.quotedIdent("seqno") + " BIGINT NOT NULL, "
for i, j := 0, 0; i < len(schema.Columns); i++ {
if filteredField(filter, i, &j) {
continue
}
fieldNames += e.quotedIdent(schema.Columns[i].Name) + " " + schema.Columns[i].Type
if schema.Columns[i].IsNullable != "YES" {
fieldNames += " NOT NULL"
}
fieldNames += ", "
}
return fieldNames
}
func (e *sqlEncoder) appendSchema(schema *types.TableSchema, filter []int) string {
return "CREATE TABLE " + e.quotedIdent(e.c.Table) + " (" + e.appendSchemaFields(e.c.inSchema, e.c.filter) + "UNIQUE KEY(" + e.quotedIdent("seqno") + "), PRIMARY KEY (" + e.appendFieldNames(e.c.inSchema, e.c.filter, true) + "));"
}
func (e *sqlEncoder) appendFields(b *bytes.Buffer, row []interface{}, cf *types.CommonFormatEvent, seqno uint64) {
bufWrite(b, strconv.FormatUint(seqno, 10))
for i, j := 0, 0; i < len(e.c.inSchema.Columns); i++ {
if filteredField(e.c.filter, i, &j) {
continue
}
_ = b.WriteByte(',')
if row == nil {
bufWrite(b, e.encodeSQLValue((*cf.Fields)[i].Value))
} else {
bufWrite(b, e.encodeSQLValue(row[i]))
}
}
}
func (e *sqlEncoder) appendIdempotentInsert(b *bytes.Buffer, row []interface{}, cf *types.CommonFormatEvent, seqno uint64, ts time.Time) error {
if err := e.appendStmt(b, types.Insert, row, cf, seqno, ts); err != nil {
return err
}
bufWrite(b, " ON DUPLICATE KEY UPDATE ")
e.appendSetFields(b, seqno)
return nil
}
func (e *sqlEncoder) appendStmt(b *bytes.Buffer, tp int, row []interface{}, cf *types.CommonFormatEvent, seqno uint64, _ time.Time) error {
switch tp {
case types.Insert:
bufWrite(b, e.insertPrefix)
e.appendFields(b, row, cf, seqno)
bufWrite(b, ")")
if !e.idempotentInsert {
bufWrite(b, ";")
}
case types.Delete:
bufWrite(b, e.deletePrefix)
if err := e.appendPKWhere(b, row, cf, seqno, "="); err != nil {
return err
}
_ = b.WriteByte(';')
case types.Schema:
bufWrite(b, e.appendSchema(e.c.inSchema, e.c.filter))
default:
return fmt.Errorf("unknown event type %v", tp)
}
return nil
}
//Row encodes row into CommonFormat
func (e *sqlEncoder) rowLow(tp int, row []interface{}, cf *types.CommonFormatEvent, seqno uint64, ts time.Time) ([]byte, error) {
var b bytes.Buffer
var err error
if tp == types.Insert && e.idempotentInsert {
err = e.appendIdempotentInsert(&b, row, cf, seqno, ts)
} else {
err = e.appendStmt(&b, tp, row, cf, seqno, ts)
}
return b.Bytes(), err
}
//Row encodes row into CommonFormat
func (e *sqlEncoder) Row(tp int, row *[]interface{}, seqno uint64, ts time.Time) ([]byte, error) {
var r []interface{}
if row != nil {
r = *row
}
return e.rowLow(tp, r, nil, seqno, ts)
}
// CommonFormatEncode encodes CommonFormatEvent into byte array based on the message pack
// encoding system
// By overriding these 2 methods we get full functionality of jsonEncoder
// that implements MessagePack
func (e *sqlEncoder) CommonFormatEncode(c *types.CommonFormatEvent) ([]byte, error) {
var tp int
switch c.Type {
case "insert":
tp = types.Insert
case "delete":
tp = types.Delete
case "schema":
//FIXME: Encode schema from c instead of sqlEncoder current schema
return []byte(e.appendSchema(e.c.inSchema, e.c.filter)), nil
default:
return nil, fmt.Errorf("unknown event type %v", c.Type)
}
return e.rowLow(tp, nil, c, c.SeqNo, time.Unix(c.Timestamp, 0))
}
// UnwrapEvent splits the event header and payload
// cfEvent is populated with the 'header' information aka the first decoding.
// Data after the header returned in the payload parameter
func (e *sqlEncoder) UnwrapEvent(data []byte, cfEvent *types.CommonFormatEvent) (payload []byte, err error) {
return nil, fmt.Errorf("SQL encoder doesn't support decoding")
}
//DecodeEvent decodes Sql encoded array into CommonFormatEvent struct
func (e *sqlEncoder) DecodeEvent(b []byte) (*types.CommonFormatEvent, error) {
return nil, fmt.Errorf("SQL encoder doesn't support decoding")
}