src/main/java/org/apache/flink/connector/rocketmq/legacy/common/serialization/RowKeyValueDeserializationSchema.java [177:220]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            fieldValue = line;
        } else {
            if (index < data.length) {
                fieldValue = data[index];
            }
        }
        return fieldValue;
    }

    private boolean isByteArrayType(String fieldName) {
        TypeInformation<?> typeInformation =
                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
        if (typeInformation != null) {
            ByteSerializer.ValueType valueType =
                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
            return valueType == ByteSerializer.ValueType.V_ByteArray;
        }
        return false;
    }

    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
        boolean skip = false;
        switch (formatErrorStrategy) {
            case SKIP:
                long now = System.currentTimeMillis();
                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
                    LOGGER.warn(
                            "Data format error, field type: "
                                    + fieldTypes[index]
                                    + "field data: "
                                    + data[index]
                                    + ", index: "
                                    + index
                                    + ", data: ["
                                    + StringUtils.join(data, ",")
                                    + "]",
                            e);
                    lastLogExceptionTime = now;
                }
                skip = true;
                break;
            case SKIP_SILENT:
                skip = true;
                break;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/main/java/org/apache/flink/connector/rocketmq/source/reader/deserializer/RowDeserializationSchema.java [258:303]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                fieldValue = line;
            } else {
                if (index < data.length) {
                    fieldValue = data[index];
                }
            }
        }

        return fieldValue;
    }

    private boolean isByteArrayType(String fieldName) {
        TypeInformation<?> typeInformation =
                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
        if (typeInformation != null) {
            ByteSerializer.ValueType valueType =
                    ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
            return valueType == ByteSerializer.ValueType.V_ByteArray;
        }
        return false;
    }

    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
        boolean skip = false;
        switch (formatErrorStrategy) {
            case SKIP:
                long now = System.currentTimeMillis();
                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
                    LOGGER.warn(
                            "Data format error, field type: "
                                    + fieldTypes[index]
                                    + "field data: "
                                    + data[index]
                                    + ", index: "
                                    + index
                                    + ", data: ["
                                    + StringUtils.join(data, ",")
                                    + "]",
                            e);
                    lastLogExceptionTime = now;
                }
                skip = true;
                break;
            case SKIP_SILENT:
                skip = true;
                break;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



