in changelog/mysql.go [462:506]
func (b *mysqlReader) produceRow(tp int, t *table, ts time.Time, row *[]interface{}) error {
var err error
buffered := config.Get().ChangelogBuffer
seqno := b.nextSeqNo()
if seqno == 0 {
return fmt.Errorf("failed to generate next seqno. Current seqno: %+v", b.seqNo)
}
key := encoder.GetRowKey(t.encoder.Schema(), row)
if buffered && b.bufPipe.Type() == "local" {
err = t.producer.PushBatch(key, &types.RowMessage{Type: tp, Key: key, Data: row, SeqNo: seqno, Timestamp: ts})
} else {
var bd []byte
bd, err = t.encoder.Row(tp, row, seqno, ts)
if err != nil {
var masterGTID string
if strings.Contains(err.Error(), "column count mismatch") {
var err1 error
masterGTID, err1 = db.GetCurrentGTID(b.masterCI)
log.E(err1)
}
b.log.WithFields(log.Fields{"state gtid": util.SortedGTIDString(b.gtidSet), "event gtid": gtidToString(&b.curGTID), "master gtid": masterGTID, "table": t.id}).Errorf(err.Error())
return err
}
if !buffered {
key = t.producer.PartitionKey("log", key)
} else if t.encoder.Type() != encoder.Internal.Type() {
bd, err = encoder.WrapEvent(t.outputFormat, key, bd, seqno)
if log.EL(b.log, err) {
return err
}
}
err = t.producer.PushBatch(key, bd)
b.metrics.BytesWritten.Inc(int64(len(bd)))
}
//log.Debugf("Pushed to buffer. seqno=%v, table=%v", seqno, t.id)
if shutdown.Initiated() {
return nil
}
if err != nil {
b.log.Errorf("Type: %v, Error: %v", tp, err.Error())
return err
}
b.metrics.ChangelogRowEventsWritten.Inc(1)
return nil
}