in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java [446:562]
public InputStream openInputStream(final Page p) throws IOException {
return new InputStream() {
private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
private Page page = readPage(p);
private int pageCount = 1;
private Page markPage;
private ByteSequence markChunk;
private Page readPage(Page page) throws IOException {
// Read the page data
pageFile.readPage(page.getPageId(), chunk.getData());
chunk.setOffset(0);
chunk.setLength(pageFile.getPageSize());
DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
page.read(in);
chunk.setOffset(Page.PAGE_HEADER_SIZE);
if (page.getType() == Page.PAGE_END_TYPE) {
chunk.setLength((int)(page.getNext()));
}
if (page.getType() == Page.PAGE_FREE_TYPE) {
throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
}
return page;
}
public int read() throws IOException {
if (!atEOF()) {
return chunk.data[chunk.offset++] & 0xff;
} else {
return -1;
}
}
private boolean atEOF() throws IOException {
if (chunk.offset < chunk.length) {
return false;
}
if (page.getType() == Page.PAGE_END_TYPE) {
return true;
}
fill();
return chunk.offset >= chunk.length;
}
private void fill() throws IOException {
page = readPage(new Page(page.getNext()));
pageCount++;
}
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
public int read(byte b[], int off, int len) throws IOException {
if (!atEOF()) {
int rc = 0;
while (!atEOF() && rc < len) {
len = Math.min(len, chunk.length - chunk.offset);
if (len > 0) {
System.arraycopy(chunk.data, chunk.offset, b, off, len);
chunk.offset += len;
}
rc += len;
}
return rc;
} else {
return -1;
}
}
public long skip(long len) throws IOException {
if (atEOF()) {
int rc = 0;
while (!atEOF() && rc < len) {
len = Math.min(len, chunk.length - chunk.offset);
if (len > 0) {
chunk.offset += len;
}
rc += len;
}
return rc;
} else {
return -1;
}
}
public int available() {
return chunk.length - chunk.offset;
}
public boolean markSupported() {
return true;
}
public void mark(int markpos) {
markPage = page;
byte data[] = new byte[pageFile.getPageSize()];
System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
}
public void reset() {
page = markPage;
chunk = markChunk;
}
};
}