pypaimon/py4j/util/java_utils.py (74 lines of code) (raw):
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import pyarrow as pa
from pypaimon import Schema
from pypaimon.py4j.java_gateway import get_gateway
def to_j_catalog_context(catalog_options: dict):
gateway = get_gateway()
j_options = gateway.jvm.Options(catalog_options)
return gateway.jvm.CatalogContext.create(j_options)
def to_j_identifier(identifier: str):
return get_gateway().jvm.Identifier.fromString(identifier)
def to_paimon_schema(schema: Schema):
j_schema_builder = get_gateway().jvm.Schema.newBuilder()
if schema.partition_keys is not None:
j_schema_builder.partitionKeys(schema.partition_keys)
if schema.primary_keys is not None:
j_schema_builder.primaryKey(schema.primary_keys)
if schema.options is not None:
j_schema_builder.options(schema.options)
j_schema_builder.comment(schema.comment)
for field in schema.pa_schema:
column_name = field.name
column_type = _to_j_type(column_name, field.type)
j_schema_builder.column(column_name, column_type)
return j_schema_builder.build()
def check_batch_write(j_table):
gateway = get_gateway()
bucket_mode = j_table.bucketMode()
if bucket_mode == gateway.jvm.BucketMode.HASH_DYNAMIC \
or bucket_mode == gateway.jvm.BucketMode.CROSS_PARTITION:
raise TypeError("Doesn't support writing dynamic bucket or cross partition table.")
def _to_j_type(name, pa_type):
jvm = get_gateway().jvm
# int
if pa.types.is_int8(pa_type):
return jvm.DataTypes.TINYINT()
elif pa.types.is_int16(pa_type):
return jvm.DataTypes.SMALLINT()
elif pa.types.is_int32(pa_type):
return jvm.DataTypes.INT()
elif pa.types.is_int64(pa_type):
return jvm.DataTypes.BIGINT()
# float
elif pa.types.is_float16(pa_type) or pa.types.is_float32(pa_type):
return jvm.DataTypes.FLOAT()
elif pa.types.is_float64(pa_type):
return jvm.DataTypes.DOUBLE()
# string
elif pa.types.is_string(pa_type):
return jvm.DataTypes.STRING()
# bool
elif pa.types.is_boolean(pa_type):
return jvm.DataTypes.BOOLEAN()
elif pa.types.is_null(pa_type):
print(f"WARN: The type of column '{name}' is null, "
"and it will be converted to string type by default. "
"Please check if the original type is string. "
f"If not, please manually specify the type of '{name}'.")
return jvm.DataTypes.STRING()
else:
raise ValueError(f'Found unsupported data type {str(pa_type)} for field {name}.')
def to_arrow_schema(j_row_type):
# init arrow schema
schema_bytes = get_gateway().jvm.SchemaUtil.getArrowSchema(j_row_type)
schema_reader = pa.RecordBatchStreamReader(pa.BufferReader(schema_bytes))
arrow_schema = schema_reader.schema
schema_reader.close()
return arrow_schema
def serialize_java_object(java_obj) -> bytes:
gateway = get_gateway()
util = gateway.jvm.org.apache.paimon.utils.InstantiationUtil
try:
java_bytes = util.serializeObject(java_obj)
return bytes(java_bytes)
except Exception as e:
raise RuntimeError(f"Java serialization failed: {e}")
def deserialize_java_object(bytes_data):
gateway = get_gateway()
cl = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
util = gateway.jvm.org.apache.paimon.utils.InstantiationUtil
return util.deserializeObject(bytes_data, cl)