func()

in snapshot/mysql.go [78:133]


func (s *mysqlReader) startFromTx(svc string, _ string, dbs string, table string,
	tx *sql.Tx) (string, error) {

	var lastGtid string
	var err error

	s.log = log.WithFields(log.Fields{"service": svc, "db": dbs, "table": table})

	s.trx = tx

	// Get GTID which is earlier in time then any row we will read during snapshot
	err = s.trx.QueryRow("SELECT @@global.gtid_executed").Scan(&lastGtid)
	if log.EL(s.log, err) {
		return lastGtid, err
	}

	// Use approximate row count, so as it's for reporting progress only
	err = s.trx.QueryRow("SELECT table_rows FROM information_schema.tables WHERE table_schema=? AND "+
		"table_name=?", dbs, table).Scan(&s.nrecs)
	if log.EL(s.log, err) {
		return lastGtid, err
	}

	query := strings.Replace(s.query, "<table_name>", table, -1)
	s.log.Infof("Snapshot reader query: %s", query)
	s.rows, err = s.trx.Query(query)

	if log.EL(s.log, err) {
		return lastGtid, err
	}

	s.ndone = 0
	s.log.Infof("Snapshot reader started, will stream %v records", s.nrecs)

	var c []string
	c, s.err = s.rows.Columns()
	if log.EL(s.log, s.err) {
		return "", s.err
	}

	s.schema = s.encoder.Schema()

	if len(c) != len(s.schema.Columns) {
		return "", fmt.Errorf("rows column count(%v) should be equal to schema's column count(%v)", len(c), len(s.schema.Columns))
	}

	s.sqlRow = make([]interface{}, len(c))
	for i := 0; i < len(c); i++ {
		s.sqlRow[i] = util.MySQLToDriverType(s.schema.Columns[i].DataType, s.schema.Columns[i].Type)
	}
	s.row = make([]interface{}, len(c))

	s.ticker = time.NewTicker(cancelCheckInterval)

	return lastGtid, err
}