in odps/tunnel/io/reader.py [0:0]
def _read_single_record(self):
if (
self._injected_error is not None
and self._injected_error[0] == self._curr_cursor
):
self._injected_error = None
raise self._injected_error[1]
if self._read_limit is not None and self.count >= self._read_limit:
warnings.warn(
"Number of lines read via tunnel already reaches the limitation.",
RuntimeWarning,
)
return None
record = Record(self._columns, max_field_size=(1 << 63) - 1)
while True:
index, _ = self._reader.read_field_number_and_wire_type()
if index == 0:
continue
if index == ProtoWireConstants.TUNNEL_END_RECORD:
checksum = utils.long_to_int(self._crc.getvalue())
if int(self._reader.read_uint32()) != utils.int_to_uint(checksum):
raise ChecksumError("Checksum invalid")
self._crc.reset()
self._crccrc.update_int(checksum)
break
if index == ProtoWireConstants.TUNNEL_META_COUNT:
if self._attempt_row_count != self._reader.read_sint64():
raise IOError("count does not match")
(
idx_of_checksum,
wire_type,
) = self._reader.read_field_number_and_wire_type()
if ProtoWireConstants.TUNNEL_META_CHECKSUM != idx_of_checksum:
if wire_type != wire_format.WIRETYPE_LENGTH_DELIMITED:
raise IOError("Invalid stream data.")
self._crc.update_int(idx_of_checksum)
self._server_metrics_string = self._reader.read_string()
self._crc.update(self._server_metrics_string)
idx_of_checksum = (
self._reader.read_field_number_and_wire_type()[0]
)
if idx_of_checksum != ProtoWireConstants.TUNNEL_END_METRICS:
raise IOError("Invalid stream data.")
checksum = utils.long_to_int(self._crc.getvalue())
self._crc.reset()
if utils.int_to_uint(checksum) != int(
self._reader.read_uint32()
):
raise ChecksumError("Checksum invalid.")
idx_of_checksum = (
self._reader.read_field_number_and_wire_type()[0]
)
if ProtoWireConstants.TUNNEL_META_CHECKSUM != idx_of_checksum:
raise IOError("Invalid stream data.")
if int(self._crccrc.getvalue()) != self._reader.read_uint32():
raise ChecksumError("Checksum invalid.")
return
if index > len(self._columns):
raise IOError(
"Invalid protobuf tag. Perhaps the datastream "
"from server is crushed."
)
self._crc.update_int(index)
i = index - 1
record[i] = self._read_field(self._columns[i].type)
if self._append_partitions and self._partition_spec is not None:
for k, v in self._partition_spec.items():
try:
record[k] = v
except KeyError:
# skip non-existing fields
pass
self._curr_cursor += 1
self._attempt_row_count += 1
return record