in flink-ml-python/src/main/java/org/apache/flink/ml/python/PythonBridgeUtils.java [59:160]
public static Object getPickledBytesFromJavaObject(Object obj, TypeInformation<?> dataType)
throws IOException {
Pickler pickler = new Pickler();
// triggers the initialization process
org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject(
null, null);
if (obj == null) {
return pickler.dumps(null);
} else {
if (dataType instanceof SqlTimeTypeInfo) {
SqlTimeTypeInfo<?> sqlTimeTypeInfo =
SqlTimeTypeInfo.getInfoFor(dataType.getTypeClass());
if (sqlTimeTypeInfo == DATE) {
return pickler.dumps(((Date) obj).toLocalDate().toEpochDay());
} else if (sqlTimeTypeInfo == TIME) {
return pickler.dumps(((Time) obj).toLocalTime().toNanoOfDay() / 1000);
}
} else if (dataType instanceof RowTypeInfo || dataType instanceof TupleTypeInfo) {
TypeInformation<?>[] fieldTypes = ((TupleTypeInfoBase<?>) dataType).getFieldTypes();
int arity =
dataType instanceof RowTypeInfo
? ((Row) obj).getArity()
: ((Tuple) obj).getArity();
List<Object> fieldBytes = new ArrayList<>(arity + 1);
if (dataType instanceof RowTypeInfo) {
fieldBytes.add(new byte[] {((Row) obj).getKind().toByteValue()});
}
for (int i = 0; i < arity; i++) {
Object field =
dataType instanceof RowTypeInfo
? ((Row) obj).getField(i)
: ((Tuple) obj).getField(i);
fieldBytes.add(getPickledBytesFromJavaObject(field, fieldTypes[i]));
}
return fieldBytes;
} else if (dataType instanceof BasicArrayTypeInfo
|| dataType instanceof PrimitiveArrayTypeInfo
|| dataType instanceof ObjectArrayTypeInfo) {
Object[] objects;
TypeInformation<?> elementType;
if (dataType instanceof BasicArrayTypeInfo) {
objects = (Object[]) obj;
elementType = ((BasicArrayTypeInfo<?, ?>) dataType).getComponentInfo();
} else if (dataType instanceof PrimitiveArrayTypeInfo) {
objects = primitiveArrayConverter(obj, dataType);
elementType = ((PrimitiveArrayTypeInfo<?>) dataType).getComponentType();
} else {
objects = (Object[]) obj;
elementType = ((ObjectArrayTypeInfo<?, ?>) dataType).getComponentInfo();
}
List<Object> serializedElements = new ArrayList<>(objects.length);
for (Object object : objects) {
serializedElements.add(getPickledBytesFromJavaObject(object, elementType));
}
return pickler.dumps(serializedElements);
} else if (dataType instanceof MapTypeInfo) {
List<List<Object>> serializedMapKV = new ArrayList<>(2);
Map<Object, Object> mapObj = (Map) obj;
List<Object> keyBytesList = new ArrayList<>(mapObj.size());
List<Object> valueBytesList = new ArrayList<>(mapObj.size());
for (Map.Entry entry : mapObj.entrySet()) {
keyBytesList.add(
getPickledBytesFromJavaObject(
entry.getKey(), ((MapTypeInfo) dataType).getKeyTypeInfo()));
valueBytesList.add(
getPickledBytesFromJavaObject(
entry.getValue(), ((MapTypeInfo) dataType).getValueTypeInfo()));
}
serializedMapKV.add(keyBytesList);
serializedMapKV.add(valueBytesList);
return pickler.dumps(serializedMapKV);
} else if (dataType instanceof ListTypeInfo) {
List objects = (List) obj;
List<Object> serializedElements = new ArrayList<>(objects.size());
TypeInformation elementType = ((ListTypeInfo) dataType).getElementTypeInfo();
for (Object object : objects) {
serializedElements.add(getPickledBytesFromJavaObject(object, elementType));
}
return pickler.dumps(serializedElements);
}
if (dataType instanceof BasicTypeInfo
&& BasicTypeInfo.getInfoFor(dataType.getTypeClass()) == FLOAT_TYPE_INFO) {
// Serialization of float type with pickler loses precision.
return pickler.dumps(String.valueOf(obj));
} else if (dataType instanceof PickledByteArrayTypeInfo
|| dataType instanceof BasicTypeInfo) {
return pickler.dumps(obj);
} else {
// other typeinfos will use the corresponding serializer to serialize data.
TypeSerializer serializer = dataType.createSerializer(null);
ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
DataOutputViewStreamWrapper baosWrapper = new DataOutputViewStreamWrapper(baos);
serializer.serialize(obj, baosWrapper);
return pickler.dumps(baos.toByteArray());
}
}
}