def _from_java_data_type()

in flink-python/pyflink/table/types.py [0:0]


def _from_java_data_type(j_data_type):
    """
    Converts Java DataType to Python DataType.
    """
    gateway = get_gateway()

    # Atomic Type with parameters.
    if is_instance_of(j_data_type, gateway.jvm.AtomicDataType):
        logical_type = j_data_type.getLogicalType()
        if is_instance_of(logical_type, gateway.jvm.CharType):
            data_type = DataTypes.CHAR(logical_type.getLength(), logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.VarCharType):
            data_type = DataTypes.VARCHAR(logical_type.getLength(), logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.BinaryType):
            data_type = DataTypes.BINARY(logical_type.getLength(), logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.VarBinaryType):
            data_type = DataTypes.VARBINARY(logical_type.getLength(), logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.DecimalType):
            data_type = DataTypes.DECIMAL(logical_type.getPrecision(),
                                          logical_type.getScale(),
                                          logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.DateType):
            data_type = DataTypes.DATE(logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.TimeType):
            data_type = DataTypes.TIME(logical_type.getPrecision(), logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.TimestampType):
            data_type = DataTypes.TIMESTAMP(precision=3, nullable=logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.BooleanType):
            data_type = DataTypes.BOOLEAN(logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.TinyIntType):
            data_type = DataTypes.TINYINT(logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.SmallIntType):
            data_type = DataTypes.SMALLINT(logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.IntType):
            data_type = DataTypes.INT(logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.BigIntType):
            data_type = DataTypes.BIGINT(logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.FloatType):
            data_type = DataTypes.FLOAT(logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.DoubleType):
            data_type = DataTypes.DOUBLE(logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.ZonedTimestampType):
            raise \
                TypeError("Unsupported type: %s, ZonedTimestampType is not supported yet."
                          % j_data_type)
        elif is_instance_of(logical_type, gateway.jvm.LocalZonedTimestampType):
            data_type = DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(nullable=logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.DayTimeIntervalType) or \
                is_instance_of(logical_type, gateway.jvm.YearMonthIntervalType):
            data_type = _from_java_interval_type(logical_type)
        elif is_instance_of(logical_type, gateway.jvm.LegacyTypeInformationType):
            type_info = logical_type.getTypeInformation()
            BasicArrayTypeInfo = gateway.jvm.org.apache.flink.api.common.typeinfo.\
                BasicArrayTypeInfo
            BasicTypeInfo = gateway.jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo
            if type_info == BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO:
                data_type = DataTypes.ARRAY(DataTypes.STRING())
            elif type_info == BasicTypeInfo.BIG_DEC_TYPE_INFO:
                data_type = DataTypes.DECIMAL(38, 18)
            elif type_info.getClass() == \
                get_java_class(gateway.jvm.org.apache.flink.table.runtime.typeutils
                               .BigDecimalTypeInfo):
                data_type = DataTypes.DECIMAL(type_info.precision(), type_info.scale())
            elif type_info.getClass() == \
                    get_java_class(gateway.jvm.org.apache.flink.table.dataview.ListViewTypeInfo):
                data_type = DataTypes.LIST_VIEW(_from_java_data_type(type_info.getElementType()))
            elif type_info.getClass() == \
                    get_java_class(gateway.jvm.org.apache.flink.table.dataview.MapViewTypeInfo):
                data_type = DataTypes.MAP_VIEW(
                    _from_java_data_type(type_info.getKeyType()),
                    _from_java_data_type(type_info.getValueType()))
            else:
                raise TypeError("Unsupported type: %s, it is recognized as a legacy type."
                                % type_info)
        elif is_instance_of(logical_type, gateway.jvm.RawType):
            data_type = RawType()
        else:
            raise TypeError("Unsupported type: %s, it is not supported yet in current python type"
                            " system" % j_data_type)

        return data_type

    # Array Type, MultiSet Type.
    elif is_instance_of(j_data_type, gateway.jvm.CollectionDataType):
        logical_type = j_data_type.getLogicalType()
        element_type = j_data_type.getElementDataType()
        if is_instance_of(logical_type, gateway.jvm.ArrayType):
            data_type = DataTypes.ARRAY(_from_java_data_type(element_type),
                                        logical_type.isNullable())
        elif is_instance_of(logical_type, gateway.jvm.MultisetType):
            data_type = DataTypes.MULTISET(_from_java_data_type(element_type),
                                           logical_type.isNullable())
        else:
            raise TypeError("Unsupported collection data type: %s" % j_data_type)

        return data_type

    # Map Type.
    elif is_instance_of(j_data_type, gateway.jvm.KeyValueDataType):
        logical_type = j_data_type.getLogicalType()
        key_type = j_data_type.getKeyDataType()
        value_type = j_data_type.getValueDataType()
        if is_instance_of(logical_type, gateway.jvm.MapType):
            data_type = DataTypes.MAP(
                _from_java_data_type(key_type),
                _from_java_data_type(value_type),
                logical_type.isNullable())
        else:
            raise TypeError("Unsupported map data type: %s" % j_data_type)

        return data_type

    # Row Type.
    elif is_instance_of(j_data_type, gateway.jvm.FieldsDataType):
        logical_type = j_data_type.getLogicalType()
        field_data_types = j_data_type.getChildren()
        if is_instance_of(logical_type, gateway.jvm.RowType):
            fields = [DataTypes.FIELD(name, _from_java_data_type(field_data_types[idx]))
                      for idx, name in enumerate(logical_type.getFieldNames())]
            data_type = DataTypes.ROW(fields, logical_type.isNullable())
        elif j_data_type.getConversionClass().isAssignableFrom(
                gateway.jvm.org.apache.flink.table.api.dataview.ListView._java_lang_class):
            array_type = _from_java_data_type(field_data_types[0])
            data_type = DataTypes.LIST_VIEW(array_type.element_type)
        elif j_data_type.getConversionClass().isAssignableFrom(
                gateway.jvm.org.apache.flink.table.api.dataview.MapView._java_lang_class):
            map_type = _from_java_data_type(field_data_types[0])
            data_type = DataTypes.MAP_VIEW(map_type.key_type, map_type.value_type)
        else:
            raise TypeError("Unsupported row data type: %s" % j_data_type)

        return data_type

    # Unrecognized type.
    else:
        TypeError("Unsupported data type: %s" % j_data_type)