public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime()

in algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java [57:193]


    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
            throws AlgebricksException {
        final IPrinter[] printers = new IPrinter[printerFactories.length];
        for (int i = 0; i < printerFactories.length; i++) {
            printers[i] = printerFactories[i].createPrinter();
        }

        return new AbstractOneInputOneOutputOneFramePushRuntime() {

            final class ForwardScriptOutput implements Runnable {

                private InputStream inStream;
                private ITupleParser parser;

                public ForwardScriptOutput(ITupleParser parser, InputStream inStream) {
                    this.parser = parser;
                    this.inStream = inStream;
                }

                @Override
                public void run() {
                    try {
                        parser.parse(inStream, writer);
                    } catch (HyracksDataException e) {
                        throw new RuntimeException(e);
                    } finally {
                        try {
                            inStream.close();
                        } catch (Exception e) {
                        }
                    }
                }
            }

            final class DumpInStreamToPrintStream implements Runnable {

                private BufferedReader reader;
                private PrintStream printStream;

                public DumpInStreamToPrintStream(InputStream inStream, PrintStream printStream) {
                    this.reader = new BufferedReader(new InputStreamReader(inStream));
                    this.printStream = printStream;
                }

                @Override
                public void run() {
                    String s;
                    try {
                        while ((s = reader.readLine()) != null) {
                            printStream.println(s);
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    } finally {
                        try {
                            reader.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        printStream.close();
                    }
                }

            }

            private Process process;
            private PrintStream ps;
            private boolean first = true;
            private Thread outputPipe;
            private Thread dumpStderr;

            @Override
            public void open() throws HyracksDataException {
                if (first) {
                    first = false;
                    initAccessAppendRef(ctx);
                }

                try {
                    ITupleParser parser = parserFactory.createTupleParser(ctx);
                    process = Runtime.getRuntime().exec(command);
                    ps = new PrintStream(process.getOutputStream());
                    ForwardScriptOutput fso = new ForwardScriptOutput(parser, process.getInputStream());
                    outputPipe = new Thread(fso);
                    outputPipe.start();
                    DumpInStreamToPrintStream disps = new DumpInStreamToPrintStream(process.getErrorStream(),
                            System.err);
                    dumpStderr = new Thread(disps);
                    dumpStderr.start();
                } catch (IOException e) {
                    throw new HyracksDataException(e);
                }
            }

            @Override
            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                tAccess.reset(buffer);
                int nTuple = tAccess.getTupleCount();
                for (int t = 0; t < nTuple; t++) {
                    tRef.reset(tAccess, t);
                    for (int i = 0; i < printers.length; i++) {
                        printers[i].print(buffer.array(), tRef.getFieldStart(i), tRef.getFieldLength(i), ps);
                        ps.print(fieldDelimiter);
                        if (i == printers.length - 1) {
                            ps.print('\n');
                        }
                    }
                }
            }

            @Override
            public void close() throws HyracksDataException {
                // first close the printer printing to the process
                ps.close();
                int ret = 0;
                // then wait for the process to finish

                try {
                    ret = process.waitFor();
                    outputPipe.join();
                    dumpStderr.join();
                } catch (InterruptedException e) {
                    throw new HyracksDataException(e);
                }
                if (ret != 0) {
                    throw new HyracksDataException("Process exit value: " + ret);
                }
                // close the following operator in the chain
                super.close();
            }

            @Override
            public void flush() throws HyracksDataException {
                ps.flush();
            }
        };
    }