in snapshot/mysql.go [78:133]
func (s *mysqlReader) startFromTx(svc string, _ string, dbs string, table string,
tx *sql.Tx) (string, error) {
var lastGtid string
var err error
s.log = log.WithFields(log.Fields{"service": svc, "db": dbs, "table": table})
s.trx = tx
// Get GTID which is earlier in time then any row we will read during snapshot
err = s.trx.QueryRow("SELECT @@global.gtid_executed").Scan(&lastGtid)
if log.EL(s.log, err) {
return lastGtid, err
}
// Use approximate row count, so as it's for reporting progress only
err = s.trx.QueryRow("SELECT table_rows FROM information_schema.tables WHERE table_schema=? AND "+
"table_name=?", dbs, table).Scan(&s.nrecs)
if log.EL(s.log, err) {
return lastGtid, err
}
query := strings.Replace(s.query, "<table_name>", table, -1)
s.log.Infof("Snapshot reader query: %s", query)
s.rows, err = s.trx.Query(query)
if log.EL(s.log, err) {
return lastGtid, err
}
s.ndone = 0
s.log.Infof("Snapshot reader started, will stream %v records", s.nrecs)
var c []string
c, s.err = s.rows.Columns()
if log.EL(s.log, s.err) {
return "", s.err
}
s.schema = s.encoder.Schema()
if len(c) != len(s.schema.Columns) {
return "", fmt.Errorf("rows column count(%v) should be equal to schema's column count(%v)", len(c), len(s.schema.Columns))
}
s.sqlRow = make([]interface{}, len(c))
for i := 0; i < len(c); i++ {
s.sqlRow[i] = util.MySQLToDriverType(s.schema.Columns[i].DataType, s.schema.Columns[i].Type)
}
s.row = make([]interface{}, len(c))
s.ticker = time.NewTicker(cancelCheckInterval)
return lastGtid, err
}