func()

in plugins/input/canal/input_canal.go [346:458]


func (sc *ServiceCanal) OnRow(e *canal.RowsEvent) error {
	logger.Debug(sc.context.GetRuntimeContext(), "[CANAL_DEBUG] host", sc.Host, "db", e.Table.Schema, "table", e.Table.Name, "action", e.Action, "GTID", sc.nextRowEventGTID)
	sc.rowCounter.Add(1)
	values := map[string]string{"_host_": sc.Host, "_db_": e.Table.Schema, "_table_": e.Table.Name, "_event_": "row_" + e.Action, "_id_": strconv.Itoa(sc.checkpoint.ID)}
	nextOffsetString := strconv.FormatUint(uint64(e.Header.LogPos), 10)
	if sc.EnableGTID {
		values["_gtid_"] = sc.nextRowEventGTID
		values["_filename_"] = sc.checkpoint.FileName
		values["_offset_"] = nextOffsetString
	}
	if sc.EnableEventMeta && e.Header != nil {
		values["_event_time_"] = strconv.Itoa(int(e.Header.Timestamp))
		values["_event_log_postion_"] = strconv.Itoa(int(e.Header.LogPos)) //nolint:misspell
		values["_event_size_"] = strconv.Itoa(int(e.Header.EventSize))
		values["_event_server_id_"] = strconv.Itoa(int(e.Header.ServerID))
	}
	if e.Action == canal.UpdateAction {
		if !sc.EnableUpdate {
			return nil
		}
		if len(e.Rows)%2 != 0 {
			logger.Error(sc.context.GetRuntimeContext(), "CANAL_INVALID_ALARM", "invalid update value count", len(e.Rows))
			return nil
		}
		for i := 0; i < len(e.Rows); i += 2 {
			if len(e.Rows[i]) != len(e.Table.Columns) || len(e.Rows[i+1]) != len(e.Table.Columns) {
				// clear cache, force update meta
				// @bug fix : if table with no PK, len(e.Rows[i])-len(e.Table.Columns) always be 1, and we
				//            should not flush table schema, it will cause high load for mysql
				if i == 0 && !(len(e.Rows[i])-len(e.Table.Columns) == 1 && len(e.Table.PKColumns) == 0) {
					logger.Info(sc.context.GetRuntimeContext(), "table column size", len(e.Rows[i]), " != event column size", len(e.Table.Columns))
					sc.canal.ClearTableCache([]byte(e.Table.Schema), []byte(e.Table.Name))
					tableMeta, err := sc.canal.GetTable(e.Table.Schema, e.Table.Name)
					if err != nil || tableMeta == nil {
						logger.Error(sc.context.GetRuntimeContext(), "CANAL_INVALID_ALARM", "invalid row values", e.Table.Name,
							"old columns", len(e.Rows[i]),
							"new columns", len(e.Rows[i+1]),
							"table meta columns", len(e.Table.Columns),
							"error", err)
					} else {
						e.Table = tableMeta
					}
				}
				for index, rowVal := range e.Rows[i] {
					if index < len(e.Table.Columns) {
						values["_old_"+e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
					} else {
						values[fmt.Sprintf("_old_unknow_col_%d", index)] = fmt.Sprint(rowVal)
					}
				}

				for index, rowVal := range e.Rows[i+1] {
					if index < len(e.Table.Columns) {
						values[e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
					} else {
						values[fmt.Sprintf("unknow_col_%d", index)] = fmt.Sprint(rowVal)
					}
				}
			} else {
				for index, rowVal := range e.Rows[i] {
					values["_old_"+e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
				}

				for index, rowVal := range e.Rows[i+1] {
					values[e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
				}
			}
			sc.addData(values)
		}
	} else {
		if e.Action == canal.DeleteAction {
			if !sc.EnableDelete {
				return nil
			}
		} else {
			if !sc.EnableInsert {
				return nil
			}
		}
		for i, rowValues := range e.Rows {
			if len(rowValues) != len(e.Table.Columns) {
				if i == 0 && !(len(e.Rows[i])-len(e.Table.Columns) == 1 && len(e.Table.PKColumns) == 0) {
					sc.canal.ClearTableCache([]byte(e.Table.Schema), []byte(e.Table.Name))
					tableMeta, err := sc.canal.GetTable(e.Table.Schema, e.Table.Name)
					if err != nil || tableMeta == nil {
						logger.Error(sc.context.GetRuntimeContext(), "CANAL_INVALID_ALARM", "invalid row values", e.Table.Name,
							"columns", len(rowValues),
							"table meta columns", len(e.Table.Columns),
							"error", err)
					} else {
						e.Table = tableMeta
					}
				}
				for index, rowVal := range rowValues {
					if index < len(e.Table.Columns) {
						values[e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
					} else {
						values[fmt.Sprintf("unknow_col_%d", index)] = fmt.Sprint(rowVal)
					}
				}
			} else {
				for index, rowVal := range rowValues {
					values[e.Table.Columns[index].Name] = sc.columnValueToString(&e.Table.Columns[index], rowVal)
				}
			}
			sc.addData(values)
		}
	}

	// Update checkpoint.
	sc.checkpoint.ID++
	return nil
}