func()

in changelog/mysql.go [911:968]


func (b *mysqlReader) readEvents(c *db.Addr, stateUpdateInterval time.Duration) {
	id := rand.Uint32()
	for id == 0 {
		id = rand.Uint32()
	}
	cfg := replication.BinlogSyncerConfig{
		ServerID:  id,
		Flavor:    "mysql",
		Host:      c.Host,
		Port:      c.Port,
		User:      c.User,
		Password:  c.Pwd,
		ParseTime: true,
	}

	syncer := replication.NewBinlogSyncer(cfg)
	streamer, err := syncer.StartSyncGTID(b.gtidSet.Clone())
	if log.EL(b.log, err) {
		b.metrics.Errors.Inc(1)
		return
	}
	defer syncer.Close()

	stateUpdateTicker := time.NewTicker(stateUpdateInterval)
	defer stateUpdateTicker.Stop()
	watchdogTicker := time.NewTicker(config.Get().ChangelogWatchdogInterval)
	defer watchdogTicker.Stop()

	defer b.closeTableProducers()
	if !b.updateState(true) {
		b.metrics.Errors.Inc(1)
		return
	}

	b.log.WithFields(log.Fields{"gtid": util.SortedGTIDString(b.gtidSet), "SeqNo": b.seqNo}).Infof("Binlog start")

	msgCh, exitCh := make(chan *result, b.batchSize), make(chan bool)
	ctx, cancel := context.WithCancel(context.Background())
	var wg sync.WaitGroup
	wg.Add(1)
	defer func() { cancel(); close(exitCh); wg.Wait() }()

	/*This goroutine is to multiplex blocking streamer.GetEvent and tickCh*/
	go b.eventFetcher(ctx, streamer, &wg, msgCh, exitCh)

	for {
		more, msg := b.fetchFirstEvent(stateUpdateTicker, watchdogTicker, msgCh)
		if !more {
			break
		}
		if msg != nil && !b.processBatch(msg, msgCh) {
			b.metrics.Errors.Inc(1)
			break
		}
	}

	b.log.Debugf("Finishing MySQL binlog reader")
}