in hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java [88:157]
public void writeTo(final IFrameWriter writer) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
synchronized (MaterializingPipelinedPartition.this) {
while (fRef == null && eos == false) {
MaterializingPipelinedPartition.this.wait();
}
}
IFileHandle fh = fRef == null ? null
: ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
try {
writer.open();
try {
if (fh != null) {
long offset = 0;
ByteBuffer buffer = ctx.allocateFrame();
boolean fail = false;
boolean done = false;
boolean flush = false;
while (!fail && !done) {
synchronized (MaterializingPipelinedPartition.this) {
if (flushRequest) {
flushRequest = false;
flush = true;
}
while (offset >= size && !eos && !failed && !flush) {
try {
MaterializingPipelinedPartition.this.wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
fail = failed;
done = eos && offset >= size;
}
if (fail) {
writer.fail();
} else if (!done) {
buffer.clear();
long readLen = ioManager.syncRead(fh, offset, buffer);
if (readLen < buffer.capacity()) {
throw new HyracksDataException("Premature end of file");
}
offset += readLen;
buffer.flip();
writer.nextFrame(buffer);
if (flush) {
writer.flush();
flush = false;
}
}
}
}
} finally {
writer.close();
}
} finally {
if (fh != null) {
ioManager.close(fh);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}