in changelog/mysql.go [970:1050]
func (b *mysqlReader) start(cfg *config.AppConfig) bool {
var err error
h, _ := os.Hostname()
w := log.GenWorkerID()
b.workerID = h + "." + w
b.dbl.Service, b.dbl.Cluster, b.dbl.Name, err = state.GetClusterTask(types.InputMySQL, b.workerID, cfg.LockExpireTimeout)
if err != nil || b.dbl.Service == "" {
return false
}
b.metrics = metrics.NewChangelogReaderMetrics(getTags(b.dbl.Cluster, b.inputType))
b.metrics.NumWorkers.Inc()
defer func() {
if err != nil {
b.metrics.Errors.Inc(1)
}
b.metrics.NumWorkers.Dec()
}()
thisInstanceCluster = b.dbl.Cluster
b.log = log.WithFields(log.Fields{"worker_id": w, "cluster": b.dbl.Cluster})
b.log.Infof("Starting MySQL binlog reader")
// Get slave's connection info. Connecting to slave guarantees that we will connect to DC local slave
b.masterCI, err = db.GetConnInfo(&b.dbl, db.Slave, b.inputType)
if err != nil {
return true
}
rf := b.binlogFormat()
if rf != "ROW" {
if rf != "" {
b.log.Errorf("binlog format is %s. row binlog format required. skipping", rf)
}
return true
}
b.tables = make(map[string]map[string]map[string][]*table)
for i := 0; i < len(queryHandlers); i++ {
queryHandlers[i].compiled = regexp.MustCompile(queryHandlers[i].regexp)
}
// Start reading binlogs from the gtid set saved in the state
var gtid string
gtid, _, err = state.GetGTID(b.dbl.Cluster)
if log.EL(b.log, err) {
return true
}
/* If no gtid in the state get current gtid set from master */
if gtid == "" {
gtid, err = db.GetCurrentGTID(b.masterCI)
if err != nil {
return true
}
err = state.SetGTID(b.dbl.Cluster, gtid)
if err != nil {
b.log.Errorf("Error saving gtid. gtid %v. Error: %v", gtid, err.Error())
}
}
var s mysql.GTIDSet
s, err = mysql.ParseMysqlGTIDSet(gtid)
if err != nil {
b.log.Errorf("Invalid gtid: '%v' Error: %v", gtid, err.Error())
return true
}
b.gtidSet = s.(*mysql.MysqlGTIDSet)
b.readEvents(b.masterCI, cfg.StateUpdateInterval)
b.log.Infof("MySQL Binlog reader finished")
return true
}