pipe/sql.go (301 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package pipe import ( "database/sql" "encoding/hex" "fmt" //"context" "strings" "time" "github.com/uber/storagetapper/config" "github.com/uber/storagetapper/db" "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/util" //_ "github.com/kshvakov/clickhouse" _ "github.com/lib/pq" _ "github.com/mailru/go-clickhouse" ) const ( mysql = "mysql" postgres = "postgres" clickhouse = "clickhouse" ) type sqlPipe struct { cfg config.PipeConfig } type sqlProducer struct { *sqlPipe conn *sql.DB tx *sql.Tx } type sqlConsumer struct { baseConsumer *sqlPipe conn *sql.DB rows *sql.Rows msg []byte err error row []interface{} insert string topic string inited bool } func init() { registerPlugin(mysql, initMySQLPipe) registerPlugin(postgres, initPostgresPipe) registerPlugin(clickhouse, initClickHousePipe) } func initSQLPipe(tp string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) { p := &sqlPipe{*cfg} p.cfg.SQL.Type = tp return p, nil } func initMySQLPipe(cfg *config.PipeConfig, db *sql.DB) (Pipe, error) { return initSQLPipe(mysql, cfg, db) } func initPostgresPipe(cfg *config.PipeConfig, db *sql.DB) (Pipe, error) { return initSQLPipe(postgres, cfg, db) } func initClickHousePipe(cfg *config.PipeConfig, db *sql.DB) (Pipe, error) { return initSQLPipe(clickhouse, cfg, db) } // Type returns Pipe type as sql func (p *sqlPipe) Type() string { return p.cfg.SQL.Type } // Config returns pipe configuration func (p *sqlPipe) Config() *config.PipeConfig { return &p.cfg } // Close release resources associated with the pipe func (p *sqlPipe) Close() error { return nil } //NewProducer registers a new sync producer func (p *sqlPipe) NewProducer(topic string) (Producer, error) { t := p.cfg.SQL.Type if t == "" { t = mysql } var err error var conn *sql.DB if p.cfg.SQL.DSN != "" { conn, err = sql.Open(t, p.cfg.SQL.DSN) if err != nil { return nil, err } } else { ci, err := db.GetConnInfo(&db.Loc{Service: p.cfg.SQL.Service, Cluster: p.cfg.SQL.Cluster, Name: p.cfg.SQL.DB}, db.Master, p.cfg.SQL.Type) if err != nil { return nil, err } conn, err = db.OpenModeType(ci, p.cfg.SQL.Type, db.SQLMode) if err != nil { return nil, err } } return &sqlProducer{sqlPipe: p, conn: conn}, nil } //NewConsumer registers a new consumer with context func (p *sqlPipe) NewConsumer(topic string) (Consumer, error) { t := p.cfg.SQL.Type if t == "" { t = mysql } conn, err := sql.Open(t, p.cfg.SQL.DSN) if err != nil { return nil, err } c := &sqlConsumer{sqlPipe: p, conn: conn, topic: topic} c.initBaseConsumer(c.fetchNext) return c, nil } func (p *sqlProducer) push(in interface{}) error { var bytes []byte switch b := in.(type) { case []byte: bytes = b default: return fmt.Errorf("SQL pipe can handle binary arrays only") } _, err := p.conn.Exec(string(bytes)) return err } //PushK sends a keyed message func (p *sqlProducer) PushK(key string, in interface{}) error { return p.push(in) } //Push produces a message func (p *sqlProducer) Push(in interface{}) error { return p.push(in) } //PushBatch stashes a keyed message into batch which will be send to SQL by //PushBatchCommit func (p *sqlProducer) PushBatch(key string, in interface{}) (err error) { if p.tx == nil { p.tx, err = p.conn.Begin() if log.E(err) { return err } } var bytes []byte switch b := in.(type) { case []byte: bytes = b default: return fmt.Errorf("SQL pipe can handle binary arrays only") } _, err = p.tx.Exec(string(bytes)) return err } //PushBatchCommit commits currently queued messages in the producer func (p *sqlProducer) PushBatchCommit() error { if p.tx != nil { err := p.tx.Commit() p.tx = nil return err } return nil } func (p *sqlProducer) PushSchema(_ string, data []byte) (err error) { return util.ExecSQL(p.conn, string(data)) } func pclose(conn *sql.DB, tx *sql.Tx, graceful bool) error { var err error if tx != nil { if graceful { err = tx.Commit() } else { err = tx.Rollback() } tx = nil } err1 := conn.Close() conn = nil if err != nil { return err } return err1 } // Close removes unfinished files func (p *sqlProducer) Close() error { return pclose(p.conn, p.tx, true) } // CloseOnFailure removes unfinished files func (p *sqlProducer) CloseOnFailure() error { return pclose(p.conn, p.tx, false) } //PartitionKey transforms input row key into partition key func (p *sqlProducer) PartitionKey(source string, key string) string { if source == "snapshot" { return "snapshot" } return "log" } func (p *sqlProducer) SetFormat(_ string) { } func (p *sqlConsumer) initTable() error { var err error q := "`" if p.cfg.SQL.Type == postgres { q = `"` } tp := q + util.EscapeQuotes(p.topic, q[0]) + q for { p.rows, err = util.QuerySQL(p.conn, "SELECT * from "+tp) if err != nil { if strings.Contains(err.Error(), "doesn't exist") { time.Sleep(200 * time.Millisecond) continue } return err } break } c, err := p.rows.Columns() if err != nil { return err } t, err := p.rows.ColumnTypes() if err != nil { return err } p.row = make([]interface{}, len(c)) p.insert = "INSERT INTO " + tp + " (" for i, f := range c { if i != 0 { p.insert += "," } p.insert += f dt := strings.ToLower(t[i].DatabaseTypeName()) if p.cfg.SQL.Type == postgres { p.row[i] = util.PostgresToDriverType(dt) } else if p.cfg.SQL.Type == clickhouse { p.row[i] = util.ClickHouseToDriverType(dt) } else { //default is mysql p.row[i] = util.MySQLToDriverType(dt, "") } } p.insert += ") VALUES (" p.inited = true return nil } func (p *sqlConsumer) Pop() (interface{}, error) { return p.msg, p.err } func (p *sqlConsumer) Close() error { if err := p.rows.Close(); err != nil { return err } return pclose(p.conn, nil, true) } func (p *sqlConsumer) CloseOnFailure() error { if err := p.rows.Close(); err != nil { return err } return pclose(p.conn, nil, false) } func encodeSQLValue(tp string, d interface{}) string { switch f := d.(type) { case *sql.NullInt64: if f.Valid { return fmt.Sprintf("%v", f.Int64) } case *sql.NullString: if f.Valid { p := "'" if tp == clickhouse { return p + f.String + p //scan returns already escaped string, probably driver issue } else if tp == postgres { p = "E" + p } return p + util.MySQLEscape(true, f.String) + "'" } case *sql.NullFloat64: if f.Valid { return fmt.Sprintf("%v", f.Float64) } case *sql.RawBytes: if f != nil { return "0x" + hex.EncodeToString([]byte(*f)) } default: return fmt.Sprintf("%v", d) } return "NULL" } func (p *sqlConsumer) fetchNext() (interface{}, error) { var err error if !p.inited { if err = p.initTable(); err != nil { return nil, err } } if !p.rows.Next() { if err = p.rows.Err(); err != nil { return nil, err } return nil, nil } if err := p.rows.Scan(p.row...); err != nil { return nil, err } msg := p.insert for k, v := range p.row { if k != 0 { msg += "," } msg += encodeSQLValue(p.cfg.SQL.Type, v) } msg += ")" return []byte(msg), nil } func (p *sqlConsumer) SaveOffset() error { //TODO: implement return nil } func (p *sqlConsumer) SetFormat(_ string) { }