def _to_java_data_type()

in flink-python/pyflink/table/types.py [0:0]


def _to_java_data_type(data_type: DataType):
    """
    Converts the specified Python DataType to Java DataType.
    """
    gateway = get_gateway()
    JDataTypes = gateway.jvm.org.apache.flink.table.api.DataTypes

    if isinstance(data_type, BooleanType):
        j_data_type = JDataTypes.BOOLEAN()
    elif isinstance(data_type, TinyIntType):
        j_data_type = JDataTypes.TINYINT()
    elif isinstance(data_type, SmallIntType):
        j_data_type = JDataTypes.SMALLINT()
    elif isinstance(data_type, IntType):
        j_data_type = JDataTypes.INT()
    elif isinstance(data_type, BigIntType):
        j_data_type = JDataTypes.BIGINT()
    elif isinstance(data_type, FloatType):
        j_data_type = JDataTypes.FLOAT()
    elif isinstance(data_type, DoubleType):
        j_data_type = JDataTypes.DOUBLE()
    elif isinstance(data_type, VarCharType):
        j_data_type = JDataTypes.VARCHAR(data_type.length)
    elif isinstance(data_type, CharType):
        j_data_type = JDataTypes.CHAR(data_type.length)
    elif isinstance(data_type, VarBinaryType):
        j_data_type = JDataTypes.VARBINARY(data_type.length)
    elif isinstance(data_type, BinaryType):
        j_data_type = JDataTypes.BINARY(data_type.length)
    elif isinstance(data_type, DecimalType):
        j_data_type = JDataTypes.DECIMAL(data_type.precision, data_type.scale)
    elif isinstance(data_type, DateType):
        j_data_type = JDataTypes.DATE()
    elif isinstance(data_type, TimeType):
        j_data_type = JDataTypes.TIME(data_type.precision)
    elif isinstance(data_type, TimestampType):
        j_data_type = JDataTypes.TIMESTAMP(data_type.precision)
    elif isinstance(data_type, LocalZonedTimestampType):
        j_data_type = JDataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(data_type.precision)
    elif isinstance(data_type, ZonedTimestampType):
        j_data_type = JDataTypes.TIMESTAMP_WITH_TIME_ZONE(data_type.precision)
    elif isinstance(data_type, ArrayType):
        j_data_type = JDataTypes.ARRAY(_to_java_data_type(data_type.element_type))
    elif isinstance(data_type, MapType):
        j_data_type = JDataTypes.MAP(
            _to_java_data_type(data_type.key_type),
            _to_java_data_type(data_type.value_type))
    elif isinstance(data_type, RowType):
        fields = [JDataTypes.FIELD(f.name, _to_java_data_type(f.data_type))
                  for f in data_type.fields]
        j_data_type = JDataTypes.ROW(to_jarray(JDataTypes.Field, fields))
    elif isinstance(data_type, UserDefinedType):
        if data_type.java_udt():
            return gateway.jvm.org.apache.flink.util.InstantiationUtil.instantiate(
                gateway.jvm.Class.forName(
                    data_type.java_udt(),
                    True,
                    gateway.jvm.Thread.currentThread().getContextClassLoader()))
        else:
            return _to_java_data_type(data_type.sql_type())
    elif isinstance(data_type, MultisetType):
        j_data_type = JDataTypes.MULTISET(_to_java_data_type(data_type.element_type))
    elif isinstance(data_type, NullType):
        j_data_type = JDataTypes.NULL()
    elif isinstance(data_type, YearMonthIntervalType):
        if data_type.resolution == YearMonthIntervalType.YearMonthResolution.YEAR:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.YEAR(data_type.precision))
        elif data_type.resolution == YearMonthIntervalType.YearMonthResolution.MONTH:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.MONTH())
        else:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.YEAR(data_type.precision),
                                              JDataTypes.MONTH())
    elif isinstance(data_type, DayTimeIntervalType):
        if data_type.resolution == DayTimeIntervalType.DayTimeResolution.DAY:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.DAY(data_type.day_precision))
        elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.DAY(data_type.day_precision),
                                              JDataTypes.HOUR())
        elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.DAY(data_type.day_precision),
                                              JDataTypes.MINUTE())
        elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.DAY(data_type.day_precision),
                                              JDataTypes.SECOND(data_type.fractional_precision))
        elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.HOUR:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.HOUR())
        elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.HOUR(), JDataTypes.MINUTE())
        elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.HOUR(),
                                              JDataTypes.SECOND(data_type.fractional_precision))
        elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.MINUTE:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.MINUTE())
        elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.MINUTE(),
                                              JDataTypes.SECOND(data_type.fractional_precision))
        else:
            j_data_type = JDataTypes.INTERVAL(JDataTypes.SECOND(data_type.fractional_precision))
    elif isinstance(data_type, ListViewType):
        return gateway.jvm.org.apache.flink.table.api.dataview.ListView.newListViewDataType(
            _to_java_data_type(data_type._element_type))
    elif isinstance(data_type, MapViewType):
        return gateway.jvm.org.apache.flink.table.api.dataview.MapView.newMapViewDataType(
            _to_java_data_type(data_type._key_type), _to_java_data_type(data_type._value_type))
    else:
        raise TypeError("Unsupported data type: %s" % data_type)

    if data_type._nullable:
        j_data_type = j_data_type.nullable()
    else:
        j_data_type = j_data_type.notNull()

    if data_type._conversion_cls:
        j_data_type = j_data_type.bridgedTo(
            gateway.jvm.org.apache.flink.api.python.shaded.py4j.reflection.ReflectionUtil
            .classForName(data_type._conversion_cls)
        )

    return j_data_type