in odps/tunnel/io/reader.py [0:0]
def _build_converter(self, odps_type, arrow_type=None):
import pyarrow as pa
arrow_decimal_types = (pa.Decimal128Type,)
if hasattr(pa, "Decimal256Type"):
arrow_decimal_types += (pa.Decimal256Type,)
if self._convert_ts and isinstance(odps_type, types.Datetime):
return self._convert_datetime
elif isinstance(odps_type, types.Timestamp):
if isinstance(arrow_type, pa.StructType):
return self._convert_struct_timestamp
elif self._convert_ts:
return self._convert_ts_timestamp
else:
return _reflective
elif isinstance(odps_type, types.TimestampNTZ):
if isinstance(arrow_type, pa.StructType):
return functools.partial(self._convert_struct_timestamp, ntz=True)
elif self._convert_ts:
return functools.partial(self._convert_ts_timestamp, ntz=True)
else:
return _reflective
elif (
isinstance(odps_type, types.Decimal)
and isinstance(arrow_type, pa.FixedSizeBinaryType)
and not isinstance(arrow_type, arrow_decimal_types)
):
return convert_legacy_decimal_bytes
elif isinstance(odps_type, types.IntervalDayTime) and isinstance(
arrow_type, pa.StructType
):
return self._convert_struct_timedelta
elif isinstance(odps_type, types.Array):
arrow_value_type = getattr(arrow_type, "value_type", None)
sub_converter = self._build_converter(
odps_type.value_type, arrow_value_type
)
if sub_converter is _reflective:
return _reflective
return (
lambda value: [sub_converter(x) for x in value]
if value is not None
else None
)
elif isinstance(odps_type, types.Map):
arrow_key_type = getattr(arrow_type, "key_type", None)
arrow_value_type = getattr(arrow_type, "item_type", None)
key_converter = self._build_converter(odps_type.key_type, arrow_key_type)
value_converter = self._build_converter(
odps_type.value_type, arrow_value_type
)
dict_hook = OrderedDict if odps_type._use_ordered_dict else dict
if key_converter is _reflective and value_converter is _reflective:
return dict_hook
return (
lambda value: dict_hook(
[(key_converter(k), value_converter(v)) for k, v in value]
)
if value is not None
else None
)
elif isinstance(odps_type, types.Struct):
field_converters = OrderedDict()
for field_name, field_type in odps_type.field_types.items():
arrow_field_type = None
if arrow_type is not None:
arrow_field_type = arrow_type[field_name].type
field_converters[field_name] = self._build_converter(
field_type, arrow_field_type
)
if options.struct_as_dict:
tuple_type = None
else:
tuple_type = odps_type.namedtuple_type
use_ordered_dict = odps_type._use_ordered_dict
return functools.partial(
self._convert_struct,
field_converters=field_converters,
tuple_type=tuple_type,
use_ordered_dict=use_ordered_dict,
)
else:
return _reflective