stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java [151:241]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @Override
    public int read(byte[] b, final int off, final int len) throws IOException {
        int remaining = len;
        int numBytesRead = 0;
        while (remaining > 0) {
            if (null == currentRecord) {
                currentRecord = nextRecordStream(reader);
            }

            if (null == currentRecord) {
                if (numBytesRead == 0) {
                    return -1;
                }
                break;
            }

            int bytesLeft = currentRecord.payloadStream.available();
            if (bytesLeft <= 0) {
                currentRecord.payloadStream.close();
                currentRecord = null;
                continue;
            }

            int numBytesToRead = Math.min(bytesLeft, remaining);
            int numBytes = currentRecord.payloadStream.read(b, off + numBytesRead, numBytesToRead);
            if (numBytes < 0) {
                continue;
            }
            numBytesRead += numBytes;
            remaining -= numBytes;
        }
        return numBytesRead;
    }

    @Override
    public long skip(final long n) throws IOException {
        if (n <= 0L) {
            return 0L;
        }

        long remaining = n;
        while (true) {
            if (null == currentRecord) {
                currentRecord = nextRecordStream(reader);
            }

            if (null == currentRecord) { // end of stream
                return n - remaining;
            }

            int bytesLeft = currentRecord.payloadStream.available();
            long endPos = currentRecord.record.getTransactionId();
            if (remaining > bytesLeft) {
                // skip the whole record
                remaining -= bytesLeft;
                this.pos = endPos;
                this.currentRecord = nextRecordStream(reader);
                continue;
            } else if (remaining == bytesLeft) {
                this.pos = endPos;
                this.currentRecord = null;
                return n;
            } else {
                currentRecord.payloadStream.skip(remaining);
                this.pos = endPos - currentRecord.payloadStream.available();
                return n;
            }
        }
    }

    @Override
    public int available() throws IOException {
        if (lastPos - pos == 0L) {
            lastPos = readEndPos();
        }
        return (int) (lastPos - pos);
    }

    @Override
    public boolean markSupported() {
        return false;
    }

    @Override
    public int read() throws IOException {
        byte[] data = new byte[1];
        int numBytes = read(data);
        if (numBytes <= 0) {
            return -1;
        }
        return data[0];
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java [139:229]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @Override
    public int read(byte[] b, final int off, final int len) throws IOException {
        int remaining = len;
        int numBytesRead = 0;
        while (remaining > 0) {
            if (null == currentRecord) {
                currentRecord = nextRecordStream(reader);
            }

            if (null == currentRecord) {
                if (numBytesRead == 0) {
                    return -1;
                }
                break;
            }

            int bytesLeft = currentRecord.payloadStream.available();
            if (bytesLeft <= 0) {
                currentRecord.payloadStream.close();
                currentRecord = null;
                continue;
            }

            int numBytesToRead = Math.min(bytesLeft, remaining);
            int numBytes = currentRecord.payloadStream.read(b, off + numBytesRead, numBytesToRead);
            if (numBytes < 0) {
                continue;
            }
            numBytesRead += numBytes;
            remaining -= numBytes;
        }
        return numBytesRead;
    }

    @Override
    public long skip(final long n) throws IOException {
        if (n <= 0L) {
            return 0L;
        }

        long remaining = n;
        while (true) {
            if (null == currentRecord) {
                currentRecord = nextRecordStream(reader);
            }

            if (null == currentRecord) { // end of stream
                return n - remaining;
            }

            int bytesLeft = currentRecord.payloadStream.available();
            long endPos = currentRecord.record.getTransactionId();
            if (remaining > bytesLeft) {
                // skip the whole record
                remaining -= bytesLeft;
                this.pos = endPos;
                this.currentRecord = nextRecordStream(reader);
                continue;
            } else if (remaining == bytesLeft) {
                this.pos = endPos;
                this.currentRecord = null;
                return n;
            } else {
                currentRecord.payloadStream.skip(remaining);
                this.pos = endPos - currentRecord.payloadStream.available();
                return n;
            }
        }
    }

    @Override
    public int available() throws IOException {
        if (lastPos - pos == 0L) {
            lastPos = readEndPos();
        }
        return (int) (lastPos - pos);
    }

    @Override
    public boolean markSupported() {
        return false;
    }

    @Override
    public int read() throws IOException {
        byte[] data = new byte[1];
        int numBytes = read(data);
        if (numBytes <= 0) {
            return -1;
        }
        return data[0];
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



