in flink-python/pyflink/table/types.py [0:0]
def _to_java_data_type(data_type: DataType):
"""
Converts the specified Python DataType to Java DataType.
"""
gateway = get_gateway()
JDataTypes = gateway.jvm.org.apache.flink.table.api.DataTypes
if isinstance(data_type, BooleanType):
j_data_type = JDataTypes.BOOLEAN()
elif isinstance(data_type, TinyIntType):
j_data_type = JDataTypes.TINYINT()
elif isinstance(data_type, SmallIntType):
j_data_type = JDataTypes.SMALLINT()
elif isinstance(data_type, IntType):
j_data_type = JDataTypes.INT()
elif isinstance(data_type, BigIntType):
j_data_type = JDataTypes.BIGINT()
elif isinstance(data_type, FloatType):
j_data_type = JDataTypes.FLOAT()
elif isinstance(data_type, DoubleType):
j_data_type = JDataTypes.DOUBLE()
elif isinstance(data_type, VarCharType):
j_data_type = JDataTypes.VARCHAR(data_type.length)
elif isinstance(data_type, CharType):
j_data_type = JDataTypes.CHAR(data_type.length)
elif isinstance(data_type, VarBinaryType):
j_data_type = JDataTypes.VARBINARY(data_type.length)
elif isinstance(data_type, BinaryType):
j_data_type = JDataTypes.BINARY(data_type.length)
elif isinstance(data_type, DecimalType):
j_data_type = JDataTypes.DECIMAL(data_type.precision, data_type.scale)
elif isinstance(data_type, DateType):
j_data_type = JDataTypes.DATE()
elif isinstance(data_type, TimeType):
j_data_type = JDataTypes.TIME(data_type.precision)
elif isinstance(data_type, TimestampType):
j_data_type = JDataTypes.TIMESTAMP(data_type.precision)
elif isinstance(data_type, LocalZonedTimestampType):
j_data_type = JDataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(data_type.precision)
elif isinstance(data_type, ZonedTimestampType):
j_data_type = JDataTypes.TIMESTAMP_WITH_TIME_ZONE(data_type.precision)
elif isinstance(data_type, ArrayType):
j_data_type = JDataTypes.ARRAY(_to_java_data_type(data_type.element_type))
elif isinstance(data_type, MapType):
j_data_type = JDataTypes.MAP(
_to_java_data_type(data_type.key_type),
_to_java_data_type(data_type.value_type))
elif isinstance(data_type, RowType):
fields = [JDataTypes.FIELD(f.name, _to_java_data_type(f.data_type))
for f in data_type.fields]
j_data_type = JDataTypes.ROW(to_jarray(JDataTypes.Field, fields))
elif isinstance(data_type, UserDefinedType):
if data_type.java_udt():
return gateway.jvm.org.apache.flink.util.InstantiationUtil.instantiate(
gateway.jvm.Class.forName(
data_type.java_udt(),
True,
gateway.jvm.Thread.currentThread().getContextClassLoader()))
else:
return _to_java_data_type(data_type.sql_type())
elif isinstance(data_type, MultisetType):
j_data_type = JDataTypes.MULTISET(_to_java_data_type(data_type.element_type))
elif isinstance(data_type, NullType):
j_data_type = JDataTypes.NULL()
elif isinstance(data_type, YearMonthIntervalType):
if data_type.resolution == YearMonthIntervalType.YearMonthResolution.YEAR:
j_data_type = JDataTypes.INTERVAL(JDataTypes.YEAR(data_type.precision))
elif data_type.resolution == YearMonthIntervalType.YearMonthResolution.MONTH:
j_data_type = JDataTypes.INTERVAL(JDataTypes.MONTH())
else:
j_data_type = JDataTypes.INTERVAL(JDataTypes.YEAR(data_type.precision),
JDataTypes.MONTH())
elif isinstance(data_type, DayTimeIntervalType):
if data_type.resolution == DayTimeIntervalType.DayTimeResolution.DAY:
j_data_type = JDataTypes.INTERVAL(JDataTypes.DAY(data_type.day_precision))
elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR:
j_data_type = JDataTypes.INTERVAL(JDataTypes.DAY(data_type.day_precision),
JDataTypes.HOUR())
elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE:
j_data_type = JDataTypes.INTERVAL(JDataTypes.DAY(data_type.day_precision),
JDataTypes.MINUTE())
elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND:
j_data_type = JDataTypes.INTERVAL(JDataTypes.DAY(data_type.day_precision),
JDataTypes.SECOND(data_type.fractional_precision))
elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.HOUR:
j_data_type = JDataTypes.INTERVAL(JDataTypes.HOUR())
elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE:
j_data_type = JDataTypes.INTERVAL(JDataTypes.HOUR(), JDataTypes.MINUTE())
elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND:
j_data_type = JDataTypes.INTERVAL(JDataTypes.HOUR(),
JDataTypes.SECOND(data_type.fractional_precision))
elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.MINUTE:
j_data_type = JDataTypes.INTERVAL(JDataTypes.MINUTE())
elif data_type.resolution == DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND:
j_data_type = JDataTypes.INTERVAL(JDataTypes.MINUTE(),
JDataTypes.SECOND(data_type.fractional_precision))
else:
j_data_type = JDataTypes.INTERVAL(JDataTypes.SECOND(data_type.fractional_precision))
elif isinstance(data_type, ListViewType):
return gateway.jvm.org.apache.flink.table.api.dataview.ListView.newListViewDataType(
_to_java_data_type(data_type._element_type))
elif isinstance(data_type, MapViewType):
return gateway.jvm.org.apache.flink.table.api.dataview.MapView.newMapViewDataType(
_to_java_data_type(data_type._key_type), _to_java_data_type(data_type._value_type))
else:
raise TypeError("Unsupported data type: %s" % data_type)
if data_type._nullable:
j_data_type = j_data_type.nullable()
else:
j_data_type = j_data_type.notNull()
if data_type._conversion_cls:
j_data_type = j_data_type.bridgedTo(
gateway.jvm.org.apache.flink.api.python.shaded.py4j.reflection.ReflectionUtil
.classForName(data_type._conversion_cls)
)
return j_data_type