in plugins/input/canal/input_canal.go [346:458]
func (sc *ServiceCanal) OnRow(e *canal.RowsEvent) error {
logger.Debug(sc.context.GetRuntimeContext(), "[CANAL_DEBUG] host", sc.Host, "db", e.Table.Schema, "table", e.Table.Name, "action", e.Action, "GTID", sc.nextRowEventGTID)
sc.rowCounter.Add(1)
values := map[string]string{"_host_": sc.Host, "_db_": e.Table.Schema, "_table_": e.Table.Name, "_event_": "row_" + e.Action, "_id_": strconv.Itoa(sc.checkpoint.ID)}
nextOffsetString := strconv.FormatUint(uint64(e.Header.LogPos), 10)
if sc.EnableGTID {
values["_gtid_"] = sc.nextRowEventGTID
values["_filename_"] = sc.checkpoint.FileName
values["_offset_"] = nextOffsetString
}
if sc.EnableEventMeta && e.Header != nil {
values["_event_time_"] = strconv.Itoa(int(e.Header.Timestamp))
values["_event_log_postion_"] = strconv.Itoa(int(e.Header.LogPos)) //nolint:misspell
values["_event_size_"] = strconv.Itoa(int(e.Header.EventSize))
values["_event_server_id_"] = strconv.Itoa(int(e.Header.ServerID))
}
if e.Action == canal.UpdateAction {
if !sc.EnableUpdate {
return nil
}
if len(e.Rows)%2 != 0 {
logger.Error(sc.context.GetRuntimeContext(), "CANAL_INVALID_ALARM", "invalid update value count", len(e.Rows))
return nil
}
for i := 0; i < len(e.Rows); i += 2 {
if len(e.Rows[i]) != len(e.Table.Columns) || len(e.Rows[i+1]) != len(e.Table.Columns) {
// clear cache, force update meta
// @bug fix : if table with no PK, len(e.Rows[i])-len(e.Table.Columns) always be 1, and we
// should not flush table schema, it will cause high load for mysql
if i == 0 && !(len(e.Rows[i])-len(e.Table.Columns) == 1 && len(e.Table.PKColumns) == 0) {
logger.Info(sc.context.GetRuntimeContext(), "table column size", len(e.Rows[i]), " != event column size", len(e.Table.Columns))
sc.canal.ClearTableCache([]byte(e.Table.Schema), []byte(e.Table.Name))
tableMeta, err := sc.canal.GetTable(e.Table.Schema, e.Table.Name)
if err != nil || tableMeta == nil {
logger.Error(sc.context.GetRuntimeContext(), "CANAL_INVALID_ALARM", "invalid row values", e.Table.Name,
"old columns", len(e.Rows[i]),
"new columns", len(e.Rows[i+1]),
"table meta columns", len(e.Table.Columns),
"error", err)
} else {
e.Table = tableMeta
}
}
for index, rowVal := range e.Rows[i] {
if index < len(e.Table.Columns) {
values["_old_"+e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
} else {
values[fmt.Sprintf("_old_unknow_col_%d", index)] = fmt.Sprint(rowVal)
}
}
for index, rowVal := range e.Rows[i+1] {
if index < len(e.Table.Columns) {
values[e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
} else {
values[fmt.Sprintf("unknow_col_%d", index)] = fmt.Sprint(rowVal)
}
}
} else {
for index, rowVal := range e.Rows[i] {
values["_old_"+e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
}
for index, rowVal := range e.Rows[i+1] {
values[e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
}
}
sc.addData(values)
}
} else {
if e.Action == canal.DeleteAction {
if !sc.EnableDelete {
return nil
}
} else {
if !sc.EnableInsert {
return nil
}
}
for i, rowValues := range e.Rows {
if len(rowValues) != len(e.Table.Columns) {
if i == 0 && !(len(e.Rows[i])-len(e.Table.Columns) == 1 && len(e.Table.PKColumns) == 0) {
sc.canal.ClearTableCache([]byte(e.Table.Schema), []byte(e.Table.Name))
tableMeta, err := sc.canal.GetTable(e.Table.Schema, e.Table.Name)
if err != nil || tableMeta == nil {
logger.Error(sc.context.GetRuntimeContext(), "CANAL_INVALID_ALARM", "invalid row values", e.Table.Name,
"columns", len(rowValues),
"table meta columns", len(e.Table.Columns),
"error", err)
} else {
e.Table = tableMeta
}
}
for index, rowVal := range rowValues {
if index < len(e.Table.Columns) {
values[e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
} else {
values[fmt.Sprintf("unknow_col_%d", index)] = fmt.Sprint(rowVal)
}
}
} else {
for index, rowVal := range rowValues {
values[e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
}
}
sc.addData(values)
}
}
// Update checkpoint.
sc.checkpoint.ID++
return nil
}