in flink-ml-python/pyflink/ml/wrapper.py [0:0]
def convert_to_python_obj_wrapper(data, type_info):
if type_info == Types.PICKLED_BYTE_ARRAY():
return pickle.loads(data)
elif isinstance(type_info, ExternalTypeInfo):
return convert_to_python_obj_wrapper(data, type_info._type_info)
else:
gateway = get_gateway()
pickle_bytes = gateway.jvm.org.apache.flink.ml.python.PythonBridgeUtils. \
getPickledBytesFromJavaObject(data, type_info.get_java_type_info())
if isinstance(type_info, RowTypeInfo) or isinstance(type_info, TupleTypeInfo):
field_data = zip(list(pickle_bytes[1:]), type_info.get_field_types())
fields = []
for data, field_type in field_data:
if len(data) == 0:
fields.append(None)
else:
fields.append(pickled_bytes_to_python_obj(data, field_type))
if isinstance(type_info, RowTypeInfo):
return Row.of_kind(RowKind(int.from_bytes(pickle_bytes[0], 'little')), *fields)
else:
return tuple(fields)
else:
return pickled_bytes_to_python_obj(pickle_bytes, type_info)