in changelog/mysql.go [911:968]
func (b *mysqlReader) readEvents(c *db.Addr, stateUpdateInterval time.Duration) {
id := rand.Uint32()
for id == 0 {
id = rand.Uint32()
}
cfg := replication.BinlogSyncerConfig{
ServerID: id,
Flavor: "mysql",
Host: c.Host,
Port: c.Port,
User: c.User,
Password: c.Pwd,
ParseTime: true,
}
syncer := replication.NewBinlogSyncer(cfg)
streamer, err := syncer.StartSyncGTID(b.gtidSet.Clone())
if log.EL(b.log, err) {
b.metrics.Errors.Inc(1)
return
}
defer syncer.Close()
stateUpdateTicker := time.NewTicker(stateUpdateInterval)
defer stateUpdateTicker.Stop()
watchdogTicker := time.NewTicker(config.Get().ChangelogWatchdogInterval)
defer watchdogTicker.Stop()
defer b.closeTableProducers()
if !b.updateState(true) {
b.metrics.Errors.Inc(1)
return
}
b.log.WithFields(log.Fields{"gtid": util.SortedGTIDString(b.gtidSet), "SeqNo": b.seqNo}).Infof("Binlog start")
msgCh, exitCh := make(chan *result, b.batchSize), make(chan bool)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
defer func() { cancel(); close(exitCh); wg.Wait() }()
/*This goroutine is to multiplex blocking streamer.GetEvent and tickCh*/
go b.eventFetcher(ctx, streamer, &wg, msgCh, exitCh)
for {
more, msg := b.fetchFirstEvent(stateUpdateTicker, watchdogTicker, msgCh)
if !more {
break
}
if msg != nil && !b.processBatch(msg, msgCh) {
b.metrics.Errors.Inc(1)
break
}
}
b.log.Debugf("Finishing MySQL binlog reader")
}