public InputStream openInputStream()

in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java [446:562]


    public InputStream openInputStream(final Page p) throws IOException {

        return new InputStream() {

            private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
            private Page page = readPage(p);
            private int pageCount = 1;

            private Page markPage;
            private ByteSequence markChunk;

            private Page readPage(Page page) throws IOException {
                // Read the page data

                pageFile.readPage(page.getPageId(), chunk.getData());

                chunk.setOffset(0);
                chunk.setLength(pageFile.getPageSize());

                DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
                page.read(in);

                chunk.setOffset(Page.PAGE_HEADER_SIZE);
                if (page.getType() == Page.PAGE_END_TYPE) {
                    chunk.setLength((int)(page.getNext()));
                }

                if (page.getType() == Page.PAGE_FREE_TYPE) {
                    throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
                }

                return page;
            }

            public int read() throws IOException {
                if (!atEOF()) {
                    return chunk.data[chunk.offset++] & 0xff;
                } else {
                    return -1;
                }
            }

            private boolean atEOF() throws IOException {
                if (chunk.offset < chunk.length) {
                    return false;
                }
                if (page.getType() == Page.PAGE_END_TYPE) {
                    return true;
                }
                fill();
                return chunk.offset >= chunk.length;
            }

            private void fill() throws IOException {
                page = readPage(new Page(page.getNext()));
                pageCount++;
            }

            public int read(byte[] b) throws IOException {
                return read(b, 0, b.length);
            }

            public int read(byte b[], int off, int len) throws IOException {
                if (!atEOF()) {
                    int rc = 0;
                    while (!atEOF() && rc < len) {
                        len = Math.min(len, chunk.length - chunk.offset);
                        if (len > 0) {
                            System.arraycopy(chunk.data, chunk.offset, b, off, len);
                            chunk.offset += len;
                        }
                        rc += len;
                    }
                    return rc;
                } else {
                    return -1;
                }
            }

            public long skip(long len) throws IOException {
                if (atEOF()) {
                    int rc = 0;
                    while (!atEOF() && rc < len) {
                        len = Math.min(len, chunk.length - chunk.offset);
                        if (len > 0) {
                            chunk.offset += len;
                        }
                        rc += len;
                    }
                    return rc;
                } else {
                    return -1;
                }
            }

            public int available() {
                return chunk.length - chunk.offset;
            }

            public boolean markSupported() {
                return true;
            }

            public void mark(int markpos) {
                markPage = page;
                byte data[] = new byte[pageFile.getPageSize()];
                System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
                markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
            }

            public void reset() {
                page = markPage;
                chunk = markChunk;
            }

        };
    }