in tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java [240:336]
public int read(byte b[], int off, int len) throws IOException {
if(noMoreData()) {
return -1;
}
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
if(currentScanData == null) {
synchronized(dataQueue) {
if(dataQueue.isEmpty()) {
if(endOfStream) {
readNext(64 * 1024, true);
} else {
try {
dataQueue.wait();
if(noMoreData()) {
return -1;
}
} catch (InterruptedException e) {
}
}
}
if(!dataQueue.isEmpty() && currentScanIndex > 0) {
currentScanData = dataQueue.poll();
currentScanIndex = 0;
}
}
}
int numRemainBytes = currentScanData.length - currentScanIndex;
if(numRemainBytes > len) {
System.arraycopy(currentScanData.data, currentScanIndex, b, off, len);
currentScanIndex += len;
avaliableSize.addAndGet(0 - len);
pos += len;
totalReadBytesForFetch.addAndGet(len);
return len;
} else {
int offset = off;
int length = 0;
int numCopyBytes = numRemainBytes;
while(true) {
synchronized(dataQueue) {
if(numCopyBytes == 0 && eof && dataQueue.isEmpty()) {
return -1;
}
}
System.arraycopy(currentScanData.data, currentScanIndex, b, offset, numCopyBytes);
currentScanIndex += numCopyBytes;
offset += numCopyBytes;
length += numCopyBytes;
if(length >= len) {
break;
}
synchronized(dataQueue) {
if(dataQueue.isEmpty()) {
if(eof) {
break;
}
if(endOfStream) {
readNext(64 * 1024, true);
} else {
try {
dataQueue.wait();
} catch (InterruptedException e) {
}
}
}
if(eof && dataQueue.isEmpty()) {
break;
}
if(!dataQueue.isEmpty() && currentScanIndex > 0) {
currentScanData = dataQueue.poll();
currentScanIndex = 0;
}
if(currentScanData == null) {
break;
}
}
if(currentScanData.length >= (len - length)) {
numCopyBytes = (len - length);
} else {
numCopyBytes = currentScanData.length;
}
} //end of while
this.pos += length;
avaliableSize.addAndGet(0 - length);
totalReadBytesForFetch.addAndGet(length);
return length;
}
}