in oss2/select_response.py [0:0]
def read_next_frame(self):
frame_type = bytearray(self.read_raw(4))
payload_length = bytearray(self.read_raw(4))
utils.change_endianness_if_needed(payload_length) # convert to little endian
payload_length_val = struct.unpack("I", bytes(payload_length))[0]
header_checksum = bytearray(self.read_raw(4))
frame_type[0] = 0 #mask the version bit
utils.change_endianness_if_needed(frame_type) # convert to little endian
frame_type_val = struct.unpack("I", bytes(frame_type))[0]
if (frame_type_val != SelectResponseAdapter._DATA_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._CONTINIOUS_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._END_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._META_END_FRAME_TYPE and
frame_type_val != SelectResponseAdapter._JSON_META_END_FRAME_TYPE):
logger.warning("Unexpected frame type: {0}. RequestId:{1}. This could be due to the old version of client.".format(frame_type_val, self.request_id))
raise SelectOperationClientError(self.request_id, "Unexpected frame type:" + str(frame_type_val))
self.payload = self.read_raw(payload_length_val)
file_offset_bytes = bytearray(self.payload[0:8])
utils.change_endianness_if_needed(file_offset_bytes)
self.file_offset = struct.unpack("Q", bytes(file_offset_bytes))[0]
if frame_type_val == SelectResponseAdapter._DATA_FRAME_TYPE:
self.frame_length = payload_length_val - 8
self.frame_off_set = 0
self.check_sum_flag=1
self.frame_data = self.payload[8:]
checksum = bytearray(self.read_raw(4)) #read checksum crc32
utils.change_endianness_if_needed(checksum)
checksum_val = struct.unpack("I", bytes(checksum))[0]
if self.enable_crc:
crc32 = utils.Crc32()
crc32.update(self.payload)
checksum_calc = crc32.crc
if checksum_val != checksum_calc:
logger.warning("Incorrect checksum: Actual {0} and calculated {1}. RequestId:{2}".format(checksum_val, checksum_calc, self.request_id))
raise InconsistentError("Incorrect checksum: Actual" + str(checksum_val) + ". Calculated:" + str(checksum_calc), self.request_id)
elif frame_type_val == SelectResponseAdapter._CONTINIOUS_FRAME_TYPE:
self.frame_length = self.frame_off_set = 0
self.check_sum_flag=1
self.read_raw(4)
elif frame_type_val == SelectResponseAdapter._END_FRAME_TYPE:
self.frame_off_set = 0
scanned_size_bytes = bytearray(self.payload[8:16])
status_bytes = bytearray(self.payload[16:20])
utils.change_endianness_if_needed(status_bytes)
status = struct.unpack("I", bytes(status_bytes))[0]
error_msg_size = payload_length_val - 20
error_msg=b''
error_code = b''
if error_msg_size > 0:
error_msg = self.payload[20:error_msg_size + 20]
error_code_index = error_msg.find(b'.')
if error_code_index >= 0 and error_code_index < error_msg_size - 1:
error_code = error_msg[0:error_code_index]
error_msg = error_msg[error_code_index + 1:]
if status // 100 != 2:
raise SelectOperationFailed(status, error_code, error_msg)
self.frame_length = 0
if self.callback is not None:
self.callback(self.file_offset, self.content_length)
self.read_raw(4) # read the payload checksum
self.frame_length = 0
self.finished = 1
elif frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE or frame_type_val == SelectResponseAdapter._JSON_META_END_FRAME_TYPE:
self.frame_off_set = 0
scanned_size_bytes = bytearray(self.payload[8:16])
status_bytes = bytearray(self.payload[16:20])
utils.change_endianness_if_needed(status_bytes)
status = struct.unpack("I", bytes(status_bytes))[0]
splits_bytes = bytearray(self.payload[20:24])
utils.change_endianness_if_needed(splits_bytes)
self.splits = struct.unpack("I", bytes(splits_bytes))[0]
lines_bytes = bytearray(self.payload[24:32])
utils.change_endianness_if_needed(lines_bytes)
self.rows = struct.unpack("Q", bytes(lines_bytes))[0]
error_index = 36
if frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE:
column_bytes = bytearray(self.payload[32:36])
utils.change_endianness_if_needed(column_bytes)
self.columns = struct.unpack("I", bytes(column_bytes))[0]
else:
error_index = 32
error_size = payload_length_val - error_index
error_msg = b''
error_code = b''
if (error_size > 0):
error_msg = self.payload[error_index:error_index + error_size]
error_code_index = error_msg.find(b'.')
if error_code_index >= 0 and error_code_index < error_size - 1:
error_code = error_msg[0:error_code_index]
error_msg = error_msg[error_code_index + 1:]
self.read_raw(4) # read the payload checksum
self.final_status = status
self.frame_length = 0
self.finished = 1
if (status / 100 != 2):
raise SelectOperationFailed(status, error_code, error_msg)