in flink-python/pyflink/table/types.py [0:0]
def _from_java_data_type(j_data_type):
"""
Converts Java DataType to Python DataType.
"""
gateway = get_gateway()
# Atomic Type with parameters.
if is_instance_of(j_data_type, gateway.jvm.AtomicDataType):
logical_type = j_data_type.getLogicalType()
if is_instance_of(logical_type, gateway.jvm.CharType):
data_type = DataTypes.CHAR(logical_type.getLength(), logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.VarCharType):
data_type = DataTypes.VARCHAR(logical_type.getLength(), logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.BinaryType):
data_type = DataTypes.BINARY(logical_type.getLength(), logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.VarBinaryType):
data_type = DataTypes.VARBINARY(logical_type.getLength(), logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.DecimalType):
data_type = DataTypes.DECIMAL(logical_type.getPrecision(),
logical_type.getScale(),
logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.DateType):
data_type = DataTypes.DATE(logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.TimeType):
data_type = DataTypes.TIME(logical_type.getPrecision(), logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.TimestampType):
data_type = DataTypes.TIMESTAMP(precision=3, nullable=logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.BooleanType):
data_type = DataTypes.BOOLEAN(logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.TinyIntType):
data_type = DataTypes.TINYINT(logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.SmallIntType):
data_type = DataTypes.SMALLINT(logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.IntType):
data_type = DataTypes.INT(logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.BigIntType):
data_type = DataTypes.BIGINT(logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.FloatType):
data_type = DataTypes.FLOAT(logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.DoubleType):
data_type = DataTypes.DOUBLE(logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.ZonedTimestampType):
raise \
TypeError("Unsupported type: %s, ZonedTimestampType is not supported yet."
% j_data_type)
elif is_instance_of(logical_type, gateway.jvm.LocalZonedTimestampType):
data_type = DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(nullable=logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.DayTimeIntervalType) or \
is_instance_of(logical_type, gateway.jvm.YearMonthIntervalType):
data_type = _from_java_interval_type(logical_type)
elif is_instance_of(logical_type, gateway.jvm.LegacyTypeInformationType):
type_info = logical_type.getTypeInformation()
BasicArrayTypeInfo = gateway.jvm.org.apache.flink.api.common.typeinfo.\
BasicArrayTypeInfo
BasicTypeInfo = gateway.jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo
if type_info == BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO:
data_type = DataTypes.ARRAY(DataTypes.STRING())
elif type_info == BasicTypeInfo.BIG_DEC_TYPE_INFO:
data_type = DataTypes.DECIMAL(38, 18)
elif type_info.getClass() == \
get_java_class(gateway.jvm.org.apache.flink.table.runtime.typeutils
.BigDecimalTypeInfo):
data_type = DataTypes.DECIMAL(type_info.precision(), type_info.scale())
elif type_info.getClass() == \
get_java_class(gateway.jvm.org.apache.flink.table.dataview.ListViewTypeInfo):
data_type = DataTypes.LIST_VIEW(_from_java_data_type(type_info.getElementType()))
elif type_info.getClass() == \
get_java_class(gateway.jvm.org.apache.flink.table.dataview.MapViewTypeInfo):
data_type = DataTypes.MAP_VIEW(
_from_java_data_type(type_info.getKeyType()),
_from_java_data_type(type_info.getValueType()))
else:
raise TypeError("Unsupported type: %s, it is recognized as a legacy type."
% type_info)
elif is_instance_of(logical_type, gateway.jvm.RawType):
data_type = RawType()
else:
raise TypeError("Unsupported type: %s, it is not supported yet in current python type"
" system" % j_data_type)
return data_type
# Array Type, MultiSet Type.
elif is_instance_of(j_data_type, gateway.jvm.CollectionDataType):
logical_type = j_data_type.getLogicalType()
element_type = j_data_type.getElementDataType()
if is_instance_of(logical_type, gateway.jvm.ArrayType):
data_type = DataTypes.ARRAY(_from_java_data_type(element_type),
logical_type.isNullable())
elif is_instance_of(logical_type, gateway.jvm.MultisetType):
data_type = DataTypes.MULTISET(_from_java_data_type(element_type),
logical_type.isNullable())
else:
raise TypeError("Unsupported collection data type: %s" % j_data_type)
return data_type
# Map Type.
elif is_instance_of(j_data_type, gateway.jvm.KeyValueDataType):
logical_type = j_data_type.getLogicalType()
key_type = j_data_type.getKeyDataType()
value_type = j_data_type.getValueDataType()
if is_instance_of(logical_type, gateway.jvm.MapType):
data_type = DataTypes.MAP(
_from_java_data_type(key_type),
_from_java_data_type(value_type),
logical_type.isNullable())
else:
raise TypeError("Unsupported map data type: %s" % j_data_type)
return data_type
# Row Type.
elif is_instance_of(j_data_type, gateway.jvm.FieldsDataType):
logical_type = j_data_type.getLogicalType()
field_data_types = j_data_type.getChildren()
if is_instance_of(logical_type, gateway.jvm.RowType):
fields = [DataTypes.FIELD(name, _from_java_data_type(field_data_types[idx]))
for idx, name in enumerate(logical_type.getFieldNames())]
data_type = DataTypes.ROW(fields, logical_type.isNullable())
elif j_data_type.getConversionClass().isAssignableFrom(
gateway.jvm.org.apache.flink.table.api.dataview.ListView._java_lang_class):
array_type = _from_java_data_type(field_data_types[0])
data_type = DataTypes.LIST_VIEW(array_type.element_type)
elif j_data_type.getConversionClass().isAssignableFrom(
gateway.jvm.org.apache.flink.table.api.dataview.MapView._java_lang_class):
map_type = _from_java_data_type(field_data_types[0])
data_type = DataTypes.MAP_VIEW(map_type.key_type, map_type.value_type)
else:
raise TypeError("Unsupported row data type: %s" % j_data_type)
return data_type
# Unrecognized type.
else:
TypeError("Unsupported data type: %s" % j_data_type)