public void writeTo()

in hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java [88:157]


    public void writeTo(final IFrameWriter writer) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (MaterializingPipelinedPartition.this) {
                        while (fRef == null && eos == false) {
                            MaterializingPipelinedPartition.this.wait();
                        }
                    }
                    IFileHandle fh = fRef == null ? null
                            : ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
                                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
                    try {
                        writer.open();
                        try {
                            if (fh != null) {
                                long offset = 0;
                                ByteBuffer buffer = ctx.allocateFrame();
                                boolean fail = false;
                                boolean done = false;
                                boolean flush = false;
                                while (!fail && !done) {
                                    synchronized (MaterializingPipelinedPartition.this) {
                                        if (flushRequest) {
                                            flushRequest = false;
                                            flush = true;
                                        }
                                        while (offset >= size && !eos && !failed && !flush) {
                                            try {
                                                MaterializingPipelinedPartition.this.wait();
                                            } catch (InterruptedException e) {
                                                throw new HyracksDataException(e);
                                            }
                                        }
                                        fail = failed;
                                        done = eos && offset >= size;
                                    }
                                    if (fail) {
                                        writer.fail();
                                    } else if (!done) {
                                        buffer.clear();
                                        long readLen = ioManager.syncRead(fh, offset, buffer);
                                        if (readLen < buffer.capacity()) {
                                            throw new HyracksDataException("Premature end of file");
                                        }
                                        offset += readLen;
                                        buffer.flip();
                                        writer.nextFrame(buffer);
                                        if (flush) {
                                            writer.flush();
                                            flush = false;
                                        }
                                    }
                                }
                            }
                        } finally {
                            writer.close();
                        }
                    } finally {
                        if (fh != null) {
                            ioManager.close(fh);
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }