in odps/tunnel/io/reader.py [0:0]
def _read_field(self, data_type):
if data_type == types.float_:
val = self._reader.read_float()
self._crc.update_float(val)
elif data_type == types.double:
val = self._reader.read_double()
self._crc.update_double(val)
elif data_type == types.boolean:
val = self._reader.read_bool()
self._crc.update_bool(val)
elif data_type in types.integer_types:
val = self._reader.read_sint64()
self._crc.update_long(val)
elif data_type == types.string:
val = self._reader.read_string()
self._crc.update(val)
elif data_type == types.binary:
val = self._reader.read_string()
self._crc.update(val)
elif data_type == types.datetime:
val = self._reader.read_sint64()
self._crc.update_long(val)
try:
val = self._to_datetime(val)
except DatetimeOverflowError:
if not options.tunnel.overflow_date_as_none:
raise
val = None
elif data_type == types.date:
val = self._reader.read_sint64()
self._crc.update_long(val)
val = self._to_date(val)
elif data_type == types.timestamp or data_type == types.timestamp_ntz:
to_datetime = (
self._to_datetime_utc
if data_type == types.timestamp_ntz
else self._to_datetime
)
l_val = self._reader.read_sint64()
self._crc.update_long(l_val)
nano_secs = self._reader.read_sint32()
self._crc.update_int(nano_secs)
if pd is None:
raise ImportError(
"To use TIMESTAMP in pyodps, you need to install pandas."
)
try:
val = pd.Timestamp(to_datetime(l_val * 1000)) + pd.Timedelta(
nanoseconds=nano_secs
)
except DatetimeOverflowError:
if not options.tunnel.overflow_date_as_none:
raise
val = None
elif data_type == types.interval_day_time:
l_val = self._reader.read_sint64()
self._crc.update_long(l_val)
nano_secs = self._reader.read_sint32()
self._crc.update_int(nano_secs)
if pd is None:
raise ImportError(
"To use INTERVAL_DAY_TIME in pyodps, you need to install pandas."
)
val = pd.Timedelta(seconds=l_val, nanoseconds=nano_secs)
elif data_type == types.interval_year_month:
l_val = self._reader.read_sint64()
self._crc.update_long(l_val)
return compat.Monthdelta(l_val)
elif data_type == types.json:
sval = self._reader.read_string()
val = json.loads(sval)
self._crc.update(sval)
elif isinstance(data_type, (types.Char, types.Varchar)):
val = self._reader.read_string()
self._crc.update(val)
elif isinstance(data_type, types.Decimal):
val = self._reader.read_string()
self._crc.update(val)
elif isinstance(data_type, types.Array):
val = self._read_array(data_type.value_type)
elif isinstance(data_type, types.Map):
keys = self._read_array(data_type.key_type)
values = self._read_array(data_type.value_type)
val = self._map_dict_hook(zip(keys, values))
elif isinstance(data_type, types.Struct):
val = self._read_struct(data_type)
else:
raise IOError("Unsupported type %s" % data_type)
return val