public IOperatorNodePushable createPushRuntime()

in hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java [74:270]


    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
                    throws HyracksDataException {
        final HadoopHelper helper = new HadoopHelper(config);
        final Configuration conf = helper.getConfiguration();
        final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
        final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
        final IInputSplitProvider isp = factory.createInputSplitProvider(partition);
        final TaskAttemptID taId = new TaskAttemptID("foo", jobId, true, partition, 0);
        final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);

        final int framesLimit = helper.getSortFrameLimit(ctx);
        final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();

        class SortingRecordWriter extends RecordWriter<K2, V2> {
            private final ArrayTupleBuilder tb;
            private final IFrame frame;
            private final FrameTupleAppender fta;
            private ExternalSortRunGenerator runGen;
            private int blockId;

            public SortingRecordWriter() throws HyracksDataException {
                tb = new ArrayTupleBuilder(2);
                frame = new VSizeFrame(ctx);
                fta = new FrameTupleAppender(frame);
            }

            public void initBlock(int blockId) throws HyracksDataException {
                runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
                        helper.getMapOutputRecordDescriptorWithoutExtraFields(), Algorithm.MERGE_SORT, framesLimit);
                this.blockId = blockId;
            }

            @Override
            public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
            }

            @Override
            public void write(K2 key, V2 value) throws IOException, InterruptedException {
                DataOutput dos = tb.getDataOutput();
                tb.reset();
                key.write(dos);
                tb.addFieldEndOffset();
                value.write(dos);
                tb.addFieldEndOffset();
                if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                    runGen.nextFrame(frame.getBuffer());
                    fta.reset(frame, true);
                    if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                        throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
                                + frame.getBuffer().capacity() + ")");
                    }
                }
            }

            public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
                if (fta.getTupleCount() > 0) {
                    runGen.nextFrame(frame.getBuffer());
                    fta.reset(frame, true);
                }
                runGen.close();
                IFrameWriter delegatingWriter = new IFrameWriter() {
                    private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
                    private final FrameTupleAccessor fta = new FrameTupleAccessor(
                            helper.getMapOutputRecordDescriptorWithoutExtraFields());
                    private final ArrayTupleBuilder tb = new ArrayTupleBuilder(3);

                    @Override
                    public void open() throws HyracksDataException {
                    }

                    @Override
                    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                        fta.reset(buffer);
                        int n = fta.getTupleCount();
                        for (int i = 0; i < n; ++i) {
                            tb.reset();
                            tb.addField(fta, i, 0);
                            tb.addField(fta, i, 1);
                            try {
                                tb.getDataOutput().writeInt(blockId);
                            } catch (IOException e) {
                                throw new HyracksDataException(e);
                            }
                            tb.addFieldEndOffset();
                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                                appender.flush(writer, true);
                                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                                    throw new IllegalStateException();
                                }
                            }
                        }
                    }

                    @Override
                    public void close() throws HyracksDataException {
                        appender.flush(writer, true);
                    }

                    @Override
                    public void fail() throws HyracksDataException {
                        // TODO Auto-generated method stub

                    }
                };
                if (helper.hasCombiner()) {
                    Reducer<K2, V2, K2, V2> combiner = helper.getCombiner();
                    TaskAttemptID ctaId = new TaskAttemptID("foo", jobId, true, partition, 0);
                    TaskAttemptContext ctaskAttemptContext = helper.createTaskAttemptContext(taId);
                    final IFrameWriter outputWriter = delegatingWriter;
                    RecordWriter<K2, V2> recordWriter = new RecordWriter<K2, V2>() {
                        private final FrameTupleAppender fta = new FrameTupleAppender(new VSizeFrame(ctx));
                        private final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);

                        {
                            outputWriter.open();
                        }

                        @Override
                        public void write(K2 key, V2 value) throws IOException, InterruptedException {
                            DataOutput dos = tb.getDataOutput();
                            tb.reset();
                            key.write(dos);
                            tb.addFieldEndOffset();
                            value.write(dos);
                            tb.addFieldEndOffset();
                            if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                                fta.flush(outputWriter, true);
                                if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                                    throw new IllegalStateException();
                                }
                            }
                        }

                        @Override
                        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                            fta.flush(outputWriter, true);
                        }
                    };
                    delegatingWriter = new ReduceWriter<K2, V2, K2, V2>(ctx, helper,
                            new int[] { HadoopHelper.KEY_FIELD_INDEX }, helper.getGroupingComparatorFactories(),
                            helper.getMapOutputRecordDescriptorWithoutExtraFields(), combiner, recordWriter, ctaId,
                            ctaskAttemptContext);
                }
                IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
                for (int i = 0; i < comparatorFactories.length; ++i) {
                    comparators[i] = comparatorFactories[i].createBinaryComparator();
                }
                ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(), runGen.getRuns(),
                        new int[] { 0 }, comparators, null, helper.getMapOutputRecordDescriptorWithoutExtraFields(),
                        framesLimit, delegatingWriter);
                merger.process();
            }
        }

        return new AbstractUnaryOutputSourceOperatorNodePushable() {
            @SuppressWarnings("unchecked")
            @Override
            public void initialize() throws HyracksDataException {
                try {
                    writer.open();
                    SortingRecordWriter recordWriter = new SortingRecordWriter();
                    InputSplit split = null;
                    int blockId = 0;
                    while ((split = isp.next()) != null) {
                        try {
                            RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split,
                                    taskAttemptContext);
                            ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                            try {
                                Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
                                recordReader.initialize(split, taskAttemptContext);
                            } finally {
                                Thread.currentThread().setContextClassLoader(ctxCL);
                            }
                            recordWriter.initBlock(blockId);
                            Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil().createMapContext(conf, taId,
                                    recordReader, recordWriter, null, null, split);
                            mapper.run(mCtx);
                            recordReader.close();
                            recordWriter.sortAndFlushBlock(writer);
                            ++blockId;
                        } catch (IOException e) {
                            throw new HyracksDataException(e);
                        } catch (InterruptedException e) {
                            throw new HyracksDataException(e);
                        }
                    }
                } catch (Throwable th) {
                    writer.fail();
                    throw th;
                } finally {
                    writer.close();
                }
            }
        };
    }