in changelog/mysql.go [816:864]
func (b *mysqlReader) processBatch(msg *result, msgCh chan *result) bool {
var i int
L:
for {
if msg.err != nil {
merr, ok := msg.err.(*mysql.MyError)
if ok && merr.Code == 1236 {
serverGtid, err := db.GetCurrentGTID(b.masterCI)
if err != nil {
b.log.Errorf("Error fetchching gtid from server: %v", err)
}
purgedGtid, err := db.GetPurgedGTID(b.masterCI)
if err != nil {
b.log.Errorf("Error fetchching purged gtid from server: %v", err)
}
b.log.WithFields(log.Fields{"my_gtid_set": strings.Replace(util.SortedGTIDString(b.gtidSet), ",", ",\n", -1), "server_gtid_set": serverGtid, "purged_gtid": purgedGtid, "host": b.masterCI.Host, "port": b.masterCI.Port, "user": b.masterCI.User}).Errorf("Error connecting to master: %v", merr)
return false
} else if msg.err.Error() != "context canceled" {
b.log.Errorf("BinlogReadEvents: %v", msg.err.Error())
return false
}
break L //Shutting down, let it commit current batch
}
if !b.handleEvent(msg.ev) {
return false
}
i++
//TODO: Unify with streamer logic. Extract MaxBatchSize from per table
//pipe config
if i >= b.batchSize*b.numTables {
break
}
select {
case msg = <-msgCh:
default:
break L //No messages for now, break the loop and commit whatever we pushed to the batch already
}
}
b.heartbeatTime = time.Now()
b.metrics.EventsRead.Inc(int64(i))
b.metrics.BatchSize.Record(time.Duration(i) * time.Millisecond)
return b.commitBatch()
}