private boolean parseOneRow()

in parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java [609:889]


    private boolean parseOneRow(RowData.Builder rowDataBuilder, RowsLogEvent event, RowsLogBuffer buffer, BitSet cols,
                                boolean isAfter, TableMeta tableMeta) throws UnsupportedEncodingException {
        int columnCnt = event.getTable().getColumnCnt();
        ColumnInfo[] columnInfo = event.getTable().getColumnInfo();
        // mysql8.0针对set @@global.binlog_row_metadata='FULL' 可以记录部分的metadata信息
        boolean existOptionalMetaData = event.getTable().isExistOptionalMetaData();
        boolean tableError = false;
        // check table fileds count,只能处理加字段
        boolean existRDSNoPrimaryKey = false;
        // 获取字段过滤条件
        List<String> fieldList = null;
        List<String> blackFieldList = null;

        if (tableMeta != null) {
            fieldList = fieldFilterMap.get(tableMeta.getFullName().toUpperCase());
            blackFieldList = fieldBlackFilterMap.get(tableMeta.getFullName().toUpperCase());
        }

        if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
            if (tableMetaCache.isOnRDS() || tableMetaCache.isOnPolarX()) {
                // 特殊处理下RDS的场景
                List<FieldMeta> primaryKeys = tableMeta.getPrimaryFields();
                if (primaryKeys == null || primaryKeys.isEmpty()) {
                    if (columnInfo.length == tableMeta.getFields().size() + 1
                        && columnInfo[columnInfo.length - 1].type == LogEvent.MYSQL_TYPE_LONGLONG) {
                        existRDSNoPrimaryKey = true;
                    }
                }
            }

            EntryPosition position = createPosition(event.getHeader());
            if (!existRDSNoPrimaryKey) {
                // online ddl增加字段操作步骤:
                // 1. 新增一张临时表,将需要做ddl表的数据全量导入
                // 2. 在老表上建立I/U/D的trigger,增量的将数据插入到临时表
                // 3. 锁住应用请求,将临时表rename为老表的名字,完成增加字段的操作
                // 尝试做一次reload,可能因为ddl没有正确解析,或者使用了类似online ddl的操作
                // 因为online ddl没有对应表名的alter语法,所以不会有clear cache的操作
                tableMeta = getTableMeta(event.getTable().getDbName(), event.getTable().getTableName(), false, position);// 强制重新获取一次
                if (tableMeta == null) {
                    tableError = true;
                    if (!filterTableError) {
                        throw new CanalParseException("not found [" + event.getTable().getDbName() + "."
                                                      + event.getTable().getTableName() + "] in db , pls check!");
                    }
                }

                // 在做一次判断
                if (tableMeta != null && columnInfo.length > tableMeta.getFields().size()) {
                    tableError = true;
                    if (!filterTableError) {
                        throw new CanalParseException("column size is not match for table:" + tableMeta.getFullName()
                                                      + "," + columnInfo.length + " vs " + tableMeta.getFields().size());
                    }
                }
                // } else {
                // logger.warn("[" + event.getTable().getDbName() + "." +
                // event.getTable().getTableName()
                // + "] is no primary key , skip alibaba_rds_row_id column");
            }
        }

        for (int i = 0; i < columnCnt; i++) {
            ColumnInfo info = columnInfo[i];
            // mysql 5.6开始支持nolob/mininal类型,并不一定记录所有的列,需要进行判断
            if (!cols.get(i)) {
                continue;
            }

            if (existRDSNoPrimaryKey && i == columnCnt - 1 && info.type == LogEvent.MYSQL_TYPE_LONGLONG) {
                // 不解析最后一列
                String rdsRowIdColumnName = "__#alibaba_rds_row_id#__";
                if (tableMetaCache.isOnPolarX()) {
                    rdsRowIdColumnName = "_drds_implicit_id_";
                }
                buffer.nextValue(rdsRowIdColumnName, i, info.type, info.meta, false);
                Column.Builder columnBuilder = Column.newBuilder();
                columnBuilder.setName(rdsRowIdColumnName);
                columnBuilder.setIsKey(true);
                columnBuilder.setMysqlType("bigint");
                columnBuilder.setIndex(i);
                columnBuilder.setIsNull(false);
                Serializable value = buffer.getValue();
                columnBuilder.setValue(value.toString());
                columnBuilder.setSqlType(Types.BIGINT);
                columnBuilder.setUpdated(false);

                if (needField(fieldList, blackFieldList, columnBuilder.getName())) {
                    if (isAfter) {
                        rowDataBuilder.addAfterColumns(columnBuilder.build());
                    } else {
                        rowDataBuilder.addBeforeColumns(columnBuilder.build());
                    }
                }
                continue;
            }

            FieldMeta fieldMeta = null;
            if (tableMeta != null && !tableError) {
                // 处理file meta
                fieldMeta = tableMeta.getFields().get(i);
            }

            if (fieldMeta != null && existOptionalMetaData && tableMetaCache.isOnTSDB()) {
                // check column info
                boolean check = StringUtils.equalsIgnoreCase(fieldMeta.getColumnName(), info.name);
                check &= (fieldMeta.isUnsigned() == info.unsigned);
                check &= (fieldMeta.isNullable() == info.nullable);

                if (!check) {
                    throw new CanalParseException("MySQL8.0 unmatch column metadata & pls submit issue , table : "
                                                  + tableMeta.getFullName() + ", db fieldMeta : "
                                                  + fieldMeta.toString() + " , binlog fieldMeta : " + info.toString()
                                                  + " , on : " + event.getHeader().getLogFileName() + ":"
                                                  + (event.getHeader().getLogPos() - event.getHeader().getEventLen()));
                }
            }

            Column.Builder columnBuilder = Column.newBuilder();
            if (fieldMeta != null) {
                columnBuilder.setName(fieldMeta.getColumnName());
                columnBuilder.setIsKey(fieldMeta.isKey());
                // 增加mysql type类型,issue 73
                columnBuilder.setMysqlType(fieldMeta.getColumnType());
            } else if (existOptionalMetaData) {
                columnBuilder.setName(info.name);
                columnBuilder.setIsKey(info.pk);
                // mysql8.0里没有mysql type类型
                // columnBuilder.setMysqlType(fieldMeta.getColumnType());
            }
            columnBuilder.setIndex(i);
            columnBuilder.setIsNull(false);

            // fixed issue
            // https://github.com/alibaba/canal/issues/66,特殊处理binary/varbinary,不能做编码处理
            boolean isBinary = false;
            if (fieldMeta != null) {
                if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "VARBINARY")) {
                    isBinary = true;
                } else if (StringUtils.containsIgnoreCase(fieldMeta.getColumnType(), "BINARY")) {
                    isBinary = true;
                }
            }

            buffer.nextValue(columnBuilder.getName(), i, info.type, info.meta, isBinary);
            int javaType = buffer.getJavaType();
            if (buffer.isNull()) {
                columnBuilder.setIsNull(true);
                
                // 处理各种类型
                switch (javaType) {
                    case Types.BINARY:
                    case Types.VARBINARY:
                    case Types.LONGVARBINARY:

                        // https://github.com/alibaba/canal/issues/4652
                        // mysql binlog中blob/text都处理为blob类型,需要反查table
                        // meta,按编码解析text
                        if (fieldMeta != null && isText(fieldMeta.getColumnType())) {
                            javaType = Types.CLOB;
                        } else {
                            javaType = Types.BLOB;
                        }
                        break;
                }
            } else {
                final Serializable value = buffer.getValue();
                // 处理各种类型
                switch (javaType) {
                    case Types.INTEGER:
                    case Types.TINYINT:
                    case Types.SMALLINT:
                    case Types.BIGINT:
                        // 处理unsigned类型
                        Number number = (Number) value;
                        boolean isUnsigned = (fieldMeta != null ? fieldMeta.isUnsigned() : (existOptionalMetaData ? info.unsigned : false));
                        if (isUnsigned && number.longValue() < 0) {
                            switch (buffer.getLength()) {
                                case 1: /* MYSQL_TYPE_TINY */
                                    columnBuilder.setValue(String.valueOf(Integer.valueOf(TINYINT_MAX_VALUE
                                                                                          + number.intValue())));
                                    javaType = Types.SMALLINT; // 往上加一个量级
                                    break;

                                case 2: /* MYSQL_TYPE_SHORT */
                                    columnBuilder.setValue(String.valueOf(Integer.valueOf(SMALLINT_MAX_VALUE
                                                                                          + number.intValue())));
                                    javaType = Types.INTEGER; // 往上加一个量级
                                    break;

                                case 3: /* MYSQL_TYPE_INT24 */
                                    columnBuilder.setValue(String.valueOf(Integer.valueOf(MEDIUMINT_MAX_VALUE
                                                                                          + number.intValue())));
                                    javaType = Types.INTEGER; // 往上加一个量级
                                    break;

                                case 4: /* MYSQL_TYPE_LONG */
                                    columnBuilder.setValue(String.valueOf(Long.valueOf(INTEGER_MAX_VALUE
                                                                                       + number.longValue())));
                                    javaType = Types.BIGINT; // 往上加一个量级
                                    break;

                                case 8: /* MYSQL_TYPE_LONGLONG */
                                    columnBuilder.setValue(BIGINT_MAX_VALUE.add(BigInteger.valueOf(number.longValue()))
                                        .toString());
                                    javaType = Types.DECIMAL; // 往上加一个量级,避免执行出错
                                    break;
                            }
                        } else {
                            // 对象为number类型,直接valueof即可
                            columnBuilder.setValue(String.valueOf(value));
                        }
                        break;
                    case Types.REAL: // float
                    case Types.DOUBLE: // double
                        // 对象为number类型,直接valueof即可
                        columnBuilder.setValue(String.valueOf(value));
                        break;
                    case Types.BIT:// bit
                        // 对象为number类型
                        columnBuilder.setValue(String.valueOf(value));
                        break;
                    case Types.DECIMAL:
                        columnBuilder.setValue(((BigDecimal) value).toPlainString());
                        break;
                    case Types.TIMESTAMP:
                        // 修复时间边界值
                        // String v = value.toString();
                        // v = v.substring(0, v.length() - 2);
                        // columnBuilder.setValue(v);
                        // break;
                    case Types.TIME:
                    case Types.DATE:
                        // 需要处理year
                        columnBuilder.setValue(value.toString());
                        break;
                    case Types.BINARY:
                    case Types.VARBINARY:
                    case Types.LONGVARBINARY:
                        // fixed text encoding
                        // https://github.com/AlibabaTech/canal/issues/18
                        // mysql binlog中blob/text都处理为blob类型,需要反查table
                        // meta,按编码解析text
                        if (fieldMeta != null && isText(fieldMeta.getColumnType())) {
                            columnBuilder.setValue(new String((byte[]) value, charset));
                            javaType = Types.CLOB;
                        } else {
                            // byte数组,直接使用iso-8859-1保留对应编码,浪费内存
                            columnBuilder.setValue(new String((byte[]) value, ISO_8859_1));
                            // columnBuilder.setValueBytes(ByteString.copyFrom((byte[])
                            // value));
                            javaType = Types.BLOB;
                        }
                        break;
                    case Types.CHAR:
                    case Types.VARCHAR:
                        columnBuilder.setValue(value.toString());
                        break;
                    default:
                        columnBuilder.setValue(value.toString());
                }
            }

            columnBuilder.setSqlType(javaType);
            // 设置是否update的标记位
            columnBuilder.setUpdated(isAfter
                                     && isUpdate(rowDataBuilder.getBeforeColumnsList(),
                                         columnBuilder.getIsNull() ? null : columnBuilder.getValue(),
                                         i));
            if (needField(fieldList, blackFieldList, columnBuilder.getName())) {
                if (isAfter) {
                    rowDataBuilder.addAfterColumns(columnBuilder.build());
                } else {
                    rowDataBuilder.addBeforeColumns(columnBuilder.build());
                }
            }
        }

        return tableError;

    }