func()

in changelog/mysql.go [816:864]


func (b *mysqlReader) processBatch(msg *result, msgCh chan *result) bool {
	var i int
L:
	for {
		if msg.err != nil {
			merr, ok := msg.err.(*mysql.MyError)
			if ok && merr.Code == 1236 {
				serverGtid, err := db.GetCurrentGTID(b.masterCI)
				if err != nil {
					b.log.Errorf("Error fetchching gtid from server: %v", err)
				}
				purgedGtid, err := db.GetPurgedGTID(b.masterCI)
				if err != nil {
					b.log.Errorf("Error fetchching purged gtid from server: %v", err)
				}
				b.log.WithFields(log.Fields{"my_gtid_set": strings.Replace(util.SortedGTIDString(b.gtidSet), ",", ",\n", -1), "server_gtid_set": serverGtid, "purged_gtid": purgedGtid, "host": b.masterCI.Host, "port": b.masterCI.Port, "user": b.masterCI.User}).Errorf("Error connecting to master: %v", merr)
				return false
			} else if msg.err.Error() != "context canceled" {
				b.log.Errorf("BinlogReadEvents: %v", msg.err.Error())
				return false
			}
			break L //Shutting down, let it commit current  batch
		}

		if !b.handleEvent(msg.ev) {
			return false
		}

		i++
		//TODO: Unify with streamer logic. Extract MaxBatchSize from per table
		//pipe config
		if i >= b.batchSize*b.numTables {
			break
		}

		select {
		case msg = <-msgCh:
		default:
			break L //No messages for now, break the loop and commit whatever we pushed to the batch already
		}
	}

	b.heartbeatTime = time.Now()

	b.metrics.EventsRead.Inc(int64(i))
	b.metrics.BatchSize.Record(time.Duration(i) * time.Millisecond)

	return b.commitBatch()
}