public IOperatorNodePushable createPushRuntime()

in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java [360:557]


        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
                        throws HyracksDataException {
            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
            for (int i = 0; i < comparatorFactories.length; ++i) {
                comparators[i] = comparatorFactories[i].createBinaryComparator();
            }
            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
            if (isLeftOuter) {
                for (int i = 0; i < nullWriterFactories1.length; i++) {
                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                }
            }
            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
                    : predEvaluatorFactory.createPredicateEvaluator());

            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                private BuildAndPartitionTaskState state;
                private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(rd0);
                private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
                        hashFunctionFactories);
                private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
                        hashFunctionFactories);
                private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();

                private final FrameTupleAppender appender = new FrameTupleAppender();
                private final FrameTupleAppender ftap = new FrameTupleAppender();
                private final IFrame inBuffer = new VSizeFrame(ctx);
                private final IFrame outBuffer = new VSizeFrame(ctx);
                private RunFileWriter[] buildWriters;
                private RunFileWriter[] probeWriters;
                private IFrame[] bufferForPartitions;

                @Override
                public void open() throws HyracksDataException {
                    writer.open();
                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
                            new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
                    buildWriters = state.fWriters;
                    probeWriters = new RunFileWriter[state.nPartitions];
                    bufferForPartitions = new IFrame[state.nPartitions];
                    for (int i = 0; i < state.nPartitions; i++) {
                        bufferForPartitions[i] = new VSizeFrame(ctx);
                    }
                    appender.reset(outBuffer, true);
                    ftap.reset(inBuffer, true);
                }

                @Override
                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                    if (state.memoryForHashtable != memsize - 2) {
                        accessorProbe.reset(buffer);
                        int tupleCount0 = accessorProbe.getTupleCount();
                        for (int i = 0; i < tupleCount0; ++i) {

                            int entry;
                            if (state.memoryForHashtable == 0) {
                                entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
                                boolean newBuffer = false;
                                IFrame outbuf = bufferForPartitions[entry];
                                while (true) {
                                    appender.reset(outbuf, newBuffer);
                                    if (appender.append(accessorProbe, i)) {
                                        break;
                                    } else {
                                        write(entry, outbuf.getBuffer());
                                        outbuf.reset();
                                        newBuffer = true;
                                    }
                                }
                            } else {
                                entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
                                if (entry < state.memoryForHashtable) {
                                    while (true) {
                                        if (!ftap.append(accessorProbe, i)) {
                                            state.joiner.join(inBuffer.getBuffer(), writer);
                                            ftap.reset(inBuffer, true);
                                        } else
                                            break;
                                    }

                                } else {
                                    entry %= state.nPartitions;
                                    boolean newBuffer = false;
                                    IFrame outbuf = bufferForPartitions[entry];
                                    while (true) {
                                        appender.reset(outbuf, newBuffer);
                                        if (appender.append(accessorProbe, i)) {
                                            break;
                                        } else {
                                            write(entry, outbuf.getBuffer());
                                            outbuf.reset();
                                            newBuffer = true;
                                        }
                                    }
                                }
                            }
                        }
                    } else {
                        state.joiner.join(buffer, writer);
                    }
                }

                @Override
                public void close() throws HyracksDataException {
                    try {
                        state.joiner.join(inBuffer.getBuffer(), writer);
                        state.joiner.closeJoin(writer);
                        ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
                                .createPartitioner();
                        ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
                                .createPartitioner();
                        if (state.memoryForHashtable != memsize - 2) {
                            for (int i = 0; i < state.nPartitions; i++) {
                                ByteBuffer buf = bufferForPartitions[i].getBuffer();
                                accessorProbe.reset(buf);
                                if (accessorProbe.getTupleCount() > 0) {
                                    write(i, buf);
                                }
                                closeWriter(i);
                            }

                            inBuffer.reset();
                            int tableSize = -1;
                            if (state.memoryForHashtable == 0) {
                                tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
                            } else {
                                tableSize = (int) (memsize * recordsPerFrame * factor);
                            }
                            ISerializableTable table = new SerializableHashTable(tableSize, ctx);
                            for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
                                RunFileWriter buildWriter = buildWriters[partitionid];
                                RunFileWriter probeWriter = probeWriters[partitionid];
                                if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                                    continue;
                                }
                                table.reset();
                                InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize,
                                        new FrameTupleAccessor(rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
                                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
                                        nullWriters1, table, predEvaluator);

                                if (buildWriter != null) {
                                    RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
                                    buildReader.open();
                                    while (buildReader.nextFrame(inBuffer)) {
                                        ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
                                        FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
                                        joiner.build(copyBuffer);
                                        inBuffer.reset();
                                    }
                                    buildReader.close();
                                }

                                // probe
                                RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
                                probeReader.open();
                                while (probeReader.nextFrame(inBuffer)) {
                                    joiner.join(inBuffer.getBuffer(), writer);
                                    inBuffer.reset();
                                }
                                probeReader.close();
                                joiner.closeJoin(writer);
                            }
                        }
                    } finally {
                        writer.close();
                    }
                }

                private void closeWriter(int i) throws HyracksDataException {
                    RunFileWriter writer = probeWriters[i];
                    if (writer != null) {
                        writer.close();
                    }
                }

                private void write(int i, ByteBuffer head) throws HyracksDataException {
                    RunFileWriter writer = probeWriters[i];
                    if (writer == null) {
                        FileReference file = ctx
                                .createManagedWorkspaceFile(PartitionAndJoinActivityNode.class.getSimpleName());
                        writer = new RunFileWriter(file, ctx.getIOManager());
                        writer.open();
                        probeWriters[i] = writer;
                    }
                    writer.nextFrame(head);
                }

                @Override
                public void fail() throws HyracksDataException {
                    writer.fail();
                }
            };
            return op;
        }