in src/main/java/com/aliyun/oss/model/SelectInputStream.java [99:178]
private void readFrame() throws IOException {
while (currentFrameOffset >= currentFramePayloadLength && !finished) {
if (!firstReadFrame) {
internalRead(currentFramePayloadChecksumBytes, 0, 4);
validateCheckSum(currentFramePayloadChecksumBytes, crc32);
}
firstReadFrame = false;
//advance to next frame
internalRead(currentFrameTypeBytes, 0, 4);
//first byte is version byte
if (currentFrameTypeBytes[0] != SELECT_VERSION) {
throw new SelectObjectException(SelectObjectException.INVALID_SELECT_VERSION, "Invalid select version found " + currentFrameTypeBytes[0] + ", expect: " + SELECT_VERSION, requestId);
}
internalRead(currentFramePayloadLengthBytes, 0, 4);
internalRead(currentFrameHeaderChecksumBytes, 0, 4);
internalRead(scannedDataBytes, 0, 8);
if (payloadCrcEnabled) {
crc32.update(scannedDataBytes);
}
currentFrameTypeBytes[0] = 0;
int type = ByteBuffer.wrap(currentFrameTypeBytes).getInt();
switch (type) {
case DATA_FRAME_MAGIC:
currentFramePayloadLength = ByteBuffer.wrap(currentFramePayloadLengthBytes).getInt() - 8;
currentFrameOffset = 0;
break;
case CONTINUOUS_FRAME_MAGIC:
//just break, continue
break;
case END_FRAME_MAGIC:
currentFramePayloadLength = ByteBuffer.wrap(currentFramePayloadLengthBytes).getInt() - 8;
byte[] totalScannedDataSizeBytes = new byte[8];
internalRead(totalScannedDataSizeBytes, 0, 8);
byte[] statusBytes = new byte[4];
internalRead(statusBytes, 0, 4);
if (payloadCrcEnabled) {
crc32.update(totalScannedDataSizeBytes);
crc32.update(statusBytes);
}
int status = ByteBuffer.wrap(statusBytes).getInt();
int errorMessageSize = (int)(currentFramePayloadLength - 12);
String error = "";
if (errorMessageSize > 0) {
byte[] errorMessageBytes = new byte[errorMessageSize];
internalRead(errorMessageBytes, 0, errorMessageSize);
error = new String(errorMessageBytes);
if (payloadCrcEnabled) {
crc32.update(errorMessageBytes);
}
}
finished = true;
currentFramePayloadLength = currentFrameOffset;
internalRead(currentFramePayloadChecksumBytes, 0, 4);
validateCheckSum(currentFramePayloadChecksumBytes, crc32);
if (status / 100 != 2) {
if (error.contains(".")) {
throw new SelectObjectException(error.split("\\.")[0], error.substring(error.indexOf(".") + 1), requestId);
} else {
// forward compatbility consideration
throw new SelectObjectException(error, error, requestId);
}
}
break;
default:
throw new SelectObjectException(SelectObjectException.INVALID_SELECT_FRAME, "Unsupported frame type " + type + " found", requestId);
}
//notify select progress
ProgressEventType eventType = ProgressEventType.SELECT_SCAN_EVENT;
if (finished) {
eventType = ProgressEventType.SELECT_COMPLETED_EVENT;
}
long scannedDataSize = ByteBuffer.wrap(scannedDataBytes).getLong();
if (scannedDataSize >= nextNotificationScannedSize || finished) {
publishSelectProgress(selectProgressListener, eventType, scannedDataSize);
nextNotificationScannedSize += DEFAULT_NOTIFICATION_THRESHOLD;
}
}
}