snapshot/mysql.go (258 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package snapshot import ( "database/sql" "fmt" "strings" "time" "github.com/uber/storagetapper/config" "github.com/uber/storagetapper/db" "github.com/uber/storagetapper/encoder" "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/metrics" "github.com/uber/storagetapper/types" "github.com/uber/storagetapper/util" ) const cancelCheckInterval = 180 * time.Second // mysqlReader is a snapshot reader structure type mysqlReader struct { dbl *db.Loc connInfo *db.Addr connType db.ConnectionType conn *sql.DB trx *sql.Tx rows *sql.Rows log log.Logger nrecs uint64 ndone uint64 encoder encoder.Encoder inputType string outMsg []byte key string err error query string metrics *metrics.Snapshot sqlRow []interface{} row []interface{} schema *types.TableSchema ticker *time.Ticker partitionKey string } func init() { registerPlugin(types.InputMySQL, createMySQLReader) } func createMySQLReader(svc string, cluster string, dbs string, table string, params *config.TableParams, enc encoder.Encoder, m *metrics.Snapshot) (Reader, error) { query := "SELECT * FROM `<table_name>` <filter> FORCE INDEX (primary)" query = strings.Replace(query, "<filter>", FilterRow(types.InputMySQL, svc+"."+table, params), -1) r := &mysqlReader{query: query, inputType: types.InputMySQL, encoder: enc, connType: db.Slave, metrics: m} _, err := r.start(svc, cluster, dbs, table) return r, err } func (s *mysqlReader) startFromTx(svc string, _ string, dbs string, table string, tx *sql.Tx) (string, error) { var lastGtid string var err error s.log = log.WithFields(log.Fields{"service": svc, "db": dbs, "table": table}) s.trx = tx // Get GTID which is earlier in time then any row we will read during snapshot err = s.trx.QueryRow("SELECT @@global.gtid_executed").Scan(&lastGtid) if log.EL(s.log, err) { return lastGtid, err } // Use approximate row count, so as it's for reporting progress only err = s.trx.QueryRow("SELECT table_rows FROM information_schema.tables WHERE table_schema=? AND "+ "table_name=?", dbs, table).Scan(&s.nrecs) if log.EL(s.log, err) { return lastGtid, err } query := strings.Replace(s.query, "<table_name>", table, -1) s.log.Infof("Snapshot reader query: %s", query) s.rows, err = s.trx.Query(query) if log.EL(s.log, err) { return lastGtid, err } s.ndone = 0 s.log.Infof("Snapshot reader started, will stream %v records", s.nrecs) var c []string c, s.err = s.rows.Columns() if log.EL(s.log, s.err) { return "", s.err } s.schema = s.encoder.Schema() if len(c) != len(s.schema.Columns) { return "", fmt.Errorf("rows column count(%v) should be equal to schema's column count(%v)", len(c), len(s.schema.Columns)) } s.sqlRow = make([]interface{}, len(c)) for i := 0; i < len(c); i++ { s.sqlRow[i] = util.MySQLToDriverType(s.schema.Columns[i].DataType, s.schema.Columns[i].Type) } s.row = make([]interface{}, len(c)) s.ticker = time.NewTicker(cancelCheckInterval) return lastGtid, err } func (s *mysqlReader) startLow(svc string, cluster string, dbs string, table string) error { var err error s.dbl = &db.Loc{Cluster: cluster, Service: svc, Name: dbs} s.connInfo, err = db.GetConnInfo(s.dbl, s.connType, s.inputType) if log.E(err) { return err } s.conn, err = db.Open(s.connInfo) if err != nil { return err } // Do we need a transaction at all? We can use seqno to separate snapshot and binlog data. Binlog is always newer. // If we need it, we need to rely on MySQL instance transaction isolation level. // Uncomment later if we have go1.8 // BeginTx since go1.8 // s.trx, err = s.conn.BeginTx(shutdown.Context, sql.TxOptions{sql.LevelRepeatableRead, true}) s.trx, err = s.conn.Begin() if log.E(err) { log.E(s.conn.Close()) return err } return nil } // start connects to the db and starts snapshot for the table func (s *mysqlReader) start(svc string, cluster string, dbs string, table string) (string, error) { if err := s.startLow(svc, cluster, dbs, table); err != nil { return "", err } g, err := s.startFromTx(svc, cluster, dbs, table, s.trx) if err != nil { log.E(s.trx.Rollback()) log.E(s.conn.Close()) } return g, err } func (s *mysqlReader) endFromTx() { s.ticker.Stop() if s.rows != nil { log.EL(s.log, s.rows.Close()) } } // End uninitializes snapshot reader func (s *mysqlReader) End() { s.endFromTx() if s.trx != nil { log.EL(s.log, s.trx.Rollback()) } if s.conn != nil { log.EL(s.log, s.conn.Close()) } s.log.Infof("Snapshot reader finished") } // isValidConn checks the validity of the connection to the DB to make sure we are connected to the right DB func (s *mysqlReader) isValidConn() bool { select { case <-s.ticker.C: if !db.IsValidConn(s.dbl, s.connType, s.connInfo, s.inputType) { return false } default: } return true } func driverTypeToGoTypeLow(p interface{}, schema *types.ColumnSchema) (v interface{}, size int64) { switch f := p.(type) { case *sql.NullInt64: if f.Valid { if schema.DataType != "bigint" { v = int32(f.Int64) size = 4 } else { v = f.Int64 size = 8 } } case *sql.NullBool: if f.Valid { v = f.Bool size = int64(1) } case *sql.NullString: if f.Valid { v = f.String size = int64(len(f.String)) } case *sql.NullTime: if f.Valid { v = f.Time size = 20 } case *sql.NullFloat64: if f.Valid { if schema.DataType == "float" { v = float32(f.Float64) size = 4 } else { v = f.Float64 size = 8 } } case *sql.RawBytes: if f != nil { b := []byte(*f) v = b size = int64(len(b)) } } return } func driverTypeToGoType(v []interface{}, p []interface{}, schema *types.TableSchema) int64 { var size, s int64 for i := 0; i < len(p); i++ { v[i], s = driverTypeToGoTypeLow(p[i], &schema.Columns[i]) size += s } return size } //Pop pops record fetched by FetchNext func (s *mysqlReader) Pop() (string, string, []byte, error) { return s.key, s.partitionKey, s.outMsg, s.err } func (s *mysqlReader) fetchRow() []interface{} { if !s.isValidConn() { s.err = fmt.Errorf("cluster topology has changed") return nil } if !s.rows.Next() { if s.err = s.rows.Err(); log.EL(s.log, s.err) { return nil } if s.ndone == s.nrecs { s.log.Infof("Finished. Done %v(%v%%) of %v", s.ndone, 100, s.nrecs) } return nil } s.err = s.rows.Scan(s.sqlRow...) if log.EL(s.log, s.err) { return nil } size := driverTypeToGoType(s.row, s.sqlRow, s.schema) s.metrics.BytesRead.Inc(size) s.metrics.EventsRead.Inc(1) return s.row } func (s *mysqlReader) encodeRow(v []interface{}) { s.outMsg, s.err = s.encoder.Row(types.Insert, &v, ^uint64(0), time.Time{}) if log.EL(s.log, s.err) { return } s.key = encoder.GetRowKey(s.encoder.Schema(), &v) s.partitionKey = s.key } func (s *mysqlReader) reportStat() { //Statistics maybe inaccurate so we can have some rows even if we got 0 when //read rows count if s.nrecs == 0 { s.nrecs = 1 } pctdone := s.ndone * 100 / s.nrecs var o uint64 if s.nrecs%10 != 0 { o = 1 } if s.ndone%(s.nrecs/10+o) == 0 { s.log.Infof("Snapshot ... Done %v(%v%%) of %v", s.ndone, pctdone, s.nrecs) } s.ndone++ } //FetchNext fetches the record from MySQL and encodes using encoder provided when //reader created func (s *mysqlReader) FetchNext() bool { s.err = nil v := s.fetchRow() if s.err != nil { return true } if v == nil { return false } s.encodeRow(v) s.reportStat() return true }