private boolean doConvert()

in flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java [173:338]


    private boolean doConvert(int col,
                              int rowIndex,
                              Types.MinorType minorType,
                              String currentType,
                              FieldVector fieldVector) throws DorisException {
        switch (currentType) {
            case "NULL_TYPE":
                break;
            case "BOOLEAN":
                if (!minorType.equals(Types.MinorType.BIT)) return false;
                BitVector bitVector = (BitVector) fieldVector;
                Object fieldValue = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
                addValueToRow(rowIndex, fieldValue);
                break;
            case "TINYINT":
                if (!minorType.equals(Types.MinorType.TINYINT)) return false;
                TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
                fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "SMALLINT":
                if (!minorType.equals(Types.MinorType.SMALLINT)) return false;
                SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
                fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "INT":
                if (!minorType.equals(Types.MinorType.INT)) return false;
                IntVector intVector = (IntVector) fieldVector;
                fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "BIGINT":
                if (!minorType.equals(Types.MinorType.BIGINT)) return false;
                BigIntVector bigIntVector = (BigIntVector) fieldVector;
                fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "FLOAT":
                if (!minorType.equals(Types.MinorType.FLOAT4)) return false;
                Float4Vector float4Vector = (Float4Vector) fieldVector;
                fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "TIME":
            case "DOUBLE":
                if (!minorType.equals(Types.MinorType.FLOAT8)) return false;
                Float8Vector float8Vector = (Float8Vector) fieldVector;
                fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "BINARY":
                if (!minorType.equals(Types.MinorType.VARBINARY)) return false;

                VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector;
                fieldValue = varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
                addValueToRow(rowIndex, fieldValue);
                break;
            case "DECIMAL":
            case "DECIMALV2":
            case "DECIMAL32":
            case "DECIMAL64":
            case "DECIMAL128I":
                if (!minorType.equals(Types.MinorType.DECIMAL)) return false;
                DecimalVector decimalVector = (DecimalVector) fieldVector;
                if (decimalVector.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }
                BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
                addValueToRow(rowIndex, value);
                break;
            case "DATE":
            case "DATEV2":
                if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
                VarCharVector date = (VarCharVector) fieldVector;
                if (date.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }
                String stringValue = new String(date.get(rowIndex));
                LocalDate localDate = LocalDate.parse(stringValue, dateFormatter);
                addValueToRow(rowIndex, localDate);
                break;
            case "DATETIME":
                if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
                VarCharVector timeStampSecVector = (VarCharVector) fieldVector;
                if (timeStampSecVector.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }
                stringValue = new String(timeStampSecVector.get(rowIndex));
                LocalDateTime parse = LocalDateTime.parse(stringValue, dateTimeFormatter);
                addValueToRow(rowIndex, parse);
                break;
            case "DATETIMEV2":
                if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
                VarCharVector timeStampV2SecVector = (VarCharVector) fieldVector;
                if (timeStampV2SecVector.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }
                stringValue = new String(timeStampV2SecVector.get(rowIndex));
                stringValue = completeMilliseconds(stringValue);
                parse = LocalDateTime.parse(stringValue, dateTimeV2Formatter);
                addValueToRow(rowIndex, parse);
                break;
            case "LARGEINT":
                if (!minorType.equals(Types.MinorType.FIXEDSIZEBINARY) &&
                        !minorType.equals(Types.MinorType.VARCHAR)) return false;
                if (minorType.equals(Types.MinorType.FIXEDSIZEBINARY)) {
                    FixedSizeBinaryVector largeIntVector = (FixedSizeBinaryVector) fieldVector;
                    if (largeIntVector.isNull(rowIndex)) {
                        addValueToRow(rowIndex, null);
                        break;
                    }
                    byte[] bytes = largeIntVector.get(rowIndex);
                    int left = 0, right = bytes.length - 1;
                    while (left < right) {
                        byte temp = bytes[left];
                        bytes[left] = bytes[right];
                        bytes[right] = temp;
                        left++;
                        right--;
                    }
                    BigInteger largeInt = new BigInteger(bytes);
                    addValueToRow(rowIndex, largeInt);
                    break;
                } else {
                    VarCharVector largeIntVector = (VarCharVector) fieldVector;
                    if (largeIntVector.isNull(rowIndex)) {
                        addValueToRow(rowIndex, null);
                        break;
                    }
                    stringValue = new String(largeIntVector.get(rowIndex));
                    BigInteger largeInt = new BigInteger(stringValue);
                    addValueToRow(rowIndex, largeInt);
                    break;
                }
            case "CHAR":
            case "VARCHAR":
            case "STRING":
            case "JSONB":
                if (!minorType.equals(Types.MinorType.VARCHAR)) return false;
                VarCharVector varCharVector = (VarCharVector) fieldVector;
                if (varCharVector.isNull(rowIndex)) {
                    addValueToRow(rowIndex, null);
                    break;
                }
                stringValue = new String(varCharVector.get(rowIndex));
                addValueToRow(rowIndex, stringValue);
                break;
            case "ARRAY":
                if (!minorType.equals(Types.MinorType.LIST)) return false;
                ListVector listVector = (ListVector) fieldVector;
                Object listValue = listVector.isNull(rowIndex) ? null : listVector.getObject(rowIndex);
                //todo: when the subtype of array is date, conversion is required
                addValueToRow(rowIndex, listValue);
                break;
            default:
                String errMsg = "Unsupported type " + schema.get(col).getType();
                logger.error(errMsg);
                throw new DorisException(errMsg);
        }
        return true;
    }