public int read()

in tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java [240:336]


	public int read(byte b[], int off, int len) throws IOException {
		if(noMoreData()) {
			return -1;
		}
		if (b == null) {
		    throw new NullPointerException();
		} else if (off < 0 || len < 0 || len > b.length - off) {
		    throw new IndexOutOfBoundsException();
		} else if (len == 0) {
		    return 0;
		}
		if(currentScanData == null) {
			synchronized(dataQueue) {
				if(dataQueue.isEmpty()) {
					if(endOfStream) {
						readNext(64 * 1024, true);
					} else {
						try {
							dataQueue.wait();
							if(noMoreData()) {
								return -1;
							}
						} catch (InterruptedException e) {
						}
					}
				}
				if(!dataQueue.isEmpty() && currentScanIndex > 0) {
					currentScanData = dataQueue.poll();
					currentScanIndex = 0;
				}
			}
		} 
		
		int numRemainBytes = currentScanData.length - currentScanIndex;
		if(numRemainBytes > len) {
			System.arraycopy(currentScanData.data, currentScanIndex, b, off, len);
			currentScanIndex += len;
			avaliableSize.addAndGet(0 - len);
			pos += len;

      totalReadBytesForFetch.addAndGet(len);
			return len;
		} else {
			int offset = off;
			int length = 0;
			int numCopyBytes = numRemainBytes;
			while(true) {
				synchronized(dataQueue) {
					if(numCopyBytes == 0 && eof && dataQueue.isEmpty()) {
						return -1;
					}
				}
				System.arraycopy(currentScanData.data, currentScanIndex, b, offset, numCopyBytes);
				currentScanIndex += numCopyBytes;
				offset += numCopyBytes;
				length += numCopyBytes;
				if(length >= len) {
					break;
				}
				synchronized(dataQueue) {
					if(dataQueue.isEmpty()) {
						if(eof) {
							break;
						}
						if(endOfStream) {
							readNext(64 * 1024, true);
						} else {
							try {
								dataQueue.wait();
							} catch (InterruptedException e) {
							}
						}
					}
					if(eof && dataQueue.isEmpty()) {
						break;
					}
					if(!dataQueue.isEmpty() && currentScanIndex > 0) {
						currentScanData = dataQueue.poll();
						currentScanIndex = 0;
					}
					if(currentScanData == null) {
						break;
					}
				}
        if(currentScanData.length >= (len - length)) {
          numCopyBytes = (len - length);
        } else {
          numCopyBytes = currentScanData.length;
        }
			}  //end of while
			this.pos += length;
			avaliableSize.addAndGet(0 - length);

      totalReadBytesForFetch.addAndGet(length);
			return length;
		}
	}