public static InternalRow reconvert()

in seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java [357:497]


    public static InternalRow reconvert(
            SeaTunnelRow seaTunnelRow,
            SeaTunnelRowType seaTunnelRowType,
            TableSchema sinkTableSchema) {
        List<DataField> sinkTotalFields = sinkTableSchema.fields();
        int sourceTotalFields = seaTunnelRowType.getTotalFields();
        if (sourceTotalFields != sinkTotalFields.size()) {
            throw CommonError.writeRowErrorWithFiledsCountNotMatch(
                    "Paimon", sourceTotalFields, sinkTotalFields.size());
        }
        BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
        BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
        // Convert SeaTunnel RowKind to Paimon RowKind
        org.apache.paimon.types.RowKind rowKind =
                RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind());
        if (rowKind == null) {
            throw CommonError.unsupportedRowKind(
                    PaimonBaseOptions.CONNECTOR_IDENTITY,
                    seaTunnelRow.getRowKind().shortString(),
                    seaTunnelRow.getTableId());
        }
        binaryRow.setRowKind(rowKind);
        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
        for (int i = 0; i < fieldTypes.length; i++) {
            // judge the field is or not equals null
            if (seaTunnelRow.getField(i) == null) {
                binaryWriter.setNullAt(i);
                continue;
            }
            checkCanWriteWithSchema(i, seaTunnelRowType, sinkTotalFields);
            String fieldName = seaTunnelRowType.getFieldName(i);
            switch (fieldTypes[i].getSqlType()) {
                case TINYINT:
                    binaryWriter.writeByte(i, (Byte) seaTunnelRow.getField(i));
                    break;
                case SMALLINT:
                    binaryWriter.writeShort(i, (Short) seaTunnelRow.getField(i));
                    break;
                case INT:
                    binaryWriter.writeInt(i, (Integer) seaTunnelRow.getField(i));
                    break;
                case BIGINT:
                    binaryWriter.writeLong(i, (Long) seaTunnelRow.getField(i));
                    break;
                case FLOAT:
                    binaryWriter.writeFloat(i, (Float) seaTunnelRow.getField(i));
                    break;
                case DOUBLE:
                    binaryWriter.writeDouble(i, (Double) seaTunnelRow.getField(i));
                    break;
                case DECIMAL:
                    DataField decimalDataField =
                            SchemaUtil.getDataField(sinkTotalFields, fieldName);
                    org.apache.paimon.types.DecimalType decimalType =
                            (org.apache.paimon.types.DecimalType) decimalDataField.type();
                    binaryWriter.writeDecimal(
                            i,
                            Decimal.fromBigDecimal(
                                    (BigDecimal) seaTunnelRow.getField(i),
                                    decimalType.getPrecision(),
                                    decimalType.getScale()),
                            decimalType.getPrecision());
                    break;
                case STRING:
                    binaryWriter.writeString(
                            i, BinaryString.fromString((String) seaTunnelRow.getField(i)));
                    break;
                case BYTES:
                    binaryWriter.writeBinary(i, (byte[]) seaTunnelRow.getField(i));
                    break;
                case BOOLEAN:
                    binaryWriter.writeBoolean(i, (Boolean) seaTunnelRow.getField(i));
                    break;
                case DATE:
                    LocalDate date = (LocalDate) seaTunnelRow.getField(i);
                    BinaryWriter.createValueSetter(DataTypes.DATE())
                            .setValue(binaryWriter, i, DateTimeUtils.toInternal(date));
                    break;
                case TIMESTAMP:
                    DataField dataField = SchemaUtil.getDataField(sinkTotalFields, fieldName);
                    int precision = ((TimestampType) dataField.type()).getPrecision();
                    LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i);
                    binaryWriter.writeTimestamp(
                            i, Timestamp.fromLocalDateTime(datetime), precision);
                    break;
                case TIME:
                    LocalTime time = (LocalTime) seaTunnelRow.getField(i);
                    BinaryWriter.createValueSetter(DataTypes.TIME())
                            .setValue(binaryWriter, i, DateTimeUtils.toInternal(time));
                    break;
                case MAP:
                    MapType<?, ?> mapType = (MapType<?, ?>) seaTunnelRowType.getFieldType(i);
                    SeaTunnelDataType<?> keyType = mapType.getKeyType();
                    SeaTunnelDataType<?> valueType = mapType.getValueType();
                    DataType paimonKeyType = RowTypeConverter.reconvert(fieldName, keyType);
                    DataType paimonValueType = RowTypeConverter.reconvert(fieldName, valueType);
                    Map<?, ?> field = (Map<?, ?>) seaTunnelRow.getField(i);
                    Object[] keys = field.keySet().toArray(new Object[0]);
                    Object[] values = field.values().toArray(new Object[0]);
                    binaryWriter.writeMap(
                            i,
                            BinaryMap.valueOf(
                                    reconvert(fieldName, keys, keyType),
                                    reconvert(fieldName, values, valueType)),
                            new InternalMapSerializer(paimonKeyType, paimonValueType));
                    break;
                case ARRAY:
                    ArrayType<?, ?> arrayType = (ArrayType<?, ?>) seaTunnelRowType.getFieldType(i);
                    BinaryArray paimonArray =
                            reconvert(
                                    fieldName,
                                    seaTunnelRow.getField(i),
                                    arrayType.getElementType());
                    binaryWriter.writeArray(
                            i,
                            paimonArray,
                            new InternalArraySerializer(
                                    RowTypeConverter.reconvert(
                                            fieldName, arrayType.getElementType())));
                    break;
                case ROW:
                    SeaTunnelDataType<?> rowType = seaTunnelRowType.getFieldType(i);
                    Object row = seaTunnelRow.getField(i);
                    InternalRow paimonRow =
                            reconvert(
                                    (SeaTunnelRow) row,
                                    (SeaTunnelRowType) rowType,
                                    sinkTableSchema);
                    RowType paimonRowType =
                            RowTypeConverter.reconvert((SeaTunnelRowType) rowType, sinkTableSchema);
                    binaryWriter.writeRow(i, paimonRow, new InternalRowSerializer(paimonRowType));
                    break;
                default:
                    throw CommonError.unsupportedDataType(
                            PaimonBaseOptions.CONNECTOR_IDENTITY,
                            seaTunnelRowType.getFieldType(i).getSqlType().toString(),
                            fieldName);
            }
        }
        return binaryRow;
    }