public boolean doConvert()

in flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java [241:557]


    public boolean doConvert(
            int col, int rowIndex, MinorType minorType, String currentType, FieldVector fieldVector)
            throws DorisException {
        switch (currentType) {
            case "NULL_TYPE":
                break;
            case "BOOLEAN":
                if (!minorType.equals(MinorType.BIT)) {
                    return false;
                }
                BitVector bitVector = (BitVector) fieldVector;
                Object fieldValue =
                        bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
                addValueToRow(rowIndex, fieldValue);
                break;
            case "TINYINT":
                if (!minorType.equals(MinorType.TINYINT)) {
                    return false;
                }
                TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
                fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "SMALLINT":
                if (!minorType.equals(MinorType.SMALLINT)) {
                    return false;
                }
                SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
                fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "INT":
                if (!minorType.equals(MinorType.INT)) {
                    return false;
                }
                IntVector intVector = (IntVector) fieldVector;
                fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "IPV4":
                if (!minorType.equals(MinorType.UINT4)
                        && !minorType.equals(MinorType.INT)
                        && !minorType.equals(MinorType.VARCHAR)) {
                    return false;
                }

                if (fieldVector.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }

                if (minorType.equals(MinorType.VARCHAR)) {
                    VarCharVector ipv4VarcharVector = (VarCharVector) fieldVector;
                    String ipv4Str =
                            new String(ipv4VarcharVector.get(rowIndex), StandardCharsets.UTF_8);
                    addValueToRow(rowIndex, ipv4Str);
                } else {
                    BaseIntVector ipv4Vector;
                    if (minorType.equals(MinorType.INT)) {
                        ipv4Vector = (IntVector) fieldVector;

                    } else {
                        ipv4Vector = (UInt4Vector) fieldVector;
                    }
                    fieldValue =
                            ipv4Vector.isNull(rowIndex)
                                    ? null
                                    : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
                    addValueToRow(rowIndex, fieldValue);
                }
                break;
            case "BIGINT":
                if (!minorType.equals(MinorType.BIGINT)) {
                    return false;
                }
                BigIntVector bigIntVector = (BigIntVector) fieldVector;
                fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "FLOAT":
                if (!minorType.equals(MinorType.FLOAT4)) {
                    return false;
                }
                Float4Vector float4Vector = (Float4Vector) fieldVector;
                fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "TIME":
            case "DOUBLE":
                if (!minorType.equals(MinorType.FLOAT8)) {
                    return false;
                }
                Float8Vector float8Vector = (Float8Vector) fieldVector;
                fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "BINARY":
                if (!minorType.equals(MinorType.VARBINARY)) {
                    return false;
                }
                VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector;
                fieldValue =
                        varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "DECIMAL":
            case "DECIMALV2":
            case "DECIMAL32":
            case "DECIMAL64":
            case "DECIMAL128I":
            case "DECIMAL128":
                if (!minorType.equals(MinorType.DECIMAL)) {
                    return false;
                }
                DecimalVector decimalVector = (DecimalVector) fieldVector;
                if (decimalVector.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }
                BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
                addValueToRow(rowIndex, value);
                break;
            case "DATE":
            case "DATEV2":
                if (!minorType.equals(MinorType.DATEDAY) && !minorType.equals(MinorType.VARCHAR)) {
                    return false;
                }
                if (minorType.equals(MinorType.VARCHAR)) {
                    VarCharVector date = (VarCharVector) fieldVector;
                    if (date.isNull(rowIndex)) {
                        addValueToRow(rowIndex, null);
                        break;
                    }
                    String stringValue = new String(date.get(rowIndex), StandardCharsets.UTF_8);
                    LocalDate localDate = FastDateUtil.fastParseDate(stringValue, DATE_PATTERN);
                    addValueToRow(rowIndex, localDate);
                } else {
                    DateDayVector date = (DateDayVector) fieldVector;
                    if (date.isNull(rowIndex)) {
                        addValueToRow(rowIndex, null);
                        break;
                    }
                    LocalDate localDate = LocalDate.ofEpochDay(date.get(rowIndex));
                    addValueToRow(rowIndex, localDate);
                }
                break;
            case "DATETIME":
                if (minorType.equals(MinorType.VARCHAR)) {
                    VarCharVector varCharVector = (VarCharVector) fieldVector;
                    if (varCharVector.isNull(rowIndex)) {
                        addValueToRow(rowIndex, null);
                        break;
                    }
                    String stringValue =
                            new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
                    stringValue = completeMilliseconds(stringValue);
                    LocalDateTime parse =
                            FastDateUtil.fastParseDateTime(stringValue, DATETIME_PATTERN);
                    addValueToRow(rowIndex, parse);
                } else if (fieldVector instanceof TimeStampVector) {
                    LocalDateTime dateTime = getDateTime(rowIndex, fieldVector);
                    addValueToRow(rowIndex, dateTime);
                } else {
                    logger.error(
                            "Unsupported type for DATETIME, minorType {}, class is {}",
                            minorType.name(),
                            fieldVector == null ? null : fieldVector.getClass());
                    return false;
                }
                break;
            case "DATETIMEV2":
                if (minorType.equals(MinorType.VARCHAR)) {
                    VarCharVector varCharVector = (VarCharVector) fieldVector;
                    if (varCharVector.isNull(rowIndex)) {
                        addValueToRow(rowIndex, null);
                        break;
                    }
                    String stringValue =
                            new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
                    stringValue = completeMilliseconds(stringValue);
                    LocalDateTime parse =
                            FastDateUtil.fastParseDateTimeV2(stringValue, DATETIMEV2_PATTERN);
                    addValueToRow(rowIndex, parse);
                } else if (fieldVector instanceof TimeStampVector) {
                    LocalDateTime dateTime = getDateTime(rowIndex, fieldVector);
                    addValueToRow(rowIndex, dateTime);
                } else {
                    logger.error(
                            "Unsupported type for DATETIMEV2, minorType {}, class is {}",
                            minorType.name(),
                            fieldVector == null ? null : fieldVector.getClass());
                    return false;
                }
                break;
            case "LARGEINT":
                if (!minorType.equals(MinorType.FIXEDSIZEBINARY)
                        && !minorType.equals(MinorType.VARCHAR)) {
                    return false;
                }
                if (minorType.equals(MinorType.FIXEDSIZEBINARY)) {
                    FixedSizeBinaryVector largeIntVector = (FixedSizeBinaryVector) fieldVector;
                    if (largeIntVector.isNull(rowIndex)) {
                        addValueToRow(rowIndex, null);
                        break;
                    }
                    byte[] bytes = largeIntVector.get(rowIndex);
                    int left = 0, right = bytes.length - 1;
                    while (left < right) {
                        byte temp = bytes[left];
                        bytes[left] = bytes[right];
                        bytes[right] = temp;
                        left++;
                        right--;
                    }
                    BigInteger largeInt = new BigInteger(bytes);
                    addValueToRow(rowIndex, largeInt);
                    break;
                } else {
                    VarCharVector largeIntVector = (VarCharVector) fieldVector;
                    if (largeIntVector.isNull(rowIndex)) {
                        addValueToRow(rowIndex, null);
                        break;
                    }
                    String stringValue =
                            new String(largeIntVector.get(rowIndex), StandardCharsets.UTF_8);
                    BigInteger largeInt = new BigInteger(stringValue);
                    addValueToRow(rowIndex, largeInt);
                    break;
                }
            case "CHAR":
            case "VARCHAR":
            case "STRING":
            case "JSONB":
            case "JSON":
            case "VARIANT":
                if (!minorType.equals(MinorType.VARCHAR)) {
                    return false;
                }
                VarCharVector varCharVector = (VarCharVector) fieldVector;
                if (varCharVector.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }
                String stringValue =
                        new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
                addValueToRow(rowIndex, stringValue);
                break;
            case "IPV6":
                if (!minorType.equals(MinorType.VARCHAR)) {
                    return false;
                }

                if (fieldVector.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }

                VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector;
                String ipv6Str =
                        new String(ipv6VarcharVector.get(rowIndex), StandardCharsets.UTF_8);
                if (ipv6Str.contains(":")) {
                    addValueToRow(rowIndex, ipv6Str);
                } else {
                    String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str));
                    addValueToRow(rowIndex, ipv6Address);
                }
                break;
            case "ARRAY":
                if (!minorType.equals(MinorType.LIST)) {
                    return false;
                }
                ListVector listVector = (ListVector) fieldVector;
                Object listValue =
                        listVector.isNull(rowIndex) ? null : listVector.getObject(rowIndex);
                // todo: when the subtype of array is date, conversion is required
                addValueToRow(rowIndex, listValue);
                break;
            case "MAP":
                if (!minorType.equals(MinorType.MAP)) {
                    return false;
                }
                MapVector mapVector = (MapVector) fieldVector;
                UnionMapReader reader = mapVector.getReader();
                if (mapVector.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }
                reader.setPosition(rowIndex);
                Map<String, Object> mapValue = new HashMap<>();
                while (reader.next()) {
                    FieldReader keyReader = reader.key();
                    FieldReader valueReader = reader.value();
                    Object mapKeyObj = handleMapFieldReader(keyReader);
                    Object mapValueObj = handleMapFieldReader(valueReader);
                    mapValue.put(mapKeyObj.toString(), mapValueObj);
                }
                addValueToRow(rowIndex, mapValue);
                break;
            case "STRUCT":
                if (!minorType.equals(MinorType.STRUCT)) {
                    return false;
                }
                StructVector structVector = (StructVector) fieldVector;
                if (structVector.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }
                Map<String, ?> structValue = structVector.getObject(rowIndex);
                addValueToRow(rowIndex, structValue);
                break;
            default:
                String errMsg = "Unsupported type " + schema.get(col).getType();
                logger.error(errMsg);
                throw new DorisException(errMsg);
        }
        return true;
    }