in changelog/mysql.go [508:544]
func (b *mysqlReader) handleRowsEventLow(ev *replication.BinlogEvent, t *table) bool {
var err error
re := ev.Event.(*replication.RowsEvent)
ts := time.Unix(int64(ev.Header.Timestamp), 0)
/*
bb := new(bytes.Buffer)
ev.Dump(bb)
fmt.Fprintf(os.Stderr, "Handle rows event %+v", bb.String())
*/
switch ev.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
//TODO: Produce as a batch
for i := 0; i < len(re.Rows) && err == nil; i++ {
err = b.produceRow(types.Insert, t, ts, &re.Rows[i])
}
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
for i := 0; i < len(re.Rows) && err == nil; i++ {
err = b.produceRow(types.Delete, t, ts, &re.Rows[i])
}
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
for i := 0; i < len(re.Rows) && err == nil; i += 2 {
if !t.noDeletes && !strings.HasSuffix(t.outputFormat, "_idempotent") {
err = b.produceRow(types.Delete, t, ts, &re.Rows[i])
}
if err == nil {
err = b.produceRow(types.Insert, t, ts, &re.Rows[i+1])
}
}
default:
err = fmt.Errorf("not supported event type %v", ev.Header.EventType)
}
return !log.E(err)
}