func()

in changelog/mysql.go [508:544]


func (b *mysqlReader) handleRowsEventLow(ev *replication.BinlogEvent, t *table) bool {
	var err error

	re := ev.Event.(*replication.RowsEvent)
	ts := time.Unix(int64(ev.Header.Timestamp), 0)

	/*
		bb := new(bytes.Buffer)
		ev.Dump(bb)
		fmt.Fprintf(os.Stderr, "Handle rows event %+v", bb.String())
	*/

	switch ev.Header.EventType {
	case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
		//TODO: Produce as a batch
		for i := 0; i < len(re.Rows) && err == nil; i++ {
			err = b.produceRow(types.Insert, t, ts, &re.Rows[i])
		}
	case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
		for i := 0; i < len(re.Rows) && err == nil; i++ {
			err = b.produceRow(types.Delete, t, ts, &re.Rows[i])
		}
	case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
		for i := 0; i < len(re.Rows) && err == nil; i += 2 {
			if !t.noDeletes && !strings.HasSuffix(t.outputFormat, "_idempotent") {
				err = b.produceRow(types.Delete, t, ts, &re.Rows[i])
			}
			if err == nil {
				err = b.produceRow(types.Insert, t, ts, &re.Rows[i+1])
			}
		}
	default:
		err = fmt.Errorf("not supported event type %v", ev.Header.EventType)
	}

	return !log.E(err)
}