public static Object getPickledBytesFromJavaObject()

in flink-ml-python/src/main/java/org/apache/flink/ml/python/PythonBridgeUtils.java [59:160]


    public static Object getPickledBytesFromJavaObject(Object obj, TypeInformation<?> dataType)
            throws IOException {
        Pickler pickler = new Pickler();

        // triggers the initialization process
        org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject(
                null, null);

        if (obj == null) {
            return pickler.dumps(null);
        } else {
            if (dataType instanceof SqlTimeTypeInfo) {
                SqlTimeTypeInfo<?> sqlTimeTypeInfo =
                        SqlTimeTypeInfo.getInfoFor(dataType.getTypeClass());
                if (sqlTimeTypeInfo == DATE) {
                    return pickler.dumps(((Date) obj).toLocalDate().toEpochDay());
                } else if (sqlTimeTypeInfo == TIME) {
                    return pickler.dumps(((Time) obj).toLocalTime().toNanoOfDay() / 1000);
                }
            } else if (dataType instanceof RowTypeInfo || dataType instanceof TupleTypeInfo) {
                TypeInformation<?>[] fieldTypes = ((TupleTypeInfoBase<?>) dataType).getFieldTypes();
                int arity =
                        dataType instanceof RowTypeInfo
                                ? ((Row) obj).getArity()
                                : ((Tuple) obj).getArity();

                List<Object> fieldBytes = new ArrayList<>(arity + 1);
                if (dataType instanceof RowTypeInfo) {
                    fieldBytes.add(new byte[] {((Row) obj).getKind().toByteValue()});
                }
                for (int i = 0; i < arity; i++) {
                    Object field =
                            dataType instanceof RowTypeInfo
                                    ? ((Row) obj).getField(i)
                                    : ((Tuple) obj).getField(i);
                    fieldBytes.add(getPickledBytesFromJavaObject(field, fieldTypes[i]));
                }
                return fieldBytes;
            } else if (dataType instanceof BasicArrayTypeInfo
                    || dataType instanceof PrimitiveArrayTypeInfo
                    || dataType instanceof ObjectArrayTypeInfo) {
                Object[] objects;
                TypeInformation<?> elementType;
                if (dataType instanceof BasicArrayTypeInfo) {
                    objects = (Object[]) obj;
                    elementType = ((BasicArrayTypeInfo<?, ?>) dataType).getComponentInfo();
                } else if (dataType instanceof PrimitiveArrayTypeInfo) {
                    objects = primitiveArrayConverter(obj, dataType);
                    elementType = ((PrimitiveArrayTypeInfo<?>) dataType).getComponentType();
                } else {
                    objects = (Object[]) obj;
                    elementType = ((ObjectArrayTypeInfo<?, ?>) dataType).getComponentInfo();
                }

                List<Object> serializedElements = new ArrayList<>(objects.length);

                for (Object object : objects) {
                    serializedElements.add(getPickledBytesFromJavaObject(object, elementType));
                }
                return pickler.dumps(serializedElements);
            } else if (dataType instanceof MapTypeInfo) {
                List<List<Object>> serializedMapKV = new ArrayList<>(2);
                Map<Object, Object> mapObj = (Map) obj;
                List<Object> keyBytesList = new ArrayList<>(mapObj.size());
                List<Object> valueBytesList = new ArrayList<>(mapObj.size());
                for (Map.Entry entry : mapObj.entrySet()) {
                    keyBytesList.add(
                            getPickledBytesFromJavaObject(
                                    entry.getKey(), ((MapTypeInfo) dataType).getKeyTypeInfo()));
                    valueBytesList.add(
                            getPickledBytesFromJavaObject(
                                    entry.getValue(), ((MapTypeInfo) dataType).getValueTypeInfo()));
                }
                serializedMapKV.add(keyBytesList);
                serializedMapKV.add(valueBytesList);
                return pickler.dumps(serializedMapKV);
            } else if (dataType instanceof ListTypeInfo) {
                List objects = (List) obj;
                List<Object> serializedElements = new ArrayList<>(objects.size());
                TypeInformation elementType = ((ListTypeInfo) dataType).getElementTypeInfo();
                for (Object object : objects) {
                    serializedElements.add(getPickledBytesFromJavaObject(object, elementType));
                }
                return pickler.dumps(serializedElements);
            }
            if (dataType instanceof BasicTypeInfo
                    && BasicTypeInfo.getInfoFor(dataType.getTypeClass()) == FLOAT_TYPE_INFO) {
                // Serialization of float type with pickler loses precision.
                return pickler.dumps(String.valueOf(obj));
            } else if (dataType instanceof PickledByteArrayTypeInfo
                    || dataType instanceof BasicTypeInfo) {
                return pickler.dumps(obj);
            } else {
                // other typeinfos will use the corresponding serializer to serialize data.
                TypeSerializer serializer = dataType.createSerializer(null);
                ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
                DataOutputViewStreamWrapper baosWrapper = new DataOutputViewStreamWrapper(baos);
                serializer.serialize(obj, baosWrapper);
                return pickler.dumps(baos.toByteArray());
            }
        }
    }