in parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java [609:889]
private boolean parseOneRow(RowData.Builder rowDataBuilder, RowsLogEvent event, RowsLogBuffer buffer, BitSet cols,
boolean isAfter, TableMeta tableMeta) throws UnsupportedEncodingException {
int columnCnt = event.getTable().getColumnCnt();
ColumnInfo[] columnInfo = event.getTable().getColumnInfo();
// mysql8.0针对set @@global.binlog_row_metadata='FULL' 可以记录部分的metadata信息
boolean existOptionalMetaData = event.getTable().isExistOptionalMetaData();
boolean tableError = false;
// check table fileds count,只能处理加字段
boolean existRDSNoPrimaryKey = false;
// 获取字段过滤条件
List<String> fieldList = null;
List<String> blackFieldList = null;
if (tableMeta != null) {
fieldList = fieldFilterMap.get(tableMeta.getFullName().toUpperCase());
blackFieldList = fieldBlackFilterMap.get(tableMeta.getFullName().toUpperCase());
}
if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
if (tableMetaCache.isOnRDS() || tableMetaCache.isOnPolarX()) {
// 特殊处理下RDS的场景
List<FieldMeta> primaryKeys = tableMeta.getPrimaryFields();
if (primaryKeys == null || primaryKeys.isEmpty()) {
if (columnInfo.length == tableMeta.getFields().size() + 1
&& columnInfo[columnInfo.length - 1].type == LogEvent.MYSQL_TYPE_LONGLONG) {
existRDSNoPrimaryKey = true;
}
}
}
EntryPosition position = createPosition(event.getHeader());
if (!existRDSNoPrimaryKey) {
// online ddl增加字段操作步骤:
// 1. 新增一张临时表,将需要做ddl表的数据全量导入
// 2. 在老表上建立I/U/D的trigger,增量的将数据插入到临时表
// 3. 锁住应用请求,将临时表rename为老表的名字,完成增加字段的操作
// 尝试做一次reload,可能因为ddl没有正确解析,或者使用了类似online ddl的操作
// 因为online ddl没有对应表名的alter语法,所以不会有clear cache的操作
tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false, position);// 强制重新获取一次
if (tableMeta == null) {
tableError = true;
if (!filterTableError) {
throw new CanalParseException("not found [" + event.getTable().getDbName() + "."
+ event.getTable().getTableName() + "] in db , pls check!");
}
}
// 在做一次判断
if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
tableError = true;
if (!filterTableError) {
throw new CanalParseException("column size is not match for table:" + tableMeta.getFullName()
+ "," + columnInfo.length + " vs " + tableMeta.getFields().size());
}
}
// } else {
// logger.warn("[" + event.getTable().getDbName() + "." +
// event.getTable().getTableName()
// + "] is no primary key , skip alibaba_rds_row_id column");
}
}
for (int i = 0; i < columnCnt; i++) {
ColumnInfo info = columnInfo[i];
// mysql 5.6开始支持nolob/mininal类型,并不一定记录所有的列,需要进行判断
if (!cols.get(i)) {
continue;
}
if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
// 不解析最后一列
String rdsRowIdColumnName = "__#alibaba_rds_row_id#__";
if (tableMetaCache.isOnPolarX()) {
rdsRowIdColumnName = "_drds_implicit_id_";
}
buffer.nextValue(rdsRowIdColumnName, i, info.type, info.meta, false);
Column.Builder columnBuilder = Column.newBuilder();
columnBuilder.setName(rdsRowIdColumnName);
columnBuilder.setIsKey(true);
columnBuilder.setMysqlType("bigint");
columnBuilder.setIndex(i);
columnBuilder.setIsNull(false);
Serializable value = buffer.getValue();
columnBuilder.setValue(value.toString());
columnBuilder.setSqlType(Types.BIGINT);
columnBuilder.setUpdated(false);
if (needField(fieldList, blackFieldList, columnBuilder.getName())) {
if (isAfter) {
rowDataBuilder.addAfterColumns(columnBuilder.build());
} else {
rowDataBuilder.addBeforeColumns(columnBuilder.build());
}
}
continue;
}
FieldMeta fieldMeta = null;
if (tableMeta != null && !tableError) {
// 处理file meta
fieldMeta = tableMeta.getFields().get(i);
}
if (fieldMeta != null && existOptionalMetaData && tableMetaCache.isOnTSDB()) {
// check column info
boolean check = StringUtils.equalsIgnoreCase(fieldMeta.getColumnName(), info.name);
check &= (fieldMeta.isUnsigned() == info.unsigned);
check &= (fieldMeta.isNullable() == info.nullable);
if (!check) {
throw new CanalParseException("MySQL8.0 unmatch column metadata & pls submit issue , table : "
+ tableMeta.getFullName() + ", db fieldMeta : "
+ fieldMeta.toString() + " , binlog fieldMeta : " + info.toString()
+ " , on : " + event.getHeader().getLogFileName() + ":"
+ (event.getHeader().getLogPos() - event.getHeader().getEventLen()));
}
}
Column.Builder columnBuilder = Column.newBuilder();
if (fieldMeta != null) {
columnBuilder.setName(fieldMeta.getColumnName());
columnBuilder.setIsKey(fieldMeta.isKey());
// 增加mysql type类型,issue 73
columnBuilder.setMysqlType(fieldMeta.getColumnType());
} else if (existOptionalMetaData) {
columnBuilder.setName(info.name);
columnBuilder.setIsKey(info.pk);
// mysql8.0里没有mysql type类型
// columnBuilder.setMysqlType(fieldMeta.getColumnType());
}
columnBuilder.setIndex(i);
columnBuilder.setIsNull(false);
// fixed issue
// https://github.com/alibaba/canal/issues/66,特殊处理binary/varbinary,不能做编码处理
boolean isBinary = false;
if (fieldMeta != null) {
if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "VARBINARY")) {
isBinary = true;
} else if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "BINARY")) {
isBinary = true;
}
}
buffer.nextValue(columnBuilder.getName(), i, info.type, info.meta, isBinary);
int javaType = buffer.getJavaType();
if (buffer.isNull()) {
columnBuilder.setIsNull(true);
// 处理各种类型
switch (javaType) {
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
// https://github.com/alibaba/canal/issues/4652
// mysql binlog中blob/text都处理为blob类型,需要反查table
// meta,按编码解析text
if (fieldMeta != null && isText(fieldMeta.getColumnType())) {
javaType = Types.CLOB;
} else {
javaType = Types.BLOB;
}
break;
}
} else {
final Serializable value = buffer.getValue();
// 处理各种类型
switch (javaType) {
case Types.INTEGER:
case Types.TINYINT:
case Types.SMALLINT:
case Types.BIGINT:
// 处理unsigned类型
Number number = (Number) value;
boolean isUnsigned = (fieldMeta != null ? fieldMeta.isUnsigned() : (existOptionalMetaData ? info.unsigned : false));
if (isUnsigned && number.longValue() < 0) {
switch (buffer.getLength()) {
case 1: /* MYSQL_TYPE_TINY */
columnBuilder.setValue(String.valueOf(Integer.valueOf(TINYINT_MAX_VALUE
+ number.intValue())));
javaType = Types.SMALLINT; // 往上加一个量级
break;
case 2: /* MYSQL_TYPE_SHORT */
columnBuilder.setValue(String.valueOf(Integer.valueOf(SMALLINT_MAX_VALUE
+ number.intValue())));
javaType = Types.INTEGER; // 往上加一个量级
break;
case 3: /* MYSQL_TYPE_INT24 */
columnBuilder.setValue(String.valueOf(Integer.valueOf(MEDIUMINT_MAX_VALUE
+ number.intValue())));
javaType = Types.INTEGER; // 往上加一个量级
break;
case 4: /* MYSQL_TYPE_LONG */
columnBuilder.setValue(String.valueOf(Long.valueOf(INTEGER_MAX_VALUE
+ number.longValue())));
javaType = Types.BIGINT; // 往上加一个量级
break;
case 8: /* MYSQL_TYPE_LONGLONG */
columnBuilder.setValue(BIGINT_MAX_VALUE.add(BigInteger.valueOf(number.longValue()))
.toString());
javaType = Types.DECIMAL; // 往上加一个量级,避免执行出错
break;
}
} else {
// 对象为number类型,直接valueof即可
columnBuilder.setValue(String.valueOf(value));
}
break;
case Types.REAL: // float
case Types.DOUBLE: // double
// 对象为number类型,直接valueof即可
columnBuilder.setValue(String.valueOf(value));
break;
case Types.BIT:// bit
// 对象为number类型
columnBuilder.setValue(String.valueOf(value));
break;
case Types.DECIMAL:
columnBuilder.setValue(((BigDecimal) value).toPlainString());
break;
case Types.TIMESTAMP:
// 修复时间边界值
// String v = value.toString();
// v = v.substring(0, v.length() - 2);
// columnBuilder.setValue(v);
// break;
case Types.TIME:
case Types.DATE:
// 需要处理year
columnBuilder.setValue(value.toString());
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
// fixed text encoding
// https://github.com/AlibabaTech/canal/issues/18
// mysql binlog中blob/text都处理为blob类型,需要反查table
// meta,按编码解析text
if (fieldMeta != null && isText(fieldMeta.getColumnType())) {
columnBuilder.setValue(new String((byte[]) value, charset));
javaType = Types.CLOB;
} else {
// byte数组,直接使用iso-8859-1保留对应编码,浪费内存
columnBuilder.setValue(new String((byte[]) value, ISO_8859_1));
// columnBuilder.setValueBytes(ByteString.copyFrom((byte[])
// value));
javaType = Types.BLOB;
}
break;
case Types.CHAR:
case Types.VARCHAR:
columnBuilder.setValue(value.toString());
break;
default:
columnBuilder.setValue(value.toString());
}
}
columnBuilder.setSqlType(javaType);
// 设置是否update的标记位
columnBuilder.setUpdated(isAfter
&& isUpdate(rowDataBuilder.getBeforeColumnsList(),
columnBuilder.getIsNull() ? null : columnBuilder.getValue(),
i));
if (needField(fieldList, blackFieldList, columnBuilder.getName())) {
if (isAfter) {
rowDataBuilder.addAfterColumns(columnBuilder.build());
} else {
rowDataBuilder.addBeforeColumns(columnBuilder.build());
}
}
}
return tableError;
}