def _read_single_record()

in odps/tunnel/io/reader.py [0:0]


        def _read_single_record(self):
            if (
                self._injected_error is not None
                and self._injected_error[0] == self._curr_cursor
            ):
                self._injected_error = None
                raise self._injected_error[1]

            if self._read_limit is not None and self.count >= self._read_limit:
                warnings.warn(
                    "Number of lines read via tunnel already reaches the limitation.",
                    RuntimeWarning,
                )
                return None

            record = Record(self._columns, max_field_size=(1 << 63) - 1)

            while True:
                index, _ = self._reader.read_field_number_and_wire_type()

                if index == 0:
                    continue
                if index == ProtoWireConstants.TUNNEL_END_RECORD:
                    checksum = utils.long_to_int(self._crc.getvalue())
                    if int(self._reader.read_uint32()) != utils.int_to_uint(checksum):
                        raise ChecksumError("Checksum invalid")
                    self._crc.reset()
                    self._crccrc.update_int(checksum)
                    break

                if index == ProtoWireConstants.TUNNEL_META_COUNT:
                    if self._attempt_row_count != self._reader.read_sint64():
                        raise IOError("count does not match")

                    (
                        idx_of_checksum,
                        wire_type,
                    ) = self._reader.read_field_number_and_wire_type()
                    if ProtoWireConstants.TUNNEL_META_CHECKSUM != idx_of_checksum:
                        if wire_type != wire_format.WIRETYPE_LENGTH_DELIMITED:
                            raise IOError("Invalid stream data.")
                        self._crc.update_int(idx_of_checksum)

                        self._server_metrics_string = self._reader.read_string()
                        self._crc.update(self._server_metrics_string)

                        idx_of_checksum = (
                            self._reader.read_field_number_and_wire_type()[0]
                        )
                        if idx_of_checksum != ProtoWireConstants.TUNNEL_END_METRICS:
                            raise IOError("Invalid stream data.")
                        checksum = utils.long_to_int(self._crc.getvalue())
                        self._crc.reset()
                        if utils.int_to_uint(checksum) != int(
                            self._reader.read_uint32()
                        ):
                            raise ChecksumError("Checksum invalid.")

                        idx_of_checksum = (
                            self._reader.read_field_number_and_wire_type()[0]
                        )
                    if ProtoWireConstants.TUNNEL_META_CHECKSUM != idx_of_checksum:
                        raise IOError("Invalid stream data.")
                    if int(self._crccrc.getvalue()) != self._reader.read_uint32():
                        raise ChecksumError("Checksum invalid.")

                    return

                if index > len(self._columns):
                    raise IOError(
                        "Invalid protobuf tag. Perhaps the datastream "
                        "from server is crushed."
                    )

                self._crc.update_int(index)

                i = index - 1
                record[i] = self._read_field(self._columns[i].type)

            if self._append_partitions and self._partition_spec is not None:
                for k, v in self._partition_spec.items():
                    try:
                        record[k] = v
                    except KeyError:
                        # skip non-existing fields
                        pass

            self._curr_cursor += 1
            self._attempt_row_count += 1
            return record